From 382ba670ed2376a9454c3c841fae4819118ec4f5 Mon Sep 17 00:00:00 2001 From: Sourab Mangrulkar <13534540+pacman100@users.noreply.github.com> Date: Wed, 20 Sep 2023 10:26:16 +0530 Subject: [PATCH] FSDP tests and checkpointing fixes (#26180) * add fsdp tests * Update test_fsdp.py * Update test_fsdp.py * fixes * checks * Update trainer.py * fix * fixes for saving/resuming checkpoints * fixes * add tests and delete debug statements * fixing tests * Update test_fsdp.py * fix tests * fix tests * minor nits * fix code style and quality * refactor and modularize test code * reduce the time of tests * reduce the test time * fix test * reduce test time * reduce test time * fix failing tests * fix * Apply suggestions from code review Co-authored-by: Arthur <48595927+ArthurZucker@users.noreply.github.com> * resolve comments --------- Co-authored-by: Arthur <48595927+ArthurZucker@users.noreply.github.com> --- src/transformers/modeling_utils.py | 2 +- src/transformers/testing_utils.py | 10 + src/transformers/trainer.py | 66 ++++--- src/transformers/utils/__init__.py | 1 + src/transformers/utils/import_utils.py | 4 + tests/fsdp/test_fsdp.py | 251 +++++++++++++++++++++++++ 6 files changed, 310 insertions(+), 24 deletions(-) create mode 100644 tests/fsdp/test_fsdp.py diff --git a/src/transformers/modeling_utils.py b/src/transformers/modeling_utils.py index 057caaedba..1432c3b78a 100644 --- a/src/transformers/modeling_utils.py +++ b/src/transformers/modeling_utils.py @@ -128,7 +128,7 @@ def is_fsdp_enabled(): def is_fsdp_enabled_and_dist_rank_0(): - return is_fsdp_enabled() and torch.distributed.get_rank() == 0 + return is_fsdp_enabled() and int(os.environ.get("LOCAL_RANK", -1)) == 0 if is_sagemaker_mp_enabled(): diff --git a/src/transformers/testing_utils.py b/src/transformers/testing_utils.py index ccdc7eb6dd..eaa44ff224 100644 --- a/src/transformers/testing_utils.py +++ b/src/transformers/testing_utils.py @@ -61,6 +61,7 @@ from .utils import ( is_essentia_available, is_faiss_available, is_flax_available, + is_fsdp_available, is_ftfy_available, is_ipex_available, is_jieba_available, @@ -316,6 +317,15 @@ def require_accelerate(test_case): return unittest.skipUnless(is_accelerate_available(), "test requires accelerate")(test_case) +def require_fsdp(test_case, min_version: str = "1.12.0"): + """ + Decorator marking a test that requires fsdp. These tests are skipped when fsdp isn't installed. + """ + return unittest.skipUnless(is_fsdp_available(min_version), f"test requires torch version >= {min_version}")( + test_case + ) + + def require_safetensors(test_case): """ Decorator marking a test that requires safetensors. These tests are skipped when safetensors isn't installed. diff --git a/src/transformers/trainer.py b/src/transformers/trainer.py index 168269da48..3cad3cbfec 100755 --- a/src/transformers/trainer.py +++ b/src/transformers/trainer.py @@ -1700,9 +1700,6 @@ class Trainer: model = self._wrap_model(self.model_wrapped) - if (is_sagemaker_mp_enabled() or self.is_fsdp_enabled) and resume_from_checkpoint is not None: - self._load_from_checkpoint(resume_from_checkpoint, model) - # as the model is wrapped, don't use `accelerator.prepare` # this is for unhandled cases such as # Fairscale Sharded DDP, FSDP-XLA, SageMaker MP/DP, DataParallel, IPEX @@ -1728,7 +1725,7 @@ class Trainer: ) if self.is_fsdp_enabled: - self.model = model + self.model = self.model_wrapped = model # for the rest of this function `model` is the outside model, whether it was wrapped or not if model is not self.model: @@ -1738,16 +1735,20 @@ class Trainer: if self.is_deepspeed_enabled: self.deepspeed = self.model_wrapped - # deepspeed ckpt loading - if resume_from_checkpoint is not None and self.is_deepspeed_enabled: - deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint) + # ckpt loading + if resume_from_checkpoint is not None: + if self.is_deepspeed_enabled: + deepspeed_load_checkpoint(self.model_wrapped, resume_from_checkpoint) + elif is_sagemaker_mp_enabled() or self.is_fsdp_enabled: + self._load_from_checkpoint(resume_from_checkpoint, self.model_wrapped) # Check if saved optimizer or scheduler states exist self._load_optimizer_and_scheduler(resume_from_checkpoint) # important: at this point: # self.model is the Transformers Model - # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), etc. + # self.model_wrapped is DDP(Transformers Model), Deepspeed(Transformers Model), + # FSDP(Transformers Model), Dynamo Optimized Module(Transformers Model) etc. # Train! logger.info("***** Running training *****") @@ -2088,17 +2089,28 @@ class Trainer: weights_index_file = os.path.join(resume_from_checkpoint, WEIGHTS_INDEX_NAME) safe_weights_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_NAME) safe_weights_index_file = os.path.join(resume_from_checkpoint, SAFE_WEIGHTS_INDEX_NAME) + is_fsdp_ckpt = os.path.isdir(resume_from_checkpoint) and any( + WEIGHTS_NAME.split(".")[0] in folder_name + for folder_name in os.listdir(resume_from_checkpoint) + if os.path.isdir(os.path.join(resume_from_checkpoint, folder_name)) + ) - if not any( - os.path.isfile(f) - for f in [ - weights_file, - safe_weights_file, - weights_index_file, - safe_weights_index_file, - adapter_weights_file, - adapter_safe_weights_file, - ] + if is_fsdp_ckpt and not self.is_fsdp_enabled: + raise ValueError(f"Checkpoint found at {resume_from_checkpoint} is only supported when using PyTorch FSDP") + + if not ( + any( + os.path.isfile(f) + for f in [ + weights_file, + safe_weights_file, + weights_index_file, + safe_weights_index_file, + adapter_weights_file, + adapter_safe_weights_file, + ] + ) + or is_fsdp_ckpt ): raise ValueError(f"Can't find a valid checkpoint at {resume_from_checkpoint}") @@ -2114,7 +2126,7 @@ class Trainer: "yield to errors or unwanted behaviors." ) - if os.path.isfile(weights_file) or os.path.isfile(safe_weights_file): + if os.path.isfile(weights_file) or os.path.isfile(safe_weights_file) or is_fsdp_ckpt: # If the model is on the GPU, it still works! if is_sagemaker_mp_enabled(): if os.path.isfile(os.path.join(resume_from_checkpoint, "user_content.pt")): @@ -2184,6 +2196,10 @@ class Trainer: model = self.model_wrapped if is_sagemaker_mp_enabled() else self.model if self.is_deepspeed_enabled: deepspeed_load_checkpoint(self.model_wrapped, self.state.best_model_checkpoint) + elif self.is_fsdp_enabled: + load_result = load_fsdp_model( + self.accelerator.state.fsdp_plugin, self.accelerator, model, self.state.best_model_checkpoint + ) elif ( os.path.exists(best_model_path) or os.path.exists(best_safe_model_path) @@ -2211,10 +2227,6 @@ class Trainer: state_dict["_smp_is_partial"] = False load_result = model.load_state_dict(state_dict, strict=True) - elif self.is_fsdp_enabled: - load_result = load_fsdp_model( - self.accelerator.state.fsdp_plugin, self.accelerator, model, self.state.best_model_checkpoint - ) else: if is_peft_available() and isinstance(model, PeftModel): # If train a model using PEFT & LoRA, assume that adapter have been saved properly. @@ -2503,6 +2515,14 @@ class Trainer: else ( os.path.isfile(os.path.join(checkpoint, OPTIMIZER_NAME)) or os.path.isfile(os.path.join(checkpoint, OPTIMIZER_NAME_BIN)) + or ( + os.path.isdir(checkpoint) + and any( + OPTIMIZER_NAME_BIN.split(".")[0] in folder_name + for folder_name in os.listdir(checkpoint) + if os.path.isdir(os.path.join(checkpoint, folder_name)) + ) + ) ) ) if checkpoint_file_exists and os.path.isfile(os.path.join(checkpoint, SCHEDULER_NAME)): diff --git a/src/transformers/utils/__init__.py b/src/transformers/utils/__init__.py index ac9beb7856..5ee787738d 100644 --- a/src/transformers/utils/__init__.py +++ b/src/transformers/utils/__init__.py @@ -115,6 +115,7 @@ from .import_utils import ( is_essentia_available, is_faiss_available, is_flax_available, + is_fsdp_available, is_ftfy_available, is_in_notebook, is_ipex_available, diff --git a/src/transformers/utils/import_utils.py b/src/transformers/utils/import_utils.py index aeb351db96..345a2cba76 100644 --- a/src/transformers/utils/import_utils.py +++ b/src/transformers/utils/import_utils.py @@ -606,6 +606,10 @@ def is_accelerate_available(min_version: str = None): return _accelerate_available +def is_fsdp_available(min_version: str = "1.12.0"): + return version.parse(_torch_version) >= version.parse(min_version) + + def is_optimum_available(): return _optimum_available diff --git a/tests/fsdp/test_fsdp.py b/tests/fsdp/test_fsdp.py new file mode 100644 index 0000000000..1a968f68fa --- /dev/null +++ b/tests/fsdp/test_fsdp.py @@ -0,0 +1,251 @@ +# Copyright 2023 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import os +from functools import partial + +from parameterized import parameterized + +import tests.trainer.test_trainer +from tests.trainer.test_trainer import TrainerIntegrationCommon # noqa +from transformers import is_torch_available +from transformers.testing_utils import ( + TestCasePlus, + execute_subprocess_async, + get_gpu_count, + mockenv_context, + require_accelerate, + require_fsdp, + require_torch_gpu, + require_torch_multi_gpu, + slow, +) +from transformers.trainer_callback import TrainerState +from transformers.trainer_utils import FSDPOption, set_seed +from transformers.utils import is_accelerate_available, is_torch_bf16_gpu_available + + +# default torch.distributed port +DEFAULT_MASTER_PORT = "10999" +dtypes = ["fp16"] +if is_torch_bf16_gpu_available(): + dtypes += ["bf16"] +sharding_strategies = ["full_shard", "shard_grad_op"] +state_dict_types = ["FULL_STATE_DICT", "SHARDED_STATE_DICT"] +set_seed(42) +params = list(itertools.product(sharding_strategies, dtypes)) + + +def get_master_port(real_launcher=False): + """ + When using a single gpu launcher emulation (i.e. not deepspeed or python -m torch.distributed) + the issue is that once the port is tied it can't be used anywhere else outside of this process, + since torch.dist doesn't free the port until the process exits. Therefore for the sake of being + able to run both emulated launcher and normal launcher tests we need 2 distinct ports. + + This function will give the right port in the right context. For real launcher it'll give the + base port, for emulated launcher it'll give the base port + 1. In both cases a string is + returned. + + Args: + `real_launcher`: whether a real launcher is going to be used, or the emulated one + + """ + + master_port_base = os.environ.get("DS_TEST_PORT", DEFAULT_MASTER_PORT) + if not real_launcher: + master_port_base = str(int(master_port_base) + 1) + return master_port_base + + +if is_torch_available(): + from tests.trainer.test_trainer import ( # noqa + RegressionModelConfig, + RegressionPreTrainedModel, + ) + + # hack to restore original logging level pre #21700 + get_regression_trainer = partial(tests.trainer.test_trainer.get_regression_trainer, log_level="info") + +if is_accelerate_available(): + from accelerate.utils.constants import ( + FSDP_PYTORCH_VERSION, + FSDP_SHARDING_STRATEGY, + ) + + require_fsdp_version = partial(require_fsdp, min_version=FSDP_PYTORCH_VERSION) + + +def get_launcher(distributed=False, use_accelerate=False): + # 1. explicitly set --num_nodes=1 just in case these tests end up run on a multi-node setup + # - it won't be able to handle that + # 2. for now testing with just 2 gpus max (since some quality tests may give different + # results with mode gpus because we use very little data) + num_gpus = min(2, get_gpu_count()) if distributed else 1 + master_port = get_master_port(real_launcher=True) + if use_accelerate: + return f"""accelerate launch + --num_processes {num_gpus} + --main_process_port {master_port} + --use_fsdp + --fsdp_auto_wrap_policy TRANSFORMER_BASED_WRAP + --fsdp_state_dict_type SHARDED_STATE_DICT + --fsdp_transformer_layer_cls_to_wrap BertLayer""".split() + return f"torchrun --nnodes 1 --nproc-per-node {num_gpus} --master-port {master_port}".split() + + +def _parameterized_custom_name_func(func, param_num, param): + # customize the test name generator function as we want both params to appear in the sub-test + # name, as by default it shows only the first param + param_based_name = parameterized.to_safe_name("_".join(str(x) for x in param.args)) + return f"{func.__name__}_{param_based_name}" + + +@require_accelerate +@require_torch_gpu +@require_fsdp_version +class TrainerIntegrationFSDP(TestCasePlus, TrainerIntegrationCommon): + def setUp(self): + super().setUp() + master_port = get_master_port(real_launcher=False) + self.dist_env_1_gpu = { + "MASTER_ADDR": "localhost", + "MASTER_PORT": master_port, + "RANK": "0", + "LOCAL_RANK": "0", + "WORLD_SIZE": "1", + } + + self.fsdp_config = { + "backward_prefetch": "backward_pre", + "forward_prefetch": "False", + "limit_all_gathers": "False", + "use_orig_params": "True", + "sync_module_states": "True", + "activation_checkpointing": "False", + "min_num_params": 1, + } + + def tearDown(self): + super().tearDown() + + @parameterized.expand(params, name_func=_parameterized_custom_name_func) + def test_fsdp_config(self, sharding_strategy, dtype): + output_dir = self.get_auto_remove_tmp_dir() + kwargs = { + "output_dir": output_dir, + "train_len": 128, + "save_steps": 5, + "learning_rate": 0.1, + "fsdp": f"{sharding_strategy} offload auto_wrap", + "fsdp_config": self.fsdp_config, + } + kwargs[dtype] = True + with mockenv_context(**self.dist_env_1_gpu): + trainer = get_regression_trainer(**kwargs) + self.assertEqual(trainer.args.fsdp[0], sharding_strategy) + self.assertEqual(trainer.args.fsdp[1], FSDPOption.OFFLOAD) + self.assertEqual(trainer.args.fsdp[2], FSDPOption.AUTO_WRAP) + for k, v in trainer.args.fsdp_config.items(): + self.assertEqual(v, self.fsdp_config[k]) + self.assertEqual(os.environ.get("ACCELERATE_USE_FSDP", "false"), "true") + + @parameterized.expand(params, name_func=_parameterized_custom_name_func) + @require_torch_multi_gpu + @slow + def test_basic_run(self, sharding_strategy, dtype): + launcher = get_launcher(distributed=True, use_accelerate=False) + output_dir = self.get_auto_remove_tmp_dir() + args = self.get_base_args(output_dir, 1, 50).split() + [f"--{dtype}"] + fsdp_args = ["--fsdp", f"{sharding_strategy} auto_wrap", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] + script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] + cmd = launcher + script + args + fsdp_args + execute_subprocess_async(cmd, env=self.get_env()) + + @parameterized.expand(dtypes) + @require_torch_multi_gpu + @slow + def test_basic_run_with_cpu_offload(self, dtype): + launcher = get_launcher(distributed=True, use_accelerate=False) + output_dir = self.get_auto_remove_tmp_dir() + args = self.get_base_args(output_dir, 1, 50).split() + [f"--{dtype}", "--max_steps", "10"] + fsdp_args = ["--fsdp", "full_shard auto_wrap offload", "--fsdp_transformer_layer_cls_to_wrap", "BertLayer"] + script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] + cmd = launcher + script + args + fsdp_args + execute_subprocess_async(cmd, env=self.get_env()) + + @parameterized.expand(state_dict_types, name_func=_parameterized_custom_name_func) + @require_torch_multi_gpu + @slow + def test_training_and_can_resume_normally(self, state_dict_type): + output_dir = self.get_auto_remove_tmp_dir("./xxx", after=False) + + sharding_strategy = "full_shard" + use_accelerate = state_dict_type == "SHARDED_STATE_DICT" + launcher = get_launcher(True, use_accelerate=use_accelerate) + args = self.get_base_args(output_dir, 2, 25).split() + script = [f"{self.examples_dir_str}/pytorch/text-classification/run_glue.py"] + logs = self.run_cmd_and_get_logs(use_accelerate, sharding_strategy, launcher, script, args, output_dir) + + # resume from ckpt + checkpoint = os.path.join(output_dir, "checkpoint-115") + resume_args = args + f"--resume_from_checkpoint {checkpoint}".split() + logs_resume = self.run_cmd_and_get_logs( + use_accelerate, sharding_strategy, launcher, script, resume_args, output_dir + ) + + for log, log1 in zip(logs, logs_resume): + if "learning_rate" in log: + self.assertAlmostEqual(log["learning_rate"], log1["learning_rate"], delta=1e-5) + + def run_cmd_and_get_logs(self, use_accelerate, sharding_strategy, launcher, script, args, output_dir): + if not use_accelerate: + fsdp_args = [ + "--fsdp", + f"{sharding_strategy} auto_wrap", + "--fsdp_transformer_layer_cls_to_wrap", + "BertLayer", + ] + cmd = launcher + script + args + fsdp_args + else: + fsdp_config = f""" + --fsdp_sharding_strategy {FSDP_SHARDING_STRATEGY.index(sharding_strategy.upper()) + 1} + """.split() + cmd = launcher + fsdp_config + script + args + + # keep for quick debug + # print(" ".join([f"\nPYTHONPATH={self.src_dir_str}"] +cmd)); die + execute_subprocess_async(cmd, env=self.get_env()) + logs = TrainerState.load_from_json(os.path.join(output_dir, "trainer_state.json")).log_history + return logs + + def get_base_args(self, output_dir, num_epochs, logging_steps): + return f""" + --model_name_or_path bert-base-cased + --task_name mrpc + --output_dir {output_dir} + --overwrite_output_dir + --do_train + --max_seq_length 128 + --per_device_train_batch_size 16 + --learning_rate 5e-5 + --num_train_epochs {num_epochs} + --lr_scheduler_type cosine + --logging_steps {logging_steps} + --save_strategy epoch + --do_eval + --evaluation_strategy epoch + --report_to none + """