M-CTC-T Model (#16402)
* added cbs to notebooks, made copy-paste error fix in generation_utils * initial push for mctc model * mctc feature extractor done * added processor, tokenizer and their tests for MCTC. Have added an MCTC modeling test, adjusting model code accordingly. * added processor, tokenizer and their tests for MCTC. Have added an MCTC modeling test, adjusting model code accordingly. * passing attention, now struggling to figure out how attention masks make sense here * works when excluding attention masks. ask later how one would integrate attention maskshere * bizarre configuration error (model prefix comes first in config dict json and messes up the order) * all passing but bizzarre config dict ordering issue when to_dict * passing all major tests * feature extraction, processor, tokenizer added & tests passing * style & consistency & other logistical fixes * copy paste fix * model after feature extraction working * commiting final feature extraction results; need to fix normalization * feature extraction passing tests; probably should add tests on the specific flashlight-copied functions? * delete print ; format code a bit * fixing tests * passing major tests * fixing styles * completed tokenization test with real example; not sure if these values are entirely correct. * last test fixes from local * reverting accidentally included custom setup configs * remove load tf weights; fix config error * testing couldnt import featureextractor * fix docs * fix docs * resolving comments * style fixes * style fixes * Update to MCTCConv1dSubSampler Co-authored-by: Patrick von Platen <patrick.v.platen@gmail.com> * relposemb fixes * conv1d name issue; expecting config fail with paraentheses * fix config issue * fix config issue * fix config issue * change everything to MCTCT * fixing naming change errors * archive list * copyrights and docs * copyrights and docs * copyrights and docs * merge resolution * move tests, fix to changed optionaldependency structure * test directories changed * fixing tests * how to avoid tf tests? * how to avoid tf tests? * tests passing locally * allow mctctprocessor imported any env * allow mctctprocessor imported any env * fixed second round of feedback, need to fix docs * doc changes not being applied * all fixed * style fix * feedback fixes * fix copies and feature extraction style fix * Update tests/models/visual_bert/test_modeling_visual_bert.py Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com> * copy paste huggingface:main visual bert * added eof newline to visual bert; all tests are passing otherwise * fix slow tests by adding attention mask * change model id to speechbrain * make fix-copies * fix readme unwanted deletes * fixing readmes, make fix-copies * consistent M-CTC-T naming * Update src/transformers/models/mctct/__init__.py Co-authored-by: Patrick von Platen <patrick.v.platen@gmail.com> * all fixed but variable naming * adjust double quotes * fixed variable names * copyright and mr quilter * Apply suggestions from code review Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com> * correct slow tests * make fix-copies * Update src/transformers/models/mctct/configuration_mctct.py Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com> * Update src/transformers/models/mctct/configuration_mctct.py Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com> * m-ctc-t not mctct Co-authored-by: Patrick von Platen <patrick.v.platen@gmail.com> Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com>
This commit is contained in:
0
tests/models/mctct/__init__.py
Normal file
0
tests/models/mctct/__init__.py
Normal file
274
tests/models/mctct/test_feature_extraction_mctct.py
Normal file
274
tests/models/mctct/test_feature_extraction_mctct.py
Normal file
@@ -0,0 +1,274 @@
|
||||
# coding=utf-8
|
||||
# Copyright 2022 HuggingFace Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import itertools
|
||||
import random
|
||||
import unittest
|
||||
|
||||
import numpy as np
|
||||
|
||||
from transformers import is_speech_available
|
||||
from transformers.testing_utils import require_torch, require_torchaudio
|
||||
|
||||
from ...test_sequence_feature_extraction_common import SequenceFeatureExtractionTestMixin
|
||||
|
||||
|
||||
if is_speech_available():
|
||||
from transformers import MCTCTFeatureExtractor
|
||||
|
||||
global_rng = random.Random()
|
||||
|
||||
|
||||
def floats_list(shape, scale=1.0, rng=None, name=None):
|
||||
"""Creates a random float32 tensor"""
|
||||
if rng is None:
|
||||
rng = global_rng
|
||||
|
||||
values = []
|
||||
for _batch_idx in range(shape[0]):
|
||||
values.append([])
|
||||
for _ in range(shape[1]):
|
||||
values[-1].append(rng.random() * scale)
|
||||
|
||||
return values
|
||||
|
||||
|
||||
@require_torch
|
||||
@require_torchaudio
|
||||
class MCTCTFeatureExtractionTester(unittest.TestCase):
|
||||
def __init__(
|
||||
self,
|
||||
parent,
|
||||
batch_size=7,
|
||||
min_seq_length=400,
|
||||
max_seq_length=2000,
|
||||
feature_size=24,
|
||||
num_mel_bins=24,
|
||||
padding_value=0.0,
|
||||
sampling_rate=16_000,
|
||||
return_attention_mask=True,
|
||||
do_normalize=True,
|
||||
):
|
||||
self.parent = parent
|
||||
self.batch_size = batch_size
|
||||
self.min_seq_length = min_seq_length
|
||||
self.max_seq_length = max_seq_length
|
||||
self.seq_length_diff = (self.max_seq_length - self.min_seq_length) // (self.batch_size - 1)
|
||||
self.feature_size = feature_size
|
||||
self.num_mel_bins = num_mel_bins
|
||||
self.padding_value = padding_value
|
||||
self.sampling_rate = sampling_rate
|
||||
self.return_attention_mask = return_attention_mask
|
||||
self.do_normalize = do_normalize
|
||||
|
||||
def prepare_feat_extract_dict(self):
|
||||
return {
|
||||
"feature_size": self.feature_size,
|
||||
"num_mel_bins": self.num_mel_bins,
|
||||
"padding_value": self.padding_value,
|
||||
"sampling_rate": self.sampling_rate,
|
||||
"return_attention_mask": self.return_attention_mask,
|
||||
"do_normalize": self.do_normalize,
|
||||
}
|
||||
|
||||
def prepare_inputs_for_common(self, equal_length=False, numpify=False):
|
||||
def _flatten(list_of_lists):
|
||||
return list(itertools.chain(*list_of_lists))
|
||||
|
||||
if equal_length:
|
||||
speech_inputs = [floats_list((self.max_seq_length, self.feature_size)) for _ in range(self.batch_size)]
|
||||
else:
|
||||
# make sure that inputs increase in size
|
||||
speech_inputs = [
|
||||
floats_list((x, self.feature_size))
|
||||
for x in range(self.min_seq_length, self.max_seq_length, self.seq_length_diff)
|
||||
]
|
||||
if numpify:
|
||||
speech_inputs = [np.asarray(x) for x in speech_inputs]
|
||||
return speech_inputs
|
||||
|
||||
|
||||
@require_torch
|
||||
@require_torchaudio
|
||||
class MCTCTFeatureExtractionTest(SequenceFeatureExtractionTestMixin, unittest.TestCase):
|
||||
|
||||
feature_extraction_class = MCTCTFeatureExtractor if is_speech_available() else None
|
||||
|
||||
def setUp(self):
|
||||
self.feat_extract_tester = MCTCTFeatureExtractionTester(self)
|
||||
|
||||
def _check_zero_mean_unit_variance(self, input_vector):
|
||||
self.assertTrue(np.all(np.mean(input_vector) < 1e-3))
|
||||
self.assertTrue(np.all(np.abs(np.var(input_vector) - 1) < 1e-3))
|
||||
|
||||
def test_call(self):
|
||||
# Tests that all call wrap to encode_plus and batch_encode_plus
|
||||
feature_extractor = self.feature_extraction_class(**self.feat_extract_tester.prepare_feat_extract_dict())
|
||||
# create three inputs of length 800, 1000, and 12000
|
||||
speech_inputs = [floats_list((1, x))[0] for x in range(8000, 14000, 2000)]
|
||||
np_speech_inputs = [np.asarray(speech_input) for speech_input in speech_inputs]
|
||||
|
||||
# Test feature size
|
||||
input_features = feature_extractor(np_speech_inputs, padding=True, return_tensors="np").input_features
|
||||
self.assertTrue(input_features.ndim == 3)
|
||||
self.assertTrue(input_features.shape[-1] == feature_extractor.feature_size)
|
||||
|
||||
# Test not batched input
|
||||
encoded_sequences_1 = feature_extractor(speech_inputs[0], return_tensors="np").input_features
|
||||
encoded_sequences_2 = feature_extractor(np_speech_inputs[0], return_tensors="np").input_features
|
||||
self.assertTrue(np.allclose(encoded_sequences_1, encoded_sequences_2, atol=1e-3))
|
||||
|
||||
# Test batched
|
||||
encoded_sequences_1 = feature_extractor(speech_inputs, return_tensors="np").input_features
|
||||
encoded_sequences_2 = feature_extractor(np_speech_inputs, return_tensors="np").input_features
|
||||
for enc_seq_1, enc_seq_2 in zip(encoded_sequences_1, encoded_sequences_2):
|
||||
self.assertTrue(np.allclose(enc_seq_1, enc_seq_2, atol=1e-3))
|
||||
|
||||
def test_cepstral_mean_and_variance_normalization(self):
|
||||
feature_extractor = self.feature_extraction_class(**self.feat_extract_tester.prepare_feat_extract_dict())
|
||||
speech_inputs = [floats_list((1, x))[0] for x in range(8000, 14000, 2000)]
|
||||
|
||||
paddings = ["longest", "max_length", "do_not_pad"]
|
||||
max_lengths = [None, 16, None]
|
||||
for max_length, padding in zip(max_lengths, paddings):
|
||||
inputs = feature_extractor(
|
||||
speech_inputs,
|
||||
padding=padding,
|
||||
max_length=max_length,
|
||||
return_attention_mask=True,
|
||||
truncation=max_length is not None, # reference to #16419
|
||||
)
|
||||
input_features = inputs.input_features
|
||||
attention_mask = inputs.attention_mask
|
||||
fbank_feat_lengths = [np.sum(x) for x in attention_mask]
|
||||
self._check_zero_mean_unit_variance(input_features[0][: fbank_feat_lengths[0]])
|
||||
self._check_zero_mean_unit_variance(input_features[1][: fbank_feat_lengths[1]])
|
||||
self._check_zero_mean_unit_variance(input_features[2][: fbank_feat_lengths[2]])
|
||||
|
||||
def test_cepstral_mean_and_variance_normalization_np(self):
|
||||
feature_extractor = self.feature_extraction_class(**self.feat_extract_tester.prepare_feat_extract_dict())
|
||||
speech_inputs = [floats_list((1, x))[0] for x in range(8000, 14000, 2000)]
|
||||
|
||||
paddings = ["longest", "max_length", "do_not_pad"]
|
||||
max_lengths = [None, 16, None]
|
||||
for max_length, padding in zip(max_lengths, paddings):
|
||||
inputs = feature_extractor(
|
||||
speech_inputs,
|
||||
max_length=max_length,
|
||||
padding=padding,
|
||||
return_tensors="np",
|
||||
return_attention_mask=True,
|
||||
truncation=max_length is not None,
|
||||
)
|
||||
input_features = inputs.input_features
|
||||
attention_mask = inputs.attention_mask
|
||||
fbank_feat_lengths = [np.sum(x) for x in attention_mask]
|
||||
|
||||
self._check_zero_mean_unit_variance(input_features[0][: fbank_feat_lengths[0]])
|
||||
self.assertTrue(input_features[0][fbank_feat_lengths[0] :].sum() < 1e-6)
|
||||
self._check_zero_mean_unit_variance(input_features[1][: fbank_feat_lengths[1]])
|
||||
self.assertTrue(input_features[0][fbank_feat_lengths[1] :].sum() < 1e-6)
|
||||
self._check_zero_mean_unit_variance(input_features[2][: fbank_feat_lengths[2]])
|
||||
|
||||
def test_cepstral_mean_and_variance_normalization_trunc_max_length(self):
|
||||
feature_extractor = self.feature_extraction_class(**self.feat_extract_tester.prepare_feat_extract_dict())
|
||||
speech_inputs = [floats_list((1, x))[0] for x in range(8000, 14000, 2000)]
|
||||
inputs = feature_extractor(
|
||||
speech_inputs,
|
||||
padding="max_length",
|
||||
max_length=4,
|
||||
truncation=True,
|
||||
return_tensors="np",
|
||||
return_attention_mask=True,
|
||||
)
|
||||
input_features = inputs.input_features
|
||||
attention_mask = inputs.attention_mask
|
||||
fbank_feat_lengths = np.sum(attention_mask == 1, axis=1)
|
||||
|
||||
self._check_zero_mean_unit_variance(input_features[0, : fbank_feat_lengths[0]])
|
||||
self._check_zero_mean_unit_variance(input_features[1])
|
||||
self._check_zero_mean_unit_variance(input_features[2])
|
||||
|
||||
def test_cepstral_mean_and_variance_normalization_trunc_longest(self):
|
||||
feature_extractor = self.feature_extraction_class(**self.feat_extract_tester.prepare_feat_extract_dict())
|
||||
speech_inputs = [floats_list((1, x))[0] for x in range(8000, 14000, 2000)]
|
||||
inputs = feature_extractor(
|
||||
speech_inputs,
|
||||
padding="longest",
|
||||
max_length=4,
|
||||
truncation=True,
|
||||
return_tensors="np",
|
||||
return_attention_mask=True,
|
||||
)
|
||||
input_features = inputs.input_features
|
||||
attention_mask = inputs.attention_mask
|
||||
fbank_feat_lengths = np.sum(attention_mask == 1, axis=1)
|
||||
|
||||
self._check_zero_mean_unit_variance(input_features[0, : fbank_feat_lengths[0]])
|
||||
self._check_zero_mean_unit_variance(input_features[1, : fbank_feat_lengths[1]])
|
||||
self._check_zero_mean_unit_variance(input_features[2])
|
||||
|
||||
# make sure that if max_length < longest -> then pad to max_length
|
||||
self.assertEqual(input_features.shape, (3, 4, 24))
|
||||
|
||||
speech_inputs = [floats_list((1, x))[0] for x in range(8000, 14000, 2000)]
|
||||
inputs = feature_extractor(
|
||||
speech_inputs,
|
||||
padding="longest",
|
||||
max_length=16,
|
||||
truncation=True,
|
||||
return_tensors="np",
|
||||
return_attention_mask=True,
|
||||
)
|
||||
input_features = inputs.input_features
|
||||
attention_mask = inputs.attention_mask
|
||||
fbank_feat_lengths = np.sum(attention_mask == 1, axis=1)
|
||||
|
||||
self._check_zero_mean_unit_variance(input_features[0, : fbank_feat_lengths[0]])
|
||||
self._check_zero_mean_unit_variance(input_features[1, : fbank_feat_lengths[1]])
|
||||
self._check_zero_mean_unit_variance(input_features[2])
|
||||
|
||||
# make sure that if max_length < longest -> then pad to max_length
|
||||
self.assertEqual(input_features.shape, (3, 16, 24))
|
||||
|
||||
def test_double_precision_pad(self):
|
||||
import torch
|
||||
|
||||
feature_extractor = self.feature_extraction_class(**self.feat_extract_tester.prepare_feat_extract_dict())
|
||||
np_speech_inputs = np.random.rand(100, 32).astype(np.float64)
|
||||
py_speech_inputs = np_speech_inputs.tolist()
|
||||
|
||||
for inputs in [py_speech_inputs, np_speech_inputs]:
|
||||
np_processed = feature_extractor.pad([{"input_features": inputs}], return_tensors="np")
|
||||
self.assertTrue(np_processed.input_features.dtype == np.float32)
|
||||
pt_processed = feature_extractor.pad([{"input_features": inputs}], return_tensors="pt")
|
||||
self.assertTrue(pt_processed.input_features.dtype == torch.float32)
|
||||
|
||||
def test_different_window(self):
|
||||
import torch
|
||||
|
||||
init_dict = self.feat_extract_tester.prepare_feat_extract_dict()
|
||||
init_dict["win_function"] = "hann_window"
|
||||
|
||||
feature_extractor = self.feature_extraction_class(**init_dict)
|
||||
np_speech_inputs = np.random.rand(100, 32).astype(np.float64)
|
||||
py_speech_inputs = np_speech_inputs.tolist()
|
||||
|
||||
for inputs in [py_speech_inputs, np_speech_inputs]:
|
||||
np_processed = feature_extractor.pad([{"input_features": inputs}], return_tensors="np")
|
||||
self.assertTrue(np_processed.input_features.dtype == np.float32)
|
||||
pt_processed = feature_extractor.pad([{"input_features": inputs}], return_tensors="pt")
|
||||
self.assertTrue(pt_processed.input_features.dtype == torch.float32)
|
||||
647
tests/models/mctct/test_modeling_mctct.py
Normal file
647
tests/models/mctct/test_modeling_mctct.py
Normal file
@@ -0,0 +1,647 @@
|
||||
# coding=utf-8
|
||||
# Copyright 2022 The HuggingFace Inc. team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
""" Testing suite for the PyTorch MCTCT model. """
|
||||
|
||||
import inspect
|
||||
import math
|
||||
import unittest
|
||||
|
||||
from datasets import load_dataset
|
||||
|
||||
from transformers import MCTCTConfig, is_torch_available
|
||||
from transformers.testing_utils import require_soundfile, require_torch, slow, torch_device
|
||||
|
||||
from ...test_configuration_common import ConfigTester
|
||||
from ...test_modeling_common import ModelTesterMixin, _config_zero_init, floats_tensor, ids_tensor
|
||||
|
||||
|
||||
if is_torch_available():
|
||||
import torch
|
||||
|
||||
from transformers import MCTCTForCTC, MCTCTModel, MCTCTProcessor
|
||||
|
||||
|
||||
class MCTCTModelTester:
|
||||
def __init__(
|
||||
self,
|
||||
parent,
|
||||
batch_size=10,
|
||||
seq_length=40, # speech is longer
|
||||
is_training=False,
|
||||
vocab_size=32,
|
||||
hidden_size=128 * 4,
|
||||
num_hidden_layers=4,
|
||||
intermediate_size=20,
|
||||
num_attention_heads=4,
|
||||
attention_head_dim=128,
|
||||
max_position_embeddings=920,
|
||||
layer_norm_eps=1e-5,
|
||||
layerdrop=0.3,
|
||||
hidden_act="relu",
|
||||
initializer_range=0.02,
|
||||
hidden_dropout_prob=0.3,
|
||||
attention_probs_dropout_prob=0.3,
|
||||
conv_glu_dim=1,
|
||||
conv_dropout=0.3,
|
||||
num_conv_layers=1,
|
||||
conv_kernel=(7,),
|
||||
conv_stride=(3,),
|
||||
input_feat_per_channel=80,
|
||||
input_channels=1,
|
||||
conv_channels=None,
|
||||
):
|
||||
self.parent = parent
|
||||
self.batch_size = batch_size
|
||||
self.seq_length = seq_length # speech is longer
|
||||
self.is_training = is_training
|
||||
|
||||
self.vocab_size = vocab_size
|
||||
self.hidden_size = hidden_size
|
||||
self.num_hidden_layers = num_hidden_layers
|
||||
self.intermediate_size = intermediate_size
|
||||
self.num_attention_heads = num_attention_heads
|
||||
|
||||
self.attention_head_dim = attention_head_dim
|
||||
self.max_position_embeddings = max_position_embeddings
|
||||
|
||||
self.layer_norm_eps = layer_norm_eps
|
||||
self.layerdrop = layerdrop
|
||||
self.hidden_act = hidden_act
|
||||
self.initializer_range = initializer_range
|
||||
self.hidden_dropout_prob = hidden_dropout_prob
|
||||
self.attention_probs_dropout_prob = attention_probs_dropout_prob
|
||||
|
||||
self.conv_glu_dim = conv_glu_dim
|
||||
self.conv_dropout = conv_dropout
|
||||
self.num_conv_layers = num_conv_layers
|
||||
self.conv_kernel = conv_kernel
|
||||
self.conv_stride = conv_stride
|
||||
self.input_feat_per_channel = input_feat_per_channel
|
||||
self.input_channels = input_channels
|
||||
self.conv_channels = conv_channels
|
||||
|
||||
output_seq_length = self.seq_length
|
||||
dilation = 1
|
||||
for _, kernel_sz, stride in zip(range(self.num_conv_layers), self.conv_kernel, self.conv_stride):
|
||||
padding = kernel_sz // 2
|
||||
output_seq_length = output_seq_length + 2 * padding - dilation * (kernel_sz - 1) - 1
|
||||
output_seq_length = torch.div(output_seq_length, stride, rounding_mode="trunc") + 1
|
||||
|
||||
self.output_seq_length = int(math.ceil(output_seq_length))
|
||||
self.encoder_seq_length = self.output_seq_length
|
||||
|
||||
def prepare_config_and_inputs(self):
|
||||
input_features = floats_tensor(
|
||||
[self.batch_size, self.seq_length, self.input_feat_per_channel], self.vocab_size
|
||||
)
|
||||
attention_mask = torch.ones([self.batch_size, self.seq_length], dtype=torch.long, device=torch_device)
|
||||
|
||||
config = self.get_config()
|
||||
|
||||
return config, input_features, attention_mask
|
||||
|
||||
def get_config(self):
|
||||
return MCTCTConfig(
|
||||
vocab_size=self.vocab_size,
|
||||
hidden_size=self.hidden_size,
|
||||
num_hidden_layers=self.num_hidden_layers,
|
||||
intermediate_size=self.intermediate_size,
|
||||
num_attention_heads=self.num_attention_heads,
|
||||
attention_head_dim=self.attention_head_dim,
|
||||
max_position_embeddings=self.max_position_embeddings,
|
||||
layer_norm_eps=self.layer_norm_eps,
|
||||
layerdrop=self.layerdrop,
|
||||
hidden_act=self.hidden_act,
|
||||
initializer_range=self.initializer_range,
|
||||
hidden_dropout_prob=self.hidden_dropout_prob,
|
||||
attention_probs_dropout_prob=self.attention_probs_dropout_prob,
|
||||
conv_glu_dim=self.conv_glu_dim,
|
||||
conv_dropout=self.conv_dropout,
|
||||
num_conv_layers=self.num_conv_layers,
|
||||
conv_kernel=self.conv_kernel,
|
||||
conv_stride=self.conv_stride,
|
||||
input_feat_per_channel=self.input_feat_per_channel,
|
||||
input_channels=self.input_channels,
|
||||
conv_channels=self.conv_channels,
|
||||
)
|
||||
|
||||
def create_and_check_model(self, config, input_features, attention_mask):
|
||||
model = MCTCTModel(config=config)
|
||||
model.to(torch_device)
|
||||
model.eval()
|
||||
result = model(input_features, attention_mask=attention_mask)
|
||||
|
||||
self.parent.assertEqual(
|
||||
result.last_hidden_state.shape, (self.batch_size, self.output_seq_length, self.hidden_size)
|
||||
)
|
||||
|
||||
def create_and_check_model_for_ctc(self, config, input_features, attention_mask):
|
||||
config.add_adapter = True
|
||||
config.output_hidden_size = 2 * config.hidden_size
|
||||
model = MCTCTForCTC(config=config)
|
||||
model.to(torch_device)
|
||||
model.eval()
|
||||
result = model(input_features, attention_mask=attention_mask)
|
||||
self.parent.assertEqual(
|
||||
result.logits.shape, (self.batch_size, self.adapter_output_seq_length, self.vocab_size)
|
||||
)
|
||||
|
||||
def create_and_check_batch_inference(self, config, input_features, *args):
|
||||
# test does not pass for models making use of `group_norm`
|
||||
# check: https://github.com/pytorch/fairseq/issues/3227
|
||||
model = MCTCTModel(config=config)
|
||||
model.to(torch_device)
|
||||
model.eval()
|
||||
|
||||
input_features = input_features[:3]
|
||||
attention_mask = torch.ones(input_features.shape[:-1], device=torch_device, dtype=torch.bool)
|
||||
|
||||
input_lengths = [input_features.shape[-1] // i for i in [2, 2, 1]]
|
||||
|
||||
# pad input
|
||||
for i in range(len(input_lengths)):
|
||||
input_features[i, input_lengths[i] :] = 0.0
|
||||
attention_mask[i, input_lengths[i] :] = 0.0
|
||||
|
||||
batch_outputs = model(input_features, attention_mask=attention_mask).last_hidden_state
|
||||
|
||||
for i in range(input_features.shape[0]):
|
||||
input_slice = input_features[i : i + 1, : input_lengths[i]]
|
||||
output = model(input_slice).last_hidden_state
|
||||
|
||||
batch_output = batch_outputs[i : i + 1, : output.shape[1]]
|
||||
self.parent.assertTrue(torch.allclose(output, batch_output, atol=1e-3))
|
||||
|
||||
def check_ctc_loss(self, config, input_features, *args):
|
||||
model = MCTCTForCTC(config=config)
|
||||
model.to(torch_device)
|
||||
|
||||
# make sure that dropout is disabled
|
||||
model.eval()
|
||||
|
||||
input_features = input_features[:3]
|
||||
|
||||
# input_features is a 2D window for each sequence
|
||||
attention_mask = torch.ones(input_features.shape[:-1], device=torch_device, dtype=torch.long)
|
||||
|
||||
# -2 since input_features is a 2D window for each sequence in batch
|
||||
input_lengths = [input_features.shape[-2] // i for i in [2, 2, 1]]
|
||||
max_length_labels = model._get_feat_extract_output_lengths(torch.tensor(input_lengths))
|
||||
labels = ids_tensor((input_features.shape[0], min(max_length_labels) - 1), model.config.vocab_size)
|
||||
# pad input
|
||||
for i in range(len(input_lengths)):
|
||||
input_features[i, input_lengths[i] :] = 0.0
|
||||
attention_mask[i, input_lengths[i] :] = 0
|
||||
|
||||
model.config.ctc_loss_reduction = "sum"
|
||||
sum_loss = model(input_features, attention_mask=attention_mask, labels=labels).loss.item()
|
||||
|
||||
model.config.ctc_loss_reduction = "mean"
|
||||
mean_loss = model(input_features, attention_mask=attention_mask, labels=labels).loss.item()
|
||||
|
||||
self.parent.assertTrue(isinstance(sum_loss, float))
|
||||
self.parent.assertTrue(isinstance(mean_loss, float))
|
||||
|
||||
def check_ctc_training(self, config, input_features, *args):
|
||||
config.ctc_zero_infinity = True
|
||||
model = MCTCTForCTC(config=config)
|
||||
model.to(torch_device)
|
||||
model.train()
|
||||
|
||||
input_features = input_features[:3]
|
||||
|
||||
input_lengths = [input_features.shape[-2] // i for i in [2, 2, 1]]
|
||||
max_length_labels = model._get_feat_extract_output_lengths(torch.tensor(input_lengths))
|
||||
labels = ids_tensor((input_features.shape[0], max(max_length_labels) - 1), model.config.vocab_size)
|
||||
|
||||
# pad input
|
||||
for i in range(len(input_lengths)):
|
||||
input_features[i, input_lengths[i] :] = 0.0
|
||||
|
||||
if max_length_labels[i] < labels.shape[-1]:
|
||||
# it's important that we make sure that target lenghts are at least
|
||||
# one shorter than logit lenghts to prevent -inf
|
||||
labels[i, max_length_labels[i] - 1 :] = -100
|
||||
|
||||
loss = model(input_features, labels=labels).loss
|
||||
self.parent.assertFalse(torch.isinf(loss).item())
|
||||
|
||||
loss.backward()
|
||||
|
||||
def check_labels_out_of_vocab(self, config, input_features, *args):
|
||||
model = MCTCTForCTC(config)
|
||||
model.to(torch_device)
|
||||
model.train()
|
||||
|
||||
input_features = input_features[:3]
|
||||
|
||||
input_lengths = [input_features.shape[-1] // i for i in [4, 2, 1]]
|
||||
max_length_labels = model._get_feat_extract_output_lengths(torch.tensor(input_lengths))
|
||||
labels = ids_tensor((input_features.shape[0], max(max_length_labels) - 2), model.config.vocab_size + 100)
|
||||
|
||||
with self.parent.assertRaises(ValueError):
|
||||
model(input_features, labels=labels)
|
||||
|
||||
def prepare_config_and_inputs_for_common(self):
|
||||
config, input_features, attention_mask = self.prepare_config_and_inputs()
|
||||
inputs_dict = {"input_features": input_features, "attention_mask": attention_mask}
|
||||
return config, inputs_dict
|
||||
|
||||
|
||||
@require_torch
|
||||
class MCTCTModelTest(ModelTesterMixin, unittest.TestCase):
|
||||
all_model_classes = (MCTCTForCTC, MCTCTModel) if is_torch_available() else ()
|
||||
test_pruning = False
|
||||
test_headmasking = False
|
||||
test_torchscript = False
|
||||
|
||||
def setUp(self):
|
||||
self.model_tester = MCTCTModelTester(self)
|
||||
self.config_tester = ConfigTester(self, config_class=MCTCTConfig, hidden_size=37)
|
||||
|
||||
def test_config(self):
|
||||
self.config_tester.run_common_tests()
|
||||
|
||||
def test_model(self):
|
||||
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
||||
self.model_tester.create_and_check_model(*config_and_inputs)
|
||||
|
||||
def test_ctc_loss_inference(self):
|
||||
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
||||
self.model_tester.check_ctc_loss(*config_and_inputs)
|
||||
|
||||
def test_ctc_train(self):
|
||||
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
||||
self.model_tester.check_ctc_training(*config_and_inputs)
|
||||
|
||||
def test_labels_out_of_vocab(self):
|
||||
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
||||
self.model_tester.check_labels_out_of_vocab(*config_and_inputs)
|
||||
|
||||
# MCTCT has no inputs_embeds
|
||||
def test_inputs_embeds(self):
|
||||
pass
|
||||
|
||||
# `input_ids` is renamed to `input_features`
|
||||
def test_forward_signature(self):
|
||||
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
|
||||
|
||||
for model_class in self.all_model_classes:
|
||||
model = model_class(config)
|
||||
signature = inspect.signature(model.forward)
|
||||
# signature.parameters is an OrderedDict => so arg_names order is deterministic
|
||||
arg_names = [*signature.parameters.keys()]
|
||||
|
||||
expected_arg_names = [
|
||||
"input_features",
|
||||
"attention_mask",
|
||||
"head_mask",
|
||||
"output_attentions",
|
||||
"output_hidden_states",
|
||||
"return_dict",
|
||||
]
|
||||
self.assertListEqual(arg_names[: len(expected_arg_names)], expected_arg_names)
|
||||
|
||||
# MCTCT cannot resize token embeddings
|
||||
# since it has no tokens embeddings
|
||||
def test_resize_tokens_embeddings(self):
|
||||
pass
|
||||
|
||||
# MCTCT has no inputs_embeds
|
||||
def test_model_common_attributes(self):
|
||||
pass
|
||||
|
||||
def test_retain_grad_hidden_states_attentions(self):
|
||||
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
|
||||
config.output_hidden_states = True
|
||||
config.output_attentions = True
|
||||
config.layerdrop = 0.0
|
||||
|
||||
# no need to test all models as different heads yield the same functionality
|
||||
model_class = self.all_model_classes[0]
|
||||
model = model_class(config)
|
||||
model.to(torch_device)
|
||||
|
||||
input_features = inputs_dict["input_features"]
|
||||
|
||||
input_lengths = torch.tensor(
|
||||
[input_features.shape[1] for _ in range(input_features.shape[0])], dtype=torch.long, device=torch_device
|
||||
)
|
||||
output_lengths = model._get_feat_extract_output_lengths(input_lengths)
|
||||
|
||||
labels = ids_tensor((input_features.shape[0], output_lengths[0] - 2), self.model_tester.vocab_size)
|
||||
inputs_dict["attention_mask"] = torch.ones_like(inputs_dict["attention_mask"])
|
||||
inputs_dict["labels"] = labels
|
||||
|
||||
outputs = model(**inputs_dict)
|
||||
|
||||
output = outputs[0]
|
||||
|
||||
# Encoder-/Decoder-only models
|
||||
hidden_states = outputs.hidden_states[0]
|
||||
attentions = outputs.attentions[0]
|
||||
|
||||
hidden_states.retain_grad()
|
||||
attentions.retain_grad()
|
||||
|
||||
output.flatten()[0].backward(retain_graph=True)
|
||||
|
||||
self.assertIsNotNone(hidden_states.grad)
|
||||
self.assertIsNotNone(attentions.grad)
|
||||
|
||||
def test_initialization(self):
|
||||
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
|
||||
|
||||
configs_no_init = _config_zero_init(config)
|
||||
for model_class in self.all_model_classes:
|
||||
model = model_class(config=configs_no_init)
|
||||
for name, param in model.named_parameters():
|
||||
uniform_init_parms = [
|
||||
"conv.weight",
|
||||
"masked_spec_embed",
|
||||
"codevectors",
|
||||
"quantizer.weight_proj.weight",
|
||||
"project_hid.weight",
|
||||
"project_hid.bias",
|
||||
"project_q.weight",
|
||||
"project_q.bias",
|
||||
"feature_projection.projection.weight",
|
||||
"feature_projection.projection.bias",
|
||||
"objective.weight",
|
||||
]
|
||||
if param.requires_grad:
|
||||
if any([x in name for x in uniform_init_parms]):
|
||||
self.assertTrue(
|
||||
-1.0 <= ((param.data.mean() * 1e9).round() / 1e9).item() <= 1.0,
|
||||
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
|
||||
)
|
||||
else:
|
||||
self.assertIn(
|
||||
((param.data.mean() * 1e9).round() / 1e9).item(),
|
||||
[0.0, 1.0],
|
||||
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
|
||||
)
|
||||
|
||||
# overwrite from test_modeling_common
|
||||
def _mock_init_weights(self, module):
|
||||
if hasattr(module, "weight") and module.weight is not None:
|
||||
module.weight.data.fill_(3)
|
||||
if hasattr(module, "weight_g") and module.weight_g is not None:
|
||||
module.weight_g.data.fill_(3)
|
||||
if hasattr(module, "weight_v") and module.weight_v is not None:
|
||||
module.weight_v.data.fill_(3)
|
||||
if hasattr(module, "bias") and module.bias is not None:
|
||||
module.bias.data.fill_(3)
|
||||
if hasattr(module, "codevectors") and module.codevectors is not None:
|
||||
module.codevectors.data.fill_(3)
|
||||
if hasattr(module, "masked_spec_embed") and module.masked_spec_embed is not None:
|
||||
module.masked_spec_embed.data.fill_(3)
|
||||
|
||||
@slow
|
||||
def test_model_from_pretrained(self):
|
||||
model = MCTCTModel.from_pretrained("speechbrain/m-ctc-t-large")
|
||||
self.assertIsNotNone(model)
|
||||
|
||||
|
||||
@require_torch
|
||||
class MCTCTRobustModelTest(ModelTesterMixin, unittest.TestCase):
|
||||
all_model_classes = (MCTCTForCTC, MCTCTModel) if is_torch_available() else ()
|
||||
test_pruning = False
|
||||
test_headmasking = False
|
||||
test_torchscript = False
|
||||
|
||||
def setUp(self):
|
||||
self.model_tester = MCTCTModelTester(self)
|
||||
self.config_tester = ConfigTester(self, config_class=MCTCTConfig, hidden_size=37)
|
||||
|
||||
def test_config(self):
|
||||
self.config_tester.run_common_tests()
|
||||
|
||||
def test_model(self):
|
||||
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
||||
self.model_tester.create_and_check_model(*config_and_inputs)
|
||||
|
||||
def test_batched_inference(self):
|
||||
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
||||
self.model_tester.create_and_check_batch_inference(*config_and_inputs)
|
||||
|
||||
def test_ctc_loss_inference(self):
|
||||
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
||||
self.model_tester.check_ctc_loss(*config_and_inputs)
|
||||
|
||||
def test_ctc_train(self):
|
||||
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
||||
self.model_tester.check_ctc_training(*config_and_inputs)
|
||||
|
||||
def test_labels_out_of_vocab(self):
|
||||
config_and_inputs = self.model_tester.prepare_config_and_inputs()
|
||||
self.model_tester.check_labels_out_of_vocab(*config_and_inputs)
|
||||
|
||||
# MCTCT has no inputs_embeds
|
||||
def test_inputs_embeds(self):
|
||||
pass
|
||||
|
||||
# `input_ids` is renamed to `input_features`
|
||||
def test_forward_signature(self):
|
||||
pass
|
||||
|
||||
# MCTCT cannot resize token embeddings
|
||||
# since it has no tokens embeddings
|
||||
def test_resize_tokens_embeddings(self):
|
||||
pass
|
||||
|
||||
# MCTCT has no inputs_embeds
|
||||
# and thus the `get_input_embeddings` fn
|
||||
# is not implemented
|
||||
def test_model_common_attributes(self):
|
||||
pass
|
||||
|
||||
def test_retain_grad_hidden_states_attentions(self):
|
||||
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
|
||||
config.output_hidden_states = True
|
||||
config.output_attentions = True
|
||||
|
||||
# no need to test all models as different heads yield the same functionality
|
||||
model_class = self.all_model_classes[0]
|
||||
model = model_class(config)
|
||||
model.to(torch_device)
|
||||
|
||||
# set layer drop to 0
|
||||
model.config.layerdrop = 0.0
|
||||
|
||||
input_features = inputs_dict["input_features"]
|
||||
|
||||
input_lengths = torch.tensor(
|
||||
[input_features.shape[1] for _ in range(input_features.shape[0])], dtype=torch.long, device=torch_device
|
||||
)
|
||||
output_lengths = model._get_feat_extract_output_lengths(input_lengths)
|
||||
|
||||
labels = ids_tensor((input_features.shape[0], output_lengths[0] - 2), self.model_tester.vocab_size)
|
||||
inputs_dict["attention_mask"] = torch.ones_like(inputs_dict["attention_mask"])
|
||||
inputs_dict["labels"] = labels
|
||||
|
||||
outputs = model(**inputs_dict)
|
||||
|
||||
output = outputs[0]
|
||||
|
||||
# Encoder-/Decoder-only models
|
||||
hidden_states = outputs.hidden_states[0]
|
||||
attentions = outputs.attentions[0]
|
||||
|
||||
hidden_states.retain_grad()
|
||||
attentions.retain_grad()
|
||||
|
||||
output.flatten()[0].backward(retain_graph=True)
|
||||
|
||||
self.assertIsNotNone(hidden_states.grad)
|
||||
self.assertIsNotNone(attentions.grad)
|
||||
|
||||
def test_initialization(self):
|
||||
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common()
|
||||
|
||||
configs_no_init = _config_zero_init(config)
|
||||
for model_class in self.all_model_classes:
|
||||
model = model_class(config=configs_no_init)
|
||||
for name, param in model.named_parameters():
|
||||
uniform_init_parms = [
|
||||
"conv.weight",
|
||||
"masked_spec_embed",
|
||||
"codevectors",
|
||||
"quantizer.weight_proj.weight",
|
||||
"project_hid.weight",
|
||||
"project_hid.bias",
|
||||
"project_q.weight",
|
||||
"project_q.bias",
|
||||
"feature_projection.projection.weight",
|
||||
"feature_projection.projection.bias",
|
||||
"objective.weight",
|
||||
]
|
||||
if param.requires_grad:
|
||||
if any([x in name for x in uniform_init_parms]):
|
||||
self.assertTrue(
|
||||
-1.0 <= ((param.data.mean() * 1e9).round() / 1e9).item() <= 1.0,
|
||||
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
|
||||
)
|
||||
else:
|
||||
self.assertIn(
|
||||
((param.data.mean() * 1e9).round() / 1e9).item(),
|
||||
[0.0, 1.0],
|
||||
msg=f"Parameter {name} of model {model_class} seems not properly initialized",
|
||||
)
|
||||
|
||||
# overwrite from test_modeling_common
|
||||
def _mock_init_weights(self, module):
|
||||
if hasattr(module, "weight") and module.weight is not None:
|
||||
module.weight.data.fill_(3)
|
||||
if hasattr(module, "weight_g") and module.weight_g is not None:
|
||||
module.weight_g.data.fill_(3)
|
||||
if hasattr(module, "weight_v") and module.weight_v is not None:
|
||||
module.weight_v.data.fill_(3)
|
||||
if hasattr(module, "bias") and module.bias is not None:
|
||||
module.bias.data.fill_(3)
|
||||
if hasattr(module, "codevectors") and module.codevectors is not None:
|
||||
module.codevectors.data.fill_(3)
|
||||
if hasattr(module, "masked_spec_embed") and module.masked_spec_embed is not None:
|
||||
module.masked_spec_embed.data.fill_(3)
|
||||
|
||||
@unittest.skip(reason="Feed forward chunking is not implemented")
|
||||
def test_feed_forward_chunking(self):
|
||||
pass
|
||||
|
||||
@slow
|
||||
def test_model_from_pretrained(self):
|
||||
model = MCTCTModel.from_pretrained("speechbrain/m-ctc-t-large")
|
||||
self.assertIsNotNone(model)
|
||||
|
||||
|
||||
@require_torch
|
||||
@require_soundfile
|
||||
@slow
|
||||
class MCTCTModelIntegrationTest(unittest.TestCase):
|
||||
def _load_datasamples(self, num_samples):
|
||||
ds = load_dataset("hf-internal-testing/librispeech_asr_dummy", "clean", split="validation")
|
||||
# automatic decoding with librispeech
|
||||
speech_samples = ds.sort("id").filter(
|
||||
lambda x: x["id"] in [f"1272-141231-000{i}" for i in range(num_samples)]
|
||||
)[:num_samples]["audio"]
|
||||
|
||||
return [x["array"] for x in speech_samples]
|
||||
|
||||
def test_inference_ctc_normal(self):
|
||||
model = MCTCTForCTC.from_pretrained("speechbrain/m-ctc-t-large")
|
||||
model.to(torch_device)
|
||||
processor = MCTCTProcessor.from_pretrained("speechbrain/m-ctc-t-large", do_lower_case=True)
|
||||
input_speech = self._load_datasamples(1)
|
||||
|
||||
input_features = processor(input_speech, return_tensors="pt").input_features.to(torch_device)
|
||||
|
||||
with torch.no_grad():
|
||||
logits = model(input_features).logits
|
||||
|
||||
predicted_ids = torch.argmax(logits, dim=-1)
|
||||
predicted_trans = processor.batch_decode(predicted_ids)
|
||||
|
||||
EXPECTED_TRANSCRIPTIONS = ["a man said to the universe, sir, i exist."]
|
||||
self.assertListEqual(predicted_trans, EXPECTED_TRANSCRIPTIONS)
|
||||
|
||||
def test_inference_ctc_normal_batched(self):
|
||||
model = MCTCTForCTC.from_pretrained("speechbrain/m-ctc-t-large")
|
||||
model.to(torch_device)
|
||||
processor = MCTCTProcessor.from_pretrained("speechbrain/m-ctc-t-large", do_lower_case=True)
|
||||
|
||||
input_speech = self._load_datasamples(2)
|
||||
|
||||
inputs = processor(input_speech, return_tensors="pt", padding=True)
|
||||
|
||||
input_features = inputs.input_features.to(torch_device)
|
||||
attention_mask = inputs.attention_mask.to(torch_device)
|
||||
|
||||
with torch.no_grad():
|
||||
logits = model(input_features, attention_mask=attention_mask).logits
|
||||
|
||||
predicted_ids = torch.argmax(logits, dim=-1)
|
||||
predicted_trans = processor.batch_decode(predicted_ids)
|
||||
|
||||
EXPECTED_TRANSCRIPTIONS = [
|
||||
"a man said to the universe, sir, i exist.",
|
||||
'"sweat-covered brion\'s body, trickling into the tight-lowing clossa was the only germent huor."',
|
||||
]
|
||||
self.assertListEqual(predicted_trans, EXPECTED_TRANSCRIPTIONS)
|
||||
|
||||
def test_inference_ctc_robust_batched(self):
|
||||
model = MCTCTForCTC.from_pretrained("speechbrain/m-ctc-t-large").to(torch_device)
|
||||
processor = MCTCTProcessor.from_pretrained("speechbrain/m-ctc-t-large", do_lower_case=True)
|
||||
|
||||
input_speech = self._load_datasamples(4)
|
||||
|
||||
inputs = processor(input_speech, return_tensors="pt", padding=True, return_attention_mask=True)
|
||||
|
||||
input_features = inputs.input_features.to(torch_device)
|
||||
attention_mask = inputs.attention_mask.to(torch_device)
|
||||
|
||||
with torch.no_grad():
|
||||
logits = model(input_features, attention_mask=attention_mask).logits
|
||||
|
||||
predicted_ids = torch.argmax(logits, dim=-1)
|
||||
predicted_trans = processor.batch_decode(predicted_ids)
|
||||
|
||||
EXPECTED_TRANSCRIPTIONS = [
|
||||
"a man said to the universe, sir, i exist.",
|
||||
'"sweat-covered brion\'s body, trickling into the tight-lowing clossa was the only germent huor." "',
|
||||
"\"the cadona's chest still-dripping bloodthe acofis overstrained eyes, even the soring arena around him"
|
||||
" with thousands of spectators retrivialities not worth-thinking about.",
|
||||
"his instant panic was followed by a small sharp blow high on his chestr.",
|
||||
]
|
||||
self.assertListEqual(predicted_trans, EXPECTED_TRANSCRIPTIONS)
|
||||
147
tests/models/mctct/test_processor_mctct.py
Normal file
147
tests/models/mctct/test_processor_mctct.py
Normal file
@@ -0,0 +1,147 @@
|
||||
# Copyright 2022 The HuggingFace Team. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from transformers import MCTCTProcessor, is_speech_available, is_torch_available
|
||||
from transformers.file_utils import FEATURE_EXTRACTOR_NAME
|
||||
from transformers.models.wav2vec2.tokenization_wav2vec2 import VOCAB_FILES_NAMES, Wav2Vec2CTCTokenizer
|
||||
from transformers.testing_utils import require_torch, require_torchaudio
|
||||
|
||||
|
||||
if is_speech_available() and is_torch_available():
|
||||
from transformers import MCTCTFeatureExtractor
|
||||
|
||||
from .test_feature_extraction_mctct import floats_list
|
||||
|
||||
|
||||
@require_torch
|
||||
@require_torchaudio
|
||||
class MCTCTProcessorTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
vocab = "<pad> <s> </s> <unk> | E T A O N I H S R D L U M W C F G Y P B V K ' X J Q Z".split(" ")
|
||||
vocab_tokens = dict(zip(vocab, range(len(vocab))))
|
||||
|
||||
self.add_kwargs_tokens_map = {
|
||||
"pad_token": "<pad>",
|
||||
"unk_token": "<unk>",
|
||||
"bos_token": "<s>",
|
||||
"eos_token": "</s>",
|
||||
}
|
||||
feature_extractor_map = {
|
||||
"feature_size": 1,
|
||||
"padding_value": 0.0,
|
||||
"sampling_rate": 16000,
|
||||
"return_attention_mask": False,
|
||||
"do_normalize": True,
|
||||
}
|
||||
|
||||
self.tmpdirname = tempfile.mkdtemp()
|
||||
self.vocab_file = os.path.join(self.tmpdirname, VOCAB_FILES_NAMES["vocab_file"])
|
||||
self.feature_extraction_file = os.path.join(self.tmpdirname, FEATURE_EXTRACTOR_NAME)
|
||||
with open(self.vocab_file, "w", encoding="utf-8") as fp:
|
||||
fp.write(json.dumps(vocab_tokens) + "\n")
|
||||
|
||||
with open(self.feature_extraction_file, "w", encoding="utf-8") as fp:
|
||||
fp.write(json.dumps(feature_extractor_map) + "\n")
|
||||
|
||||
def get_tokenizer(self, **kwargs_init):
|
||||
kwargs = self.add_kwargs_tokens_map.copy()
|
||||
kwargs.update(kwargs_init)
|
||||
return Wav2Vec2CTCTokenizer.from_pretrained(self.tmpdirname, **kwargs)
|
||||
|
||||
def get_feature_extractor(self, **kwargs):
|
||||
return MCTCTFeatureExtractor.from_pretrained(self.tmpdirname, **kwargs)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmpdirname)
|
||||
|
||||
def test_save_load_pretrained_default(self):
|
||||
tokenizer = self.get_tokenizer()
|
||||
feature_extractor = self.get_feature_extractor()
|
||||
|
||||
processor = MCTCTProcessor(tokenizer=tokenizer, feature_extractor=feature_extractor)
|
||||
|
||||
processor.save_pretrained(self.tmpdirname)
|
||||
processor = MCTCTProcessor.from_pretrained(self.tmpdirname)
|
||||
|
||||
self.assertEqual(processor.tokenizer.get_vocab(), tokenizer.get_vocab())
|
||||
self.assertIsInstance(processor.tokenizer, Wav2Vec2CTCTokenizer)
|
||||
|
||||
self.assertEqual(processor.feature_extractor.to_json_string(), feature_extractor.to_json_string())
|
||||
self.assertIsInstance(processor.feature_extractor, MCTCTFeatureExtractor)
|
||||
|
||||
def test_save_load_pretrained_additional_features(self):
|
||||
processor = MCTCTProcessor(tokenizer=self.get_tokenizer(), feature_extractor=self.get_feature_extractor())
|
||||
processor.save_pretrained(self.tmpdirname)
|
||||
|
||||
tokenizer_add_kwargs = self.get_tokenizer(bos_token="(BOS)", eos_token="(EOS)")
|
||||
feature_extractor_add_kwargs = self.get_feature_extractor(do_normalize=False, padding_value=1.0)
|
||||
|
||||
processor = MCTCTProcessor.from_pretrained(
|
||||
self.tmpdirname, bos_token="(BOS)", eos_token="(EOS)", do_normalize=False, padding_value=1.0
|
||||
)
|
||||
|
||||
self.assertEqual(processor.tokenizer.get_vocab(), tokenizer_add_kwargs.get_vocab())
|
||||
self.assertIsInstance(processor.tokenizer, Wav2Vec2CTCTokenizer)
|
||||
|
||||
self.assertEqual(processor.feature_extractor.to_json_string(), feature_extractor_add_kwargs.to_json_string())
|
||||
self.assertIsInstance(processor.feature_extractor, MCTCTFeatureExtractor)
|
||||
|
||||
def test_feature_extractor(self):
|
||||
feature_extractor = self.get_feature_extractor()
|
||||
tokenizer = self.get_tokenizer()
|
||||
|
||||
processor = MCTCTProcessor(tokenizer=tokenizer, feature_extractor=feature_extractor)
|
||||
|
||||
raw_speech = floats_list((3, 1000))
|
||||
|
||||
input_feat_extract = feature_extractor(raw_speech, return_tensors="np")
|
||||
input_processor = processor(raw_speech, return_tensors="np")
|
||||
|
||||
for key in input_feat_extract.keys():
|
||||
self.assertAlmostEqual(input_feat_extract[key].sum(), input_processor[key].sum(), delta=1e-2)
|
||||
|
||||
def test_tokenizer(self):
|
||||
feature_extractor = self.get_feature_extractor()
|
||||
tokenizer = self.get_tokenizer()
|
||||
|
||||
processor = MCTCTProcessor(tokenizer=tokenizer, feature_extractor=feature_extractor)
|
||||
|
||||
input_str = "This is a test string"
|
||||
|
||||
with processor.as_target_processor():
|
||||
encoded_processor = processor(input_str)
|
||||
|
||||
encoded_tok = tokenizer(input_str)
|
||||
|
||||
for key in encoded_tok.keys():
|
||||
self.assertListEqual(encoded_tok[key], encoded_processor[key])
|
||||
|
||||
def test_tokenizer_decode(self):
|
||||
feature_extractor = self.get_feature_extractor()
|
||||
tokenizer = self.get_tokenizer()
|
||||
|
||||
processor = MCTCTProcessor(tokenizer=tokenizer, feature_extractor=feature_extractor)
|
||||
|
||||
predicted_ids = [[1, 4, 5, 8, 1, 0, 8], [3, 4, 3, 1, 1, 8, 9]]
|
||||
|
||||
decoded_processor = processor.batch_decode(predicted_ids)
|
||||
decoded_tok = tokenizer.batch_decode(predicted_ids)
|
||||
|
||||
self.assertListEqual(decoded_tok, decoded_processor)
|
||||
Reference in New Issue
Block a user