Add AWS Neuron torchrun support (#20806)
* Add XLA torchrun support * Clarify that currently DDP doesn't work with torch.distributed XLA backend yet * Enable DDP with torchrun and XLA (now available in PT-XLA 1.13) * Add check for AWS Neuron availability and AWS Neuron specific compiler flag * Change the new test's name to TestTrainerDistributedNeuronCore * Remove "assert" and replace raised exception * Remove compiler flag as it is optional. If needed, will be another PR. * Use TORCHELASTIC_RUN_ID to determine whether torchrun is used
This commit is contained in:
@@ -577,6 +577,7 @@ _import_structure = {
|
|||||||
"is_timm_available",
|
"is_timm_available",
|
||||||
"is_tokenizers_available",
|
"is_tokenizers_available",
|
||||||
"is_torch_available",
|
"is_torch_available",
|
||||||
|
"is_torch_neuroncore_available",
|
||||||
"is_torch_tpu_available",
|
"is_torch_tpu_available",
|
||||||
"is_vision_available",
|
"is_vision_available",
|
||||||
"logging",
|
"logging",
|
||||||
@@ -3947,6 +3948,7 @@ if TYPE_CHECKING:
|
|||||||
is_timm_available,
|
is_timm_available,
|
||||||
is_tokenizers_available,
|
is_tokenizers_available,
|
||||||
is_torch_available,
|
is_torch_available,
|
||||||
|
is_torch_neuroncore_available,
|
||||||
is_torch_tpu_available,
|
is_torch_tpu_available,
|
||||||
is_vision_available,
|
is_vision_available,
|
||||||
logging,
|
logging,
|
||||||
|
|||||||
@@ -83,6 +83,7 @@ from .utils import (
|
|||||||
is_torch_available,
|
is_torch_available,
|
||||||
is_torch_bf16_cpu_available,
|
is_torch_bf16_cpu_available,
|
||||||
is_torch_bf16_gpu_available,
|
is_torch_bf16_gpu_available,
|
||||||
|
is_torch_neuroncore_available,
|
||||||
is_torch_tensorrt_fx_available,
|
is_torch_tensorrt_fx_available,
|
||||||
is_torch_tf32_available,
|
is_torch_tf32_available,
|
||||||
is_torch_tpu_available,
|
is_torch_tpu_available,
|
||||||
@@ -500,6 +501,15 @@ def require_torch_tpu(test_case):
|
|||||||
return unittest.skipUnless(is_torch_tpu_available(check_device=False), "test requires PyTorch TPU")(test_case)
|
return unittest.skipUnless(is_torch_tpu_available(check_device=False), "test requires PyTorch TPU")(test_case)
|
||||||
|
|
||||||
|
|
||||||
|
def require_torch_neuroncore(test_case):
|
||||||
|
"""
|
||||||
|
Decorator marking a test that requires NeuronCore (in PyTorch).
|
||||||
|
"""
|
||||||
|
return unittest.skipUnless(is_torch_neuroncore_available(check_device=False), "test requires PyTorch NeuronCore")(
|
||||||
|
test_case
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
if is_torch_available():
|
if is_torch_available():
|
||||||
# Set env var CUDA_VISIBLE_DEVICES="" to force cpu-mode
|
# Set env var CUDA_VISIBLE_DEVICES="" to force cpu-mode
|
||||||
import torch
|
import torch
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ from .utils import (
|
|||||||
is_torch_available,
|
is_torch_available,
|
||||||
is_torch_bf16_cpu_available,
|
is_torch_bf16_cpu_available,
|
||||||
is_torch_bf16_gpu_available,
|
is_torch_bf16_gpu_available,
|
||||||
|
is_torch_neuroncore_available,
|
||||||
is_torch_tf32_available,
|
is_torch_tf32_available,
|
||||||
is_torch_tpu_available,
|
is_torch_tpu_available,
|
||||||
logging,
|
logging,
|
||||||
@@ -60,6 +61,17 @@ if is_torch_available():
|
|||||||
if is_torch_tpu_available(check_device=False):
|
if is_torch_tpu_available(check_device=False):
|
||||||
import torch_xla.core.xla_model as xm
|
import torch_xla.core.xla_model as xm
|
||||||
|
|
||||||
|
if is_torch_neuroncore_available(check_device=False):
|
||||||
|
# torchrun support
|
||||||
|
# https://github.com/pytorch/xla/pull/3609
|
||||||
|
if os.environ.get("TORCHELASTIC_RUN_ID"):
|
||||||
|
import torch_xla.distributed.xla_backend as xbn
|
||||||
|
|
||||||
|
if not isinstance(torch.distributed.group.WORLD, xbn.ProcessGroupXla):
|
||||||
|
torch.distributed.init_process_group(backend="xla")
|
||||||
|
if not isinstance(torch.distributed.group.WORLD, xbn.ProcessGroupXla):
|
||||||
|
raise AssertionError("Failed to initialize torch.distributed process group using XLA backend.")
|
||||||
|
|
||||||
|
|
||||||
if is_sagemaker_mp_enabled():
|
if is_sagemaker_mp_enabled():
|
||||||
import smdistributed.modelparallel.torch as smp
|
import smdistributed.modelparallel.torch as smp
|
||||||
|
|||||||
@@ -153,6 +153,7 @@ from .import_utils import (
|
|||||||
is_torch_cuda_available,
|
is_torch_cuda_available,
|
||||||
is_torch_fx_available,
|
is_torch_fx_available,
|
||||||
is_torch_fx_proxy,
|
is_torch_fx_proxy,
|
||||||
|
is_torch_neuroncore_available,
|
||||||
is_torch_onnx_dict_inputs_support_available,
|
is_torch_onnx_dict_inputs_support_available,
|
||||||
is_torch_tensorrt_fx_available,
|
is_torch_tensorrt_fx_available,
|
||||||
is_torch_tf32_available,
|
is_torch_tf32_available,
|
||||||
|
|||||||
@@ -451,6 +451,13 @@ def is_torch_tpu_available(check_device=True):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache()
|
||||||
|
def is_torch_neuroncore_available(check_device=True):
|
||||||
|
if importlib.util.find_spec("torch_neuronx") is not None:
|
||||||
|
return is_torch_tpu_available(check_device)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def is_torchdynamo_available():
|
def is_torchdynamo_available():
|
||||||
if not is_torch_available():
|
if not is_torch_available():
|
||||||
return False
|
return False
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ from transformers.testing_utils import (
|
|||||||
execute_subprocess_async,
|
execute_subprocess_async,
|
||||||
get_torch_dist_unique_port,
|
get_torch_dist_unique_port,
|
||||||
require_torch_multi_gpu,
|
require_torch_multi_gpu,
|
||||||
|
require_torch_neuroncore,
|
||||||
)
|
)
|
||||||
from transformers.utils import logging
|
from transformers.utils import logging
|
||||||
|
|
||||||
@@ -62,6 +63,23 @@ if is_torch_available():
|
|||||||
return input_ids
|
return input_ids
|
||||||
|
|
||||||
|
|
||||||
|
class TestTrainerDistributedNeuronCore(TestCasePlus):
|
||||||
|
@require_torch_neuroncore
|
||||||
|
def test_trainer(self):
|
||||||
|
|
||||||
|
distributed_args = f"""
|
||||||
|
-m torch.distributed.launch
|
||||||
|
--nproc_per_node=2
|
||||||
|
--master_port={get_torch_dist_unique_port()}
|
||||||
|
{self.test_file_dir}/test_trainer_distributed.py
|
||||||
|
""".split()
|
||||||
|
output_dir = self.get_auto_remove_tmp_dir()
|
||||||
|
args = f"--output_dir {output_dir}".split()
|
||||||
|
cmd = [sys.executable] + distributed_args + args
|
||||||
|
execute_subprocess_async(cmd, env=self.get_env())
|
||||||
|
# successful return here == success - any errors would have caused an error in the sub-call
|
||||||
|
|
||||||
|
|
||||||
class TestTrainerDistributed(TestCasePlus):
|
class TestTrainerDistributed(TestCasePlus):
|
||||||
@require_torch_multi_gpu
|
@require_torch_multi_gpu
|
||||||
def test_trainer(self):
|
def test_trainer(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user