[DeepSpeed] restore memory for evaluation (#10114)
* free up memory at the end of train * rework tests * consistent formatting * correction
This commit is contained in:
@@ -24,10 +24,7 @@
|
|||||||
"type": "AdamW",
|
"type": "AdamW",
|
||||||
"params": {
|
"params": {
|
||||||
"lr": 3e-5,
|
"lr": 3e-5,
|
||||||
"betas": [
|
"betas": [0.8, 0.999],
|
||||||
0.8,
|
|
||||||
0.999
|
|
||||||
],
|
|
||||||
"eps": 1e-8,
|
"eps": 1e-8,
|
||||||
"weight_decay": 3e-7
|
"weight_decay": 3e-7
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,8 +17,14 @@ import os
|
|||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from transformers.integrations import is_deepspeed_available
|
from transformers.integrations import is_deepspeed_available
|
||||||
from transformers.testing_utils import TestCasePlus, execute_subprocess_async, require_torch_multi_gpu
|
from transformers.testing_utils import (
|
||||||
from transformers.trainer_callback import TrainerState
|
TestCasePlus,
|
||||||
|
execute_subprocess_async,
|
||||||
|
get_gpu_count,
|
||||||
|
require_torch_gpu,
|
||||||
|
require_torch_multi_gpu,
|
||||||
|
slow,
|
||||||
|
)
|
||||||
from transformers.trainer_utils import set_seed
|
from transformers.trainer_utils import set_seed
|
||||||
|
|
||||||
|
|
||||||
@@ -42,37 +48,45 @@ def require_deepspeed(test_case):
|
|||||||
return test_case
|
return test_case
|
||||||
|
|
||||||
|
|
||||||
|
@slow
|
||||||
@require_deepspeed
|
@require_deepspeed
|
||||||
|
@require_torch_gpu
|
||||||
class TestDeepSpeed(TestCasePlus):
|
class TestDeepSpeed(TestCasePlus):
|
||||||
|
|
||||||
# XXX: need to do better validation beyond just that the run was successful
|
|
||||||
def run_quick(self, distributed=None, extra_args_str=None, remove_args_str=None):
|
|
||||||
output_dir = self.run_trainer(1, "12", MBART_TINY, 1, distributed, extra_args_str, remove_args_str)
|
|
||||||
logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history
|
|
||||||
eval_metrics = [log for log in logs if "eval_loss" in log.keys()]
|
|
||||||
first_step_stats = eval_metrics[0]
|
|
||||||
assert "eval_bleu" in first_step_stats
|
|
||||||
|
|
||||||
def run_quick_no_train(self, distributed=None, extra_args_str=None):
|
|
||||||
remove_args_str = "--do_train"
|
|
||||||
output_dir = self.run_trainer(1, "12", MBART_TINY, 1, distributed, extra_args_str, remove_args_str)
|
|
||||||
val_metrics = load_json(os.path.join(output_dir, "val_results.json"))
|
|
||||||
assert "val_bleu" in val_metrics
|
|
||||||
test_metrics = load_json(os.path.join(output_dir, "test_results.json"))
|
|
||||||
assert "test_bleu" in test_metrics
|
|
||||||
|
|
||||||
@require_torch_multi_gpu
|
@require_torch_multi_gpu
|
||||||
def test_basic(self):
|
def test_basic_distributed(self):
|
||||||
self.run_quick()
|
self.run_quick(distributed=True)
|
||||||
|
|
||||||
@require_torch_multi_gpu
|
@require_torch_multi_gpu
|
||||||
def test_grad_acum(self):
|
def test_grad_acum(self):
|
||||||
self.run_quick(extra_args_str="--gradient_accumulation_steps 2")
|
self.run_quick(distributed=True, extra_args_str="--gradient_accumulation_steps 2")
|
||||||
|
|
||||||
@require_torch_multi_gpu
|
def test_do_eval_no_train(self):
|
||||||
def test_no_train(self):
|
|
||||||
# we should not fail if train is skipped
|
# we should not fail if train is skipped
|
||||||
self.run_quick_no_train()
|
output_dir = self.run_trainer(
|
||||||
|
eval_steps=1,
|
||||||
|
max_len=12,
|
||||||
|
model_name=MBART_TINY,
|
||||||
|
num_train_epochs=1,
|
||||||
|
distributed=False,
|
||||||
|
extra_args_str="--do_eval",
|
||||||
|
remove_args_str="--do_train",
|
||||||
|
)
|
||||||
|
val_metrics = load_json(os.path.join(output_dir, "val_results.json"))
|
||||||
|
assert "val_bleu" in val_metrics
|
||||||
|
|
||||||
|
# XXX: need to do better validation beyond just that the run was successful
|
||||||
|
def run_quick(self, distributed=True, extra_args_str=None, remove_args_str=None):
|
||||||
|
output_dir = self.run_trainer(
|
||||||
|
eval_steps=1,
|
||||||
|
max_len=12,
|
||||||
|
model_name=MBART_TINY,
|
||||||
|
num_train_epochs=1,
|
||||||
|
distributed=distributed,
|
||||||
|
extra_args_str=extra_args_str,
|
||||||
|
remove_args_str=remove_args_str,
|
||||||
|
)
|
||||||
|
train_metrics = load_json(os.path.join(output_dir, "train_results.json"))
|
||||||
|
assert "train_runtime" in train_metrics
|
||||||
|
|
||||||
def run_trainer(
|
def run_trainer(
|
||||||
self,
|
self,
|
||||||
@@ -80,7 +94,7 @@ class TestDeepSpeed(TestCasePlus):
|
|||||||
max_len: str,
|
max_len: str,
|
||||||
model_name: str,
|
model_name: str,
|
||||||
num_train_epochs: int,
|
num_train_epochs: int,
|
||||||
distributed: bool = False,
|
distributed: bool = True,
|
||||||
extra_args_str: str = None,
|
extra_args_str: str = None,
|
||||||
remove_args_str: str = None,
|
remove_args_str: str = None,
|
||||||
):
|
):
|
||||||
@@ -97,18 +111,13 @@ class TestDeepSpeed(TestCasePlus):
|
|||||||
--max_target_length {max_len}
|
--max_target_length {max_len}
|
||||||
--val_max_target_length {max_len}
|
--val_max_target_length {max_len}
|
||||||
--do_train
|
--do_train
|
||||||
--do_eval
|
|
||||||
--do_predict
|
|
||||||
--num_train_epochs {str(num_train_epochs)}
|
--num_train_epochs {str(num_train_epochs)}
|
||||||
--per_device_train_batch_size 4
|
--per_device_train_batch_size 4
|
||||||
--per_device_eval_batch_size 4
|
|
||||||
--learning_rate 3e-3
|
--learning_rate 3e-3
|
||||||
--warmup_steps 8
|
--warmup_steps 8
|
||||||
--evaluation_strategy steps
|
|
||||||
--predict_with_generate
|
--predict_with_generate
|
||||||
--logging_steps 0
|
--logging_steps 0
|
||||||
--save_steps {str(eval_steps)}
|
--save_steps {str(eval_steps)}
|
||||||
--eval_steps {str(eval_steps)}
|
|
||||||
--group_by_length
|
--group_by_length
|
||||||
--label_smoothing_factor 0.1
|
--label_smoothing_factor 0.1
|
||||||
--adafactor
|
--adafactor
|
||||||
@@ -116,7 +125,6 @@ class TestDeepSpeed(TestCasePlus):
|
|||||||
--tgt_lang ro_RO
|
--tgt_lang ro_RO
|
||||||
--src_lang en_XX
|
--src_lang en_XX
|
||||||
""".split()
|
""".split()
|
||||||
# --eval_beams 2
|
|
||||||
|
|
||||||
if extra_args_str is not None:
|
if extra_args_str is not None:
|
||||||
args.extend(extra_args_str.split())
|
args.extend(extra_args_str.split())
|
||||||
@@ -126,12 +134,13 @@ class TestDeepSpeed(TestCasePlus):
|
|||||||
args = [x for x in args if x not in remove_args]
|
args = [x for x in args if x not in remove_args]
|
||||||
|
|
||||||
ds_args = f"--deepspeed {self.test_file_dir_str}/ds_config.json".split()
|
ds_args = f"--deepspeed {self.test_file_dir_str}/ds_config.json".split()
|
||||||
distributed_args = f"""
|
script = [f"{self.examples_dir_str}/seq2seq/finetune_trainer.py"]
|
||||||
{self.test_file_dir}/../../seq2seq/finetune_trainer.py
|
num_gpus = get_gpu_count() if distributed else 1
|
||||||
""".split()
|
launcher = f"deepspeed --num_gpus {num_gpus}".split()
|
||||||
cmd = ["deepspeed"] + distributed_args + args + ds_args
|
|
||||||
|
cmd = launcher + script + args + ds_args
|
||||||
# keep for quick debug
|
# keep for quick debug
|
||||||
# print(" ".join(cmd)); die
|
# print(" ".join([f"PYTHONPATH={self.src_dir_str}"] +cmd)); die
|
||||||
execute_subprocess_async(cmd, env=self.get_env())
|
execute_subprocess_async(cmd, env=self.get_env())
|
||||||
|
|
||||||
return output_dir
|
return output_dir
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ The Trainer class, to easily train a 🤗 Transformers from scratch or finetune
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
|
import gc
|
||||||
import inspect
|
import inspect
|
||||||
import math
|
import math
|
||||||
import os
|
import os
|
||||||
@@ -266,8 +267,9 @@ class Trainer:
|
|||||||
|
|
||||||
# postpone switching model to cuda when:
|
# postpone switching model to cuda when:
|
||||||
# 1. MP - since we are trying to fit a much bigger than 1 gpu model
|
# 1. MP - since we are trying to fit a much bigger than 1 gpu model
|
||||||
# 2. fp16-enabled DeepSpeed loads the model in half the size and it doesn't need .to() anyway
|
# 2. fp16-enabled DeepSpeed loads the model in half the size and it doesn't need .to() anyway,
|
||||||
if not (self.is_model_parallel or args.deepspeed):
|
# and we only use deepspeed for training at the moment
|
||||||
|
if not self.is_model_parallel and not (args.deepspeed and args.do_train):
|
||||||
model = model.to(args.device)
|
model = model.to(args.device)
|
||||||
|
|
||||||
# Force n_gpu to 1 to avoid DataParallel as MP will manage the GPUs
|
# Force n_gpu to 1 to avoid DataParallel as MP will manage the GPUs
|
||||||
@@ -817,7 +819,7 @@ class Trainer:
|
|||||||
|
|
||||||
# important: at this point:
|
# important: at this point:
|
||||||
# self.model is the Transformers Model
|
# self.model is the Transformers Model
|
||||||
# self.model_wrapped is DDP(Transformers Model), DDP(Deepspeed(Transformers Model)), etc.
|
# self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), etc.
|
||||||
|
|
||||||
# Train!
|
# Train!
|
||||||
if is_torch_tpu_available():
|
if is_torch_tpu_available():
|
||||||
@@ -1036,6 +1038,14 @@ class Trainer:
|
|||||||
# add remaining tr_loss
|
# add remaining tr_loss
|
||||||
self._total_loss_scalar += tr_loss.item()
|
self._total_loss_scalar += tr_loss.item()
|
||||||
|
|
||||||
|
if self.deepspeed:
|
||||||
|
# free up any memory that might be useful for eval
|
||||||
|
self.deepspeed = None
|
||||||
|
self.optimizer = None
|
||||||
|
self.lr_scheduler = None
|
||||||
|
self.model_wrapped = self.model
|
||||||
|
gc.collect() # force memory release
|
||||||
|
|
||||||
return TrainOutput(self.state.global_step, self._total_loss_scalar / self.state.global_step, metrics)
|
return TrainOutput(self.state.global_step, self._total_loss_scalar / self.state.global_step, metrics)
|
||||||
|
|
||||||
def _maybe_log_save_evaluate(self, tr_loss, model, trial, epoch):
|
def _maybe_log_save_evaluate(self, tr_loss, model, trial, epoch):
|
||||||
@@ -1593,13 +1603,9 @@ class Trainer:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if self.args.deepspeed and not self.args.do_train:
|
if self.args.deepspeed and not self.args.do_train:
|
||||||
# In the future we probably can run deepspeed for inference too, but this will require
|
# no harm, but flagging to the user that deepspeed config is ignored for eval
|
||||||
# some thinking about how to best run it - since while it works DeepSpeed wasn't
|
# flagging only for when --do_train wasn't passed as only then it's redundant
|
||||||
# designed for inference
|
logger.info("Detected the deepspeed argument but it will not be used for evaluation")
|
||||||
|
|
||||||
# since we have to postpone model.to() till training for DeepSpeed, if there was no
|
|
||||||
# training, we must put the model on the right device
|
|
||||||
self.model = self.model.to(self.args.device)
|
|
||||||
|
|
||||||
model = self.model
|
model = self.model
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user