fix run_seq2seq.py; porting trainer tests to it (#10162)

* fix run_seq2seq.py; porting DeepSpeed tests to it

* unrefactor

* defensive programming

* defensive programming 2

* port the rest of the trainer tests

* style

* a cleaner scripts dir finder

* cleanup
This commit is contained in:
Stas Bekman
2021-02-15 09:12:17 -08:00
committed by GitHub
parent 31b0560ab4
commit 0b1f552a24
6 changed files with 145 additions and 59 deletions

View File

@@ -18,6 +18,7 @@ Fine-tuning the library models for sequence to sequence.
"""
# You can also adapt this script on your own sequence to sequence task. Pointers for this are left as comments.
import json
import logging
import os
import re
@@ -38,6 +39,7 @@ from transformers import (
DataCollatorForSeq2Seq,
HfArgumentParser,
MBartTokenizer,
MBartTokenizerFast,
Seq2SeqTrainer,
Seq2SeqTrainingArguments,
default_data_collator,
@@ -53,6 +55,11 @@ with FileLock(".lock") as lock:
logger = logging.getLogger(__name__)
def save_json(content, path, indent=4, **json_dump_kwargs):
with open(path, "w") as f:
json.dump(content, f, indent=indent, sort_keys=True, **json_dump_kwargs)
@dataclass
class ModelArguments:
"""
@@ -351,8 +358,15 @@ def main():
)
# Set decoder_start_token_id
if model.config.decoder_start_token_id is None and isinstance(tokenizer, MBartTokenizer):
model.config.decoder_start_token_id = tokenizer.lang_code_to_id[data_args.target_lang]
if model.config.decoder_start_token_id is None and isinstance(tokenizer, (MBartTokenizer, MBartTokenizerFast)):
assert (
data_args.target_lang is not None and data_args.source_lang is not None
), "mBart requires --target_lang and --source_lang"
if isinstance(tokenizer, MBartTokenizer):
model.config.decoder_start_token_id = tokenizer.lang_code_to_id[data_args.target_lang]
else:
model.config.decoder_start_token_id = tokenizer.convert_tokens_to_ids(data_args.target_lang)
if model.config.decoder_start_token_id is None:
raise ValueError("Make sure that `config.decoder_start_token_id` is correctly defined")
@@ -448,6 +462,8 @@ def main():
if training_args.do_train:
train_dataset = datasets["train"]
if "train" not in datasets:
raise ValueError("--do_train requires a train dataset")
if data_args.max_train_samples is not None:
train_dataset = train_dataset.select(range(data_args.max_train_samples))
train_dataset = train_dataset.map(
@@ -460,6 +476,8 @@ def main():
if training_args.do_eval:
max_target_length = data_args.val_max_target_length
if "validation" not in datasets:
raise ValueError("--do_eval requires a validation dataset")
eval_dataset = datasets["validation"]
if data_args.max_val_samples is not None:
eval_dataset = eval_dataset.select(range(data_args.max_val_samples))
@@ -473,6 +491,8 @@ def main():
if training_args.do_predict:
max_target_length = data_args.val_max_target_length
if "test" not in datasets:
raise ValueError("--do_predict requires a test dataset")
test_dataset = datasets["test"]
if data_args.max_test_samples is not None:
test_dataset = test_dataset.select(range(data_args.max_test_samples))
@@ -550,6 +570,7 @@ def main():
compute_metrics=compute_metrics if training_args.predict_with_generate else None,
)
all_metrics = {}
# Training
if training_args.do_train:
if last_checkpoint is not None:
@@ -561,13 +582,17 @@ def main():
train_result = trainer.train(resume_from_checkpoint=checkpoint)
trainer.save_model() # Saves the tokenizer too for easy upload
output_train_file = os.path.join(training_args.output_dir, "train_results.txt")
metrics = train_result.metrics
max_train_samples = (
data_args.max_train_samples if data_args.max_train_samples is not None else len(train_dataset)
)
metrics["train_samples"] = min(max_train_samples, len(train_dataset))
if trainer.is_world_process_zero():
with open(output_train_file, "w") as writer:
logger.info("***** Train results *****")
for key, value in sorted(train_result.metrics.items()):
logger.info(f" {key} = {value}")
writer.write(f"{key} = {value}\n")
logger.info("***** train metrics *****")
for key in sorted(metrics.keys()):
logger.info(f" {key} = {metrics[key]}")
save_json(metrics, os.path.join(training_args.output_dir, "train_results.json"))
all_metrics.update(metrics)
# Need to save the state, since Trainer.save_model saves only the tokenizer with the model
trainer.state.save_to_json(os.path.join(training_args.output_dir, "trainer_state.json"))
@@ -577,16 +602,19 @@ def main():
if training_args.do_eval:
logger.info("*** Evaluate ***")
results = trainer.evaluate(max_length=data_args.val_max_target_length, num_beams=data_args.num_beams)
results = {k: round(v, 4) for k, v in results.items()}
metrics = trainer.evaluate(
max_length=data_args.val_max_target_length, num_beams=data_args.num_beams, metric_key_prefix="val"
)
metrics = {k: round(v, 4) for k, v in metrics.items()}
max_val_samples = data_args.max_val_samples if data_args.max_val_samples is not None else len(eval_dataset)
metrics["val_samples"] = min(max_val_samples, len(eval_dataset))
output_eval_file = os.path.join(training_args.output_dir, "eval_results_seq2seq.txt")
if trainer.is_world_process_zero():
with open(output_eval_file, "w") as writer:
logger.info("***** Eval results *****")
for key, value in sorted(results.items()):
logger.info(f" {key} = {value}")
writer.write(f"{key} = {value}\n")
logger.info("***** val metrics *****")
for key in sorted(metrics.keys()):
logger.info(f" {key} = {metrics[key]}")
save_json(metrics, os.path.join(training_args.output_dir, "val_results.json"))
all_metrics.update(metrics)
if training_args.do_predict:
logger.info("*** Test ***")
@@ -597,16 +625,17 @@ def main():
max_length=data_args.val_max_target_length,
num_beams=data_args.num_beams,
)
test_metrics = test_results.metrics
test_metrics["test_loss"] = round(test_metrics["test_loss"], 4)
metrics = test_results.metrics
max_test_samples = data_args.max_test_samples if data_args.max_test_samples is not None else len(test_dataset)
metrics["test_samples"] = min(max_test_samples, len(test_dataset))
metrics = {k: round(v, 4) for k, v in metrics.items()}
output_test_result_file = os.path.join(training_args.output_dir, "test_results_seq2seq.txt")
if trainer.is_world_process_zero():
with open(output_test_result_file, "w") as writer:
logger.info("***** Test results *****")
for key, value in sorted(test_metrics.items()):
logger.info(f" {key} = {value}")
writer.write(f"{key} = {value}\n")
logger.info("***** test metrics *****")
for key in sorted(metrics.keys()):
logger.info(f" {key} = {metrics[key]}")
save_json(metrics, os.path.join(training_args.output_dir, "test_results.json"))
all_metrics.update(metrics)
if training_args.predict_with_generate:
test_preds = tokenizer.batch_decode(
@@ -617,6 +646,9 @@ def main():
with open(output_test_preds_file, "w") as writer:
writer.write("\n".join(test_preds))
if trainer.is_world_process_zero():
save_json(all_metrics, os.path.join(training_args.output_dir, "all_results.json"))
return results

View File

@@ -1,181 +0,0 @@
# Copyright 2020 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import unittest
from unittest.mock import patch
from transformers.file_utils import is_apex_available
from transformers.integrations import is_fairscale_available
from transformers.testing_utils import (
TestCasePlus,
execute_subprocess_async,
get_gpu_count,
require_torch_multi_gpu,
require_torch_non_multi_gpu,
slow,
)
from transformers.trainer_callback import TrainerState
from transformers.trainer_utils import set_seed
from .finetune_trainer import main
set_seed(42)
MARIAN_MODEL = "sshleifer/student_marian_en_ro_6_1"
MBART_TINY = "sshleifer/tiny-mbart"
# a candidate for testing_utils
def require_fairscale(test_case):
"""
Decorator marking a test that requires fairscale
"""
if not is_fairscale_available():
return unittest.skip("test requires fairscale")(test_case)
else:
return test_case
# a candidate for testing_utils
def require_apex(test_case):
"""
Decorator marking a test that requires apex
"""
if not is_apex_available():
return unittest.skip("test requires apex")(test_case)
else:
return test_case
class TestFinetuneTrainer(TestCasePlus):
def finetune_trainer_quick(self, distributed=None, extra_args_str=None):
output_dir = self.run_trainer(1, "12", MBART_TINY, 1, distributed, extra_args_str)
logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history
eval_metrics = [log for log in logs if "eval_loss" in log.keys()]
first_step_stats = eval_metrics[0]
assert "eval_bleu" in first_step_stats
@require_torch_non_multi_gpu
def test_finetune_trainer_no_dist(self):
self.finetune_trainer_quick()
# the following 2 tests verify that the trainer can handle distributed and non-distributed with n_gpu > 1
@require_torch_multi_gpu
def test_finetune_trainer_dp(self):
self.finetune_trainer_quick(distributed=False)
@require_torch_multi_gpu
def test_finetune_trainer_ddp(self):
self.finetune_trainer_quick(distributed=True)
# it's crucial to test --sharded_ddp w/ and w/o --fp16
@require_torch_multi_gpu
@require_fairscale
def test_finetune_trainer_ddp_sharded_ddp(self):
self.finetune_trainer_quick(distributed=True, extra_args_str="--sharded_ddp")
@require_torch_multi_gpu
@require_fairscale
def test_finetune_trainer_ddp_sharded_ddp_fp16(self):
self.finetune_trainer_quick(distributed=True, extra_args_str="--sharded_ddp --fp16")
@require_apex
def test_finetune_trainer_apex(self):
self.finetune_trainer_quick(extra_args_str="--fp16 --fp16_backend=apex")
@slow
def test_finetune_trainer_slow(self):
# There is a missing call to __init__process_group somewhere
output_dir = self.run_trainer(
eval_steps=2, max_len="128", model_name=MARIAN_MODEL, num_train_epochs=10, distributed=False
)
# Check metrics
logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history
eval_metrics = [log for log in logs if "eval_loss" in log.keys()]
first_step_stats = eval_metrics[0]
last_step_stats = eval_metrics[-1]
assert first_step_stats["eval_bleu"] < last_step_stats["eval_bleu"] # model learned nothing
assert isinstance(last_step_stats["eval_bleu"], float)
# test if do_predict saves generations and metrics
contents = os.listdir(output_dir)
contents = {os.path.basename(p) for p in contents}
assert "test_generations.txt" in contents
assert "test_results.json" in contents
def run_trainer(
self,
eval_steps: int,
max_len: str,
model_name: str,
num_train_epochs: int,
distributed: bool = False,
extra_args_str: str = None,
):
data_dir = self.examples_dir / "seq2seq/test_data/wmt_en_ro"
output_dir = self.get_auto_remove_tmp_dir()
args = f"""
--model_name_or_path {model_name}
--data_dir {data_dir}
--output_dir {output_dir}
--overwrite_output_dir
--n_train 8
--n_val 8
--max_source_length {max_len}
--max_target_length {max_len}
--val_max_target_length {max_len}
--do_train
--do_eval
--do_predict
--num_train_epochs {str(num_train_epochs)}
--per_device_train_batch_size 4
--per_device_eval_batch_size 4
--learning_rate 3e-3
--warmup_steps 8
--evaluation_strategy steps
--predict_with_generate
--logging_steps 0
--save_steps {str(eval_steps)}
--eval_steps {str(eval_steps)}
--group_by_length
--label_smoothing_factor 0.1
--adafactor
--task translation
--tgt_lang ro_RO
--src_lang en_XX
""".split()
# --eval_beams 2
if extra_args_str is not None:
args.extend(extra_args_str.split())
if distributed:
n_gpu = get_gpu_count()
distributed_args = f"""
-m torch.distributed.launch
--nproc_per_node={n_gpu}
{self.test_file_dir}/finetune_trainer.py
""".split()
cmd = [sys.executable] + distributed_args + args
execute_subprocess_async(cmd, env=self.get_env())
else:
testargs = ["finetune_trainer.py"] + args
with patch.object(sys, "argv", testargs):
main()
return output_dir