diff --git a/docs/source/en/main_classes/callback.mdx b/docs/source/en/main_classes/callback.mdx index 1d7d0b03d2..7c5b48f5d4 100644 --- a/docs/source/en/main_classes/callback.mdx +++ b/docs/source/en/main_classes/callback.mdx @@ -32,6 +32,7 @@ By default a [`Trainer`] will use the following callbacks: - [`~integrations.WandbCallback`] if [wandb](https://www.wandb.com/) is installed. - [`~integrations.CometCallback`] if [comet_ml](https://www.comet.ml/site/) is installed. - [`~integrations.MLflowCallback`] if [mlflow](https://www.mlflow.org/) is installed. +- [`~integrations.NeptuneCallback`] if [neptune](https://neptune.ai/) is installed. - [`~integrations.AzureMLCallback`] if [azureml-sdk](https://pypi.org/project/azureml-sdk/) is installed. - [`~integrations.CodeCarbonCallback`] if [codecarbon](https://pypi.org/project/codecarbon/) is @@ -70,6 +71,8 @@ Here is the list of the available [`TrainerCallback`] in the library: [[autodoc]] integrations.CodeCarbonCallback +[[autodoc]] integrations.NeptuneCallback + ## TrainerCallback [[autodoc]] TrainerCallback diff --git a/examples/pytorch/README.md b/examples/pytorch/README.md index 442511ead9..b0b099bae2 100644 --- a/examples/pytorch/README.md +++ b/examples/pytorch/README.md @@ -198,6 +198,7 @@ You can easily log and monitor your runs code. The following are currently suppo * [TensorBoard](https://www.tensorflow.org/tensorboard) * [Weights & Biases](https://docs.wandb.ai/integrations/huggingface) * [Comet ML](https://www.comet.ml/docs/python-sdk/huggingface/) +* [Neptune](https://docs.neptune.ai/integrations-and-supported-tools/model-training/hugging-face) ### Weights & Biases @@ -251,3 +252,86 @@ or if in a Conda environment: ```bash conda install -c comet_ml -c anaconda -c conda-forge comet_ml ``` + +### Neptune + +First, install the Neptune client library. You can do it with either `pip` or `conda`: + +`pip`: + +```bash +pip install neptune-client +``` + +`conda`: + +```bash +conda install -c conda-forge neptune-client +``` + +Next, in your model training script, import `NeptuneCallback`: + +```python +from transformers.integrations import NeptuneCallback +``` + +To enable Neptune logging, in your `TrainingArguments`, set the `report_to` argument to `"neptune"`: + +```python +training_args = TrainingArguments( + "quick-training-distilbert-mrpc", + evaluation_strategy="steps", + eval_steps = 20, + report_to = "neptune", +) + +trainer = Trainer( + model, + training_args, + ... +) +``` + +Alternatively, for more logging options, create a Neptune callback: + +```python +neptune_callback = NeptuneCallback() +``` + +To add more detail to the tracked run, you can supply optional arguments to `NeptuneCallback`. + +Some examples: + +```python +neptune_callback = NeptuneCallback( + name = "DistilBERT", + description = "DistilBERT fine-tuned on GLUE/MRPC", + tags = ["args-callback", "fine-tune", "MRPC"], # tags help you manage runs in Neptune + base_namespace="callback", # the default is "finetuning" + log_checkpoints = "best", # other options are "last", "same", and None + capture_hardware_metrics = False, # additional keyword arguments for a Neptune run +) +``` + +Pass the callback to the Trainer: + +```python +training_args = TrainingArguments(..., report_to = None) +trainer = Trainer( + model, + training_args, + ... + callbacks=[neptune_callback], +) +``` + +Now, when you start the training with `trainer.train()`, your metadata will be logged in Neptune. + +**Note:** Although you can pass your **Neptune API token** and **project name** as arguments when creating the callback, the recommended way is to save them as environment variables: + +| Environment variable | Value | +| :------------------- | :--------------------------------------------------- | +| `NEPTUNE_API_TOKEN` | Your Neptune API token. To find and copy it, click your Neptune avatar and select **Get your API token**. | +| `NEPTUNE_PROJECT` | The full name of your Neptune project (`workspace-name/project-name`). To find and copy it, head to **project settings** → **Properties**. | + +For detailed instructions and examples, see the [Neptune docs](https://docs.neptune.ai/integrations-and-supported-tools/model-training/hugging-face). diff --git a/src/transformers/__init__.py b/src/transformers/__init__.py index 3bea1e4ad4..ee23c79db6 100755 --- a/src/transformers/__init__.py +++ b/src/transformers/__init__.py @@ -100,6 +100,7 @@ _import_structure = { "hf_argparser": ["HfArgumentParser"], "integrations": [ "is_comet_available", + "is_neptune_available", "is_optuna_available", "is_ray_available", "is_ray_tune_available", @@ -2986,6 +2987,7 @@ if TYPE_CHECKING: # Integrations from .integrations import ( is_comet_available, + is_neptune_available, is_optuna_available, is_ray_available, is_ray_tune_available, diff --git a/src/transformers/integrations.py b/src/transformers/integrations.py index 42569d6e22..89493f7790 100644 --- a/src/transformers/integrations.py +++ b/src/transformers/integrations.py @@ -19,10 +19,15 @@ import importlib.util import json import numbers import os +import shutil import sys import tempfile from pathlib import Path +from typing import TYPE_CHECKING, Dict, Optional +import numpy as np + +from . import __version__ as version from .utils import flatten_dict, is_datasets_available, logging @@ -44,6 +49,10 @@ if _has_comet: except (ImportError, ValueError): _has_comet = False +_has_neptune = importlib.util.find_spec("neptune") is not None +if TYPE_CHECKING and _has_neptune: + from neptune.new.metadata_containers.run import Run + from .trainer_callback import ProgressCallback, TrainerCallback # noqa: E402 from .trainer_utils import PREFIX_CHECKPOINT_DIR, BestRun, IntervalStrategy # noqa: E402 from .utils import ENV_VARS_TRUE_VALUES, is_torch_tpu_available # noqa: E402 @@ -106,7 +115,7 @@ def is_fairscale_available(): def is_neptune_available(): - return importlib.util.find_spec("neptune") is not None + return _has_neptune def is_codecarbon_available(): @@ -449,6 +458,8 @@ def get_available_reporting_integrations(): integrations.append("comet_ml") if is_mlflow_available(): integrations.append("mlflow") + if is_neptune_available(): + integrations.append("neptune") if is_tensorboard_available(): integrations.append("tensorboard") if is_wandb_available(): @@ -925,75 +936,276 @@ class MLflowCallback(TrainerCallback): self._ml_flow.end_run() +class NeptuneMissingConfiguration(Exception): + def __init__(self): + super().__init__( + """ + ------ Unsupported ---- We were not able to create new runs. You provided a custom Neptune run to + `NeptuneCallback` with the `run` argument. For the integration to work fully, provide your `api_token` and + `project` by saving them as environment variables or passing them to the callback. + """ + ) + + class NeptuneCallback(TrainerCallback): - """ - A [`TrainerCallback`] that sends the logs to [Neptune](https://neptune.ai). + """TrainerCallback that sends the logs to [Neptune](https://neptune.ai). + + Args: + api_token (`str`, optional): + Neptune API token obtained upon registration. You can leave this argument out if you have saved your token + to the `NEPTUNE_API_TOKEN` environment variable (strongly recommended). See full setup instructions in the + [docs](https://docs.neptune.ai/getting-started/installation). + project (`str`, optional): + Name of an existing Neptune project, in the form: "workspace-name/project-name". You can find and copy the + name from the project Settings -> Properties in Neptune. If None (default), the value of the + `NEPTUNE_PROJECT` environment variable will be used. + name (`str`, optional): Custom name for the run. + base_namespace (`str`, optional, defaults to "finetuning"): In the Neptune run, the root namespace + that will contain all of the logged metadata. + log_parameters (`bool`, optional, defaults to True): + If True, logs all Trainer arguments and model parameters provided by the Trainer. + log_checkpoints (`str`, optional, defaults to None): + If "same", uploads checkpoints whenever they are saved by the Trainer. If "last", uploads only the most + recently saved checkpoint. If "best", uploads the best checkpoint (among the ones saved by the Trainer). If + None, does not upload checkpoints. + run (`Run`, optional): + Pass a Neptune run object if you want to continue logging to an existing run. Read more about resuming runs + in the [docs](https://docs.neptune.ai/how-to-guides/neptune-api/resume-run). + **neptune_run_kwargs (optional): + Additional keyword arguments to be passed directly to the + [neptune.init_run()](https://docs.neptune.ai/api-reference/neptune#.init_run) function when a new run is + created. """ - def __init__(self): + integration_version_key = "source_code/integrations/transformers" + model_parameters_key = "model_parameters" + trial_name_key = "trial" + trial_params_key = "trial_params" + trainer_parameters_key = "trainer_parameters" + flat_metrics = {"train/epoch"} + + def __init__( + self, + *, + api_token: Optional[str] = None, + project: Optional[str] = None, + name: Optional[str] = None, + base_namespace: str = "finetuning", + run: Optional["Run"] = None, + log_parameters: bool = True, + log_checkpoints: Optional[str] = None, + **neptune_run_kwargs + ): if not is_neptune_available(): raise ValueError( - "NeptuneCallback requires neptune-client to be installed. Run `pip install neptune-client`." + "NeptuneCallback requires the Neptune client library to be installed. " + "To install the library, run `pip install neptune-client`." ) - import neptune.new as neptune - self._neptune = neptune - self._initialized = False - self._log_artifacts = False + from neptune.new.metadata_containers.run import Run - def setup(self, args, state, model): - """ - Setup the Neptune integration. + try: + from neptune.new.integrations.utils import verify_type + except ImportError: + from neptune.new.internal.utils import verify_type - Environment: - NEPTUNE_PROJECT (`str`, *required*): - The project ID for neptune.ai account. Should be in format *workspace_name/project_name* - NEPTUNE_API_TOKEN (`str`, *required*): - API-token for neptune.ai account - NEPTUNE_CONNECTION_MODE (`str`, *optional*): - Neptune connection mode. *async* by default - NEPTUNE_RUN_NAME (`str`, *optional*): - The name of run process on Neptune dashboard - """ - if state.is_world_process_zero: - self._neptune_run = self._neptune.init( - project=os.getenv("NEPTUNE_PROJECT"), - api_token=os.getenv("NEPTUNE_API_TOKEN"), - mode=os.getenv("NEPTUNE_CONNECTION_MODE", "async"), - name=os.getenv("NEPTUNE_RUN_NAME", None), - run=os.getenv("NEPTUNE_RUN_ID", None), - ) - combined_dict = args.to_dict() - if hasattr(model, "config") and model.config is not None: - model_config = model.config.to_dict() - combined_dict = {**model_config, **combined_dict} - self._neptune_run["parameters"] = combined_dict - self._initialized = True + verify_type("api_token", api_token, (str, type(None))) + verify_type("project", project, (str, type(None))) + verify_type("name", name, (str, type(None))) + verify_type("base_namespace", base_namespace, str) + verify_type("run", run, (Run, type(None))) + verify_type("log_parameters", log_parameters, bool) + verify_type("log_checkpoints", log_checkpoints, (str, type(None))) + + self._base_namespace_path = base_namespace + self._log_parameters = log_parameters + self._log_checkpoints = log_checkpoints + self._initial_run: Optional[Run] = run + + self._run = None + self._is_monitoring_run = False + self._run_id = None + self._force_reset_monitoring_run = False + self._init_run_kwargs = {"api_token": api_token, "project": project, "name": name, **neptune_run_kwargs} + + self._volatile_checkpoints_dir = None + self._should_upload_checkpoint = self._log_checkpoints is not None + self._recent_checkpoint_path = None + + if self._log_checkpoints in {"last", "best"}: + self._target_checkpoints_namespace = f"checkpoints/{self._log_checkpoints}" + self._should_clean_recently_uploaded_checkpoint = True + else: + self._target_checkpoints_namespace = "checkpoints" + self._should_clean_recently_uploaded_checkpoint = False + + def _stop_run_if_exists(self): + if self._run: + self._run.stop() + del self._run + self._run = None + + def _initialize_run(self, **additional_neptune_kwargs): + from neptune.new import init_run + from neptune.new.exceptions import NeptuneMissingApiTokenException, NeptuneMissingProjectNameException + + self._stop_run_if_exists() + + try: + self._run = init_run(**self._init_run_kwargs, **additional_neptune_kwargs) + self._run_id = self._run["sys/id"].fetch() + except (NeptuneMissingProjectNameException, NeptuneMissingApiTokenException) as e: + raise NeptuneMissingConfiguration() from e + + def _use_initial_run(self): + self._run = self._initial_run + self._is_monitoring_run = True + self._run_id = self._run["sys/id"].fetch() + self._initial_run = None + + def _ensure_run_with_monitoring(self): + if self._initial_run is not None: + self._use_initial_run() + else: + if not self._force_reset_monitoring_run and self._is_monitoring_run: + return + + if self._run and not self._is_monitoring_run and not self._force_reset_monitoring_run: + self._initialize_run(run=self._run_id) + self._is_monitoring_run = True + else: + self._initialize_run() + self._force_reset_monitoring_run = False + + def _ensure_at_least_run_without_monitoring(self): + if self._initial_run is not None: + self._use_initial_run() + else: + if not self._run: + self._initialize_run( + run=self._run_id, + capture_stdout=False, + capture_stderr=False, + capture_hardware_metrics=False, + capture_traceback=False, + ) + self._is_monitoring_run = False + + @property + def run(self): + if self._run is None: + self._ensure_at_least_run_without_monitoring() + return self._run + + @property + def _metadata_namespace(self): + return self.run[self._base_namespace_path] + + def _log_integration_version(self): + self.run[NeptuneCallback.integration_version_key] = version + + def _log_trainer_parameters(self, args): + self._metadata_namespace[NeptuneCallback.trainer_parameters_key] = args.to_sanitized_dict() + + def _log_model_parameters(self, model): + if model and hasattr(model, "config") and model.config is not None: + self._metadata_namespace[NeptuneCallback.model_parameters_key] = model.config.to_dict() + + def _log_hyper_param_search_parameters(self, state): + if state and hasattr(state, "trial_name"): + self._metadata_namespace[NeptuneCallback.trial_name_key] = state.trial_name + + if state and hasattr(state, "trial_params") and state.trial_params is not None: + self._metadata_namespace[NeptuneCallback.trial_params_key] = state.trial_params + + def _log_model_checkpoint(self, source_directory: str, checkpoint: str): + target_path = relative_path = os.path.join(source_directory, checkpoint) + + if self._volatile_checkpoints_dir is not None: + consistent_checkpoint_path = os.path.join(self._volatile_checkpoints_dir, checkpoint) + try: + shutil.copytree(relative_path, os.path.join(consistent_checkpoint_path, relative_path)) + target_path = consistent_checkpoint_path + except IOError as e: + logger.warning( + "NeptuneCallback was unable to made a copy of checkpoint due to I/O exception: '{}'." + "Could fail trying to upload.".format(e) + ) + + self._metadata_namespace[self._target_checkpoints_namespace].upload_files(target_path) + + if self._should_clean_recently_uploaded_checkpoint and self._recent_checkpoint_path is not None: + self._metadata_namespace[self._target_checkpoints_namespace].delete_files(self._recent_checkpoint_path) + + self._recent_checkpoint_path = relative_path + + def on_init_end(self, args, state, control, **kwargs): + self._volatile_checkpoints_dir = None + if self._log_checkpoints and (args.overwrite_output_dir or args.save_total_limit is not None): + self._volatile_checkpoints_dir = tempfile.TemporaryDirectory().name + + if self._log_checkpoints == "best" and not args.load_best_model_at_end: + raise ValueError("To save the best model checkpoint, the load_best_model_at_end argument must be enabled.") def on_train_begin(self, args, state, control, model=None, **kwargs): - if not self._initialized: - self.setup(args, state, model) + if not state.is_world_process_zero: + return - def on_log(self, args, state, control, logs, model=None, **kwargs): - if not self._initialized: - self.setup(args, state, model) - if state.is_world_process_zero: - for k, v in logs.items(): - self._neptune_run[k].log(v, step=state.global_step) + self._ensure_run_with_monitoring() + self._force_reset_monitoring_run = True + + self._log_integration_version() + if self._log_parameters: + self._log_trainer_parameters(args) + self._log_model_parameters(model) + + if state.is_hyper_param_search: + self._log_hyper_param_search_parameters(state) + + def on_train_end(self, args, state, control, **kwargs): + self._stop_run_if_exists() def __del__(self): - """ - Environment: - NEPTUNE_STOP_TIMEOUT (`int`, *optional*): - Number of seconsds to wait for all Neptune.ai tracking calls to finish, before stopping the tracked - run. If not set it will wait for all tracking calls to finish. - """ - try: - stop_timeout = os.getenv("NEPTUNE_STOP_TIMEOUT") - stop_timeout = int(stop_timeout) if stop_timeout else None - self._neptune_run.stop(seconds=stop_timeout) - except AttributeError: - pass + if self._volatile_checkpoints_dir is not None: + shutil.rmtree(self._volatile_checkpoints_dir, ignore_errors=True) + + self._stop_run_if_exists() + + def on_save(self, args, state, control, **kwargs): + if self._should_upload_checkpoint: + self._log_model_checkpoint(args.output_dir, f"checkpoint-{state.global_step}") + + def on_evaluate(self, args, state, control, metrics=None, **kwargs): + if self._log_checkpoints == "best": + best_metric_name = args.metric_for_best_model + if not best_metric_name.startswith("eval_"): + best_metric_name = f"eval_{best_metric_name}" + + metric_value = metrics.get(best_metric_name) + + operator = np.greater if args.greater_is_better else np.less + + self._should_upload_checkpoint = state.best_metric is None or operator(metric_value, state.best_metric) + + @classmethod + def get_run(cls, trainer): + for callback in trainer.callback_handler.callbacks: + if isinstance(callback, cls): + return callback.run + + raise Exception("The trainer doesn't have a NeptuneCallback configured.") + + def on_log(self, args, state, control, logs: Optional[Dict[str, float]] = None, **kwargs): + if not state.is_world_process_zero: + return + + if logs is not None: + for name, value in rewrite_logs(logs).items(): + if isinstance(value, (int, float)): + if name in NeptuneCallback.flat_metrics: + self._metadata_namespace[name] = value + else: + self._metadata_namespace[name].log(value, step=state.global_step) class CodeCarbonCallback(TrainerCallback): diff --git a/src/transformers/training_args.py b/src/transformers/training_args.py index 4cba577342..dd5e455bcf 100644 --- a/src/transformers/training_args.py +++ b/src/transformers/training_args.py @@ -414,8 +414,8 @@ class TrainingArguments: instance of `Dataset`. report_to (`str` or `List[str]`, *optional*, defaults to `"all"`): The list of integrations to report the results and logs to. Supported platforms are `"azure_ml"`, - `"comet_ml"`, `"mlflow"`, `"tensorboard"` and `"wandb"`. Use `"all"` to report to all integrations - installed, `"none"` for no integrations. + `"comet_ml"`, `"mlflow"`, `"neptune"`, `"tensorboard"` and `"wandb"`. Use `"all"` to report to all + integrations installed, `"none"` for no integrations. ddp_find_unused_parameters (`bool`, *optional*): When using distributed training, the value of the flag `find_unused_parameters` passed to `DistributedDataParallel`. Will default to `False` if gradient checkpointing is used, `True` otherwise.