diff --git a/examples/research_projects/self-training-text-classification/README.md b/examples/research_projects/self-training-text-classification/README.md new file mode 100644 index 0000000000..7e0f3f9714 --- /dev/null +++ b/examples/research_projects/self-training-text-classification/README.md @@ -0,0 +1,128 @@ +# Self-training + +This is an implementation of the self-training algorithm (without task augmentation) in the [EMNLP 2021](https://2021.emnlp.org/) paper: [STraTA: Self-Training with Task Augmentation for Better Few-shot Learning](https://arxiv.org/abs/2109.06270). Please check out https://github.com/google-research/google-research/tree/master/STraTA for the original codebase. + +**Note**: The code can be used as a tool for automatic data labeling. + +## Table of Contents + + * [Installation](#installation) + * [Self-training](#self-training) + * [Running self-training with a base model](#running-self-training-with-a-base-model) + * [Hyperparameters for self-training](#hyperparameters-for-self-training) + * [Distributed training](#distributed-training) + * [Demo](#demo) + * [How to cite](#how-to-cite) + +## Installation +This repository is tested on Python 3.8+, PyTorch 1.10+, and the 🤗 Transformers 4.16+. + +You should install all necessary Python packages in a [virtual environment](https://docs.python.org/3/library/venv.html). If you are unfamiliar with Python virtual environments, please check out the [user guide](https://packaging.python.org/guides/installing-using-pip-and-virtual-environments/). + +Below, we create a virtual environment with the [Anaconda Python distribution](https://www.anaconda.com/products/distribution) and activate it. +```sh +conda create -n strata python=3.9 +conda activate strata +``` +Next, you need to install 🤗 Transformers. Please refer to [🤗 Transformers installation page](https://github.com/huggingface/transformers#installation) for a detailed guide. +```sh +pip install transformers +``` +Finally, install all necessary Python packages for our self-training algorithm. + +```sh +pip install -r STraTA/selftraining/requirements.txt +``` +This will install PyTorch as a backend. + +## Self-training +### Running self-training with a base model +The following example code shows how to run our self-training algorithm with a base model (e.g., `BERT`) on the `SciTail` science entailment dataset, which has two classes `['entails', 'neutral']`. We assume that you have a data directory that includes some training data (e.g., `train.csv`), evaluation data (e.g., `eval.csv`), and unlabeled data (e.g., `infer.csv`). + +```python +import os +from selftraining import selftrain + +data_dir = '/path/to/your/data/dir' +parameters_dict = { + 'max_selftrain_iterations': 100, + 'model_name_or_path': '/path/to/your/base/model', # could be the id of a model hosted by 🤗 Transformers + 'output_dir': '/path/to/your/output/dir', + 'train_file': os.path.join(data_dir, 'train.csv'), + 'infer_file': os.path.join(data_dir, 'infer.csv'), + 'eval_file': os.path.join(data_dir, 'eval.csv'), + 'evaluation_strategy': 'steps', + 'task_name': 'scitail', + 'label_list': ['entails', 'neutral'], + 'per_device_train_batch_size': 32, + 'per_device_eval_batch_size': 8, + 'max_length': 128, + 'learning_rate': 2e-5, + 'max_steps': 100000, + 'eval_steps': 1, + 'early_stopping_patience': 50, + 'overwrite_output_dir': True, + 'do_filter_by_confidence': False, + # 'confidence_threshold': 0.3, + 'do_filter_by_val_performance': True, + 'finetune_on_labeled_data': False, + 'seed': 42, +} +selftrain(**parameters_dict) +``` + +**Note**: We checkpoint periodically during self-training. In case of preemptions, just re-run the above script and self-training will resume from the latest iteration. + +### Hyperparameters for self-training +If you have development data, you might want to tune some hyperparameters for self-training. +Below are hyperparameters that could provide additional gains for your task. + + - `finetune_on_labeled_data`: If set to `True`, the resulting model from each self-training iteration is further fine-tuned on the original labeled data before the next self-training iteration. Intuitively, this would give the model a chance to "correct" ifself after being trained on pseudo-labeled data. + - `do_filter_by_confidence`: If set to `True`, the pseudo-labeled data in each self-training iteration is filtered based on the model confidence. For instance, if `confidence_threshold` is set to `0.3`, pseudo-labeled examples with a confidence score less than or equal to `0.3` will be discarded. Note that `confidence_threshold` should be greater or equal to `1/num_labels`, where `num_labels` is the number of class labels. Filtering out the lowest-confidence pseudo-labeled examples could be helpful in some cases. + - `do_filter_by_val_performance`: If set to `True`, the pseudo-labeled data in each self-training iteration is filtered based on the current validation performance. For instance, if your validation performance is 80% accuracy, you might want to get rid of 20% of the pseudo-labeled data with the lowest the confidence scores. + +### Distributed training +We strongly recommend distributed training with multiple accelerators. To activate distributed training, please try one of the following methods: + +1. Run `accelerate config` and answer to the questions asked. This will save a `default_config.yaml` file in your cache folder for 🤗 Accelerate. Now, you can run your script with the following command: + +```sh +accelerate launch your_script.py --args_to_your_script +``` + +2. Run your script with the following command: + +```sh +python -m torch.distributed.launch --nnodes="{$NUM_NODES}" --nproc_per_node="{$NUM_TRAINERS}" --your_script.py --args_to_your_script +``` + +3. Run your script with the following command: + +```sh +torchrun --nnodes="{$NUM_NODES}" --nproc_per_node="{$NUM_TRAINERS}" --your_script.py --args_to_your_script +``` + +## Demo +Please check out `run.sh` to see how to perform our self-training algorithm with a `BERT` Base model on the SciTail science entailment dataset using 8 labeled examples per class. You can configure your training environment by specifying `NUM_NODES` and `NUM_TRAINERS` (number of processes per node). To launch the script, simply run `source run.sh`. + +## How to cite +If you extend or use this code, please cite the [paper](https://arxiv.org/abs/2109.06270) where it was introduced: + +```bibtex +@inproceedings{vu-etal-2021-strata, + title = "{ST}ra{TA}: Self-Training with Task Augmentation for Better Few-shot Learning", + author = "Vu, Tu and + Luong, Minh-Thang and + Le, Quoc and + Simon, Grady and + Iyyer, Mohit", + booktitle = "Proceedings of the 2021 Conference on Empirical Methods in Natural Language Processing", + month = nov, + year = "2021", + address = "Online and Punta Cana, Dominican Republic", + publisher = "Association for Computational Linguistics", + url = "https://aclanthology.org/2021.emnlp-main.462", + doi = "10.18653/v1/2021.emnlp-main.462", + pages = "5715--5731", +} +``` diff --git a/examples/research_projects/self-training-text-classification/finetuning.py b/examples/research_projects/self-training-text-classification/finetuning.py new file mode 100644 index 0000000000..8ad92359b6 --- /dev/null +++ b/examples/research_projects/self-training-text-classification/finetuning.py @@ -0,0 +1,796 @@ +# coding=utf-8 +# Copyright 2022 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Fine-tuning the library models for sequence classification.""" + +import argparse +import dataclasses +import json +import logging +import math +import os +import random +import shutil +from typing import List, Optional + +import datasets +import numpy as np +import pandas as pd +import torch +from datasets import load_dataset, load_metric +from torch.utils.data import DataLoader +from tqdm.auto import tqdm + +from transformers import ( + AdamW, + AutoConfig, + AutoModelForSequenceClassification, + AutoTokenizer, + DataCollatorWithPadding, + default_data_collator, + get_scheduler, + set_seed, +) +from transformers.file_utils import ExplicitEnum +from transformers.trainer_utils import IntervalStrategy + + +logger = logging.getLogger(__name__) + + +class Split(ExplicitEnum): + TRAIN = "train" + EVAL = "eval" + TEST = "test" + INFER = "infer" + + +@dataclasses.dataclass +class FTModelArguments: + """Arguments pertaining to which config/tokenizer/model we are going to fine-tune from.""" + + model_name_or_path: str = dataclasses.field( + metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models."} + ) + use_fast_tokenizer: Optional[bool] = dataclasses.field( + default=True, + metadata={"help": "Whether to use one of the fast tokenizer (backed by the tokenizers library) or not."}, + ) + cache_dir: Optional[str] = dataclasses.field( + default=None, + metadata={"help": "Where do you want to store the pretrained models downloaded from huggingface.co."}, + ) + + +@dataclasses.dataclass +class FTDataArguments: + """Arguments pertaining to what data we are going to input our model for training and evaluation.""" + + train_file: str = dataclasses.field( + default=None, metadata={"help": "A csv or a json file containing the training data."} + ) + eval_file: Optional[str] = dataclasses.field( + default=None, metadata={"help": "A csv or a json file containing the validation data."} + ) + test_file: Optional[str] = dataclasses.field( + default=None, metadata={"help": "A csv or a json file containing the test data."} + ) + infer_file: Optional[str] = dataclasses.field( + default=None, metadata={"help": "A csv or a json file containing the data to predict on."} + ) + task_name: Optional[str] = dataclasses.field( + default=None, + metadata={"help": "The name of the task to train on."}, + ) + label_list: Optional[List[str]] = dataclasses.field( + default=None, metadata={"help": "The list of labels for the task."} + ) + + max_length: Optional[int] = dataclasses.field( + default=128, + metadata={ + "help": "The maximum total input sequence length after tokenization. Sequences longer " + "than this will be truncated, sequences shorter will be padded." + }, + ) + pad_to_max_length: Optional[bool] = dataclasses.field( + default=False, + metadata={ + "help": "Whether to pad all samples to `max_seq_length`. " + "If False, will pad the samples dynamically when batching to the maximum length in the batch." + }, + ) + + +@dataclasses.dataclass +class FTTrainingArguments: + """Training arguments pertaining to the training loop itself.""" + + output_dir: str = dataclasses.field( + metadata={"help": "The output directory where the model predictions and checkpoints will be written."} + ) + do_train: Optional[bool] = dataclasses.field( + default=False, + metadata={"help": "Whether to run training or not."}, + ) + do_eval: Optional[bool] = dataclasses.field( + default=False, + metadata={"help": "Whether to run evaluation on the validation set or not."}, + ) + do_predict: Optional[bool] = dataclasses.field( + default=False, + metadata={"help": "Whether to run inference on the inference set or not."}, + ) + seed: Optional[int] = dataclasses.field( + default=42, + metadata={"help": "Random seed that will be set at the beginning of training."}, + ) + per_device_train_batch_size: Optional[int] = dataclasses.field( + default=8, + metadata={"help": "The batch size per GPU/TPU core/CPU for training."}, + ) + per_device_eval_batch_size: Optional[int] = dataclasses.field( + default=8, + metadata={"help": "The batch size per GPU/TPU core/CPU for evaluation."}, + ) + weight_decay: Optional[float] = dataclasses.field( + default=0.0, + metadata={ + "help": "The weight decay to apply (if not zero) to all layers except all bias and LayerNorm weights in [`AdamW`] optimizer." + }, + ) + learning_rate: Optional[float] = dataclasses.field( + default=5e-5, + metadata={"help": "The initial learning rate for [`AdamW`] optimizer."}, + ) + gradient_accumulation_steps: Optional[int] = dataclasses.field( + default=1, + metadata={ + "help": "Number of updates steps to accumulate the gradients for, before performing a backward/update pass." + }, + ) + max_steps: Optional[int] = dataclasses.field( + default=-1, + metadata={ + "help": "If set to a positive number, the total number of training steps to perform. Overrides `num_train_epochs`." + }, + ) + lr_scheduler_type: Optional[str] = dataclasses.field( + default="linear", metadata={"help": "The scheduler type to use."} + ) + warmup_steps: Optional[int] = dataclasses.field( + default=1, + metadata={ + "help": "Number of steps used for a linear warmup from 0 to `learning_rate`. Overrides any effect of `warmup_ratio`." + }, + ) + evaluation_strategy: Optional[str] = dataclasses.field( + default="no", + metadata={ + "help": 'The evaluation strategy to adopt during training. Possible values are: ["no", "step", "epoch]' + }, + ) + eval_steps: Optional[int] = dataclasses.field( + default=1, + metadata={"help": 'Number of update steps between two evaluations if `evaluation_strategy="steps"`.'}, + ) + eval_metric: Optional[str] = dataclasses.field( + default="accuracy", metadata={"help": "The evaluation metric used for the task."} + ) + keep_checkpoint_max: Optional[int] = dataclasses.field( + default=1, + metadata={"help": "The maximum number of best checkpoint files to keep."}, + ) + early_stopping_patience: Optional[int] = dataclasses.field( + default=10, + metadata={"help": "Number of evaluation calls with no improvement after which training will be stopped."}, + ) + early_stopping_threshold: Optional[float] = dataclasses.field( + default=0.0, + metadata={ + "help": "How much the specified evaluation metric must improve to satisfy early stopping conditions." + }, + ) + + +def train(args, accelerator, model, tokenizer, train_dataloader, optimizer, lr_scheduler, eval_dataloader=None): + """Train a model on the given training data.""" + + total_batch_size = args.per_device_train_batch_size * accelerator.num_processes * args.gradient_accumulation_steps + + logger.info("***** Running training *****") + logger.info(" Num examples = %d", args.num_examples[Split.TRAIN.value]) + logger.info(" Instantaneous batch size per device = %d", args.per_device_train_batch_size) + logger.info(" Total train batch size (w. parallel, distributed & accumulation) = %d", total_batch_size) + logger.info(" Gradient Accumulation steps = %d", args.gradient_accumulation_steps) + logger.info(" Total optimization steps = %d", args.max_steps) + + # Only show the progress bar once on each machine. + progress_bar = tqdm(range(args.max_steps), disable=not accelerator.is_local_main_process) + + checkpoints = None + eval_results = None + best_checkpoint = None + best_eval_result = None + early_stopping_patience_counter = 0 + should_training_stop = False + epoch = 0 + completed_steps = 0 + train_loss = 0.0 + model.zero_grad() + + for _ in range(args.num_train_epochs): + epoch += 1 + model.train() + for step, batch in enumerate(train_dataloader): + outputs = model(**batch) + loss = outputs.loss + loss = loss / args.gradient_accumulation_steps + accelerator.backward(loss) + train_loss += loss.item() + + if step % args.gradient_accumulation_steps == 0 or step == len(train_dataloader) - 1: + optimizer.step() + lr_scheduler.step() + optimizer.zero_grad() + progress_bar.update(1) + completed_steps += 1 + + # Evaluate during training + if ( + eval_dataloader is not None + and args.evaluation_strategy == IntervalStrategy.STEPS.value + and args.eval_steps > 0 + and completed_steps % args.eval_steps == 0 + ): + accelerator.wait_for_everyone() + new_checkpoint = f"checkpoint-{IntervalStrategy.STEPS.value}-{completed_steps}" + new_eval_result = evaluate(args, accelerator, eval_dataloader, "eval", model, new_checkpoint)[ + args.eval_metric + ] + logger.info( + "Evaluation result at step %d: %s = %f", completed_steps, args.eval_metric, new_eval_result + ) + if checkpoints is None: + checkpoints = np.array([new_checkpoint]) + eval_results = np.array([new_eval_result]) + best_checkpoint = new_checkpoint + best_eval_result = new_eval_result + else: + if new_eval_result - best_eval_result > args.early_stopping_threshold: + best_checkpoint = new_checkpoint + best_eval_result = new_eval_result + early_stopping_patience_counter = 0 + else: + if new_eval_result == best_eval_result: + best_checkpoint = new_checkpoint + best_eval_result = new_eval_result + early_stopping_patience_counter += 1 + + if early_stopping_patience_counter >= args.early_stopping_patience: + should_training_stop = True + + checkpoints = np.append(checkpoints, [new_checkpoint], axis=0) + eval_results = np.append(eval_results, [new_eval_result], axis=0) + sorted_ids = np.argsort(eval_results) + eval_results = eval_results[sorted_ids] + checkpoints = checkpoints[sorted_ids] + + if len(checkpoints) > args.keep_checkpoint_max: + # Delete the current worst checkpoint + checkpoint_to_remove, *checkpoints = checkpoints + eval_results = eval_results[1:] + if checkpoint_to_remove != new_checkpoint: + if accelerator.is_main_process: + shutil.rmtree(os.path.join(args.output_dir, checkpoint_to_remove), ignore_errors=True) + accelerator.wait_for_everyone() + + if new_checkpoint in checkpoints: + # Save model checkpoint + checkpoint_output_dir = os.path.join(args.output_dir, new_checkpoint) + if accelerator.is_main_process: + if not os.path.exists(checkpoint_output_dir): + os.makedirs(checkpoint_output_dir) + accelerator.wait_for_everyone() + unwrapped_model = accelerator.unwrap_model(model) + unwrapped_model.save_pretrained(checkpoint_output_dir, save_function=accelerator.save) + if accelerator.is_main_process: + tokenizer.save_pretrained(checkpoint_output_dir) + logger.info("Saving model checkpoint to %s", checkpoint_output_dir) + + if completed_steps >= args.max_steps: + break + + if should_training_stop: + break + + # Evaluate during training + if eval_dataloader is not None and args.evaluation_strategy == IntervalStrategy.EPOCH.value: + accelerator.wait_for_everyone() + new_checkpoint = f"checkpoint-{IntervalStrategy.EPOCH.value}-{epoch}" + new_eval_result = evaluate(args, accelerator, eval_dataloader, "eval", model, new_checkpoint)[ + args.eval_metric + ] + logger.info("Evaluation result at epoch %d: %s = %f", epoch, args.eval_metric, new_eval_result) + + if checkpoints is None: + checkpoints = np.array([new_checkpoint]) + eval_results = np.array([new_eval_result]) + best_checkpoint = new_checkpoint + best_eval_result = new_eval_result + else: + if new_eval_result - best_eval_result > args.early_stopping_threshold: + best_checkpoint = new_checkpoint + best_eval_result = new_eval_result + early_stopping_patience_counter = 0 + else: + if new_eval_result == best_eval_result: + best_checkpoint = new_checkpoint + best_eval_result = new_eval_result + early_stopping_patience_counter += 1 + + if early_stopping_patience_counter >= args.early_stopping_patience: + should_training_stop = True + + checkpoints = np.append(checkpoints, [new_checkpoint], axis=0) + eval_results = np.append(eval_results, [new_eval_result], axis=0) + sorted_ids = np.argsort(eval_results) + eval_results = eval_results[sorted_ids] + checkpoints = checkpoints[sorted_ids] + + if len(checkpoints) > args.keep_checkpoint_max: + # Delete the current worst checkpoint + checkpoint_to_remove, *checkpoints = checkpoints + eval_results = eval_results[1:] + if checkpoint_to_remove != new_checkpoint: + if accelerator.is_main_process: + shutil.rmtree(os.path.join(args.output_dir, checkpoint_to_remove), ignore_errors=True) + accelerator.wait_for_everyone() + + if new_checkpoint in checkpoints: + # Save model checkpoint + checkpoint_output_dir = os.path.join(args.output_dir, new_checkpoint) + if accelerator.is_main_process: + if not os.path.exists(checkpoint_output_dir): + os.makedirs(checkpoint_output_dir) + accelerator.wait_for_everyone() + unwrapped_model = accelerator.unwrap_model(model) + unwrapped_model.save_pretrained(checkpoint_output_dir, save_function=accelerator.save) + if accelerator.is_main_process: + tokenizer.save_pretrained(checkpoint_output_dir) + logger.info("Saving model checkpoint to %s", checkpoint_output_dir) + + if completed_steps >= args.max_steps: + break + + if should_training_stop: + break + + if best_checkpoint is not None: + # Save the best checkpoint + logger.info("Best checkpoint: %s", best_checkpoint) + logger.info("Best evaluation result: %s = %f", args.eval_metric, best_eval_result) + best_checkpoint_output_dir = os.path.join(args.output_dir, best_checkpoint) + if accelerator.is_main_process: + shutil.move(best_checkpoint_output_dir, os.path.join(args.output_dir, "best-checkpoint")) + shutil.rmtree(best_checkpoint_output_dir, ignore_errors=True) + accelerator.wait_for_everyone() + + else: + # Assume that the last checkpoint is the best checkpoint and save it + checkpoint_output_dir = os.path.join(args.output_dir, "best-checkpoint") + if not os.path.exists(checkpoint_output_dir): + os.makedirs(checkpoint_output_dir) + + accelerator.wait_for_everyone() + unwrapped_model = accelerator.unwrap_model(model) + unwrapped_model.save_pretrained(checkpoint_output_dir, save_function=accelerator.save) + if accelerator.is_main_process: + tokenizer.save_pretrained(checkpoint_output_dir) + logger.info("Saving model checkpoint to %s", checkpoint_output_dir) + return completed_steps, train_loss / completed_steps + + +def evaluate(args, accelerator, dataloader, eval_set, model, checkpoint, has_labels=True, write_to_file=True): + """Evaluate a model checkpoint on the given evaluation data.""" + + num_examples = args.num_examples[eval_set] + eval_metric = None + completed_steps = 0 + eval_loss = 0.0 + all_predictions = None + all_references = None + all_probabilities = None + + if has_labels: + # Get the metric function + eval_metric = load_metric(args.eval_metric) + + eval_results = {} + model.eval() + for _, batch in enumerate(dataloader): + with torch.no_grad(): + outputs = model(**batch) + + eval_loss += outputs.loss.item() + logits = outputs.logits + predictions = logits.argmax(dim=-1) if not args.is_regression else logits.squeeze() + predictions = accelerator.gather(predictions) + + if all_predictions is None: + all_predictions = predictions.detach().cpu().numpy() + else: + all_predictions = np.append(all_predictions, predictions.detach().cpu().numpy(), axis=0) + + if not args.is_regression: + probabilities = logits.softmax(dim=-1).max(dim=-1).values + probabilities = accelerator.gather(probabilities) + if all_probabilities is None: + all_probabilities = probabilities.detach().cpu().numpy() + else: + all_probabilities = np.append(all_probabilities, probabilities.detach().cpu().numpy(), axis=0) + + if has_labels: + references = batch["labels"] + references = accelerator.gather(references) + if all_references is None: + all_references = references.detach().cpu().numpy() + else: + all_references = np.append(all_references, references.detach().cpu().numpy(), axis=0) + + eval_metric.add_batch( + predictions=predictions, + references=references, + ) + completed_steps += 1 + + if has_labels: + eval_results.update(eval_metric.compute()) + eval_results["completed_steps"] = completed_steps + eval_results["avg_eval_loss"] = eval_loss / completed_steps + + if write_to_file: + accelerator.wait_for_everyone() + if accelerator.is_main_process: + results_file = os.path.join(args.output_dir, f"{eval_set}_results_{checkpoint}.json") + with open(results_file, "w") as f: + json.dump(eval_results, f, indent=4, sort_keys=True) + + if write_to_file: + accelerator.wait_for_everyone() + if accelerator.is_main_process: + output_file = os.path.join(args.output_dir, f"{eval_set}_output_{checkpoint}.csv") + if not args.is_regression: + assert len(all_predictions) == len(all_probabilities) + df = pd.DataFrame(list(zip(all_predictions, all_probabilities)), columns=["prediction", "probability"]) + else: + df = pd.DataFrame(all_predictions, columns=["prediction"]) + df = df.head(num_examples) + df.to_csv(output_file, header=True, index=False) + return eval_results + + +def load_from_pretrained(args, pretrained_model_name_or_path): + """Load the pretrained model and tokenizer.""" + + # In distributed training, the .from_pretrained methods guarantee that only + # one local process can concurrently perform this procedure. + + config = AutoConfig.from_pretrained( + pretrained_model_name_or_path, + num_labels=args.num_labels if hasattr(args, "num_labels") else None, + finetuning_task=args.task_name.lower(), + cache_dir=args.cache_dir, + ) + tokenizer = AutoTokenizer.from_pretrained( + pretrained_model_name_or_path, use_fast=args.use_fast_tokenizer, cache_dir=args.cache_dir + ) + model = AutoModelForSequenceClassification.from_pretrained( + pretrained_model_name_or_path, + from_tf=bool(".ckpt" in args.model_name_or_path), + config=config, + ignore_mismatched_sizes=True, + cache_dir=args.cache_dir, + ) + return config, tokenizer, model + + +def finetune(accelerator, model_name_or_path, train_file, output_dir, **kwargs): + """Fine-tuning a pre-trained model on a downstream task. + + Args: + accelerator: An instance of an accelerator for distributed training (on + multi-GPU, TPU) or mixed precision training. + model_name_or_path: Path to pretrained model or model identifier from + huggingface.co/models. + train_file: A csv or a json file containing the training data. + output_dir: The output directory where the model predictions and checkpoints + will be written. + **kwargs: Dictionary of key/value pairs with which to update the + configuration object after loading. The values in kwargs of any keys which + are configuration attributes will be used to override the loaded values. + """ + # Make one log on every process with the configuration for debugging. + logging.basicConfig( + format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", + datefmt="%m/%d/%Y %H:%M:%S", + level=logging.INFO, + ) + logger.info(accelerator.state) + + # Setup logging, we only want one process per machine to log things on the + # screen. accelerator.is_local_main_process is only True for one process per + # machine. + logger.setLevel(logging.INFO if accelerator.is_local_main_process else logging.ERROR) + + model_args = FTModelArguments(model_name_or_path=model_name_or_path) + data_args = FTDataArguments(train_file=train_file) + training_args = FTTrainingArguments(output_dir=output_dir) + args = argparse.Namespace() + + for arg_class in (model_args, data_args, training_args): + for key, value in vars(arg_class).items(): + setattr(args, key, value) + + for key, value in kwargs.items(): + if hasattr(args, key): + setattr(args, key, value) + + # Sanity checks + data_files = {} + args.data_file_extension = None + + # You need to provide the training data as we always run training + args.do_train = True + assert args.train_file is not None + data_files[Split.TRAIN.value] = args.train_file + + if args.do_eval or args.evaluation_strategy != IntervalStrategy.NO.value: + assert args.eval_file is not None + data_files[Split.EVAL.value] = args.eval_file + + if args.do_eval and args.test_file is not None: + data_files[Split.TEST.value] = args.test_file + + if args.do_predict: + assert args.infer_file is not None + data_files[Split.INFER.value] = args.infer_file + + for key in data_files: + extension = data_files[key].split(".")[-1] + assert extension in ["csv", "json"], f"`{key}_file` should be a csv or a json file." + if args.data_file_extension is None: + args.data_file_extension = extension + else: + assert extension == args.data_file_extension, f"`{key}_file` should be a {args.data_file_extension} file`." + + assert ( + args.eval_metric in datasets.list_metrics() + ), f"{args.eval_metric} not in the list of supported metrics {datasets.list_metrics()}." + + # Handle the output directory creation + if accelerator.is_main_process: + if args.output_dir is not None: + os.makedirs(args.output_dir, exist_ok=True) + accelerator.wait_for_everyone() + + # If passed along, set the training seed now. + if args.seed is not None: + set_seed(args.seed) + + # You need to provide your CSV/JSON data files. + # + # For CSV/JSON files, this script will use as labels the column called 'label' + # and as pair of sentences the sentences in columns called 'sentence1' and + # 'sentence2' if these columns exist or the first two columns not named + # 'label' if at least two columns are provided. + # + # If the CSVs/JSONs contain only one non-label column, the script does single + # sentence classification on this single column. + # + # In distributed training, the load_dataset function guarantees that only one + # local process can download the dataset. + + # Loading the dataset from local csv or json files. + raw_datasets = load_dataset(args.data_file_extension, data_files=data_files) + + # Labels + is_regression = raw_datasets[Split.TRAIN.value].features["label"].dtype in ["float32", "float64"] + args.is_regression = is_regression + + if args.is_regression: + label_list = None + num_labels = 1 + else: + label_list = args.label_list + assert label_list is not None + label_list.sort() # Let's sort it for determinism + num_labels = len(label_list) + args.num_labels = num_labels + + # Load pre-trained model + config, tokenizer, model = load_from_pretrained(args, args.model_name_or_path) + + # Preprocessing the datasets + non_label_column_names = [name for name in raw_datasets[Split.TRAIN.value].column_names if name != "label"] + if "sentence1" in non_label_column_names and "sentence2" in non_label_column_names: + sentence1_key, sentence2_key = "sentence1", "sentence2" + else: + if len(non_label_column_names) >= 2: + sentence1_key, sentence2_key = non_label_column_names[:2] + else: + sentence1_key, sentence2_key = non_label_column_names[0], None + + label_to_id = {v: i for i, v in enumerate(label_list)} + config.label2id = label_to_id + config.id2label = {id: label for label, id in config.label2id.items()} + padding = "max_length" if args.pad_to_max_length else False + + def preprocess_function(examples): + # Tokenize the texts + texts = ( + (examples[sentence1_key],) if sentence2_key is None else (examples[sentence1_key], examples[sentence2_key]) + ) + result = tokenizer(*texts, padding=padding, max_length=args.max_length, truncation=True) + + if "label" in examples: + if label_to_id is not None: + # Map labels to IDs (not necessary for GLUE tasks) + result["labels"] = [label_to_id[l] for l in examples["label"]] + else: + # In all cases, rename the column to labels because the model will + # expect that. + result["labels"] = examples["label"] + return result + + with accelerator.main_process_first(): + processed_datasets = raw_datasets.map( + preprocess_function, + batched=True, + remove_columns=raw_datasets[Split.TRAIN.value].column_names, + desc="Running tokenizer on dataset", + ) + + num_examples = {} + splits = [s.value for s in Split] + for split in splits: + if split in processed_datasets: + num_examples[split] = len(processed_datasets[split]) + args.num_examples = num_examples + + train_dataset = processed_datasets[Split.TRAIN.value] + eval_dataset = processed_datasets[Split.EVAL.value] if Split.EVAL.value in processed_datasets else None + test_dataset = processed_datasets[Split.TEST.value] if Split.TEST.value in processed_datasets else None + infer_dataset = processed_datasets[Split.INFER.value] if Split.INFER.value in processed_datasets else None + + # Log a few random samples from the training set: + for index in random.sample(range(len(train_dataset)), 3): + logger.info("Sample %d of the training set: %s.", index, train_dataset[index]) + + # DataLoaders creation: + if args.pad_to_max_length: + # If padding was already done ot max length, we use the default data + # collator that will just convert everything to tensors. + data_collator = default_data_collator + else: + # Otherwise, `DataCollatorWithPadding` will apply dynamic padding for us (by + # padding to the maximum length of the samples passed). When using mixed + # precision, we add `pad_to_multiple_of=8` to pad all tensors to multiple of + # 8s, which will enable the use of Tensor Cores on NVIDIA hardware with + # compute capability >= 7.5 (Volta). + data_collator = DataCollatorWithPadding(tokenizer, pad_to_multiple_of=(8 if accelerator.use_fp16 else None)) + + train_dataloader = DataLoader( + train_dataset, + batch_size=args.per_device_train_batch_size, + shuffle=True, + collate_fn=data_collator, + ) + eval_dataloader, test_dataloader, infer_dataloader = None, None, None + + if eval_dataset is not None: + eval_dataloader = DataLoader( + eval_dataset, batch_size=args.per_device_eval_batch_size, collate_fn=data_collator + ) + + if test_dataset is not None: + test_dataloader = DataLoader( + test_dataset, batch_size=args.per_device_eval_batch_size, collate_fn=data_collator + ) + + if infer_dataset is not None: + infer_dataloader = DataLoader( + infer_dataset, batch_size=args.per_device_eval_batch_size, collate_fn=data_collator + ) + + # Optimizer + # Split weights in two groups, one with weight decay and the other not. + no_decay = ["bias", "LayerNorm.weight"] + optimizer_grouped_parameters = [ + { + "params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)], + "weight_decay": args.weight_decay, + }, + { + "params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)], + "weight_decay": 0.0, + }, + ] + optimizer = AdamW(optimizer_grouped_parameters, lr=args.learning_rate) + + # Prepare everything with our `accelerator`. + model, optimizer, train_dataloader, eval_dataloader, test_dataloader, infer_dataloader = accelerator.prepare( + model, optimizer, train_dataloader, eval_dataloader, test_dataloader, infer_dataloader + ) + + # Note -> the training dataloader needs to be prepared before we grab its + # length below (cause its length will be shorter in multiprocess) + + # Scheduler and math around the number of training steps. + num_update_steps_per_epoch = math.ceil(len(train_dataloader) / args.gradient_accumulation_steps) + if args.max_steps == -1: + args.max_steps = args.num_train_epochs * num_update_steps_per_epoch + else: + args.num_train_epochs = math.ceil(args.max_steps / num_update_steps_per_epoch) + + lr_scheduler = get_scheduler( + name=args.lr_scheduler_type, + optimizer=optimizer, + num_warmup_steps=args.warmup_steps, + num_training_steps=args.max_steps, + ) + + # Train + completed_steps, avg_train_loss = train( + args, accelerator, model, tokenizer, train_dataloader, optimizer, lr_scheduler, eval_dataloader + ) + accelerator.wait_for_everyone() + logger.info("Training job completed: completed_steps = %d, avg_train_loss = %f", completed_steps, avg_train_loss) + + args.model_name_or_path = os.path.join(args.output_dir, "best-checkpoint") + logger.info("Loading the best checkpoint: %s", args.model_name_or_path) + config, tokenizer, model = load_from_pretrained(args, args.model_name_or_path) + model = accelerator.prepare(model) + + if args.do_eval: + # Evaluate + if eval_dataloader is not None: + logger.info("***** Running evaluation on the eval data using the best checkpoint *****") + eval_results = evaluate(args, accelerator, eval_dataloader, Split.EVAL.value, model, "best-checkpoint") + avg_eval_loss = eval_results["avg_eval_loss"] + eval_metric = eval_results[args.eval_metric] + logger.info("Evaluation job completed: avg_eval_loss = %f", avg_eval_loss) + logger.info("Evaluation result for the best checkpoint: %s = %f", args.eval_metric, eval_metric) + + if test_dataloader is not None: + logger.info("***** Running evaluation on the test data using the best checkpoint *****") + eval_results = evaluate(args, accelerator, test_dataloader, Split.TEST.value, model, "best-checkpoint") + avg_eval_loss = eval_results["avg_eval_loss"] + eval_metric = eval_results[args.eval_metric] + logger.info("Test job completed: avg_test_loss = %f", avg_eval_loss) + logger.info("Test result for the best checkpoint: %s = %f", args.eval_metric, eval_metric) + + if args.do_predict: + # Predict + if infer_dataloader is not None: + logger.info("***** Running inference using the best checkpoint *****") + evaluate( + args, accelerator, infer_dataloader, Split.INFER.value, model, "best-checkpoint", has_labels=False + ) + logger.info("Inference job completed.") + + # Release all references to the internal objects stored and call the garbage + # collector. You should call this method between two trainings with different + # models/optimizers. + accelerator.free_memory() diff --git a/examples/research_projects/self-training-text-classification/requirements.txt b/examples/research_projects/self-training-text-classification/requirements.txt new file mode 100644 index 0000000000..25d66c8b6a --- /dev/null +++ b/examples/research_projects/self-training-text-classification/requirements.txt @@ -0,0 +1,7 @@ +accelerate +datasets >= 1.8.0 +protobuf +scikit-learn +scipy +sentencepiece != 0.1.92 +torch >= 1.3 diff --git a/examples/research_projects/self-training-text-classification/run.sh b/examples/research_projects/self-training-text-classification/run.sh new file mode 100755 index 0000000000..435a414618 --- /dev/null +++ b/examples/research_projects/self-training-text-classification/run.sh @@ -0,0 +1,81 @@ +# Copyright 2022 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/bin/bash + +# Create a virtual environment +conda deactivate +conda update conda -y +conda update anaconda -y +pip install --upgrade pip +python3 -m pip install --user virtualenv +conda create -n strata python=3.9 -y +conda activate strata +# Install all necessary packages +pip install transformers +pip install -r requirements.txt + +# Download and prepare data +WORK_DIR="/tmp/strata" +rm -rf "${WORK_DIR}" && mkdir -p "${WORK_DIR}" +wget https://storage.googleapis.com/gresearch/strata/demo.zip -P "${WORK_DIR}" +DEMO_ZIP_FILE="${WORK_DIR}/demo.zip" +unzip "${DEMO_ZIP_FILE}" -d "${WORK_DIR}" && rm "${DEMO_ZIP_FILE}" +DATA_DIR="${WORK_DIR}/demo/scitail-8" +OUTPUT_DIR="/tmp/output" +rm -rf "${OUTPUT_DIR}" && mkdir -p "${OUTPUT_DIR}" + +# Specific hyperparameters +MODEL_NAME_OR_PATH="bert-base-uncased" +NUM_NODES=1 +NUM_TRAINERS=4 +LAUNCH_SCRIPT="torchrun --nnodes='${NUM_NODES}' --nproc_per_node='${NUM_TRAINERS}' python -c" +MAX_SELFTRAIN_ITERATIONS=100 +TRAIN_FILE="train.csv" +INFER_FILE="infer.csv" +EVAL_FILE="eval_256.csv" +MAX_STEPS=100000 + +# Start self-training +${LAUNCH_SCRIPT} " +import os +from selftraining import selftrain + +data_dir = '${DATA_DIR}' +parameters_dict = { + 'max_selftrain_iterations': ${MAX_SELFTRAIN_ITERATIONS}, + 'model_name_or_path': '${MODEL_NAME_OR_PATH}', + 'output_dir': '${OUTPUT_DIR}', + 'train_file': os.path.join(data_dir, '${TRAIN_FILE}'), + 'infer_file': os.path.join(data_dir, '${INFER_FILE}'), + 'eval_file': os.path.join(data_dir, '${EVAL_FILE}'), + 'evaluation_strategy': 'steps', + 'task_name': 'scitail', + 'label_list': ['entails', 'neutral'], + 'per_device_train_batch_size': 32, + 'per_device_eval_batch_size': 8, + 'max_length': 128, + 'learning_rate': 2e-5, + 'max_steps': ${MAX_STEPS}, + 'eval_steps': 1, + 'early_stopping_patience': 50, + 'overwrite_output_dir': True, + 'do_filter_by_confidence': False, + 'do_filter_by_val_performance': True, + 'finetune_on_labeled_data': False, + 'seed': 42, +} + +selftrain(**parameters_dict) +" diff --git a/examples/research_projects/self-training-text-classification/selftraining.py b/examples/research_projects/self-training-text-classification/selftraining.py new file mode 100644 index 0000000000..7fde2fd1b8 --- /dev/null +++ b/examples/research_projects/self-training-text-classification/selftraining.py @@ -0,0 +1,388 @@ +# coding=utf-8 +# Copyright 2022 The Google Research Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Self-training for sequence classification.""" + +import argparse +import dataclasses +import json +import logging +import os +import shutil +from typing import List, Optional + +import datasets +from datasets import load_dataset +from tqdm.auto import tqdm + +import transformers +from accelerate import Accelerator +from finetuning import finetune +from transformers import AutoConfig, set_seed +from transformers.trainer_utils import IntervalStrategy + + +logger = logging.getLogger(__name__) + +MODEL_BIN_FILE = "pytorch_model.bin" + + +@dataclasses.dataclass +class STModelArguments: + """Arguments pertaining to which config/tokenizer/model we are going to fine-tune from.""" + + model_name_or_path: str = dataclasses.field( + metadata={"help": "Path to pretrained model or model identifier from huggingface.co/models."} + ) + cache_dir: Optional[str] = dataclasses.field( + default=None, + metadata={"help": "Where do you want to store the pretrained models downloaded from huggingface.co."}, + ) + + +@dataclasses.dataclass +class STDataArguments: + """Arguments pertaining to what data we are going to input our model for training and evaluation.""" + + train_file: str = dataclasses.field(metadata={"help": "A csv or a json file containing the training data."}) + infer_file: str = dataclasses.field(metadata={"help": "A csv or a json file containing the data to predict on."}) + eval_file: Optional[str] = dataclasses.field( + default=None, metadata={"help": "A csv or a json file containing the validation data."} + ) + task_name: Optional[str] = dataclasses.field( + default=None, + metadata={"help": "The name of the task to train on."}, + ) + label_list: Optional[List[str]] = dataclasses.field( + default=None, metadata={"help": "The list of labels for the task."} + ) + + +@dataclasses.dataclass +class STTrainingArguments: + """Training arguments pertaining to the training loop itself.""" + + output_dir: str = dataclasses.field( + metadata={"help": "The output directory where the model predictions and checkpoints will be written."} + ) + eval_metric: Optional[str] = dataclasses.field( + default="accuracy", metadata={"help": "The evaluation metric used for the task."} + ) + evaluation_strategy: Optional[str] = dataclasses.field( + default="no", + metadata={ + "help": 'The evaluation strategy to adopt during training. Possible values are: ["no", "step", "epoch]' + }, + ) + early_stopping_patience: Optional[int] = dataclasses.field( + default=10, + metadata={"help": "Number of evaluation calls with no improvement after which training will be stopped."}, + ) + early_stopping_threshold: Optional[float] = dataclasses.field( + default=0.0, + metadata={ + "help": "How much the specified evaluation metric must improve to satisfy early stopping conditions." + }, + ) + do_filter_by_confidence: Optional[bool] = dataclasses.field( + default=False, + metadata={"help": "Whether to filter the pseudo-labeled data based on the confidence score."}, + ) + do_filter_by_val_performance: Optional[bool] = dataclasses.field( + default=False, + metadata={"help": "Whether to filter the pseudo-labeled data based on the validation performance."}, + ) + finetune_on_labeled_data: Optional[bool] = dataclasses.field( + default=False, + metadata={"help": "Whether to fine-tune on labeled data after pseudo training."}, + ) + confidence_threshold: Optional[float] = dataclasses.field( + default=0.0, + metadata={"help": "Confidence threshold for pseudo-labeled data filtering."}, + ) + max_selftrain_iterations: Optional[int] = dataclasses.field( + default=100, + metadata={"help": "Number of evaluation calls with no improvement after which training will be stopped."}, + ) + seed: Optional[int] = dataclasses.field( + default=None, + metadata={"help": "Random seed for initialization."}, + ) + + +def create_pseudo_labeled_data(args, infer_input, infer_output, eval_result, id2label, next_data_dir): + """Create pseudeo labeled data for the next self-training iteration.""" + + dataset = datasets.concatenate_datasets([infer_input, infer_output], axis=1) + + if args.do_filter_by_confidence: + dataset = dataset.filter(lambda example: example["probability"] > args.confidence_threshold) + + if args.do_filter_by_val_performance: + assert eval_result >= 0.0 and eval_result <= 1.0 + num_selected_rows = int(eval_result * len(dataset)) + print(num_selected_rows) + dataset = dataset.sort("probability", reverse=True) + dataset = dataset.select(range(num_selected_rows)) + + dataset = dataset.remove_columns(["label", "probability"]) + dataset = dataset.rename_column("prediction", "label") + dataset = dataset.map(lambda example: {"label": id2label[example["label"]]}) + dataset = dataset.shuffle(seed=args.seed) + + pseudo_labeled_data_file = os.path.join(next_data_dir, f"train_pseudo.{args.data_file_extension}") + if args.data_file_extension == "csv": + dataset.to_csv(pseudo_labeled_data_file, index=False) + else: + dataset.to_json(pseudo_labeled_data_file) + + +def selftrain(model_name_or_path, train_file, infer_file, output_dir, **kwargs): + """Self-training a pre-trained model on a downstream task. + + Args: + model_name_or_path: Path to pretrained model or model identifier from + huggingface.co/models. + train_file: A csv or a json file containing the training data. + infer_file: A csv or a json file containing the data to predict on. + output_dir: The output directory where the model predictions and checkpoints + will be written. + **kwargs: Dictionary of key/value pairs with which to update the + configuration object after loading. The values in kwargs of any keys which + are configuration attributes will be used to override the loaded values. + """ + # Initialize the accelerator. We will let the accelerator handle device + # placement for us. + accelerator = Accelerator() + # Make one log on every process with the configuration for debugging. + logging.basicConfig( + format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", + datefmt="%m/%d/%Y %H:%M:%S", + level=logging.INFO, + ) + logger.info(accelerator.state) + + # Setup logging, we only want one process per machine to log things on the + # screen. accelerator.is_local_main_process is only True for one process per + # machine. + logger.setLevel(logging.INFO if accelerator.is_local_main_process else logging.ERROR) + + if accelerator.is_local_main_process: + datasets.utils.logging.set_verbosity_warning() + transformers.utils.logging.set_verbosity_info() + else: + datasets.utils.logging.set_verbosity_error() + transformers.utils.logging.set_verbosity_error() + + model_args = STModelArguments(model_name_or_path=model_name_or_path) + data_args = STDataArguments(train_file=train_file, infer_file=infer_file) + training_args = STTrainingArguments(output_dir=output_dir) + args = argparse.Namespace() + + for arg_class in (model_args, data_args, training_args): + for key, value in vars(arg_class).items(): + setattr(args, key, value) + + for key, value in kwargs.items(): + if hasattr(args, key): + setattr(args, key, value) + + # Sanity checks + data_files = {} + args.data_file_extension = None + + # You need to provide the training data and the data to predict on + assert args.train_file is not None + assert args.infer_file is not None + data_files["train"] = args.train_file + data_files["infer"] = args.infer_file + + if args.evaluation_strategy != IntervalStrategy.NO.value: + assert args.eval_file is not None + data_files["eval"] = args.eval_file + + for key in data_files: + extension = data_files[key].split(".")[-1] + assert extension in ["csv", "json"], f"`{key}_file` should be a csv or a json file." + if args.data_file_extension is None: + args.data_file_extension = extension + else: + assert extension == args.data_file_extension, f"`{key}_file` should be a {args.data_file_extension} file`." + + assert ( + args.eval_metric in datasets.list_metrics() + ), f"{args.eval_metric} not in the list of supported metrics {datasets.list_metrics()}." + + # If passed along, set the training seed now. + if args.seed is not None: + set_seed(args.seed) + + logger.info("Creating the initial data directory for self-training...") + data_dir_format = f"{args.output_dir}/self-train_iter-{{}}".format + initial_data_dir = data_dir_format(0) + + if accelerator.is_main_process: + if args.output_dir is not None: + os.makedirs(args.output_dir, exist_ok=True) + os.makedirs(initial_data_dir, exist_ok=True) + accelerator.wait_for_everyone() + + best_iteration = None + best_eval_result = None + early_stopping_patience_counter = 0 + should_training_stop = False + # Show the progress bar + progress_bar = tqdm(range(args.max_selftrain_iterations), disable=not accelerator.is_local_main_process) + + # Self-train + for iteration in range(0, int(args.max_selftrain_iterations)): + current_data_dir = data_dir_format(iteration) + assert os.path.exists(current_data_dir) + + # Stage 1: initial fine-tuning for iteration = 0 or pseudo-training for + # iteration > 0 + current_output_dir = os.path.join(current_data_dir, "stage-1") + arguments_dict = { + "accelerator": accelerator, + "model_name_or_path": args.model_name_or_path, + "cache_dir": args.cache_dir, + "do_train": True, + "train_file": data_files["train"] if iteration == 0 else data_files["train_pseudo"], + "do_eval": True if args.eval_file is not None else False, + "eval_file": data_files["eval"], + "do_predict": True, + "infer_file": data_files["infer"], + "task_name": args.task_name, + "label_list": args.label_list, + "output_dir": current_output_dir, + "eval_metric": args.eval_metric, + "evaluation_strategy": args.evaluation_strategy, + "early_stopping_patience": args.early_stopping_patience, + "early_stopping_threshold": args.early_stopping_threshold, + "seed": args.seed, + } + # Add additional training arguments + for key, value in kwargs.items(): + if key not in arguments_dict and not hasattr(training_args, key): + arguments_dict.update({key: value}) + + model_bin_file_path = os.path.join(current_output_dir, "best-checkpoint", MODEL_BIN_FILE) + if os.path.exists(model_bin_file_path): + logger.info( + "Found existing model checkpoint at %s. Skipping self-training: iteration: %d, stage: 1.", + model_bin_file_path, + iteration, + ) + else: + logger.info("***** Running self-training: iteration: %d, stage: 1 *****", iteration) + finetune(**arguments_dict) + accelerator.wait_for_everyone() + assert os.path.exists(model_bin_file_path) + logger.info("Self-training job completed: iteration: %d, stage: 1.", iteration) + + if iteration > 0 and args.finetune_on_labeled_data: + # Stage 2 (optional): fine-tuning on the original labeled data + model_path = os.path.join(current_output_dir, "best-checkpoint") + current_output_dir = os.path.join(current_data_dir, "stage-2") + # Update arguments_dict + arguments_dict["model_name_or_path"] = model_path + arguments_dict["train_file"] = data_files["train"] + arguments_dict["output_dir"] = current_output_dir + + model_bin_file_path = os.path.join(current_output_dir, "best-checkpoint", MODEL_BIN_FILE) + if os.path.exists(model_bin_file_path): + logger.info( + "Found existing model checkpoint at %s. Skipping self-training: iteration: %d, stage: 2.", + model_bin_file_path, + iteration, + ) + else: + logger.info("***** Running self-training: iteration: %d, stage: 2 *****", iteration) + finetune(**arguments_dict) + accelerator.wait_for_everyone() + assert os.path.exists(model_bin_file_path) + logger.info("Self-training job completed: iteration: %d, stage: 2.", iteration) + + new_iteration = iteration + next_data_dir = data_dir_format(iteration + 1) + + config = AutoConfig.from_pretrained(os.path.join(current_output_dir, "best-checkpoint")) + id2label = config.id2label + eval_results_file = os.path.join(current_output_dir, "eval_results_best-checkpoint.json") + test_results_file = os.path.join(current_output_dir, "test_results_best-checkpoint.json") + assert os.path.exists(eval_results_file) + + with open(eval_results_file, "r") as f: + eval_result = float(json.load(f)[args.eval_metric]) + infer_output_file = os.path.join(current_output_dir, "infer_output_best-checkpoint.csv") + assert os.path.exists(infer_output_file) + # Loading the dataset from local csv or json files. + infer_input = load_dataset(args.data_file_extension, data_files={"data": data_files["infer"]})["data"] + infer_output = load_dataset("csv", data_files={"data": infer_output_file})["data"] + + if accelerator.is_main_process: + os.makedirs(next_data_dir, exist_ok=True) + shutil.copy(eval_results_file, os.path.join(output_dir, f"eval_results_iter-{iteration}.json")) + if os.path.exists(test_results_file): + shutil.copy(eval_results_file, os.path.join(output_dir, f"test_results_iter-{iteration}.json")) + create_pseudo_labeled_data(args, infer_input, infer_output, eval_result, id2label, next_data_dir) + accelerator.wait_for_everyone() + + data_files["train_pseudo"] = os.path.join(next_data_dir, f"train_pseudo.{args.data_file_extension}") + + if args.evaluation_strategy != IntervalStrategy.NO.value: + new_eval_result = eval_result + + if best_iteration is None: + best_iteration = new_iteration + best_eval_result = new_eval_result + else: + if new_eval_result - best_eval_result > args.early_stopping_threshold: + best_iteration = new_iteration + best_eval_result = new_eval_result + early_stopping_patience_counter = 0 + else: + if new_eval_result == best_eval_result: + best_iteration = new_iteration + best_eval_result = new_eval_result + early_stopping_patience_counter += 1 + + if early_stopping_patience_counter >= args.early_stopping_patience: + should_training_stop = True + + progress_bar.update(1) + + if should_training_stop: + break + + if best_iteration is not None: + # Save the best iteration + logger.info("Best iteration: %d", best_iteration) + logger.info("Best evaluation result: %s = %f", args.eval_metric, best_eval_result) + accelerator.wait_for_everyone() + if accelerator.is_main_process: + shutil.copy( + os.path.join(output_dir, f"eval_results_iter-{iteration}.json"), + os.path.join(output_dir, "eval_results_best-iteration.json"), + ) + else: + # Assume that the last iteration is the best + logger.info("Best iteration: %d", args.max_selftrain_iterations - 1) + logger.info("Best evaluation result: %s = %f", args.eval_metric, eval_result) + accelerator.wait_for_everyone() + if accelerator.is_main_process: + shutil.copy( + os.path.join(output_dir, f"eval_results_iter-{args.max_selftrain_iterations - 1}.json"), + os.path.join(output_dir, "eval_results_best-iteration.json"), + )