From 2c73b9309974bc0157fd5937a495fe5c964199f9 Mon Sep 17 00:00:00 2001 From: Stas Bekman Date: Fri, 4 Jun 2021 08:58:23 -0700 Subject: [PATCH] [Deepspeed] Assert on mismatches between ds and hf args (#12021) * wip * add mismatch validation + test * renames * Update docs/source/main_classes/deepspeed.rst Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com> * renames Co-authored-by: Sylvain Gugger <35901082+sgugger@users.noreply.github.com> --- docs/source/main_classes/deepspeed.rst | 7 ++ src/transformers/deepspeed.py | 120 +++++++++++++++---------- tests/deepspeed/test_deepspeed.py | 82 ++++++++++++++--- 3 files changed, 149 insertions(+), 60 deletions(-) diff --git a/docs/source/main_classes/deepspeed.rst b/docs/source/main_classes/deepspeed.rst index d3cc4b92fa..98c4246e0a 100644 --- a/docs/source/main_classes/deepspeed.rst +++ b/docs/source/main_classes/deepspeed.rst @@ -537,7 +537,14 @@ difficult to detect ways. You have been warned. There are multiple other values that are specific to DeepSpeed-only and those you will have to set manually to suit your needs. +In your own programs, you can also use the following approach if you'd like to modify the DeepSpeed config as a master +and configure :class:`~transformers.TrainingArguments` based on that. The steps are: +1. Create or load the DeepSpeed configuration to be used as a master configuration +2. Create the :class:`~transformers.TrainingArguments` object based on these values + +Do note that some values, such as :obj:`scheduler.params.total_num_steps` are calculated by +:class:`~transformers.Trainer` during ``train``, but you can of course do the math yourself. .. _deepspeed-zero: diff --git a/src/transformers/deepspeed.py b/src/transformers/deepspeed.py index 63185562c9..31595fca90 100644 --- a/src/transformers/deepspeed.py +++ b/src/transformers/deepspeed.py @@ -20,6 +20,7 @@ import io import json import weakref from copy import deepcopy +from functools import partialmethod from .dependency_versions_check import dep_version_check from .utils import logging @@ -32,19 +33,6 @@ def is_deepspeed_available(): return importlib.util.find_spec("deepspeed") is not None -def _is_true(config, key): - if config is None: - return False - return bool(config.get(key)) - - -def _set_if_auto(config, key, val): - if config is None: - return - if config.get(key) == "auto": - config[key] = val - - class HfDeepSpeedConfig: """ This object contains a DeepSpeed configuration dictionary and can be quickly queried for things like zero stage. @@ -89,7 +77,7 @@ class HfDeepSpeedConfig: self.offload = False config_zero = config.get("zero_optimization", {}) if self.is_zero2(): - self.offload = _is_true(config_zero, "cpu_offload") + self.offload = self.is_true(config_zero, "cpu_offload") elif self.is_zero3(): offload_devices = ["cpu", "nvme"] if config_zero.get("offload_optimizer", {}).get("device") in offload_devices: @@ -106,6 +94,12 @@ class HfDeepSpeedConfig: def is_offload(self): return self.offload + @staticmethod + def is_true(config, key): + if config is None: + return False + return bool(config.get(key)) + class HfTrainerDeepSpeedConfig(HfDeepSpeedConfig): """ @@ -116,37 +110,67 @@ class HfTrainerDeepSpeedConfig(HfDeepSpeedConfig): def __init__(self, config_file_or_dict): super().__init__(config_file_or_dict) + self.mismatches = [] + + def fill_match(self, ds_key_long, hf_val, hf_key=None, must_match=True): + """ + A utility method that massages the config file and can optionally verify that the values match. + + 1. Replace "auto" values with ``TrainingArguments`` value. + + 2. If it wasn't "auto" and ``must_match`` is true, then check that DS config matches Trainer + config values and if mismatched add the entry to ``self.mismatched`` - will assert during + ``trainer_config_finalize`` for one or more mismatches. + + """ + + config = self.config + + # find the config node of interest if it exists + nodes = ds_key_long.split(".") + ds_key = nodes.pop() + for node in nodes: + config = config.get(node) + if config is None: + return + + if config.get(ds_key) == "auto": + config[ds_key] = hf_val + return + + if not must_match: + return + + ds_val = config.get(ds_key) + if ds_val is not None and ds_val != hf_val: + self.mismatches.append(f"- ds {ds_key_long}={ds_val} vs hf {hf_key}={hf_val}") + + fill_only = partialmethod(fill_match, must_match=False) def trainer_config_process(self, args): """ Adjust the config with ``TrainingArguments`` values. This stage is run during ``TrainingArguments`` object creation. """ - config = self.config - # DeepSpeed does: # train_batch_size = world_size * train_micro_batch_size_per_gpu * gradient_accumulation_steps train_batch_size = args.world_size * args.per_device_train_batch_size * args.gradient_accumulation_steps - _set_if_auto(config, "train_micro_batch_size_per_gpu", args.per_device_train_batch_size) - _set_if_auto(config, "gradient_accumulation_steps", args.gradient_accumulation_steps) - _set_if_auto(config, "train_batch_size", train_batch_size) - _set_if_auto(config, "gradient_clipping", args.max_grad_norm) + self.fill_match( + "train_micro_batch_size_per_gpu", args.per_device_train_batch_size, "per_device_train_batch_size" + ) + self.fill_match("gradient_accumulation_steps", args.gradient_accumulation_steps, "gradient_accumulation_steps") + self.fill_match("train_batch_size", train_batch_size, "train_batch_size (calculated)") + self.fill_match("gradient_clipping", args.max_grad_norm, "max_grad_norm") - config_optim = config.get("optimizer", {}) - if config_optim != {}: - config_optim_params = config_optim.get("params") - _set_if_auto(config_optim_params, "lr", args.learning_rate) - _set_if_auto(config_optim_params, "betas", [args.adam_beta1, args.adam_beta2]) - _set_if_auto(config_optim_params, "eps", args.adam_epsilon) - _set_if_auto(config_optim_params, "weight_decay", args.weight_decay) + self.fill_match("optimizer.params.lr", args.learning_rate, "learning_rate") + self.fill_match("optimizer.params.betas", [args.adam_beta1, args.adam_beta2], "adam_beta1+adam_beta2") + self.fill_match("optimizer.params.eps", args.adam_epsilon, "adam_epsilon") + self.fill_match("optimizer.params.weight_decay", args.weight_decay, "weight_decay") - config_sched = config.get("scheduler", {}) - if config_sched != {}: - config_sched_params = config_sched.get("params") - _set_if_auto(config_sched_params, "warmup_min_lr", 0) - _set_if_auto(config_sched_params, "warmup_max_lr", args.learning_rate) - _set_if_auto(config_sched_params, "warmup_num_steps", args.warmup_steps) - # total_num_steps - will get set in trainer_config_finalize + self.fill_only("scheduler.params.warmup_min_lr", 0) # not a trainer arg + self.fill_match("scheduler.params.warmup_max_lr", args.learning_rate, "learning_rate") + self.fill_match("scheduler.params.warmup_num_steps", args.warmup_steps, "warmup_steps") + # total_num_steps - will get set in trainer_config_finalize # fp16 if args.fp16: @@ -156,14 +180,12 @@ class HfTrainerDeepSpeedConfig(HfDeepSpeedConfig): # amp: similar to the pytorch native amp - it has a bunch of optional params but we won't set # any here unless the user did the work - config_fp16 = config.get("fp16") - _set_if_auto(config_fp16, "enabled", fp16_backend == "amp") + self.fill_match("fp16.enabled", fp16_backend == "amp", "fp16+fp16_backend(amp)") # apex: delegates amp work to apex (which needs to be available), but it cannot be used with any # ZeRO features - config_amp = config.get("amp") - _set_if_auto(config_amp, "enabled", fp16_backend == "apex") - _set_if_auto(config_amp, "opt_level", args.fp16_opt_level) + self.fill_match("amp.enabled", fp16_backend == "apex", "fp16+fp16_backend(apex)") + self.fill_match("amp.opt_level", args.fp16_opt_level, "fp16_opt_level") def trainer_config_finalize(self, args, model, num_training_steps): """ @@ -171,21 +193,23 @@ class HfTrainerDeepSpeedConfig(HfDeepSpeedConfig): Now we we can complete the configuration process. """ - config = self.config - # zero - config_zero = config.get("zero_optimization", {}) if self.is_zero3(): # automatically assign the optimal config values based on model config hidden_size = model.config.hidden_size - _set_if_auto(config_zero, "reduce_bucket_size", hidden_size * hidden_size) - _set_if_auto(config_zero, "stage3_prefetch_bucket_size", 0.9 * hidden_size * hidden_size) - _set_if_auto(config_zero, "stage3_param_persistence_threshold", 10 * hidden_size) + self.fill_only("zero_optimization.reduce_bucket_size", hidden_size * hidden_size) + self.fill_only("zero_optimization.stage3_prefetch_bucket_size", 0.9 * hidden_size * hidden_size) + self.fill_only("zero_optimization.stage3_param_persistence_threshold", 10 * hidden_size) # scheduler - config_sched = config.get("scheduler", {}) - config_sched_params = config_sched.get("params", {}) - _set_if_auto(config_sched_params, "total_num_steps", num_training_steps) + self.fill_match("scheduler.params.total_num_steps", num_training_steps, "num_training_steps (calculated)") + + if len(self.mismatches) > 0: + mismatches = "\n".join(self.mismatches) + raise ValueError( + f"Please correct the following DeepSpeed config values that mismatch TrainingArguments values:\n{mismatches}\n" + "The easiest method is to set these DeepSpeed config values to 'auto'." + ) # keep the config object global to be able to access it anywhere during TrainingArguments life-cycle diff --git a/tests/deepspeed/test_deepspeed.py b/tests/deepspeed/test_deepspeed.py index 149cbd4d20..3e7412b7f9 100644 --- a/tests/deepspeed/test_deepspeed.py +++ b/tests/deepspeed/test_deepspeed.py @@ -205,6 +205,58 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): # --- These tests are enough to run on one of zero stages --- # + def test_hf_ds_config_mismatch(self): + + ds_config = self.get_config_dict(ZERO2) + + # Purposefully configure these values to mismatch TrainingArguments values. + # This currently doesn't cover all keys (but it could) + per_device_train_batch_size = 2 + ds_config["train_micro_batch_size_per_gpu"] = per_device_train_batch_size + 2 + + ds_config["train_batch_size"] = 1000 + + gradient_accumulation_steps = 2 + ds_config["gradient_accumulation_steps"] = gradient_accumulation_steps + 2 + + max_grad_norm = 1.0 + ds_config["gradient_clipping"] = max_grad_norm + 0.1 + + adam_beta1, adam_beta2 = 0.9, 0.99 + ds_config["optimizer"]["params"]["betas"] = [adam_beta1 - 0.1, adam_beta2 - 0.1] + + fp16 = True + ds_config["fp16"]["enabled"] = not fp16 + + keys = [ + "per_device_train_batch_size", + "train_batch_size", + "gradient_accumulation_steps", + "max_grad_norm", + "betas", + "fp16", + ] + + with mockenv_context(**self.dist_env_1_gpu): + trainer = get_regression_trainer( + local_rank=0, + fp16=fp16, + deepspeed=ds_config, + per_device_train_batch_size=per_device_train_batch_size, + gradient_accumulation_steps=gradient_accumulation_steps, + max_grad_norm=max_grad_norm, + adam_beta1=adam_beta1, + adam_beta2=adam_beta2, + ) + with self.assertRaises(Exception) as context: + trainer.train() + + for key in keys: + self.assertTrue( + key in str(context.exception), + f"{key} is not in the exception message:\n{context.exception}", + ) + # Test various combos # 1. DS scheduler + DS optimizer: this is already tested by most other tests # 2. HF scheduler + HF optimizer: @@ -219,7 +271,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): del ds_config_zero2_dict["scheduler"] # force default HF Trainer scheduler ds_config_zero2_dict["zero_optimization"]["cpu_offload"] = False ds_config_zero2_dict["fp16"]["initial_scale_power"] = 1 # force optimizer on the first step - trainer = get_regression_trainer(a=a, local_rank=0, deepspeed=ds_config_zero2_dict) + trainer = get_regression_trainer(a=a, local_rank=0, fp16=True, deepspeed=ds_config_zero2_dict) trainer.train() new_a = trainer.model.a.item() self.assertNotEqual(new_a, a) @@ -231,7 +283,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): del ds_config_zero2_dict["optimizer"] # force default HF Trainer optimizer ds_config_zero2_dict["zero_optimization"]["cpu_offload"] = False ds_config_zero2_dict["fp16"]["initial_scale_power"] = 1 # force optimizer on the first step - trainer = get_regression_trainer(a=a, local_rank=0, deepspeed=ds_config_zero2_dict) + trainer = get_regression_trainer(a=a, local_rank=0, fp16=True, deepspeed=ds_config_zero2_dict) trainer.train() new_a = trainer.model.a.item() self.assertNotEqual(new_a, a) @@ -243,7 +295,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): del ds_config_zero2_dict["scheduler"] # force default HF Trainer scheduler ds_config_zero2_dict["zero_optimization"]["cpu_offload"] = False ds_config_zero2_dict["fp16"]["initial_scale_power"] = 1 # force optimizer on the first step - trainer = get_regression_trainer(local_rank=0, deepspeed=ds_config_zero2_dict) + trainer = get_regression_trainer(local_rank=0, fp16=True, deepspeed=ds_config_zero2_dict) with self.assertRaises(Exception) as context: trainer.train() self.assertTrue( @@ -261,7 +313,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): ds_config_zero3_dict = self.get_config_dict(ZERO3) ds_config_zero3_dict["zero_optimization"]["offload_optimizer"] = nvme_config ds_config_zero3_dict["zero_optimization"]["offload_param"] = nvme_config - trainer = get_regression_trainer(local_rank=0, deepspeed=ds_config_zero3_dict) + trainer = get_regression_trainer(local_rank=0, fp16=True, deepspeed=ds_config_zero3_dict) with CaptureLogger(deepspeed_logger) as cl: trainer.train() self.assertIn("DeepSpeed info", cl.out, "expected DeepSpeed logger output but got none") @@ -279,7 +331,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): elif stage == "stage3": ds_config_dict["zero_optimization"]["offload_optimizer"]["device"] = "cpu" with mockenv_context(**self.dist_env_1_gpu): - trainer = get_regression_trainer(local_rank=0, deepspeed=ds_config_dict) + trainer = get_regression_trainer(local_rank=0, fp16=True, deepspeed=ds_config_dict) with self.assertRaises(Exception) as context: trainer.train() self.assertIn( @@ -297,7 +349,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): # it's run not as a first test as `sys.stdout` will no longer be the same. So we either have # to reset `deepspeed_logger.handlers[0].setStream(sys.stdout)` or directly capture from the deepspeed_logger. with mockenv_context(**self.dist_env_1_gpu): - trainer = get_regression_trainer(local_rank=0, deepspeed=self.get_config_dict(stage)) + trainer = get_regression_trainer(local_rank=0, fp16=True, deepspeed=self.get_config_dict(stage)) with CaptureLogger(deepspeed_logger) as cl: trainer.train() self.assertIn("DeepSpeed info", cl.out, "expected DeepSpeed logger output but got none") @@ -317,6 +369,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): b=b, local_rank=0, train_len=8, + fp16=True, deepspeed=self.get_config_dict(stage), per_device_train_batch_size=8, logging_steps=1, @@ -360,6 +413,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): b=b, local_rank=0, train_len=train_len, + fp16=True, deepspeed=self.get_config_dict(stage), per_device_train_batch_size=8, gradient_accumulation_steps=1, @@ -377,6 +431,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): b=b, local_rank=0, train_len=train_len, + fp16=True, deepspeed=self.get_config_dict(stage), per_device_train_batch_size=4, gradient_accumulation_steps=2, @@ -450,6 +505,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): trainer = get_regression_trainer( output_dir=output_dir, save_steps=freq, + fp16=True, deepspeed=ds_config_dict, ) trainer.train() @@ -463,7 +519,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): with mockenv_context(**self.dist_env_1_gpu): ds_config_dict = self.get_config_dict(stage) output_dir = self.get_auto_remove_tmp_dir() - trainer = get_regression_trainer(output_dir=output_dir, deepspeed=ds_config_dict) + trainer = get_regression_trainer(output_dir=output_dir, fp16=True, deepspeed=ds_config_dict) # 1. fail to find any checkpoint - due a fresh output_dir with self.assertRaises(Exception) as context: @@ -491,7 +547,9 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): if stage == ZERO3: ds_config_dict["zero_optimization"]["stage3_gather_fp16_weights_on_model_save"] = True - kwargs = dict(output_dir=output_dir, train_len=128, save_steps=5, learning_rate=0.1, deepspeed=ds_config_dict) + kwargs = dict( + output_dir=output_dir, train_len=128, save_steps=5, learning_rate=0.1, fp16=True, deepspeed=ds_config_dict + ) with mockenv_context(**self.dist_env_1_gpu): trainer = get_regression_trainer(**kwargs) @@ -528,12 +586,12 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon): # test that we can switch from zero2 to zero3 in the same process for example # test is_zero, etc. output_dir = self.get_auto_remove_tmp_dir() - kwargs = dict(output_dir=output_dir, train_len=8) + kwargs = dict(output_dir=output_dir, train_len=8, fp16=True) + + ds_config_zero3_dict = self.get_config_dict("zero3") + ds_config_zero2_dict = self.get_config_dict("zero2") with mockenv_context(**self.dist_env_1_gpu): - ds_config_zero3_dict = self.get_config_dict("zero3") - ds_config_zero2_dict = self.get_config_dict("zero2") - trainer = get_regression_trainer(deepspeed=ds_config_zero3_dict, **kwargs) self.assertTrue(is_deepspeed_zero3_enabled())