CI reporting improvements (#38230)

update

Co-authored-by: ydshieh <ydshieh@users.noreply.github.com>
This commit is contained in:
Yih-Dar
2025-05-20 19:34:58 +02:00
committed by GitHub
parent cb513e35f9
commit feec294dea
8 changed files with 375 additions and 155 deletions

View File

@@ -14,7 +14,6 @@
import ast
import collections
import datetime
import functools
import json
import operator
@@ -26,7 +25,7 @@ from typing import Any, Dict, List, Optional, Union
import requests
from get_ci_error_statistics import get_jobs
from get_previous_daily_ci import get_last_daily_ci_reports
from get_previous_daily_ci import get_last_daily_ci_reports, get_last_daily_ci_run, get_last_daily_ci_workflow_run_id
from huggingface_hub import HfApi
from slack_sdk import WebClient
@@ -109,6 +108,7 @@ class Message:
additional_results: Dict,
selected_warnings: Optional[List] = None,
prev_ci_artifacts=None,
other_ci_artifacts=None,
):
self.title = title
self.ci_title = ci_title
@@ -159,6 +159,7 @@ class Message:
self.selected_warnings = selected_warnings
self.prev_ci_artifacts = prev_ci_artifacts
self.other_ci_artifacts = other_ci_artifacts
@property
def time(self) -> str:
@@ -515,71 +516,83 @@ class Message:
if len(self.selected_warnings) > 0:
blocks.append(self.warnings)
new_failure_blocks = self.get_new_model_failure_blocks(with_header=False)
if len(new_failure_blocks) > 0:
blocks.extend(new_failure_blocks)
for idx, (prev_workflow_run_id, prev_ci_artifacts) in enumerate(
[self.prev_ci_artifacts] + self.other_ci_artifacts
):
if idx == 0:
# This is the truncated version to show on slack. For now.
new_failure_blocks = self.get_new_model_failure_blocks(
prev_ci_artifacts=prev_ci_artifacts, with_header=False
)
if len(new_failure_blocks) > 0:
blocks.extend(new_failure_blocks)
# To save the list of new model failures
extra_blocks = self.get_new_model_failure_blocks(to_truncate=False)
if extra_blocks:
failure_text = extra_blocks[-1]["text"]["text"]
file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/new_model_failures.txt")
with open(file_path, "w", encoding="UTF-8") as fp:
fp.write(failure_text)
# To save the list of new model failures and uploaed to hub repositories
extra_blocks = self.get_new_model_failure_blocks(prev_ci_artifacts=prev_ci_artifacts, to_truncate=False)
if extra_blocks:
filename = "new_model_failures"
if idx > 0:
filename = f"{filename}_against_{prev_workflow_run_id}"
# upload results to Hub dataset
file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/new_model_failures.txt")
commit_info = api.upload_file(
path_or_fileobj=file_path,
path_in_repo=f"{datetime.datetime.today().strftime('%Y-%m-%d')}/ci_results_{job_name}/new_model_failures.txt",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
url = f"https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/{commit_info.oid}/{datetime.datetime.today().strftime('%Y-%m-%d')}/ci_results_{job_name}/new_model_failures.txt"
failure_text = extra_blocks[-1]["text"]["text"]
file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/{filename}.txt")
with open(file_path, "w", encoding="UTF-8") as fp:
fp.write(failure_text)
# extra processing to save to json format
new_failed_tests = {}
for line in failure_text.split():
if "https://github.com/huggingface/transformers/actions/runs" in line:
pattern = r"<(https://github.com/huggingface/transformers/actions/runs/.+?/job/.+?)\|(.+?)>"
items = re.findall(pattern, line)
elif "tests/" in line:
if "tests/models/" in line:
model = line.split("/")[2]
else:
model = line.split("/")[1]
if model not in new_failed_tests:
new_failed_tests[model] = {"single-gpu": [], "multi-gpu": []}
for url, device in items:
new_failed_tests[model][f"{device}-gpu"].append(line)
file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/new_model_failures.json")
with open(file_path, "w", encoding="UTF-8") as fp:
json.dump(new_failed_tests, fp, ensure_ascii=False, indent=4)
# upload results to Hub dataset
file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/{filename}.txt")
commit_info = api.upload_file(
path_or_fileobj=file_path,
path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/{filename}.txt",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
url = f"https://huggingface.co/datasets/hf-internal-testing/transformers_daily_ci/raw/{commit_info.oid}/{report_repo_folder}/ci_results_{job_name}/{filename}.txt"
# upload results to Hub dataset
file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/new_model_failures.json")
_ = api.upload_file(
path_or_fileobj=file_path,
path_in_repo=f"{datetime.datetime.today().strftime('%Y-%m-%d')}/ci_results_{job_name}/new_model_failures.json",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
# extra processing to save to json format
new_failed_tests = {}
for line in failure_text.split():
if "https://github.com/huggingface/transformers/actions/runs" in line:
pattern = r"<(https://github.com/huggingface/transformers/actions/runs/.+?/job/.+?)\|(.+?)>"
items = re.findall(pattern, line)
elif "tests/" in line:
if "tests/models/" in line:
model = line.split("/")[2]
else:
model = line.split("/")[1]
if model not in new_failed_tests:
new_failed_tests[model] = {"single-gpu": [], "multi-gpu": []}
for url, device in items:
new_failed_tests[model][f"{device}-gpu"].append(line)
file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/{filename}.json")
with open(file_path, "w", encoding="UTF-8") as fp:
json.dump(new_failed_tests, fp, ensure_ascii=False, indent=4)
block = {
"type": "section",
"text": {
"type": "plain_text",
"text": " ",
},
"accessory": {
"type": "button",
"text": {"type": "plain_text", "text": "Check New model failures"},
"url": url,
},
}
blocks.append(block)
# upload results to Hub dataset
file_path = os.path.join(os.getcwd(), f"ci_results_{job_name}/{filename}.json")
_ = api.upload_file(
path_or_fileobj=file_path,
path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/{filename}.json",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
if idx == 0:
block = {
"type": "section",
"text": {
"type": "plain_text",
"text": " ",
},
"accessory": {
"type": "button",
"text": {"type": "plain_text", "text": "Check New model failures"},
"url": url,
},
}
blocks.append(block)
return json.dumps(blocks)
@@ -700,18 +713,18 @@ class Message:
{"type": "section", "text": {"type": "mrkdwn", "text": failure_text}},
]
def get_new_model_failure_blocks(self, with_header=True, to_truncate=True):
if self.prev_ci_artifacts is None:
def get_new_model_failure_blocks(self, prev_ci_artifacts, with_header=True, to_truncate=True):
if prev_ci_artifacts is None:
return []
sorted_dict = sorted(self.model_results.items(), key=lambda t: t[0])
prev_model_results = {}
if (
f"ci_results_{job_name}" in self.prev_ci_artifacts
and "model_results.json" in self.prev_ci_artifacts[f"ci_results_{job_name}"]
f"ci_results_{job_name}" in prev_ci_artifacts
and "model_results.json" in prev_ci_artifacts[f"ci_results_{job_name}"]
):
prev_model_results = json.loads(self.prev_ci_artifacts[f"ci_results_{job_name}"]["model_results.json"])
prev_model_results = json.loads(prev_ci_artifacts[f"ci_results_{job_name}"]["model_results.json"])
all_failure_lines = {}
for job, job_result in sorted_dict:
@@ -812,20 +825,6 @@ class Message:
time.sleep(1)
blocks = self.get_new_model_failure_blocks()
if blocks:
print("Sending the following reply")
print(json.dumps({"blocks": blocks}))
client.chat_postMessage(
channel=SLACK_REPORT_CHANNEL_ID,
text="Results for new failures",
blocks=blocks,
thread_ts=self.thread_ts["ts"],
)
time.sleep(1)
def retrieve_artifact(artifact_path: str, gpu: Optional[str]):
if gpu not in [None, "single", "multi"]:
@@ -1168,6 +1167,23 @@ if __name__ == "__main__":
"run_torch_cuda_extensions_gpu": "DeepSpeed",
}
# if it is not a scheduled run, upload the reports to a subfolder under `report_repo_folder`
report_repo_subfolder = ""
if os.getenv("GITHUB_EVENT_NAME") != "schedule":
report_repo_subfolder = f"{os.getenv('GITHUB_RUN_NUMBER')}-{os.getenv('GITHUB_RUN_ID')}"
report_repo_subfolder = f"runs/{report_repo_subfolder}"
workflow_run = get_last_daily_ci_run(
token=os.environ["ACCESS_REPO_INFO_TOKEN"], workflow_run_id=os.getenv("GITHUB_RUN_ID")
)
workflow_run_created_time = workflow_run["created_at"]
workflow_id = workflow_run["workflow_id"]
report_repo_folder = workflow_run_created_time.split("T")[0]
if report_repo_subfolder:
report_repo_folder = f"{report_repo_folder}/{report_repo_subfolder}"
# Remove some entries in `additional_files` if they are not concerned.
test_name = None
if job_name in job_to_test_map:
@@ -1241,8 +1257,9 @@ if __name__ == "__main__":
if not os.path.isdir(os.path.join(os.getcwd(), f"ci_results_{job_name}")):
os.makedirs(os.path.join(os.getcwd(), f"ci_results_{job_name}"))
target_workflow = "huggingface/transformers/.github/workflows/self-scheduled-caller.yml@refs/heads/main"
is_scheduled_ci_run = os.environ.get("CI_WORKFLOW_REF") == target_workflow
nvidia_daily_ci_workflow = "huggingface/transformers/.github/workflows/self-scheduled-caller.yml"
is_nvidia_daily_ci_workflow = os.environ.get("GITHUB_WORKFLOW_REF").startswith(nvidia_daily_ci_workflow)
is_scheduled_ci_run = os.environ.get("GITHUB_EVENT_NAME") == "schedule"
# Only the model testing job is concerned: this condition is to avoid other jobs to upload the empty list as
# results.
@@ -1250,15 +1267,13 @@ if __name__ == "__main__":
with open(f"ci_results_{job_name}/model_results.json", "w", encoding="UTF-8") as fp:
json.dump(model_results, fp, indent=4, ensure_ascii=False)
# upload results to Hub dataset (only for the scheduled daily CI run on `main`)
if is_scheduled_ci_run:
api.upload_file(
path_or_fileobj=f"ci_results_{job_name}/model_results.json",
path_in_repo=f"{datetime.datetime.today().strftime('%Y-%m-%d')}/ci_results_{job_name}/model_results.json",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
api.upload_file(
path_or_fileobj=f"ci_results_{job_name}/model_results.json",
path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/model_results.json",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
# Let's create a file contain job --> job link
model_job_links = {}
@@ -1272,15 +1287,13 @@ if __name__ == "__main__":
with open(f"ci_results_{job_name}/model_job_links.json", "w", encoding="UTF-8") as fp:
json.dump(model_job_links, fp, indent=4, ensure_ascii=False)
# upload results to Hub dataset (only for the scheduled daily CI run on `main`)
if is_scheduled_ci_run:
api.upload_file(
path_or_fileobj=f"ci_results_{job_name}/model_job_links.json",
path_in_repo=f"{datetime.datetime.today().strftime('%Y-%m-%d')}/ci_results_{job_name}/model_job_links.json",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
api.upload_file(
path_or_fileobj=f"ci_results_{job_name}/model_job_links.json",
path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/model_job_links.json",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
# Must have the same keys as in `additional_results`.
# The values are used as the file names where to save the corresponding CI job results.
@@ -1294,26 +1307,57 @@ if __name__ == "__main__":
with open(f"ci_results_{job_name}/{test_to_result_name[job]}_results.json", "w", encoding="UTF-8") as fp:
json.dump(job_result, fp, indent=4, ensure_ascii=False)
# upload results to Hub dataset (only for the scheduled daily CI run on `main`)
if is_scheduled_ci_run:
api.upload_file(
path_or_fileobj=f"ci_results_{job_name}/{test_to_result_name[job]}_results.json",
path_in_repo=f"{datetime.datetime.today().strftime('%Y-%m-%d')}/ci_results_{job_name}/{test_to_result_name[job]}_results.json",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
api.upload_file(
path_or_fileobj=f"ci_results_{job_name}/{test_to_result_name[job]}_results.json",
path_in_repo=f"{report_repo_folder}/ci_results_{job_name}/{test_to_result_name[job]}_results.json",
repo_id="hf-internal-testing/transformers_daily_ci",
repo_type="dataset",
token=os.environ.get("TRANSFORMERS_CI_RESULTS_UPLOAD_TOKEN", None),
)
prev_workflow_run_id = None
other_workflow_run_ids = []
prev_ci_artifacts = None
if is_scheduled_ci_run:
# TODO: remove `if job_name == "run_models_gpu"`
if job_name == "run_models_gpu":
# Get the last previously completed CI's failure tables
prev_workflow_run_id = get_last_daily_ci_workflow_run_id(
token=os.environ["ACCESS_REPO_INFO_TOKEN"], workflow_id=workflow_id
)
# For a scheduled run that is not the Nvidia's scheduled daily CI, add Nvidia's scheduled daily CI run as a target to compare.
if not is_nvidia_daily_ci_workflow:
# The id of the workflow `.github/workflows/self-scheduled-caller.yml` (not of a workflow run of it).
other_workflow_id = "90575235"
# We need to get the Nvidia's scheduled daily CI run that match the current run (i.e. run with the same commit SHA)
other_workflow_run_id = get_last_daily_ci_workflow_run_id(
token=os.environ["ACCESS_REPO_INFO_TOKEN"], workflow_id=other_workflow_id, commit_sha=ci_sha
)
other_workflow_run_ids.append(other_workflow_run_id)
else:
prev_workflow_run_id = os.environ["PREV_WORKFLOW_RUN_ID"]
other_workflow_run_id = os.environ["OTHER_WORKFLOW_RUN_ID"]
other_workflow_run_ids.append(other_workflow_run_id)
prev_ci_artifacts = (None, None)
other_ci_artifacts = []
for idx, target_workflow_run_id in enumerate([prev_workflow_run_id] + other_workflow_run_ids):
if target_workflow_run_id is None or target_workflow_run_id == "":
continue
else:
artifact_names = [f"ci_results_{job_name}"]
output_dir = os.path.join(os.getcwd(), "previous_reports")
os.makedirs(output_dir, exist_ok=True)
prev_ci_artifacts = get_last_daily_ci_reports(
artifact_names=artifact_names, output_dir=output_dir, token=os.environ["ACCESS_REPO_INFO_TOKEN"]
ci_artifacts = get_last_daily_ci_reports(
artifact_names=artifact_names,
output_dir=output_dir,
token=os.environ["ACCESS_REPO_INFO_TOKEN"],
workflow_run_id=target_workflow_run_id,
)
if idx == 0:
prev_ci_artifacts = (target_workflow_run_id, ci_artifacts)
else:
other_ci_artifacts.append((target_workflow_run_id, ci_artifacts))
job_to_test_map.update(
{
@@ -1335,6 +1379,7 @@ if __name__ == "__main__":
additional_results,
selected_warnings=selected_warnings,
prev_ci_artifacts=prev_ci_artifacts,
other_ci_artifacts=other_ci_artifacts,
)
# send report only if there is any failure (for push CI)