[trainer] deepspeed integration (#9211)

* deepspeed integration

* style

* add test

* ds wants to do its own backward

* fp16 assert

* Update src/transformers/training_args.py

Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com>

* style

* for clarity extract what args are being passed to deepspeed

* introduce the concept of self.wrapped_model

* s/self.wrapped_model/self.model_wrapped/

* complete transition to self.wrapped_model / self.model

* fix

* doc

* give ds its own init

* add custom overrides, handle bs correctly

* fix test

* clean up model_init logic, fix small bug

* complete fix

* collapse --deepspeed_config into --deepspeed

* style

* start adding doc notes

* style

* implement hf2ds optimizer and scheduler configuration remapping

* oops

* call get_num_training_steps absolutely when needed

* workaround broken auto-formatter

* deepspeed_config arg is no longer needed - fixed in deepspeed master

* use hf's fp16 args in config

* clean

* start on the docs

* rebase cleanup

* finish up --fp16

* clarify the supported stages

* big refactor thanks to discovering deepspeed.init_distributed

* cleanup

* revert fp16 part

* add checkpoint-support

* more init ds into integrations

* extend docs

* cleanup

* unfix docs

* clean up old code

* imports

* move docs

* fix logic

* make it clear which file it's referring to

* document nodes/gpus

* style

* wrong format

* style

* deepspeed handles gradient clipping

* easier to read

* major doc rewrite

* Apply suggestions from code review

Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com>

* docs

* switch to AdamW optimizer

* style

* Apply suggestions from code review

Co-authored-by: Lysandre Debut <lysandre@huggingface.co>

* clarify doc

Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com>
Co-authored-by: Lysandre Debut <lysandre@huggingface.co>
This commit is contained in:
Stas Bekman
2021-01-12 19:05:18 -08:00
committed by GitHub
parent 5f6721032a
commit 2df34f4aba
7 changed files with 741 additions and 57 deletions

View File

@@ -15,13 +15,17 @@
Integrations with other Python libraries.
"""
import importlib.util
import io
import json
import math
import numbers
import os
import re
import tempfile
from pathlib import Path
from types import SimpleNamespace
from .trainer_utils import SchedulerType
from .utils import logging
@@ -43,7 +47,6 @@ if _has_comet:
except (ImportError, ValueError):
_has_comet = False
from .file_utils import ENV_VARS_TRUE_VALUES, is_torch_tpu_available # noqa: E402
from .trainer_callback import TrainerCallback # noqa: E402
from .trainer_utils import PREFIX_CHECKPOINT_DIR, BestRun, EvaluationStrategy # noqa: E402
@@ -94,6 +97,10 @@ def is_fairscale_available():
return importlib.util.find_spec("fairscale") is not None
def is_deepspeed_available():
return importlib.util.find_spec("deepspeed") is not None
def hp_params(trial):
if is_optuna_available():
import optuna
@@ -230,6 +237,157 @@ def rewrite_logs(d):
return new_d
def init_deepspeed(trainer, num_training_steps):
"""
Init DeepSpeed, after converting any relevant Trainer's args into DeepSpeed configuration
Args:
trainer: Trainer object
num_training_steps: per single gpu
Returns: model, optimizer, lr_scheduler
"""
import deepspeed
args = trainer.args
ds_config_file = args.deepspeed
model = trainer.model
with io.open(ds_config_file, "r", encoding="utf-8") as f:
config = json.load(f)
# The following code translates relevant trainer's cl args into the DS config
# First to ensure that there is no mismatch between cl args values and presets in the config
# file, ask to not set in ds config file:
# - "train_batch_size",
# - "train_micro_batch_size_per_gpu",
# - "gradient_accumulation_steps"
bs_keys = ["train_batch_size", "train_micro_batch_size_per_gpu"]
if len([x for x in bs_keys if x in config.keys()]):
raise ValueError(
f"Do not include {bs_keys} entries in the ds config file, as they will be set via --per_device_train_batch_size or its default"
)
if "gradient_accumulation_steps" in config.keys():
raise ValueError(
"Do not include gradient_accumulation_steps entries in the ds config file, as they will be set via --gradient_accumulation_steps or its default"
)
# DeepSpeed does:
# train_batch_size = n_gpus * train_micro_batch_size_per_gpu * gradient_accumulation_steps
# therefore we just need to set:
config["train_micro_batch_size_per_gpu"] = args.per_device_train_batch_size
config["gradient_accumulation_steps"] = args.gradient_accumulation_steps
if "gradient_clipping" in config:
logger.info(
f"Keeping the `gradient_clipping` config from {ds_config_file} intact, ignoring any gradient clipping-specific cl args"
)
else: # override only if the ds config doesn't already have this section
config["gradient_clipping"] = args.max_grad_norm
if "optimizer" in config:
logger.info(
f"Keeping the `optimizer` config from {ds_config_file} intact, ignoring any optimizer-specific cl args"
)
else: # override only if the ds config doesn't already have this section
# ds supports Adam, OneBitAdam, and Lamb optimizers and can import other optimizers from torch.
# But trainer uses AdamW by default.
# To use other optimizers so using a different scheduler requires voiding warranty with: `zero_allow_untested_optimizer`
optimizer_configs = {
"AdamW": {
"lr": args.learning_rate,
"betas": [args.adam_beta1, args.adam_beta2],
"eps": args.adam_epsilon,
"weight_decay": args.weight_decay,
}
}
optimizer = "AdamW"
config["zero_allow_untested_optimizer"] = True
config["optimizer"] = {
"type": optimizer,
"params": optimizer_configs[optimizer],
}
# DS schedulers (deepspeed/runtime/lr_schedules.py):
#
# DS name | --lr_scheduler_type | HF func | Notes
# -------------| ---------------------|-----------------------------------|--------------------
# LRRangeTest | na | na | LRRT
# OneCycle | na | na | 1CLR
# WarmupLR | constant_with_warmup | get_constant_schedule_with_warmup | w/ warmup_min_lr=0
# WarmupDecayLR| linear | get_linear_schedule_with_warmup |
if "scheduler" in config:
logger.info(
f"Keeping the `scheduler` config from {ds_config_file} intact, ignoring any scheduler-specific cl args"
)
else: # override only if the ds config doesn't already have this section
if args.lr_scheduler_type == SchedulerType.LINEAR:
scheduler = "WarmupDecayLR"
params = {
"last_batch_iteration": -1,
"total_num_steps": num_training_steps,
"warmup_min_lr": 0,
"warmup_max_lr": args.learning_rate,
"warmup_num_steps": args.warmup_steps,
}
elif args.lr_scheduler_type == SchedulerType.CONSTANT_WITH_WARMUP:
scheduler = "WarmupLR"
params = {
"warmup_min_lr": 0,
"warmup_max_lr": args.learning_rate,
"warmup_num_steps": args.warmup_steps,
}
else:
raise ValueError(f"{args.lr_scheduler_type} scheduler type is not supported by DeepSpeed")
config["scheduler"] = {
"type": scheduler,
"params": params,
}
# fp16
if trainer.fp16_backend is not None:
# Deepspeed has 2 possible fp16 config entries:
# - `fp16`: for the native amp - it has a bunch of optional params but we won't set any here unless the user did the work
# - `amp`: which delegates amp work to apex (which needs to be available), but it cannot be used with any ZeRO features, so probably best to be avoided.
if trainer.fp16_backend == "apex":
if "amp" in config:
logger.info(
f"Keeping the `amp` config from {ds_config_file} intact, ignoring any amp-specific cl args"
)
else:
config["amp"] = {
"enabled": True,
"opt_level": args.fp16_opt_level,
}
elif trainer.fp16_backend == "amp":
if "fp16" in config:
logger.info(
f"Keeping the `fp16` config from {ds_config_file} intact, ignoring any fp16-specific cl args"
)
else:
config["fp16"] = {
"enabled": True,
}
# for clarity extract the specific cl args that are being passed to deepspeed
ds_args = dict(local_rank=args.local_rank)
# init that takes part of the config via `args`, and the bulk of it via `config_params`
model_parameters = filter(lambda p: p.requires_grad, model.parameters())
model, optimizer, _, lr_scheduler = deepspeed.initialize(
args=SimpleNamespace(**ds_args), # expects an obj
model=model,
model_parameters=model_parameters,
config_params=config,
)
return model, optimizer, lr_scheduler
class TensorBoardCallback(TrainerCallback):
"""
A :class:`~transformers.TrainerCallback` that sends the logs to `TensorBoard

View File

@@ -42,6 +42,7 @@ from .integrations import ( # isort: split
is_wandb_available,
run_hp_search_optuna,
run_hp_search_ray,
init_deepspeed,
)
import numpy as np
@@ -252,6 +253,7 @@ class Trainer:
# Seed must be set before instantiating the model when using model
set_seed(self.args.seed)
self.hp_name = None
self.deepspeed = None
if model is None:
if model_init is not None:
@@ -338,20 +340,25 @@ class Trainer:
raise ValueError("Using sharded DDP only works in distributed training.")
elif not is_fairscale_available():
raise ImportError("Sharded DDP training requires fairscale: `pip install fairscale`.")
elif args.deepspeed:
raise ValueError("can't use --sharded_ddp together with --deepspeed.")
else:
self.sharded_dpp = True
# Mixed precision setup
self.use_apex = False
self.use_amp = False
self.fp16_backend = None
if args.fp16:
if args.fp16_backend == "auto":
backend = "amp" if _is_native_amp_available else "apex"
self.fp16_backend = "amp" if _is_native_amp_available else "apex"
else:
backend = args.fp16_backend
logger.info(f"Using {backend} fp16 backend")
self.fp16_backend = args.fp16_backend
logger.info(f"Using {self.fp16_backend} fp16 backend")
if backend == "amp":
if args.fp16 and not args.deepspeed: # deepspeed manages its own fp16
if self.fp16_backend == "amp":
self.use_amp = True
self.scaler = ShardedGradScaler() if self.sharded_dpp else torch.cuda.amp.GradScaler()
else:
@@ -714,7 +721,16 @@ class Trainer:
num_train_epochs = 1
num_update_steps_per_epoch = max_steps
self.create_optimizer_and_scheduler(num_training_steps=max_steps)
if self.args.deepspeed:
model, optimizer, lr_scheduler = init_deepspeed(self, num_training_steps=max_steps)
self.model = model.module
self.model_wrapped = model # will get further wrapped in DDP
self.deepspeed = model # DeepSpeedEngine object
self.optimizer = optimizer
self.lr_scheduler = lr_scheduler
else:
self.create_optimizer_and_scheduler(num_training_steps=max_steps)
self.state = TrainerState()
self.state.is_hyper_param_search = trial is not None
@@ -878,7 +894,9 @@ class Trainer:
and (step + 1) == steps_in_epoch
):
# Gradient clipping
if self.args.max_grad_norm is not None and self.args.max_grad_norm > 0:
if self.args.max_grad_norm is not None and self.args.max_grad_norm > 0 and not self.deepspeed:
# deepspeed does its own clipping
if self.use_amp:
# AMP: gradients need unscaling
self.scaler.unscale_(self.optimizer)
@@ -945,6 +963,11 @@ class Trainer:
state_dict = torch.load(os.path.join(self.state.best_model_checkpoint, WEIGHTS_NAME))
self.model.load_state_dict(state_dict)
if self.deepspeed:
self.deepspeed.load_checkpoint(
self.state.best_model_checkpoint, load_optimizer_states=False, load_lr_scheduler_states=False
)
metrics = speed_metrics("train", start_time, self.state.max_steps)
if self._total_flos is not None:
self.store_flos()
@@ -1006,18 +1029,23 @@ class Trainer:
output_dir = os.path.join(self.args.output_dir, checkpoint_folder)
self.store_flos()
self.save_model(output_dir)
if self.deepspeed:
self.deepspeed.save_checkpoint(output_dir)
# Save optimizer and scheduler
if self.sharded_dpp:
self.optimizer.consolidate_state_dict()
if is_torch_tpu_available():
xm.rendezvous("saving_optimizer_states")
xm.save(self.optimizer.state_dict(), os.path.join(output_dir, "optimizer.pt"))
with warnings.catch_warnings(record=True) as caught_warnings:
xm.save(self.lr_scheduler.state_dict(), os.path.join(output_dir, "scheduler.pt"))
reissue_pt_warnings(caught_warnings)
elif self.is_world_process_zero():
elif self.is_world_process_zero() and not self.deepspeed:
# deepspeed.save_checkpoint above saves model/optim/sched
torch.save(self.optimizer.state_dict(), os.path.join(output_dir, "optimizer.pt"))
with warnings.catch_warnings(record=True) as caught_warnings:
torch.save(self.lr_scheduler.state_dict(), os.path.join(output_dir, "scheduler.pt"))
@@ -1049,10 +1077,11 @@ class Trainer:
def _load_optimizer_and_scheduler(self, model_path):
"""If optimizer and scheduler states exist, load them."""
if (
model_path is not None
and os.path.isfile(os.path.join(model_path, "optimizer.pt"))
and os.path.isfile(os.path.join(model_path, "scheduler.pt"))
if model_path is None:
return
if os.path.isfile(os.path.join(model_path, "optimizer.pt")) and os.path.isfile(
os.path.join(model_path, "scheduler.pt")
):
# Load in optimizer and scheduler states
if is_torch_tpu_available():
@@ -1075,6 +1104,10 @@ class Trainer:
self.lr_scheduler.load_state_dict(torch.load(os.path.join(model_path, "scheduler.pt")))
reissue_pt_warnings(caught_warnings)
if self.deepspeed:
# Not sure how to check if there is a saved deepspeed checkpoint, but since it just return None if it fails to find a deepspeed checkpoint this is sort of a check-n-load function
self.deepspeed.load_checkpoint(model_path, load_optimizer_states=True, load_lr_scheduler_states=True)
def hyperparameter_search(
self,
hp_space: Optional[Callable[["optuna.Trial"], Dict[str, float]]] = None,
@@ -1227,6 +1260,9 @@ class Trainer:
elif self.use_apex:
with amp.scale_loss(loss, self.optimizer) as scaled_loss:
scaled_loss.backward()
elif self.deepspeed:
# calling on DS engine (model_wrapped == DDP(Deepspeed(PretrainedModule)))
self.model_wrapped.module.backward(loss)
else:
loss.backward()

View File

@@ -217,6 +217,9 @@ class TrainingArguments:
sharded_ddp (:obj:`bool`, `optional`, defaults to :obj:`False`):
Use Sharded DDP training from `FairScale <https://github.com/facebookresearch/fairscale>`__ (in distributed
training only). This is an experimental feature.
deepspeed (:obj:`str`, `optional`):
Use `Deepspeed <https://github.com/microsoft/deepspeed>`__. This is an experimental feature and its API may
evolve in the future. The value is the location of its json config file (usually ``ds_config.json``).
label_smoothing_factor (:obj:`float`, `optional`, defaults to 0.0):
The label smoothing factor to use. Zero means no label smoothing, otherwise the underlying onehot-encoded
labels are changed from 0s and 1s to :obj:`label_smoothing_factor/num_labels` and :obj:`1 -
@@ -394,6 +397,10 @@ class TrainingArguments:
default=False,
metadata={"help": "Whether or not to use sharded DDP training (in distributed training only)."},
)
deepspeed: Optional[str] = field(
default=None,
metadata={"help": "Enable deepspeed and pass the path to deepspeed json config file (e.g. ds_config.json)"},
)
label_smoothing_factor: float = field(
default=0.0, metadata={"help": "The label smoothing epsilon to apply (zero means no label smoothing)."}
)
@@ -480,7 +487,21 @@ class TrainingArguments:
else:
# Here, we'll use torch.distributed.
# Initializes the distributed backend which will take care of synchronizing nodes/GPUs
torch.distributed.init_process_group(backend="nccl")
#
# deepspeed performs its own DDP internally, and requires the program to be started with:
# deepspeed ./program.py
# rather than:
# python -m torch.distributed.launch --nproc_per_node=2 ./program.py
if self.deepspeed:
from .integrations import is_deepspeed_available
if not is_deepspeed_available():
raise ImportError("--deepspeed requires deepspeed: `pip install deepspeed`.")
import deepspeed
deepspeed.init_distributed()
else:
torch.distributed.init_process_group(backend="nccl")
device = torch.device("cuda", self.local_rank)
n_gpu = 1