[DeepSpeed] fp32 support (#11499)

* prep for deepspeed==0.3.16

* new version

* too soon

* support and test fp32 mode

* troubleshooting doc start

* workaround no longer needed

* add fp32 doc

* style

* cleanup, add tf32 note

* clarify

* release was made
This commit is contained in:
Stas Bekman
2021-04-30 12:51:48 -07:00
committed by GitHub
parent 282f3ac3ef
commit 4e7bf94e72
6 changed files with 139 additions and 70 deletions

View File

@@ -48,6 +48,7 @@ with ExtendSysPath(f"{bindir}/.."):
set_seed(42)
MBART_TINY = "sshleifer/tiny-mbart"
T5_SMALL = "t5-small"
T5_TINY = "patrickvonplaten/t5-tiny-random"
def load_json(path):
@@ -108,25 +109,31 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon):
MASTER_ADDR="localhost", MASTER_PORT="10999", RANK="0", LOCAL_RANK="0", WORLD_SIZE="1"
)
self.ds_config_file = {}
self.ds_config_file[ZERO2] = f"{self.test_file_dir_str}/ds_config_zero2.json"
self.ds_config_file[ZERO3] = f"{self.test_file_dir_str}/ds_config_zero3.json"
self.ds_config_file = dict(
zero2=f"{self.test_file_dir_str}/ds_config_zero2.json",
zero3=f"{self.test_file_dir_str}/ds_config_zero3.json",
)
# use self.get_config_dict(stage) to use these to ensure the original is not modified
self.ds_config_dict = {}
with io.open(self.ds_config_file[ZERO2], "r", encoding="utf-8") as f:
self.ds_config_dict[ZERO2] = json.load(f)
config_zero2 = json.load(f)
# by default use fp16
config_zero2["fp16"]["enabled"] = True
with io.open(self.ds_config_file[ZERO3], "r", encoding="utf-8") as f:
self.ds_config_dict[ZERO3] = json.load(f)
def get_config_dict(self, stage):
"""As the tests modify the dict, always make a copy"""
config = deepcopy(self.ds_config_dict[stage])
if stage == ZERO3:
config_zero3 = json.load(f)
# by default use fp16
config_zero3["fp16"]["enabled"] = True
# This setting slows things down, so don't enable it by default unless needed by a test.
# It's in the file as a demo for users since we want everything to work out of the box even if slower.
config["zero_optimization"]["stage3_gather_fp16_weights_on_model_save"] = False
return config
config_zero3["zero_optimization"]["stage3_gather_fp16_weights_on_model_save"] = False
self.ds_config_dict = dict(
zero2=config_zero2,
zero3=config_zero3,
)
def get_config_dict(self, stage):
# As some tests modify the dict, always make a copy
return deepcopy(self.ds_config_dict[stage])
# --- These tests are enough to run on one of zero stages --- #
@@ -192,24 +199,6 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon):
# --- These tests need to run on both zero stages --- #
@parameterized.expand(stages)
def test_fp32(self, stage):
ds_config_dict = self.get_config_dict(stage)
ds_config_dict["fp16"]["enabled"] = False # force non-fp16 mode
# XXX: do we go via from_pretrained in zero 3 here? need to test zero.Init(dtype=torch.float)
# XXX: rewrite this test once fp32 is supported by DeepSpeed
with mockenv_context(**self.dist_env_1_gpu):
trainer = get_regression_trainer(local_rank=0, deepspeed=ds_config_dict)
with self.assertRaises(Exception) as context:
trainer.train()
self.assertIn(
"ZeRO is only supported if fp16 is enabled",
str(context.exception),
f"got exception: {context.exception}",
)
@parameterized.expand(stages)
def test_hf_optimizer_with_offload(self, stage):
# must not allow non-DS optimizer when using ZERO-offload
@@ -239,7 +228,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon):
# it's run not as a first test as `sys.stdout` will no longer be the same. So we either have
# to reset `deepspeed_logger.handlers[0].setStream(sys.stdout)` or directly capture from the deepspeed_logger.
with mockenv_context(**self.dist_env_1_gpu):
trainer = get_regression_trainer(local_rank=0, deepspeed=self.ds_config_file[stage])
trainer = get_regression_trainer(local_rank=0, deepspeed=self.get_config_dict(stage))
with CaptureLogger(deepspeed_logger) as cs:
trainer.train()
self.assertIn("DeepSpeed info", cs.out, "expected DeepSpeed logger output but got none")
@@ -259,7 +248,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon):
b=b,
local_rank=0,
train_len=8,
deepspeed=self.ds_config_file[stage],
deepspeed=self.get_config_dict(stage),
per_device_train_batch_size=8,
logging_steps=1,
)
@@ -267,7 +256,11 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon):
post_train_a = trainer.model.a.item()
# XXX: for some reason the following check fails with zero3 - not a broken but a
# different qualitative outcome - need to investigate at some point
# different qualitative outcome - as if optimizer did run
# oddly getting 1.0 for both a and b from 0.0 - there is a bug somewhere
# print(trainer.model.a.item())
# print(trainer.model.b.item())
# need to investigate at some point
if stage == ZERO3:
return
@@ -298,7 +291,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon):
b=b,
local_rank=0,
train_len=train_len,
deepspeed=self.ds_config_file[stage],
deepspeed=self.get_config_dict(stage),
per_device_train_batch_size=8,
gradient_accumulation_steps=1,
)
@@ -315,7 +308,7 @@ class TrainerIntegrationDeepSpeed(TestCasePlus, TrainerIntegrationCommon):
b=b,
local_rank=0,
train_len=train_len,
deepspeed=self.ds_config_file[stage],
deepspeed=self.get_config_dict(stage),
per_device_train_batch_size=4,
gradient_accumulation_steps=2,
)
@@ -532,6 +525,35 @@ class TestDeepSpeedWithLauncher(TestCasePlus):
do_eval=True,
)
@parameterized.expand(stages)
def test_fp32_non_distributed(self, stage):
# real model needs too much GPU memory under stage2+fp32, so using tiny random model here -
# therefore no quality checks, just basic completion checks are done
self.run_and_check(
stage=stage,
model_name=T5_TINY,
distributed=False,
do_train=True,
do_eval=True,
quality_checks=False,
fp16=False,
)
@require_torch_multi_gpu
@parameterized.expand(stages)
def test_fp32_distributed(self, stage):
# real model needs too much GPU memory under stage2+fp32, so using tiny random model here -
# therefore no quality checks, just basic completion checks are done
self.run_and_check(
stage=stage,
model_name=T5_TINY,
distributed=True,
do_train=True,
do_eval=True,
quality_checks=False,
fp16=False,
)
@parameterized.expand(stages)
def test_resume_train_not_from_ds_checkpoint(self, stage):
# do normal training and then resume not from the deepspeed checkpoint but explicitly from
@@ -550,44 +572,50 @@ class TestDeepSpeedWithLauncher(TestCasePlus):
self.do_checks(output_dir, do_train=do_train, do_eval=do_eval)
def do_checks(self, output_dir, do_train=True, do_eval=True):
def do_checks(self, output_dir, do_train=True, do_eval=True, quality_checks=True):
if do_train:
train_metrics = load_json(os.path.join(output_dir, "train_results.json"))
self.assertIn("train_samples_per_second", train_metrics)
self.assertGreater(train_metrics["train_samples_per_second"], 0.5)
if quality_checks:
self.assertGreater(train_metrics["train_samples_per_second"], 0.5)
if do_eval:
eval_metrics = load_json(os.path.join(output_dir, "eval_results.json"))
self.assertIn("eval_bleu", eval_metrics)
self.assertGreater(eval_metrics["eval_bleu"], 0)
if quality_checks:
self.assertGreater(eval_metrics["eval_bleu"], 1)
# XXX: need to do better validation beyond just that the run was successful
def run_and_check(
self,
stage,
eval_steps=10,
distributed=True,
do_train=True,
do_eval=True,
extra_args_str=None,
remove_args_str=None,
model_name: str = T5_SMALL,
eval_steps: int = 10,
distributed: bool = True,
do_train: bool = True,
do_eval: bool = True,
quality_checks: bool = True,
fp16: bool = True,
extra_args_str: str = None,
remove_args_str: str = None,
):
# we are doing quality testing so using a small real model
output_dir = self.run_trainer(
stage=stage,
model_name=T5_SMALL,
model_name=model_name,
eval_steps=eval_steps,
num_train_epochs=1,
do_train=do_train,
do_eval=do_eval,
distributed=distributed,
fp16=fp16,
extra_args_str=extra_args_str,
remove_args_str=remove_args_str,
)
self.do_checks(output_dir, do_train=do_train, do_eval=do_eval)
self.do_checks(output_dir, do_train=do_train, do_eval=do_eval, quality_checks=quality_checks)
return output_dir
@@ -600,6 +628,7 @@ class TestDeepSpeedWithLauncher(TestCasePlus):
do_train: bool = False,
do_eval: bool = True,
distributed: bool = True,
fp16: bool = True,
extra_args_str: str = None,
remove_args_str: str = None,
):
@@ -629,6 +658,9 @@ class TestDeepSpeedWithLauncher(TestCasePlus):
""".split()
args.extend(["--source_prefix", '"translate English to Romanian: "'])
if fp16:
args.extend(["--fp16"])
actions = 0
if do_train:
actions += 1
@@ -636,7 +668,7 @@ class TestDeepSpeedWithLauncher(TestCasePlus):
f"""
--do_train
--num_train_epochs {str(num_train_epochs)}
--max_train_samples 100
--max_train_samples 16
--per_device_train_batch_size 2
--learning_rate 3e-3
""".split()
@@ -647,7 +679,7 @@ class TestDeepSpeedWithLauncher(TestCasePlus):
args.extend(
"""
--do_eval
--max_eval_samples 100
--max_eval_samples 16
--per_device_eval_batch_size 2
""".split()
)
@@ -688,13 +720,14 @@ class TestDeepSpeedWithLauncher(TestCasePlus):
--overwrite_output_dir
--do_train
--do_eval
--max_train_samples 10
--max_eval_samples 10
--per_device_train_batch_size 5
--per_device_eval_batch_size 5
--max_train_samples 16
--max_eval_samples 16
--per_device_train_batch_size 2
--per_device_eval_batch_size 2
--num_train_epochs 1
--warmup_steps 8
--block_size 128
--block_size 64
--fp16
--report_to none
""".split()