From 4868a830db5f19f56712f540979d637368221d50 Mon Sep 17 00:00:00 2001 From: Jia LI Date: Mon, 11 Apr 2022 11:24:32 +0200 Subject: [PATCH] Jia multi gpu eval (#16428) * add simple multi gpu complet * add human_eval_multi_gpu * use copy strategy to distribute across gpu, to avoid padding * add doc string * update code style * use task id to arrange output * truncate input to avoid zero pad * Stop the copy mechanism * update style * restore copies to scale better in distributed mode * update style * replace human eval * Apply suggestions from code review 1. Tokenize all input at the same time 2. use attention_mask to get the input length 3. other small fixes Co-authored-by: Leandro von Werra * correct typo and update docstring * update code style * remove num sample division constraint * remove max len calculation * use accelerator.gather once to speed up * use accelerate set_seed; update accelerate version * correct gather bug Co-authored-by: Leandro von Werra --- .../codeparrot/requirements.txt | 2 +- .../codeparrot/scripts/human_eval.py | 179 ++++++++++++++---- 2 files changed, 141 insertions(+), 40 deletions(-) diff --git a/examples/research_projects/codeparrot/requirements.txt b/examples/research_projects/codeparrot/requirements.txt index 14b27c8e25..a8aadb4ed9 100644 --- a/examples/research_projects/codeparrot/requirements.txt +++ b/examples/research_projects/codeparrot/requirements.txt @@ -1,6 +1,6 @@ transformers==4.15.0 datasets==1.16.0 -accelerate==0.5.1 +accelerate==0.6.2 wandb==0.12.0 tensorboard==2.6.0 torch==1.9.0 diff --git a/examples/research_projects/codeparrot/scripts/human_eval.py b/examples/research_projects/codeparrot/scripts/human_eval.py index ea9eb82146..1eb5555cd7 100644 --- a/examples/research_projects/codeparrot/scripts/human_eval.py +++ b/examples/research_projects/codeparrot/scripts/human_eval.py @@ -2,26 +2,51 @@ import json import multiprocessing import os import re +from collections import defaultdict +import torch from datasets import load_dataset, load_metric +from torch.utils.data import IterableDataset +from torch.utils.data.dataloader import DataLoader from tqdm import tqdm import transformers +from accelerate import Accelerator +from accelerate.utils import set_seed from arguments import HumanEvalArguments -from transformers import ( - AutoModelForCausalLM, - AutoTokenizer, - HfArgumentParser, - StoppingCriteria, - StoppingCriteriaList, - pipeline, - set_seed, -) +from transformers import AutoModelForCausalLM, AutoTokenizer, HfArgumentParser, StoppingCriteria, StoppingCriteriaList EOF_STRINGS = ["\nclass", "\ndef", "\n#", "\n@", "\nprint", "\nif"] +class TokenizedDataset(IterableDataset): + """Tokenize and preprocess the dataset + Multiple copies of the same prompt are sent sequentially. + See compute_code for more details. + """ + + def __init__(self, tokenizer, dataset, n_tasks=None, n_copies=1): + self.tokenizer = tokenizer + self.dataset = dataset + self.n_tasks = len(dataset) if n_tasks is None else n_tasks + self.n_copies = n_copies + + def __iter__(self): + prompts = [] + for task in range(self.n_tasks): + # without strip, the model generate commented codes ... + prompts.append(self.tokenizer.eos_token + self.dataset[task]["prompt"].strip()) + outputs = self.tokenizer(prompts, padding=True, return_tensors="pt") + for task in range(self.n_tasks): + for _ in range(self.n_copies): + yield { + "ids": outputs.input_ids[task], + "task_id": task, + "input_len": outputs.attention_mask[task].sum(), + } + + class EndOfFunctionCriteria(StoppingCriteria): """Custom `StoppingCriteria` which checks if all generated functions in the batch are completed.""" @@ -39,16 +64,77 @@ class EndOfFunctionCriteria(StoppingCriteria): return all(done) -def first_block(string): - """Split off first block of code by scanning for class, def etc. on newlines.""" - return re.split("|".join(EOF_STRINGS), string)[0].rstrip() +def remove_last_block(string): + """Remove the last block of the code containing EOF_STRINGS""" + string_list = re.split("(%s)" % "|".join(EOF_STRINGS), string) + # last string should be "" + return "".join(string_list[:-2]) -def complete_code(pipe, prompt, num_completions=1, **gen_kwargs): - """Complete prompt with text generation pipeline and return num_completions.""" - prompt = pipe.tokenizer.eos_token + prompt - code_gens = pipe(prompt, num_return_sequences=num_completions, **gen_kwargs) - return [first_block(code_gen["generated_text"][len(prompt) :]) for code_gen in code_gens] +def complete_code(accelerator, model, tokenizer, dataloader, n_tasks, batch_size=20, **gen_kwargs): + """Generate multiple codes for each task in the dataset. This function leverage accelerator to distribute + the processing to multiple GPUs. + dataloader, a wrapper around a TokenizeDataset objectm is supposed to send all the prompts from + the evalution dataset to the modelm as the following: + [p_0_0, p_0_1, ..., p_0_nc-1, p_1_0, ..., p_nt-1_nc-1] + where nc is the number of copies of the prompt, and nt is the number of tasks. + nc is such that num_sample = nc * batch_size + + Parameters + ---------- + accelerator: Accelerator + + model: transformers.PreTrainedModel + Code generation model. AutoTokenizer.from_pretrained(model_ckpt), ex model_ckpt = "lvwerra/codeparrot" + + tokenizer: transformers.AutoTokenizer + The tokenizer used to train model + + dataloader: DataLoader + The dataloader is a wrapper around a TokenizeDataset object. It is designed to be used with multiple GPUs. + + n_tasks: int + The number of tasks in the dataset. It is used to determine the length of the output. + Should be aligned with the number of tasks in the TokenizeDataset. + + batch_size: int + num_return_sequences per copy of the prompt such that num_sample = batch_size * n_copies + + gen_kwargs: dict + Keyword arguments for the generation function of the model. + + Returns + ------- + code_gens: list of list of str, of length n_tasks + List of generated codes for each task. + Each element is a list of generated codes for each task, with length num_samples + """ + gen_token_dict = defaultdict(list) # dict of list of generated tokens + for step, batch in tqdm(enumerate(dataloader)): + with torch.no_grad(): + gen_kwargs["stopping_criteria"][0].start_length = batch["ids"].shape[-1] + generated_tokens = accelerator.unwrap_model(model).generate( + input_ids=batch["ids"][:, : batch["input_len"]], num_return_sequences=batch_size, **gen_kwargs + ) + # each task is generated batch_size times + generated_tasks = batch["task_id"].repeat(batch_size) + generated_tokens = accelerator.pad_across_processes( + generated_tokens, dim=1, pad_index=tokenizer.pad_token_id + ) + + generated_tokens, generated_tasks = accelerator.gather((generated_tokens, generated_tasks)) + generated_tokens = generated_tokens.cpu().numpy() + generated_tasks = generated_tasks.cpu().numpy() + + for task, generated_tokens in zip(generated_tasks, generated_tokens): + gen_token_dict[task].append(generated_tokens) + + code_gens = [[] for _ in range(n_tasks)] + for task, generated_tokens in gen_token_dict.items(): + for s in generated_tokens: + gen_code = tokenizer.decode(s, skip_special_tokens=True, clean_up_tokenization_spaces=True) + code_gens[task].append(remove_last_block(gen_code)) + return code_gens def main(): @@ -65,12 +151,14 @@ def main(): if args.num_workers is None: args.num_workers = multiprocessing.cpu_count() - set_seed(args.seed) + # Use dataset load to feed to accelerate + accelerator = Accelerator() + set_seed(args.seed, device_specific=True) # Load model and tokenizer tokenizer = AutoTokenizer.from_pretrained(args.model_ckpt) + tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained(args.model_ckpt) - pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=args.device_int) # Generation settings gen_kwargs = { @@ -86,6 +174,13 @@ def main(): human_eval = load_dataset("openai_humaneval") code_eval_metric = load_metric("code_eval") + n_tasks = args.num_tasks if args.num_tasks is not None else len(human_eval["test"]) + n_copies = args.n_samples // args.batch_size + + human_eval_tokenized = TokenizedDataset(tokenizer, human_eval["test"], n_copies=n_copies, n_tasks=n_tasks) + # do not confuse args.batch_size, which is actually the num_return_sequences + human_eval_loader = DataLoader(human_eval_tokenized, batch_size=1) + # Run a quick test to see if code evaluation is enabled try: _ = code_eval_metric.compute(references=[""], predictions=[[""]]) @@ -95,29 +190,35 @@ def main(): ) raise exception - # Generate completions for evaluation set - n_tasks = args.num_tasks if args.num_tasks is not None else len(human_eval["test"]) - generations, references = [], [] - for task in tqdm(range(n_tasks)): - task_generations = [] - prompt = human_eval["test"][task]["prompt"].strip() - gen_kwargs["stopping_criteria"][0].start_length = len(tokenizer(prompt)["input_ids"]) - for batch in range(args.n_samples // args.batch_size): - task_generations.extend(complete_code(pipe, prompt, num_completions=args.batch_size, **gen_kwargs)) - generations.append([prompt + gen for gen in task_generations]) - test_func = human_eval["test"][task]["test"] - entry_point = f"check({human_eval['test'][task]['entry_point']})" - references.append("\n" + test_func + "\n" + entry_point) + model, human_eval_loader = accelerator.prepare(model, human_eval_loader) - # Evaluate completions with "code_eval" metric - pass_at_k, _ = code_eval_metric.compute( - references=references, predictions=generations, num_workers=args.num_workers + generations = complete_code( + accelerator, + model, + tokenizer, + human_eval_loader, + n_tasks=n_tasks, + batch_size=args.batch_size, + **gen_kwargs, ) - print(f"Results: {pass_at_k}") - # Save results to json file - with open(args.output_file, "w") as fp: - json.dump(pass_at_k, fp) + if accelerator.is_main_process: + references = [] + + for task in tqdm(range(n_tasks)): + test_func = human_eval["test"][task]["test"] + entry_point = f"check({human_eval['test'][task]['entry_point']})" + references.append("\n" + test_func + "\n" + entry_point) + + # Evaluate completions with "code_eval" metric + pass_at_k, _ = code_eval_metric.compute( + references=references, predictions=generations, num_workers=args.num_workers + ) + print(f"Results: {pass_at_k}") + + # Save results to json file + with open(args.output_file, "w") as fp: + json.dump(pass_at_k, fp) # For some reason the folliwng seems to be necessary sometimes for code_eval to work nice with multiprocessing