diff --git a/docs/source/index.rst b/docs/source/index.rst index 188a2a406d..31dd86753e 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -468,6 +468,7 @@ Flax), PyTorch, and/or TensorFlow. main_classes/processors main_classes/tokenizer main_classes/trainer + main_classes/deepspeed main_classes/feature_extractor .. toctree:: diff --git a/docs/source/main_classes/deepspeed.rst b/docs/source/main_classes/deepspeed.rst new file mode 100644 index 0000000000..4677d0e1d2 --- /dev/null +++ b/docs/source/main_classes/deepspeed.rst @@ -0,0 +1,56 @@ +.. + Copyright 2020 The HuggingFace Team. All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + specific language governing permissions and limitations under the License. + +HfDeepSpeedConfig +----------------------------------------------------------------------------------------------------------------------- + +The :class:`~transformers.integrations.HfDeepSpeedConfig` is used to integrate Deepspeed into the 🤗 Transformer core +functionality, when :class:`~transformers.Trainer` is not used. + +When using :class:`~transformers.Trainer` everything is automatically taken care of. + +When not using :class:`~transformers.Trainer`, to efficiently deploy DeepSpeed stage 3, you must instantiate the +:class:`~transformers.integrations.HfDeepSpeedConfig` object before instantiating the model. + +For example for a pretrained model: + +.. code-block:: python + + from transformers.integrations import HfDeepSpeedConfig + from transformers import AugoModel + + ds_config = { ... } # deepspeed config object or path to the file + # must run before instantiating the model + dschf = HfDeepSpeedConfig(ds_config) # keep this object alive + model = AutoModel.from_pretrained("gpt2") + engine = deepspeed.initialize(model=model, config_params=ds_config, ...) + +or for non-pretrained model: + +.. code-block:: python + + from transformers.integrations import HfDeepSpeedConfig + from transformers import AugoModel, AutoConfig + + ds_config = { ... } # deepspeed config object or path to the file + # must run before instantiating the model + dschf = HfDeepSpeedConfig(ds_config) # keep this object alive + config = AutoConfig.from_pretrained("gpt2") + model = AutoModel.from_config(config) + engine = deepspeed.initialize(model=model, config_params=ds_config, ...) + + +HfDeepSpeedConfig +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. autoclass:: transformers.integrations.HfDeepSpeedConfig + :members: diff --git a/src/transformers/integrations.py b/src/transformers/integrations.py index e05d1331f4..7629d21b65 100644 --- a/src/transformers/integrations.py +++ b/src/transformers/integrations.py @@ -286,28 +286,57 @@ def _set_if_auto(config, key, val): config[key] = val -class DeepSpeedConfigHF: +class HfDeepSpeedConfig: """ - This object contains Deepspeed configuration and can be quickly queried for things like zero stage. + This object contains a DeepSpeed configuration dictionary and can be quickly queried for things like zero stage. - We store a ``weakref`` of this object in the module's global to be able to access the config from areas where the - Trainer is not available (e.g. `from_pretrained` and `_get_resized_embeddings`). + A ``weakref`` of this object is stored in the module's globals to be able to access the config from areas where + things like the Trainer object is not available (e.g. ``from_pretrained`` and ``_get_resized_embeddings``). + Therefore it's important that this object remains alive while the program is still running. + + :class:`~transformers.Trainer` uses the ``HfTrainerDeepSpeedConfig`` subclass instead. That subclass has logic to + sync the configuration with values of :class:`~transformers.TrainingArguments` by replacing special placeholder + values: ``"auto"``. Without this special logic the DeepSpeed configuration is not modified in any way. + + Args: + config_file_or_dict (:obj:`Union[str, Dict]`) - path to DeepSpeed config file or dict. - The ``DeepSpeedConfigHF`` object is meant to be created during ``TrainingArguments`` object creation and has the - same lifespan as the latter. """ - def __init__(self, args): - self.config = None - self.stage = 0 - self.offload = False + def __init__(self, config_file_or_dict): + # set global weakref object + set_hf_deepspeed_config(self) dep_version_check("deepspeed") - self.config_process(args) + if isinstance(config_file_or_dict, dict): + # Don't modify user's data should they want to reuse it (e.g. in tests), because once we + # modified it, it will not be accepted here again, since `auto` values would have been overriden + config = deepcopy(config_file_or_dict) + elif isinstance(config_file_or_dict, str): + with io.open(config_file_or_dict, "r", encoding="utf-8") as f: + config = json.load(f) + else: + raise ValueError("expecting either a path to a DeepSpeed config file or a pre-populated dict") + self.config = config - # set global weakref object - deepspeed_config_hf_set(self) + # zero stage - this is done as early as possible, before model is created, to allow + # ``is_deepspeed_zero3_enabled`` query and getting to the early deepspeed config object + # during ``zero.Init()`` which needs whether fp16 is enabled, dtype, etc. + config_zero = config.get("zero_optimization", {}) + self.stage = config_zero.get("stage", 0) + + # offload + self.offload = False + config_zero = config.get("zero_optimization", {}) + if self.is_zero2(): + self.offload = _is_true(config_zero, "cpu_offload") + elif self.is_zero3(): + offload_devices = ["cpu", "nvme"] + if config_zero.get("offload_optimizer", {}).get("device") in offload_devices: + self.offload = True + if config_zero.get("offload_param", {}).get("device") in offload_devices: + self.offload = True def is_zero2(self): return self.stage == 2 @@ -318,28 +347,23 @@ class DeepSpeedConfigHF: def is_offload(self): return self.offload - def config_process(self, args): + +class HfTrainerDeepSpeedConfig(HfDeepSpeedConfig): + """ + The ``HfTrainerDeepSpeedConfig`` object is meant to be created during ``TrainingArguments`` object creation and has + the same lifespan as the latter. + + """ + + def __init__(self, config_file_or_dict): + super().__init__(config_file_or_dict) + + def trainer_config_process(self, args): """ - 1. load json if the ``args.deepspeed`` is a path - 2. replace any ``auto`` values in the config with the correct or recommended value - - This is done as early as possible, before model is created, to allow ``is_deepspeed_zero3_enabled`` query and - getting to the early deepspeed config object during ``zero.Init()`` which needs whether fp16 is enabled, dtype, - etc. - + Adjust the config with ``TrainingArguments`` values. This stage is run during ``TrainingArguments`` object + creation. """ - config_file_or_dict = args.deepspeed - if isinstance(config_file_or_dict, dict): - # Don't modify user's data should they want to reuse it (e.g. in tests), because once we - # modified it, it will not be accepted here again, since `auto` values would have been overriden - config = deepcopy(config_file_or_dict) - elif isinstance(config_file_or_dict, str): - with io.open(config_file_or_dict, "r", encoding="utf-8") as f: - config = json.load(f) - else: - raise ValueError("expecting either a path to a config file or a pre-populated dict") - - self.config = config + config = self.config # DeepSpeed does: # train_batch_size = world_size * train_micro_batch_size_per_gpu * gradient_accumulation_steps @@ -349,10 +373,6 @@ class DeepSpeedConfigHF: _set_if_auto(config, "train_batch_size", train_batch_size) _set_if_auto(config, "gradient_clipping", args.max_grad_norm) - # zero - config_zero = config.get("zero_optimization", {}) - self.stage = config_zero.get("stage", 0) - config_optim = config.get("optimizer", {}) if config_optim != {}: config_optim_params = config_optim.get("params") @@ -367,7 +387,7 @@ class DeepSpeedConfigHF: _set_if_auto(config_sched_params, "warmup_min_lr", 0) _set_if_auto(config_sched_params, "warmup_max_lr", args.learning_rate) _set_if_auto(config_sched_params, "warmup_num_steps", args.warmup_steps) - # total_num_steps - will get set in deepspeed_init + # total_num_steps - will get set in trainer_config_finalize # fp16 if args.fp16: @@ -381,27 +401,16 @@ class DeepSpeedConfigHF: _set_if_auto(config_fp16, "enabled", fp16_backend == "amp") # apex: delegates amp work to apex (which needs to be available), but it cannot be used with any - # ZeRO features, so probably best to be avoided. + # ZeRO features config_amp = config.get("amp") _set_if_auto(config_amp, "enabled", fp16_backend == "apex") _set_if_auto(config_amp, "opt_level", args.fp16_opt_level) - config_zero = config.get("zero_optimization", {}) - if self.is_zero2(): - self.offload = _is_true(config_zero, "cpu_offload") - elif self.is_zero3(): - offload_devices = ["cpu", "nvme"] - if config_zero.get("offload_optimizer", {}).get("device") in offload_devices: - self.offload = True - if config_zero.get("offload_param", {}).get("device") in offload_devices: - self.offload = True - - def config_finalize(self, args, model, num_training_steps): + def trainer_config_finalize(self, args, model, num_training_steps): """ This stage is run after we have the model and know num_training_steps. Now we we can complete the configuration process. - """ config = self.config @@ -421,27 +430,27 @@ class DeepSpeedConfigHF: # keep the config object global to be able to access it anywhere during TrainingArguments life-cycle -_deepspeed_config_hf_weak_ref = None +_hf_deepspeed_config_weak_ref = None -def deepspeed_config_hf_set(deepspeed_config_hf_obj): +def set_hf_deepspeed_config(hf_deepspeed_config_obj): # this is a special weakref global object to allow us to get to Deepspeed config from APIs # that don't have an easy way to get to the Deepspeed config outside of the Trainer domain. - global _deepspeed_config_hf_weak_ref - # will go away automatically when DeepSpeedConfigHF is destroyed (when TrainingArguments is destroyed) - _deepspeed_config_hf_weak_ref = weakref.ref(deepspeed_config_hf_obj) + global _hf_deepspeed_config_weak_ref + # will go away automatically when HfDeepSpeedConfig is destroyed (when TrainingArguments is destroyed) + _hf_deepspeed_config_weak_ref = weakref.ref(hf_deepspeed_config_obj) def is_deepspeed_zero3_enabled(): - if _deepspeed_config_hf_weak_ref is not None and _deepspeed_config_hf_weak_ref() is not None: - return _deepspeed_config_hf_weak_ref().is_zero3() + if _hf_deepspeed_config_weak_ref is not None and _hf_deepspeed_config_weak_ref() is not None: + return _hf_deepspeed_config_weak_ref().is_zero3() else: return False def deepspeed_config(): - if _deepspeed_config_hf_weak_ref is not None and _deepspeed_config_hf_weak_ref() is not None: - return _deepspeed_config_hf_weak_ref().config + if _hf_deepspeed_config_weak_ref is not None and _hf_deepspeed_config_weak_ref() is not None: + return _hf_deepspeed_config_weak_ref().config else: return None @@ -464,11 +473,11 @@ def deepspeed_init(trainer, num_training_steps, resume_from_checkpoint=None): model = trainer.model - deepspeed_config_hf = trainer.args.deepspeed_config_hf - deepspeed_config_hf.config_finalize(trainer.args, model, num_training_steps) + hf_deepspeed_config = trainer.args.hf_deepspeed_config + hf_deepspeed_config.trainer_config_finalize(trainer.args, model, num_training_steps) # resume config update - some bits like `model` and `num_training_steps` only become available during train - config = deepspeed_config_hf.config + config = hf_deepspeed_config.config # Optimizer + Scheduler # Currently supported combos: @@ -485,7 +494,7 @@ def deepspeed_init(trainer, num_training_steps, resume_from_checkpoint=None): optimizer = None if "optimizer" not in config: - if deepspeed_config_hf.is_offload(): + if hf_deepspeed_config.is_offload(): raise ValueError("ZeRO Offload can only work with DeepSpeed optimizers") # ds supports Adam, OneBitAdam, and Lamb optimizers and can import other optimizers from torch. diff --git a/src/transformers/testing_utils.py b/src/transformers/testing_utils.py index 81d74a9a42..8cd90ad573 100644 --- a/src/transformers/testing_utils.py +++ b/src/transformers/testing_utils.py @@ -26,6 +26,8 @@ from io import StringIO from pathlib import Path from typing import Iterator, Union +from transformers import logging as transformers_logging + from .file_utils import ( is_datasets_available, is_faiss_available, @@ -648,6 +650,26 @@ class CaptureLogger: return f"captured: {self.out}\n" +@contextlib.contextmanager +def LoggingLevel(level): + """ + This is a context manager to temporarily change transformers modules logging level to the desired value and have it + restored to the original setting at the end of the scope. + + For example :: + + with LoggingLevel(logging.INFO): + AutoModel.from_pretrained("gpt2") # calls logger.info() several times + + """ + orig_level = transformers_logging.get_verbosity() + try: + transformers_logging.set_verbosity(level) + yield + finally: + transformers_logging.set_verbosity(orig_level) + + @contextlib.contextmanager # adapted from https://stackoverflow.com/a/64789046/9201239 def ExtendSysPath(path: Union[str, os.PathLike]) -> Iterator[None]: diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index fd1a039307..0673174879 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -863,9 +863,9 @@ class Trainer: logger.info("Trial:", trial.params) if self.args.deepspeed: # Rebuild the deepspeed config to reflect the updated training parameters - from transformers.integrations import DeepSpeedConfigHF + from transformers.integrations import HfDeepSpeedConfig - self.args.deepspeed_config_hf = DeepSpeedConfigHF(self.args) + self.args.hf_deepspeed_config = HfDeepSpeedConfig(self.args) def _report_to_hp_search( self, trial: Union["optuna.Trial", Dict[str, Any]], epoch: int, metrics: Dict[str, float] diff --git a/src/transformers/training_args.py b/src/transformers/training_args.py index a99dbe69b5..b00bbdf581 100644 --- a/src/transformers/training_args.py +++ b/src/transformers/training_args.py @@ -671,10 +671,12 @@ class TrainingArguments: if self.deepspeed: # - must be run very last in arg parsing, since it will use a lot of these settings. # - must be run before the model is created. - from transformers.integrations import DeepSpeedConfigHF + from transformers.integrations import HfTrainerDeepSpeedConfig - # will be used later by the Trainer (leave self.deepspeed unmodified in case a user relies on it not to be modified) - self.deepspeed_config_hf = DeepSpeedConfigHF(self) + # will be used later by the Trainer + # note: leave self.deepspeed unmodified in case a user relies on it not to be modified) + self.hf_deepspeed_config = HfTrainerDeepSpeedConfig(self.deepspeed) + self.hf_deepspeed_config.trainer_config_process(self) def __repr__(self): # We override the default repr to remove deprecated arguments from the repr. This method should be removed once diff --git a/tests/deepspeed/test_deepspeed.py b/tests/deepspeed/test_deepspeed.py index e8f961a066..3cdc85f44e 100644 --- a/tests/deepspeed/test_deepspeed.py +++ b/tests/deepspeed/test_deepspeed.py @@ -20,13 +20,14 @@ import unittest from copy import deepcopy from parameterized import parameterized -from transformers import TrainingArguments, is_torch_available +from transformers import AutoModel, TrainingArguments, is_torch_available, logging from transformers.file_utils import WEIGHTS_NAME -from transformers.integrations import is_deepspeed_available +from transformers.integrations import HfDeepSpeedConfig, is_deepspeed_available from transformers.testing_utils import ( CaptureLogger, CaptureStderr, ExtendSysPath, + LoggingLevel, TestCasePlus, execute_subprocess_async, get_gpu_count, @@ -77,6 +78,56 @@ ZERO3 = "zero3" stages = [ZERO2, ZERO3] +@require_deepspeed +@require_torch_gpu +class CoreIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): + """ + Testing non-Trainer DeepSpeed integration + """ + + def setUp(self): + super().setUp() + + self.dist_env_1_gpu = dict( + MASTER_ADDR="localhost", MASTER_PORT="10999", RANK="0", LOCAL_RANK="0", WORLD_SIZE="1" + ) + + def test_init_zero3(self): + # test that zero.Init() works correctly under zero3 + ds_config = { + "train_batch_size": 1, + "zero_optimization": { + "stage": 3, + }, + } + + dschf = HfDeepSpeedConfig(ds_config) + + self.assertTrue(dschf.is_zero3()) + self.assertTrue(is_deepspeed_zero3_enabled()) + + with LoggingLevel(logging.INFO): + with mockenv_context(**self.dist_env_1_gpu): + logger = logging.get_logger("transformers.modeling_utils") + with CaptureLogger(logger) as cl: + AutoModel.from_pretrained(T5_TINY) + self.assertIn("Detected DeepSpeed ZeRO-3", cl.out) + + # now remove zero optimization + del ds_config["zero_optimization"] + dschf = HfDeepSpeedConfig(ds_config) + + self.assertFalse(dschf.is_zero3()) + self.assertFalse(is_deepspeed_zero3_enabled()) + + with LoggingLevel(logging.INFO): + with mockenv_context(**self.dist_env_1_gpu): + logger = logging.get_logger("transformers.modeling_utils") + with CaptureLogger(logger) as cl: + AutoModel.from_pretrained(T5_TINY) + self.assertNotIn("Detected DeepSpeed ZeRO-3", cl.out) + + @require_deepspeed @require_torch_gpu class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): @@ -194,9 +245,9 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): ds_config_zero3_dict["zero_optimization"]["offload_optimizer"] = nvme_config ds_config_zero3_dict["zero_optimization"]["offload_param"] = nvme_config trainer = get_regression_trainer(local_rank=0, deepspeed=ds_config_zero3_dict) - with CaptureLogger(deepspeed_logger) as cs: + with CaptureLogger(deepspeed_logger) as cl: trainer.train() - self.assertIn("DeepSpeed info", cs.out, "expected DeepSpeed logger output but got none") + self.assertIn("DeepSpeed info", cl.out, "expected DeepSpeed logger output but got none") # --- These tests need to run on both zero stages --- # @@ -230,9 +281,9 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): # to reset `deepspeed_logger.handlers[0].setStream(sys.stdout)` or directly capture from the deepspeed_logger. with mockenv_context(**self.dist_env_1_gpu): trainer = get_regression_trainer(local_rank=0, deepspeed=self.get_config_dict(stage)) - with CaptureLogger(deepspeed_logger) as cs: + with CaptureLogger(deepspeed_logger) as cl: trainer.train() - self.assertIn("DeepSpeed info", cs.out, "expected DeepSpeed logger output but got none") + self.assertIn("DeepSpeed info", cl.out, "expected DeepSpeed logger output but got none") @parameterized.expand(stages) def test_early_get_last_lr(self, stage):