TF version of the trainer (#4017)

* First commit to add a TF version of the trainer.

* Make the TF trainer closer to what looks the PT trainer

* Refactoring common code between the PT and TF trainer into an util file.

* Some bugfix + better similarity with the PT trainer

* Add missing class in transformers init

* Bugfix over prediction + use classification report instead of simple metrics

* Fix name error

* Fix optimization tests + style

* Apply style

* Several bugfix for multi-gpu training

* Apply style

* Apply style

* Add glue example for the TF trainer

* Several bugix + address the reviews

* Fix on the TF training args file

* Add a debug mode

* Bugfix in utils_ner.py when segment_ids is None

* Apply style

* Apply style

* Add TPU strategy

* Fix selection strategy
This commit is contained in:
Julien Plu
2020-05-06 18:56:52 +02:00
committed by GitHub
parent 25296b12aa
commit aad50151f3
10 changed files with 1206 additions and 819 deletions

View File

@@ -145,7 +145,9 @@ from .tokenization_utils import PreTrainedTokenizer
from .tokenization_xlm import XLMTokenizer
from .tokenization_xlm_roberta import XLMRobertaTokenizer
from .tokenization_xlnet import SPIECE_UNDERLINE, XLNetTokenizer
from .trainer_utils import EvalPrediction
from .training_args import TrainingArguments
from .training_args_tf import TFTrainingArguments
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
@@ -502,6 +504,9 @@ if is_tf_available():
# Optimization
from .optimization_tf import WarmUp, create_optimizer, AdamWeightDecay, GradientAccumulator
# Trainer
from .trainer_tf import TFTrainer
if not is_tf_available() and not is_torch_available():
logger.warning(

View File

@@ -21,9 +21,11 @@ import tensorflow as tf
class WarmUp(tf.keras.optimizers.schedules.LearningRateSchedule):
"""Applys a warmup schedule on a given learning rate decay schedule."""
"""Applies a warmup schedule on a given learning rate decay schedule."""
def __init__(self, initial_learning_rate, decay_schedule_fn, warmup_steps, power=1.0, name=None):
def __init__(
self, initial_learning_rate, decay_schedule_fn, warmup_steps, power=1.0, name=None,
):
super().__init__()
self.initial_learning_rate = initial_learning_rate
self.warmup_steps = warmup_steps
@@ -56,34 +58,34 @@ class WarmUp(tf.keras.optimizers.schedules.LearningRateSchedule):
}
def create_optimizer(init_lr, num_train_steps, num_warmup_steps):
def create_optimizer(init_lr, num_train_steps, num_warmup_steps, end_lr=0.0, optimizer_type="adamw"):
"""Creates an optimizer with learning rate schedule."""
# Implements linear decay of the learning rate.
learning_rate_fn = tf.keras.optimizers.schedules.PolynomialDecay(
initial_learning_rate=init_lr, decay_steps=num_train_steps, end_learning_rate=0.0
lr_schedule = tf.keras.optimizers.schedules.PolynomialDecay(
initial_learning_rate=init_lr, decay_steps=num_train_steps, end_learning_rate=end_lr,
)
if num_warmup_steps:
learning_rate_fn = WarmUp(
initial_learning_rate=init_lr, decay_schedule_fn=learning_rate_fn, warmup_steps=num_warmup_steps
lr_schedule = WarmUp(
initial_learning_rate=init_lr, decay_schedule_fn=lr_schedule, warmup_steps=num_warmup_steps,
)
optimizer = AdamWeightDecay(
learning_rate=learning_rate_fn,
learning_rate=lr_schedule,
weight_decay_rate=0.01,
beta_1=0.9,
beta_2=0.999,
epsilon=1e-6,
exclude_from_weight_decay=["layer_norm", "bias"],
)
return optimizer
class AdamWeightDecay(tf.keras.optimizers.Adam):
"""Adam enables L2 weight decay and clip_by_global_norm on gradients.
Just adding the square of the weights to the loss function is *not* the
correct way of using L2 regularization/weight decay with Adam, since that will
interact with the m and v parameters in strange ways.
Instead we want ot decay the weights in a manner that doesn't interact with
the m/v parameters. This is equivalent to adding the square of the weights to
the loss with plain (non-momentum) SGD.
@@ -111,24 +113,26 @@ class AdamWeightDecay(tf.keras.optimizers.Adam):
def from_config(cls, config):
"""Creates an optimizer from its config with WarmUp custom object."""
custom_objects = {"WarmUp": WarmUp}
return super().from_config(config, custom_objects=custom_objects)
return super(AdamWeightDecay, cls).from_config(config, custom_objects=custom_objects)
def _prepare_local(self, var_device, var_dtype, apply_state):
super()._prepare_local(var_device, var_dtype, apply_state)
apply_state["weight_decay_rate"] = tf.constant(self.weight_decay_rate, name="adam_weight_decay_rate")
super(AdamWeightDecay, self)._prepare_local(var_device, var_dtype, apply_state)
apply_state[(var_device, var_dtype)]["weight_decay_rate"] = tf.constant(
self.weight_decay_rate, name="adam_weight_decay_rate"
)
def _decay_weights_op(self, var, learning_rate, apply_state):
do_decay = self._do_use_weight_decay(var.name)
if do_decay:
return var.assign_sub(
learning_rate * var * apply_state["weight_decay_rate"], use_locking=self._use_locking
learning_rate * var * apply_state[(var.device, var.dtype.base_dtype)]["weight_decay_rate"],
use_locking=self._use_locking,
)
return tf.no_op()
def apply_gradients(self, grads_and_vars, clip_norm, name=None):
def apply_gradients(self, grads_and_vars, name=None):
grads, tvars = list(zip(*grads_and_vars))
(grads, _) = tf.clip_by_global_norm(grads, clip_norm=clip_norm)
return super().apply_gradients(zip(grads, tvars))
return super(AdamWeightDecay, self).apply_gradients(zip(grads, tvars), name=name,)
def _get_lr(self, var_device, var_dtype, apply_state):
"""Retrieves the learning rate with the given state."""
@@ -147,13 +151,13 @@ class AdamWeightDecay(tf.keras.optimizers.Adam):
lr_t, kwargs = self._get_lr(var.device, var.dtype.base_dtype, apply_state)
decay = self._decay_weights_op(var, lr_t, apply_state)
with tf.control_dependencies([decay]):
return super()._resource_apply_dense(grad, var, **kwargs)
return super(AdamWeightDecay, self)._resource_apply_dense(grad, var, **kwargs)
def _resource_apply_sparse(self, grad, var, indices, apply_state=None):
lr_t, kwargs = self._get_lr(var.device, var.dtype.base_dtype, apply_state)
decay = self._decay_weights_op(var, lr_t, apply_state)
with tf.control_dependencies([decay]):
return super()._resource_apply_sparse(grad, var, indices, **kwargs)
return super(AdamWeightDecay, self)._resource_apply_sparse(grad, var, indices, **kwargs)
def get_config(self):
config = super().get_config()
@@ -177,71 +181,65 @@ class AdamWeightDecay(tf.keras.optimizers.Adam):
return True
# Inspired from https://github.com/OpenNMT/OpenNMT-tf/blob/master/opennmt/optimizers/utils.py
# Extracted from https://github.com/OpenNMT/OpenNMT-tf/blob/master/opennmt/optimizers/utils.py
class GradientAccumulator(object):
"""Distribution strategies-aware gradient accumulation utility."""
"""Gradient accumulation utility.
When used with a distribution strategy, the accumulator should be called in a
replica context. Gradients will be accumulated locally on each replica and
without synchronization. Users should then call ``.gradients``, scale the
gradients if required, and pass the result to ``apply_gradients``.
"""
# We use the ON_READ synchronization policy so that no synchronization is
# performed on assignment. To get the value, we call .value() which returns the
# value on the current replica without synchronization.
def __init__(self):
"""Initializes the accumulator."""
self._gradients = []
self._accum_steps = tf.Variable(
initial_value=0, dtype=tf.int64, trainable=False, aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA
)
self._accum_steps = None
@property
def step(self):
"""Number of accumulated steps."""
if self._accum_steps is None:
self._accum_steps = tf.Variable(
tf.constant(0, dtype=tf.int64), trainable=False, synchronization=tf.VariableSynchronization.ON_READ,
)
return self._accum_steps.value()
@property
def gradients(self):
"""The accumulated gradients."""
return list(
gradient.value() if gradient is not None else gradient for gradient in self._get_replica_gradients()
)
"""The accumulated gradients on the current replica."""
if not self._gradients:
raise ValueError("The accumulator should be called first to initialize the gradients")
return list(gradient.value() for gradient in self._gradients)
def __call__(self, gradients):
"""Accumulates :obj:`gradients`."""
"""Accumulates :obj:`gradients` on the current replica."""
if not self._gradients:
_ = self.step # Create the step variable.
self._gradients.extend(
[
tf.Variable(tf.zeros_like(gradient), trainable=False) if gradient is not None else gradient
tf.Variable(
tf.zeros_like(gradient), trainable=False, synchronization=tf.VariableSynchronization.ON_READ,
)
for gradient in gradients
]
)
if len(gradients) != len(self._gradients):
raise ValueError("Expected %s gradients, but got %d" % (len(self._gradients), len(gradients)))
for accum_gradient, gradient in zip(self._get_replica_gradients(), gradients):
if accum_gradient is not None and gradient is not None:
accum_gradient.assign_add(gradient)
for accum_gradient, gradient in zip(self._gradients, gradients):
accum_gradient.assign_add(gradient)
self._accum_steps.assign_add(1)
def reset(self):
"""Resets the accumulated gradients."""
if self._gradients:
self._accum_steps.assign(0)
for gradient in self._get_replica_gradients():
if gradient is not None:
gradient.assign(tf.zeros_like(gradient))
def _get_replica_gradients(self):
if tf.distribute.has_strategy():
# In a replica context, we want to accumulate gradients on each replica
# without synchronization, so we directly assign the value of the
# current replica.
replica_context = tf.distribute.get_replica_context()
if replica_context is None or tf.distribute.get_strategy().num_replicas_in_sync == 1:
return self._gradients
return (
gradient.device_map.select_for_current_replica(gradient.values, replica_context)
for gradient in self._gradients
if gradient is not None
)
else:
return self._gradients
"""Resets the accumulated gradients on the current replica."""
if not self._gradients:
return
self._accum_steps.assign(0)
for gradient in self._gradients:
gradient.assign(tf.zeros_like(gradient))

View File

@@ -6,7 +6,7 @@ import re
import shutil
from contextlib import contextmanager
from pathlib import Path
from typing import Callable, Dict, List, NamedTuple, Optional, Tuple
from typing import Callable, Dict, List, Optional, Tuple
import numpy as np
import torch
@@ -20,6 +20,7 @@ from tqdm.auto import tqdm, trange
from .data.data_collator import DataCollator, DefaultDataCollator
from .modeling_utils import PreTrainedModel
from .optimization import AdamW, get_linear_schedule_with_warmup
from .trainer_utils import PREFIX_CHECKPOINT_DIR, EvalPrediction, PredictionOutput, TrainOutput
from .training_args import TrainingArguments
@@ -87,30 +88,6 @@ def torch_distributed_zero_first(local_rank: int):
torch.distributed.barrier()
class EvalPrediction(NamedTuple):
"""
Evaluation output (always contains labels), to be used
to compute metrics.
"""
predictions: np.ndarray
label_ids: np.ndarray
class PredictionOutput(NamedTuple):
predictions: np.ndarray
label_ids: Optional[np.ndarray]
metrics: Optional[Dict[str, float]]
class TrainOutput(NamedTuple):
global_step: int
training_loss: float
PREFIX_CHECKPOINT_DIR = "checkpoint"
class Trainer:
"""
Trainer is a simple but feature-complete training and eval loop for PyTorch,

View File

@@ -0,0 +1,429 @@
"""Tensorflow trainer class."""
import logging
import math
import os
from typing import Callable, Dict, Optional
import numpy as np
import tensorflow as tf
from .modeling_tf_utils import TFPreTrainedModel, shape_list
from .optimization_tf import GradientAccumulator, create_optimizer
from .trainer_utils import PREFIX_CHECKPOINT_DIR, EvalPrediction, PredictionOutput
from .training_args_tf import TFTrainingArguments
logger = logging.getLogger(__name__)
class TFTrainer:
model: TFPreTrainedModel
args: TFTrainingArguments
# something similar to a PT Dataset.
# This is just temporary before to have
# a framework-agnostic approach for datasets.
train_dataset: Optional[tf.data.Dataset]
eval_dataset: Optional[tf.data.Dataset]
compute_metrics: Optional[Callable[[EvalPrediction], Dict]] = None
prediction_loss_only: bool
def __init__(
self,
model: TFPreTrainedModel,
args: TFTrainingArguments,
train_dataset: Optional[tf.data.Dataset] = None,
eval_dataset: Optional[tf.data.Dataset] = None,
compute_metrics: Optional[Callable[[EvalPrediction], Dict]] = None,
prediction_loss_only=False,
):
self.model = model
self.args = args
self.train_dataset = train_dataset
self.eval_dataset = eval_dataset
self.compute_metrics = compute_metrics
self.prediction_loss_only = prediction_loss_only
self.gradient_accumulator = GradientAccumulator()
self._setup_training()
def _setup_training(self) -> None:
"""
Setup the different steps to train a model:
- check if all the data are given
- create the proper strategy
- create the features
- prepare the model settings
"""
self._prepare_dataset()
with self.args.strategy.scope():
self._create_optimizer()
_ = self.optimizer.iterations
self._set_loss_and_metric()
self._create_checkpoint_manager()
self._create_summary_writer()
def _set_loss_and_metric(self) -> None:
"""
Create the training loss and metric with their name. Allowed names are those listed
in the Tensorflow documentation and those contained in the transformers library.
"""
try:
self.loss = tf.keras.losses.get(
{
"class_name": self.args.loss_name,
"config": {"from_logits": True, "reduction": tf.keras.losses.Reduction.NONE},
}
)
except TypeError:
self.loss = tf.keras.losses.get(
{"class_name": self.args.loss_name, "config": {"reduction": tf.keras.losses.Reduction.NONE}}
)
def _create_summary_writer(self) -> None:
"""
Create a summary writer to be able to read the logs in Tensorboard.
"""
self.writer = tf.summary.create_file_writer(self.args.logging_dir)
def _prepare_dataset(self) -> None:
"""
Prepare the training, validation and test data.
"""
if self.train_dataset is not None:
self.num_train_examples = self.train_dataset.reduce(tf.constant(0), lambda x, _: x + 1).numpy()
if self.args.max_steps > 0:
self.train_steps = self.args.max_steps
else:
self.train_steps: int = math.ceil(self.num_train_examples / self.args.train_batch_size)
self.train_dataset = (
self.train_dataset.cache()
.shuffle(self.num_train_examples)
.batch(self.args.train_batch_size)
.prefetch(tf.data.experimental.AUTOTUNE)
)
if self.args.max_steps > 0:
self.train_dataset = self.train_dataset.repeat(-1)
self.train_dataset = self.args.strategy.experimental_distribute_dataset(self.train_dataset)
else:
self.train_steps = 0
if self.eval_dataset is not None:
self.eval_dataset = (
self.eval_dataset.batch(self.args.eval_batch_size).cache().prefetch(tf.data.experimental.AUTOTUNE)
)
self.eval_dataset = self.args.strategy.experimental_distribute_dataset(self.eval_dataset)
def _create_optimizer(self) -> None:
"""
Create the training optimizer with its name. Allowed names are those listed
in the Tensorflow documentation and those contained in the transformers library.
"""
if self.args.optimizer_name == "adamw":
self.optimizer = create_optimizer(self.args.learning_rate, self.train_steps, self.args.warmup_steps)
else:
try:
self.optimizer = tf.keras.optimizers.get(
{
"class_name": self.args.optimizer_name,
"config": {"learning_rate": self.args.learning_rate, "epsilon": self.args.adam_epsilon},
}
)
except TypeError:
# This is for the case where the optimizer is not Adam-like such as SGD
self.optimizer = tf.keras.optimizers.get(
{"class_name": self.args.optimizer_name, "config": {"learning_rate": self.args.learning_rate}}
)
def _create_checkpoint_manager(self, max_to_keep: int = 5, load_model: bool = True) -> None:
"""
Create a checkpoint manager in order to be able to make the training
fault-tolerant.
Args:
max_to_keep: the maximum number of checkpoints to keep in the checkpoint path.
load_model: if we want to start the training from the latest checkpoint.
"""
ckpt = tf.train.Checkpoint(optimizer=self.optimizer, model=self.model)
self.model.ckpt_manager = tf.train.CheckpointManager(ckpt, PREFIX_CHECKPOINT_DIR, max_to_keep=max_to_keep)
if load_model:
ckpt.restore(self.model.ckpt_manager.latest_checkpoint).expect_partial()
@tf.function
def _evaluate_steps(self, per_replica_features, per_replica_labels):
"""
One step evaluation across replica.
Args:
per_replica_features: the batched features.
per_replica_labels: the batched labels.
Returns:
The loss corresponding to the given batch.
"""
per_replica_loss, per_replica_logits = self.args.strategy.experimental_run_v2(
self._run_model, args=(per_replica_features, per_replica_labels, False)
)
try:
reduced_loss = self.args.strategy.reduce(tf.distribute.ReduceOp.MEAN, per_replica_loss, axis=0)
except ValueError:
reduced_loss = self.args.strategy.reduce(tf.distribute.ReduceOp.MEAN, per_replica_loss, None)
return reduced_loss, per_replica_logits
def _prediction_loop(
self, dataset: tf.data.Dataset, description: str, prediction_loss_only: Optional[bool] = None
) -> PredictionOutput:
logger.info("***** Running %s *****", description)
logger.info(" Batch size = %d", self.args.eval_batch_size)
label_ids: np.ndarray = None
preds: np.ndarray = None
step: int = 1
for features, labels in dataset:
step = tf.convert_to_tensor(step, dtype=tf.int64)
loss, logits = self._evaluate_steps(features, labels)
loss = tf.reduce_mean(loss)
if not prediction_loss_only:
if self.args.n_gpu > 1:
for val in logits.values:
if preds is None:
preds = val.numpy()
else:
preds = np.append(preds, val.numpy(), axis=0)
for val in labels.values:
if label_ids is None:
label_ids = val.numpy()
else:
label_ids = np.append(label_ids, val.numpy(), axis=0)
else:
if preds is None:
preds = logits.numpy()
else:
preds = np.append(preds, logits.numpy(), axis=0)
if label_ids is None:
label_ids = labels.numpy()
else:
label_ids = np.append(label_ids, labels.numpy(), axis=0)
step += 1
if self.compute_metrics is not None and preds is not None and label_ids is not None:
metrics = self.compute_metrics(EvalPrediction(predictions=preds, label_ids=label_ids))
else:
metrics = {}
metrics["loss"] = loss.numpy()
return PredictionOutput(predictions=preds, label_ids=label_ids, metrics=metrics)
def evaluate(
self, eval_dataset: Optional[tf.data.Dataset] = None, prediction_loss_only: Optional[bool] = None
) -> Dict[str, float]:
"""
Prediction/evaluation loop, shared by `evaluate()` and `predict()`.
"""
if eval_dataset is None:
eval_dataset = self.eval_dataset
output = self._prediction_loop(eval_dataset, description="Evaluation")
return output.metrics
def train(self) -> None:
"""
Train method to train the model.
"""
if self.args.debug:
tf.summary.trace_on(graph=True, profiler=True)
self.gradient_accumulator.reset()
iterations = self.optimizer.iterations
if iterations.numpy() > 0:
logger.info("Start the training from the last checkpoint")
start_epoch = (iterations.numpy() // self.train_steps) + 1
else:
start_epoch = 1
tf.summary.experimental.set_step(iterations)
epochs = 1 if self.args.max_steps > 0 else self.args.num_train_epochs
logger.info("***** Running training *****")
logger.info(" Num examples = %d", self.num_train_examples)
logger.info(" Num Epochs = %d", epochs)
logger.info(" Total optimization steps = %d", self.train_steps)
for epoch in range(start_epoch, int(epochs + 1)):
for training_loss in self._training_steps():
step = iterations.numpy()
if self.args.debug:
with self.writer.as_default():
tf.summary.scalar("loss", training_loss, step=step)
if step == 1 and self.args.debug:
with self.writer.as_default():
tf.summary.trace_export(name="training", step=step, profiler_outdir=self.args.logging_dir)
if self.args.evaluate_during_training and step % self.args.eval_steps == 0:
logs = {}
results = self.evaluate()
for key, value in results.items():
eval_key = "eval_{}".format(key)
logs[eval_key] = value
if callable(self.optimizer.learning_rate):
logs["learning_rate"] = self.optimizer.learning_rate(step).numpy()
else:
logs["learning_rate"] = self.optimizer.learning_rate.numpy()
logger.info("Epoch {} Step {} Validation Metrics {}".format(epoch, step, logs))
with self.writer.as_default():
for k, v in logs.items():
tf.summary.scalar(k, v, step=step)
if step % self.args.logging_steps == 0:
logger.info("Epoch {} Step {} Train Loss {:.4f}".format(epoch, step, training_loss.numpy()))
if step % self.args.save_steps == 0:
ckpt_save_path = self.model.ckpt_manager.save()
logger.info("Saving checkpoint for step {} at {}".format(step, ckpt_save_path))
if step % self.train_steps == 0:
break
def _training_steps(self):
"""
Returns a generator over training steps (i.e. parameters update).
"""
for i, loss in enumerate(self._accumulate_next_gradients()):
if i % self.args.gradient_accumulation_steps == 0:
self._apply_gradients()
yield loss
@tf.function
def _apply_gradients(self):
"""Applies the gradients (cross-replica)."""
self.args.strategy.experimental_run_v2(self._step)
def _step(self):
"""Applies gradients and resets accumulation."""
gradient_scale = self.gradient_accumulator.step * self.args.strategy.num_replicas_in_sync
gradients = [
gradient / tf.cast(gradient_scale, gradient.dtype) for gradient in self.gradient_accumulator.gradients
]
gradients = [(tf.clip_by_value(grad, -self.args.max_grad_norm, self.args.max_grad_norm)) for grad in gradients]
vars = self.model.trainable_variables
if self.args.mode == "token-classification":
vars = [var for var in self.model.trainable_variables if "pooler" not in var.name]
self.optimizer.apply_gradients(list(zip(gradients, vars)))
self.gradient_accumulator.reset()
def _accumulate_next_gradients(self):
"""Accumulates the gradients from the next element in dataset."""
iterator = iter(self.train_dataset)
@tf.function
def _accumulate_next():
per_replica_features, per_replica_labels = next(iterator)
return self._accumulate_gradients(per_replica_features, per_replica_labels)
while True:
try:
yield _accumulate_next()
except tf.errors.OutOfRangeError:
break
def _accumulate_gradients(self, per_replica_features, per_replica_labels):
"""Accumulates the gradients across all the replica."""
per_replica_loss = self.args.strategy.experimental_run_v2(
self._forward, args=(per_replica_features, per_replica_labels)
)
try:
reduced_loss = self.args.strategy.reduce(tf.distribute.ReduceOp.MEAN, per_replica_loss, axis=0)
except ValueError:
reduced_loss = self.args.strategy.reduce(tf.distribute.ReduceOp.MEAN, per_replica_loss, None)
return reduced_loss
def _forward(self, features, labels):
"""Forwards a training example and accumulates the gradients."""
per_example_loss, _ = self._run_model(features, labels, True)
vars = self.model.trainable_variables
if self.args.mode == "token-classification":
vars = [var for var in self.model.trainable_variables if "pooler" not in var.name]
gradients = self.optimizer.get_gradients(per_example_loss, vars)
self.gradient_accumulator(gradients)
return per_example_loss
def _run_model(self, features, labels, training):
"""
Computes the loss of the given features and labels pair.
Args:
features: the batched features.
labels: the batched labels.
training: run the model in training mode or not
"""
if self.args.mode == "sequence-classification" or self.args.mode == "token-classification":
logits = self.model(features, training=training)[0]
else:
logits = self.model(features, training=training)
if self.args.mode == "token-classification":
active_loss = tf.reshape(labels, (-1,)) != -1
reduced_logits = tf.boolean_mask(tf.reshape(logits, (-1, shape_list(logits)[2])), active_loss)
labels = tf.boolean_mask(tf.reshape(labels, (-1,)), active_loss)
loss = self.loss(labels, reduced_logits)
else:
loss = self.loss(labels, logits)
loss += sum(self.model.losses) * (1.0 / self.args.n_gpu)
return loss, logits
def predict(self, test_dataset: tf.data.Dataset) -> PredictionOutput:
"""
Run prediction and return predictions and potential metrics.
Depending on the dataset and your use case, your test dataset may contain labels.
In that case, this method will also return metrics, like in evaluate().
Args:
test_dataset: something similar to a PT Dataset. This is just
temporary before to have a framework-agnostic approach for datasets.
"""
test_dataset = test_dataset.batch(self.args.eval_batch_size)
test_dataset = self.args.strategy.experimental_distribute_dataset(test_dataset)
return self._prediction_loop(test_dataset, description="Prediction")
def save_model(self) -> None:
"""
Save the pretrained model and create a Tensorflow saved model.
"""
logger.info("Saving model in {}".format(self.args.output_dir))
path = os.path.join(self.args.output_dir, "saved_model")
os.makedirs(path, exist_ok=True)
self.model.save_pretrained(self.args.output_dir)

View File

@@ -0,0 +1,27 @@
from typing import Dict, NamedTuple, Optional
import numpy as np
class EvalPrediction(NamedTuple):
"""
Evaluation output (always contains labels), to be used
to compute metrics.
"""
predictions: np.ndarray
label_ids: np.ndarray
class PredictionOutput(NamedTuple):
predictions: np.ndarray
label_ids: Optional[np.ndarray]
metrics: Optional[Dict[str, float]]
class TrainOutput(NamedTuple):
global_step: int
training_loss: float
PREFIX_CHECKPOINT_DIR = "checkpoint"

View File

@@ -0,0 +1,75 @@
import logging
from dataclasses import dataclass, field
from typing import Tuple
from .file_utils import cached_property, is_tf_available, tf_required
from .training_args import TrainingArguments
logger = logging.getLogger(__name__)
if is_tf_available():
import tensorflow as tf
@dataclass
class TFTrainingArguments(TrainingArguments):
optimizer_name: str = field(
default="adam",
metadata={
"help": 'Name of a Tensorflow optimizer among "adadelta, adagrad, adam, adamax, ftrl, nadam, rmsprop, sgd, adamw"'
},
)
mode: str = field(
default="sequence-classification",
metadata={"help": 'Type of task, one of "sequence-classification", "token-classification" '},
)
loss_name: str = field(
default="SparseCategoricalCrossentropy",
metadata={
"help": "Name of a Tensorflow loss. For the list see: https://www.tensorflow.org/api_docs/python/tf/keras/losses"
},
)
eval_steps: int = field(default=1000, metadata={"help": "Run an evaluation every X steps."})
debug: bool = field(
default=False, metadata={"help": "Activate the trace to record computation graphs and profiling information"}
)
@cached_property
@tf_required
def _setup_strategy(self) -> Tuple["tf.distribute.Strategy", int]:
logger.info("Tensorflow: setting up strategy")
gpus = tf.config.list_physical_devices("GPU")
if self.no_cuda:
strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
else:
try:
tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
except ValueError:
tpu = None
if tpu:
tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)
strategy = tf.distribute.experimental.TPUStrategy(tpu)
elif len(gpus) == 0:
strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
elif len(gpus) > 1:
# If you only want to use a specific subset of GPUs use `CUDA_VISIBLE_DEVICES=0`
strategy = tf.distribute.MirroredStrategy(gpus)
else:
raise ValueError("Cannot find the proper strategy please check your environment properties.")
return strategy
@property
@tf_required
def strategy(self) -> "tf.distribute.Strategy":
return self._setup_strategy
@property
@tf_required
def n_gpu(self) -> int:
return self._setup_strategy.num_replicas_in_sync