Jia multi gpu eval (#16428)
* add simple multi gpu complet * add human_eval_multi_gpu * use copy strategy to distribute across gpu, to avoid padding * add doc string * update code style * use task id to arrange output * truncate input to avoid zero pad * Stop the copy mechanism * update style * restore copies to scale better in distributed mode * update style * replace human eval * Apply suggestions from code review 1. Tokenize all input at the same time 2. use attention_mask to get the input length 3. other small fixes Co-authored-by: Leandro von Werra <lvwerra@users.noreply.github.com> * correct typo and update docstring * update code style * remove num sample division constraint * remove max len calculation * use accelerator.gather once to speed up * use accelerate set_seed; update accelerate version * correct gather bug Co-authored-by: Leandro von Werra <lvwerra@users.noreply.github.com>
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
transformers==4.15.0
|
||||
datasets==1.16.0
|
||||
accelerate==0.5.1
|
||||
accelerate==0.6.2
|
||||
wandb==0.12.0
|
||||
tensorboard==2.6.0
|
||||
torch==1.9.0
|
||||
|
||||
@@ -2,26 +2,51 @@ import json
|
||||
import multiprocessing
|
||||
import os
|
||||
import re
|
||||
from collections import defaultdict
|
||||
|
||||
import torch
|
||||
from datasets import load_dataset, load_metric
|
||||
from torch.utils.data import IterableDataset
|
||||
from torch.utils.data.dataloader import DataLoader
|
||||
from tqdm import tqdm
|
||||
|
||||
import transformers
|
||||
from accelerate import Accelerator
|
||||
from accelerate.utils import set_seed
|
||||
from arguments import HumanEvalArguments
|
||||
from transformers import (
|
||||
AutoModelForCausalLM,
|
||||
AutoTokenizer,
|
||||
HfArgumentParser,
|
||||
StoppingCriteria,
|
||||
StoppingCriteriaList,
|
||||
pipeline,
|
||||
set_seed,
|
||||
)
|
||||
from transformers import AutoModelForCausalLM, AutoTokenizer, HfArgumentParser, StoppingCriteria, StoppingCriteriaList
|
||||
|
||||
|
||||
EOF_STRINGS = ["\nclass", "\ndef", "\n#", "\n@", "\nprint", "\nif"]
|
||||
|
||||
|
||||
class TokenizedDataset(IterableDataset):
|
||||
"""Tokenize and preprocess the dataset
|
||||
Multiple copies of the same prompt are sent sequentially.
|
||||
See compute_code for more details.
|
||||
"""
|
||||
|
||||
def __init__(self, tokenizer, dataset, n_tasks=None, n_copies=1):
|
||||
self.tokenizer = tokenizer
|
||||
self.dataset = dataset
|
||||
self.n_tasks = len(dataset) if n_tasks is None else n_tasks
|
||||
self.n_copies = n_copies
|
||||
|
||||
def __iter__(self):
|
||||
prompts = []
|
||||
for task in range(self.n_tasks):
|
||||
# without strip, the model generate commented codes ...
|
||||
prompts.append(self.tokenizer.eos_token + self.dataset[task]["prompt"].strip())
|
||||
outputs = self.tokenizer(prompts, padding=True, return_tensors="pt")
|
||||
for task in range(self.n_tasks):
|
||||
for _ in range(self.n_copies):
|
||||
yield {
|
||||
"ids": outputs.input_ids[task],
|
||||
"task_id": task,
|
||||
"input_len": outputs.attention_mask[task].sum(),
|
||||
}
|
||||
|
||||
|
||||
class EndOfFunctionCriteria(StoppingCriteria):
|
||||
"""Custom `StoppingCriteria` which checks if all generated functions in the batch are completed."""
|
||||
|
||||
@@ -39,16 +64,77 @@ class EndOfFunctionCriteria(StoppingCriteria):
|
||||
return all(done)
|
||||
|
||||
|
||||
def first_block(string):
|
||||
"""Split off first block of code by scanning for class, def etc. on newlines."""
|
||||
return re.split("|".join(EOF_STRINGS), string)[0].rstrip()
|
||||
def remove_last_block(string):
|
||||
"""Remove the last block of the code containing EOF_STRINGS"""
|
||||
string_list = re.split("(%s)" % "|".join(EOF_STRINGS), string)
|
||||
# last string should be ""
|
||||
return "".join(string_list[:-2])
|
||||
|
||||
|
||||
def complete_code(pipe, prompt, num_completions=1, **gen_kwargs):
|
||||
"""Complete prompt with text generation pipeline and return num_completions."""
|
||||
prompt = pipe.tokenizer.eos_token + prompt
|
||||
code_gens = pipe(prompt, num_return_sequences=num_completions, **gen_kwargs)
|
||||
return [first_block(code_gen["generated_text"][len(prompt) :]) for code_gen in code_gens]
|
||||
def complete_code(accelerator, model, tokenizer, dataloader, n_tasks, batch_size=20, **gen_kwargs):
|
||||
"""Generate multiple codes for each task in the dataset. This function leverage accelerator to distribute
|
||||
the processing to multiple GPUs.
|
||||
dataloader, a wrapper around a TokenizeDataset objectm is supposed to send all the prompts from
|
||||
the evalution dataset to the modelm as the following:
|
||||
[p_0_0, p_0_1, ..., p_0_nc-1, p_1_0, ..., p_nt-1_nc-1]
|
||||
where nc is the number of copies of the prompt, and nt is the number of tasks.
|
||||
nc is such that num_sample = nc * batch_size
|
||||
|
||||
Parameters
|
||||
----------
|
||||
accelerator: Accelerator
|
||||
|
||||
model: transformers.PreTrainedModel
|
||||
Code generation model. AutoTokenizer.from_pretrained(model_ckpt), ex model_ckpt = "lvwerra/codeparrot"
|
||||
|
||||
tokenizer: transformers.AutoTokenizer
|
||||
The tokenizer used to train model
|
||||
|
||||
dataloader: DataLoader
|
||||
The dataloader is a wrapper around a TokenizeDataset object. It is designed to be used with multiple GPUs.
|
||||
|
||||
n_tasks: int
|
||||
The number of tasks in the dataset. It is used to determine the length of the output.
|
||||
Should be aligned with the number of tasks in the TokenizeDataset.
|
||||
|
||||
batch_size: int
|
||||
num_return_sequences per copy of the prompt such that num_sample = batch_size * n_copies
|
||||
|
||||
gen_kwargs: dict
|
||||
Keyword arguments for the generation function of the model.
|
||||
|
||||
Returns
|
||||
-------
|
||||
code_gens: list of list of str, of length n_tasks
|
||||
List of generated codes for each task.
|
||||
Each element is a list of generated codes for each task, with length num_samples
|
||||
"""
|
||||
gen_token_dict = defaultdict(list) # dict of list of generated tokens
|
||||
for step, batch in tqdm(enumerate(dataloader)):
|
||||
with torch.no_grad():
|
||||
gen_kwargs["stopping_criteria"][0].start_length = batch["ids"].shape[-1]
|
||||
generated_tokens = accelerator.unwrap_model(model).generate(
|
||||
input_ids=batch["ids"][:, : batch["input_len"]], num_return_sequences=batch_size, **gen_kwargs
|
||||
)
|
||||
# each task is generated batch_size times
|
||||
generated_tasks = batch["task_id"].repeat(batch_size)
|
||||
generated_tokens = accelerator.pad_across_processes(
|
||||
generated_tokens, dim=1, pad_index=tokenizer.pad_token_id
|
||||
)
|
||||
|
||||
generated_tokens, generated_tasks = accelerator.gather((generated_tokens, generated_tasks))
|
||||
generated_tokens = generated_tokens.cpu().numpy()
|
||||
generated_tasks = generated_tasks.cpu().numpy()
|
||||
|
||||
for task, generated_tokens in zip(generated_tasks, generated_tokens):
|
||||
gen_token_dict[task].append(generated_tokens)
|
||||
|
||||
code_gens = [[] for _ in range(n_tasks)]
|
||||
for task, generated_tokens in gen_token_dict.items():
|
||||
for s in generated_tokens:
|
||||
gen_code = tokenizer.decode(s, skip_special_tokens=True, clean_up_tokenization_spaces=True)
|
||||
code_gens[task].append(remove_last_block(gen_code))
|
||||
return code_gens
|
||||
|
||||
|
||||
def main():
|
||||
@@ -65,12 +151,14 @@ def main():
|
||||
if args.num_workers is None:
|
||||
args.num_workers = multiprocessing.cpu_count()
|
||||
|
||||
set_seed(args.seed)
|
||||
# Use dataset load to feed to accelerate
|
||||
accelerator = Accelerator()
|
||||
set_seed(args.seed, device_specific=True)
|
||||
|
||||
# Load model and tokenizer
|
||||
tokenizer = AutoTokenizer.from_pretrained(args.model_ckpt)
|
||||
tokenizer.pad_token = tokenizer.eos_token
|
||||
model = AutoModelForCausalLM.from_pretrained(args.model_ckpt)
|
||||
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer, device=args.device_int)
|
||||
|
||||
# Generation settings
|
||||
gen_kwargs = {
|
||||
@@ -86,6 +174,13 @@ def main():
|
||||
human_eval = load_dataset("openai_humaneval")
|
||||
code_eval_metric = load_metric("code_eval")
|
||||
|
||||
n_tasks = args.num_tasks if args.num_tasks is not None else len(human_eval["test"])
|
||||
n_copies = args.n_samples // args.batch_size
|
||||
|
||||
human_eval_tokenized = TokenizedDataset(tokenizer, human_eval["test"], n_copies=n_copies, n_tasks=n_tasks)
|
||||
# do not confuse args.batch_size, which is actually the num_return_sequences
|
||||
human_eval_loader = DataLoader(human_eval_tokenized, batch_size=1)
|
||||
|
||||
# Run a quick test to see if code evaluation is enabled
|
||||
try:
|
||||
_ = code_eval_metric.compute(references=[""], predictions=[[""]])
|
||||
@@ -95,29 +190,35 @@ def main():
|
||||
)
|
||||
raise exception
|
||||
|
||||
# Generate completions for evaluation set
|
||||
n_tasks = args.num_tasks if args.num_tasks is not None else len(human_eval["test"])
|
||||
generations, references = [], []
|
||||
for task in tqdm(range(n_tasks)):
|
||||
task_generations = []
|
||||
prompt = human_eval["test"][task]["prompt"].strip()
|
||||
gen_kwargs["stopping_criteria"][0].start_length = len(tokenizer(prompt)["input_ids"])
|
||||
for batch in range(args.n_samples // args.batch_size):
|
||||
task_generations.extend(complete_code(pipe, prompt, num_completions=args.batch_size, **gen_kwargs))
|
||||
generations.append([prompt + gen for gen in task_generations])
|
||||
test_func = human_eval["test"][task]["test"]
|
||||
entry_point = f"check({human_eval['test'][task]['entry_point']})"
|
||||
references.append("\n" + test_func + "\n" + entry_point)
|
||||
model, human_eval_loader = accelerator.prepare(model, human_eval_loader)
|
||||
|
||||
# Evaluate completions with "code_eval" metric
|
||||
pass_at_k, _ = code_eval_metric.compute(
|
||||
references=references, predictions=generations, num_workers=args.num_workers
|
||||
generations = complete_code(
|
||||
accelerator,
|
||||
model,
|
||||
tokenizer,
|
||||
human_eval_loader,
|
||||
n_tasks=n_tasks,
|
||||
batch_size=args.batch_size,
|
||||
**gen_kwargs,
|
||||
)
|
||||
print(f"Results: {pass_at_k}")
|
||||
|
||||
# Save results to json file
|
||||
with open(args.output_file, "w") as fp:
|
||||
json.dump(pass_at_k, fp)
|
||||
if accelerator.is_main_process:
|
||||
references = []
|
||||
|
||||
for task in tqdm(range(n_tasks)):
|
||||
test_func = human_eval["test"][task]["test"]
|
||||
entry_point = f"check({human_eval['test'][task]['entry_point']})"
|
||||
references.append("\n" + test_func + "\n" + entry_point)
|
||||
|
||||
# Evaluate completions with "code_eval" metric
|
||||
pass_at_k, _ = code_eval_metric.compute(
|
||||
references=references, predictions=generations, num_workers=args.num_workers
|
||||
)
|
||||
print(f"Results: {pass_at_k}")
|
||||
|
||||
# Save results to json file
|
||||
with open(args.output_file, "w") as fp:
|
||||
json.dump(pass_at_k, fp)
|
||||
|
||||
|
||||
# For some reason the folliwng seems to be necessary sometimes for code_eval to work nice with multiprocessing
|
||||
|
||||
Reference in New Issue
Block a user