diff --git a/.circleci/config.yml b/.circleci/config.yml index e94030c1d1..42142928a2 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -301,6 +301,22 @@ jobs: - store_artifacts: path: ~/transformers/reports + run_tests_git_lfs: + working_directory: ~/transformers + docker: + - image: circleci/python:3.7 + resource_class: xlarge + parallelism: 1 + steps: + - checkout + - run: sudo apt-get install git-lfs + - run: | + git config --global user.email "ci@dummy.com" + git config --global user.name "ci" + - run: pip install --upgrade pip + - run: pip install .[testing] + - run: RUN_GIT_LFS_TESTS=1 python -m pytest -sv ./tests/test_hf_api.py -k "HfLargefilesTest" + build_doc: working_directory: ~/transformers docker: @@ -428,6 +444,7 @@ workflows: - run_tests_flax - run_tests_pipelines_torch - run_tests_pipelines_tf + - run_tests_git_lfs - build_doc - deploy_doc: *workflow_filters tpu_testing_jobs: diff --git a/src/transformers/commands/lfs.py b/src/transformers/commands/lfs.py new file mode 100644 index 0000000000..5ca2c2cace --- /dev/null +++ b/src/transformers/commands/lfs.py @@ -0,0 +1,219 @@ +""" +Implementation of a custom transfer agent for the transfer type "multipart" for git-lfs. + +Inspired by: github.com/cbartz/git-lfs-swift-transfer-agent/blob/master/git_lfs_swift_transfer.py + +Spec is: github.com/git-lfs/git-lfs/blob/master/docs/custom-transfers.md + + +To launch debugger while developing: + +``` [lfs "customtransfer.multipart"] + +path = /path/to/transformers/.env/bin/python + +args = -m debugpy --listen 5678 --wait-for-client /path/to/transformers/src/transformers/commands/transformers_cli.py +lfs-multipart-upload ``` +""" + +import json +import os +import subprocess +import sys +from argparse import ArgumentParser +from contextlib import AbstractContextManager +from typing import Dict, List, Optional + +import requests +from transformers.commands import BaseTransformersCLICommand + +from ..utils import logging + + +logger = logging.get_logger(__name__) # pylint: disable=invalid-name + + +LFS_MULTIPART_UPLOAD_COMMAND = "lfs-multipart-upload" + + +class LfsCommands(BaseTransformersCLICommand): + """ + Implementation of a custom transfer agent for the transfer type "multipart" for git-lfs. This lets users upload + large files >5GB 🔥. Spec for LFS custom transfer agent is: + https://github.com/git-lfs/git-lfs/blob/master/docs/custom-transfers.md + + This introduces two commands to the CLI: + + 1. $ transformers-cli lfs-enable-largefiles + + This should be executed once for each model repo that contains a model file >5GB. It's documented in the error + message you get if you just try to git push a 5GB file without having enabled it before. + + 2. $ transformers-cli lfs-multipart-upload + + This command is called by lfs directly and is not meant to be called by the user. + """ + + @staticmethod + def register_subcommand(parser: ArgumentParser): + enable_parser = parser.add_parser( + "lfs-enable-largefiles", help="Configure your repository to enable upload of files > 5GB." + ) + enable_parser.add_argument("path", type=str, help="Local path to repository you want to configure.") + enable_parser.set_defaults(func=lambda args: LfsEnableCommand(args)) + + upload_parser = parser.add_parser( + LFS_MULTIPART_UPLOAD_COMMAND, help="Command will get called by git-lfs, do not call it directly." + ) + upload_parser.set_defaults(func=lambda args: LfsUploadCommand(args)) + + +class LfsEnableCommand: + def __init__(self, args): + self.args = args + + def run(self): + local_path = os.path.abspath(self.args.path) + if not os.path.isdir(local_path): + print("This does not look like a valid git repo.") + exit(1) + subprocess.run( + "git config lfs.customtransfer.multipart.path transformers-cli".split(), check=True, cwd=local_path + ) + subprocess.run( + f"git config lfs.customtransfer.multipart.args {LFS_MULTIPART_UPLOAD_COMMAND}".split(), + check=True, + cwd=local_path, + ) + print("Local repo set up for largefiles") + + +def write_msg(msg: Dict): + """Write out the message in Line delimited JSON.""" + msg = json.dumps(msg) + "\n" + sys.stdout.write(msg) + sys.stdout.flush() + + +def read_msg() -> Optional[Dict]: + """Read Line delimited JSON from stdin. """ + msg = json.loads(sys.stdin.readline().strip()) + + if "terminate" in (msg.get("type"), msg.get("event")): + # terminate message received + return None + + if msg.get("event") not in ("download", "upload"): + logger.critical("Received unexpected message") + sys.exit(1) + + return msg + + +class FileSlice(AbstractContextManager): + """ + File-like object that only reads a slice of a file + + Inspired by stackoverflow.com/a/29838711/593036 + """ + + def __init__(self, filepath: str, seek_from: int, read_limit: int): + self.filepath = filepath + self.seek_from = seek_from + self.read_limit = read_limit + self.n_seen = 0 + + def __enter__(self): + self.f = open(self.filepath, "rb") + self.f.seek(self.seek_from) + return self + + def __len__(self): + total_length = os.fstat(self.f.fileno()).st_size + return min(self.read_limit, total_length - self.seek_from) + + def read(self, n=-1): + if self.n_seen >= self.read_limit: + return b"" + remaining_amount = self.read_limit - self.n_seen + data = self.f.read(remaining_amount if n < 0 else min(n, remaining_amount)) + self.n_seen += len(data) + return data + + def __iter__(self): + yield self.read(n=4 * 1024 * 1024) + + def __exit__(self, *args): + self.f.close() + + +class LfsUploadCommand: + def __init__(self, args): + self.args = args + + def run(self): + # Immediately after invoking a custom transfer process, git-lfs + # sends initiation data to the process over stdin. + # This tells the process useful information about the configuration. + init_msg = json.loads(sys.stdin.readline().strip()) + if not (init_msg.get("event") == "init" and init_msg.get("operation") == "upload"): + write_msg({"error": {"code": 32, "message": "Wrong lfs init operation"}}) + sys.exit(1) + + # The transfer process should use the information it needs from the + # initiation structure, and also perform any one-off setup tasks it + # needs to do. It should then respond on stdout with a simple empty + # confirmation structure, as follows: + write_msg({}) + + # After the initiation exchange, git-lfs will send any number of + # transfer requests to the stdin of the transfer process, in a serial sequence. + while True: + msg = read_msg() + if msg is None: + # When all transfers have been processed, git-lfs will send + # a terminate event to the stdin of the transfer process. + # On receiving this message the transfer process should + # clean up and terminate. No response is expected. + sys.exit(0) + + oid = msg["oid"] + filepath = msg["path"] + completion_url = msg["action"]["href"] + header = msg["action"]["header"] + chunk_size = int(header.pop("chunk_size")) + presigned_urls: List[str] = list(header.values()) + + parts = [] + for i, presigned_url in enumerate(presigned_urls): + with FileSlice(filepath, seek_from=i * chunk_size, read_limit=chunk_size) as data: + r = requests.put(presigned_url, data=data) + r.raise_for_status() + parts.append( + { + "etag": r.headers.get("etag"), + "partNumber": i + 1, + } + ) + # In order to support progress reporting while data is uploading / downloading, + # the transfer process should post messages to stdout + write_msg( + { + "event": "progress", + "oid": oid, + "bytesSoFar": (i + 1) * chunk_size, + "bytesSinceLast": chunk_size, + } + ) + # Not precise but that's ok. + + r = requests.post( + completion_url, + json={ + "oid": oid, + "parts": parts, + }, + ) + r.raise_for_status() + + write_msg({"event": "complete", "oid": oid}) diff --git a/src/transformers/commands/transformers_cli.py b/src/transformers/commands/transformers_cli.py index eaa2bcaa22..be95d56db0 100644 --- a/src/transformers/commands/transformers_cli.py +++ b/src/transformers/commands/transformers_cli.py @@ -5,6 +5,7 @@ from transformers.commands.add_new_model import AddNewModelCommand from transformers.commands.convert import ConvertCommand from transformers.commands.download import DownloadCommand from transformers.commands.env import EnvironmentCommand +from transformers.commands.lfs import LfsCommands from transformers.commands.run import RunCommand from transformers.commands.serving import ServeCommand from transformers.commands.user import UserCommands @@ -22,6 +23,7 @@ def main(): ServeCommand.register_subcommand(commands_parser) UserCommands.register_subcommand(commands_parser) AddNewModelCommand.register_subcommand(commands_parser) + LfsCommands.register_subcommand(commands_parser) # Let's go args = parser.parse_args() diff --git a/src/transformers/hf_api.py b/src/transformers/hf_api.py index d5f9977608..59578a2a79 100644 --- a/src/transformers/hf_api.py +++ b/src/transformers/hf_api.py @@ -95,6 +95,8 @@ class ModelInfo: class HfApi: + ALLOWED_S3_FILE_TYPES = ["datasets", "metrics"] + def __init__(self, endpoint=None): self.endpoint = endpoint if endpoint is not None else ENDPOINT @@ -130,13 +132,14 @@ class HfApi: r = requests.post(path, headers={"authorization": "Bearer {}".format(token)}) r.raise_for_status() - def presign(self, token: str, filename: str, organization: Optional[str] = None) -> PresignedUrl: + def presign(self, token: str, filetype: str, filename: str, organization: Optional[str] = None) -> PresignedUrl: """ HuggingFace S3-based system, used for datasets and metrics. Call HF API to get a presigned url to upload `filename` to S3. """ - path = "{}/api/datasets/presign".format(self.endpoint) + assert filetype in self.ALLOWED_S3_FILE_TYPES, f"Please specify filetype from {self.ALLOWED_S3_FILE_TYPES}" + path = f"{self.endpoint}/api/{filetype}/presign" r = requests.post( path, headers={"authorization": "Bearer {}".format(token)}, @@ -146,7 +149,9 @@ class HfApi: d = r.json() return PresignedUrl(**d) - def presign_and_upload(self, token: str, filename: str, filepath: str, organization: Optional[str] = None) -> str: + def presign_and_upload( + self, token: str, filetype: str, filename: str, filepath: str, organization: Optional[str] = None + ) -> str: """ HuggingFace S3-based system, used for datasets and metrics. @@ -154,7 +159,8 @@ class HfApi: Outputs: url: Read-only url for the stored file on S3. """ - urls = self.presign(token, filename=filename, organization=organization) + assert filetype in self.ALLOWED_S3_FILE_TYPES, f"Please specify filetype from {self.ALLOWED_S3_FILE_TYPES}" + urls = self.presign(token, filetype=filetype, filename=filename, organization=organization) # streaming upload: # https://2.python-requests.org/en/master/user/advanced/#streaming-uploads # @@ -169,26 +175,28 @@ class HfApi: pf.close() return urls.access - def list_objs(self, token: str, organization: Optional[str] = None) -> List[S3Obj]: + def list_objs(self, token: str, filetype: str, organization: Optional[str] = None) -> List[S3Obj]: """ HuggingFace S3-based system, used for datasets and metrics. Call HF API to list all stored files for user (or one of their organizations). """ - path = "{}/api/datasets/listObjs".format(self.endpoint) + assert filetype in self.ALLOWED_S3_FILE_TYPES, f"Please specify filetype from {self.ALLOWED_S3_FILE_TYPES}" + path = "{}/api/{}/listObjs".format(self.endpoint, filetype) params = {"organization": organization} if organization is not None else None r = requests.get(path, params=params, headers={"authorization": "Bearer {}".format(token)}) r.raise_for_status() d = r.json() return [S3Obj(**x) for x in d] - def delete_obj(self, token: str, filename: str, organization: Optional[str] = None): + def delete_obj(self, token: str, filetype: str, filename: str, organization: Optional[str] = None): """ HuggingFace S3-based system, used for datasets and metrics. Call HF API to delete a file stored by user """ - path = "{}/api/datasets/deleteObj".format(self.endpoint) + assert filetype in self.ALLOWED_S3_FILE_TYPES, f"Please specify filetype from {self.ALLOWED_S3_FILE_TYPES}" + path = "{}/api/{}/deleteObj".format(self.endpoint, filetype) r = requests.delete( path, headers={"authorization": "Bearer {}".format(token)}, @@ -219,17 +227,25 @@ class HfApi: d = r.json() return [RepoObj(**x) for x in d] - def create_repo(self, token: str, name: str, organization: Optional[str] = None) -> str: + def create_repo( + self, token: str, name: str, organization: Optional[str] = None, lfsmultipartthresh: Optional[int] = None + ) -> str: """ HuggingFace git-based system, used for models. Call HF API to create a whole repo. + + Params: + lfsmultipartthresh: Optional: internal param for testing purposes. """ path = "{}/api/repos/create".format(self.endpoint) + json = {"name": name, "organization": organization} + if lfsmultipartthresh is not None: + json["lfsmultipartthresh"] = lfsmultipartthresh r = requests.post( path, headers={"authorization": "Bearer {}".format(token)}, - json={"name": name, "organization": organization}, + json=json, ) r.raise_for_status() d = r.json() diff --git a/src/transformers/testing_utils.py b/src/transformers/testing_utils.py index 03b53c1768..786a3c2486 100644 --- a/src/transformers/testing_utils.py +++ b/src/transformers/testing_utils.py @@ -62,6 +62,7 @@ _run_slow_tests = parse_flag_from_env("RUN_SLOW", default=False) _run_pt_tf_cross_tests = parse_flag_from_env("RUN_PT_TF_CROSS_TESTS", default=False) _run_custom_tokenizers = parse_flag_from_env("RUN_CUSTOM_TOKENIZERS", default=False) _run_pipeline_tests = parse_flag_from_env("RUN_PIPELINE_TESTS", default=False) +_run_git_lfs_tests = parse_flag_from_env("RUN_GIT_LFS_TESTS", default=False) _tf_gpu_memory_limit = parse_int_from_env("TF_GPU_MEMORY_LIMIT", default=None) @@ -129,6 +130,19 @@ def custom_tokenizers(test_case): return test_case +def require_git_lfs(test_case): + """ + Decorator marking a test that requires git-lfs. + + git-lfs requires additional dependencies, and tests are skipped by default. Set the RUN_GIT_LFS_TESTS environment + variable to a truthy value to run them. + """ + if not _run_git_lfs_tests: + return unittest.skip("test of git lfs workflow")(test_case) + else: + return test_case + + def require_torch(test_case): """ Decorator marking a test that requires PyTorch. diff --git a/tests/test_hf_api.py b/tests/test_hf_api.py index 040d756ff3..7c93164565 100644 --- a/tests/test_hf_api.py +++ b/tests/test_hf_api.py @@ -15,12 +15,15 @@ import os +import shutil +import subprocess import time import unittest import requests from requests.exceptions import HTTPError from transformers.hf_api import HfApi, HfFolder, ModelInfo, PresignedUrl, RepoObj, S3Obj +from transformers.testing_utils import require_git_lfs USER = "__DUMMY_TRANSFORMERS_USER__" @@ -35,8 +38,14 @@ FILES = [ os.path.join(os.path.dirname(os.path.abspath(__file__)), "fixtures/empty.txt"), ), ] -REPO_NAME = "my-model-{}".format(int(time.time())) ENDPOINT_STAGING = "https://moon-staging.huggingface.co" +ENDPOINT_STAGING_BASIC_AUTH = f"https://{USER}:{PASS}@moon-staging.huggingface.co" + +REPO_NAME = "my-model-{}".format(int(time.time())) +REPO_NAME_LARGE_FILE = "my-model-largefiles-{}".format(int(time.time())) +WORKING_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fixtures/working_repo") +LARGE_FILE_14MB = "https://cdn-media.huggingface.co/lfs-largefiles/progit.epub" +LARGE_FILE_18MB = "https://cdn-media.huggingface.co/lfs-largefiles/progit.pdf" class HfApiCommonTest(unittest.TestCase): @@ -64,7 +73,7 @@ class HfApiEndpointsTest(HfApiCommonTest): @classmethod def tearDownClass(cls): for FILE_KEY, FILE_PATH in FILES: - cls._api.delete_obj(token=cls._token, filename=FILE_KEY) + cls._api.delete_obj(token=cls._token, filetype="datasets", filename=FILE_KEY) def test_whoami(self): user, orgs = self._api.whoami(token=self._token) @@ -73,21 +82,27 @@ class HfApiEndpointsTest(HfApiCommonTest): def test_presign_invalid_org(self): with self.assertRaises(HTTPError): - _ = self._api.presign(token=self._token, filename="nested/fake_org.txt", organization="fake") + _ = self._api.presign( + token=self._token, filetype="datasets", filename="nested/fake_org.txt", organization="fake" + ) def test_presign_valid_org(self): - urls = self._api.presign(token=self._token, filename="nested/valid_org.txt", organization="valid_org") + urls = self._api.presign( + token=self._token, filetype="datasets", filename="nested/valid_org.txt", organization="valid_org" + ) self.assertIsInstance(urls, PresignedUrl) def test_presign(self): for FILE_KEY, FILE_PATH in FILES: - urls = self._api.presign(token=self._token, filename=FILE_KEY) + urls = self._api.presign(token=self._token, filetype="datasets", filename=FILE_KEY) self.assertIsInstance(urls, PresignedUrl) self.assertEqual(urls.type, "text/plain") def test_presign_and_upload(self): for FILE_KEY, FILE_PATH in FILES: - access_url = self._api.presign_and_upload(token=self._token, filename=FILE_KEY, filepath=FILE_PATH) + access_url = self._api.presign_and_upload( + token=self._token, filetype="datasets", filename=FILE_KEY, filepath=FILE_PATH + ) self.assertIsInstance(access_url, str) with open(FILE_PATH, "r") as f: body = f.read() @@ -95,7 +110,7 @@ class HfApiEndpointsTest(HfApiCommonTest): self.assertEqual(r.text, body) def test_list_objs(self): - objs = self._api.list_objs(token=self._token) + objs = self._api.list_objs(token=self._token, filetype="datasets") self.assertIsInstance(objs, list) if len(objs) > 0: o = objs[-1] @@ -108,7 +123,6 @@ class HfApiEndpointsTest(HfApiCommonTest): o = objs[-1] self.assertIsInstance(o, RepoObj) - @unittest.skip("Until @julien-c or @pierrci debugs") def test_create_and_delete_repo(self): self._api.create_repo(token=self._token, name=REPO_NAME) self._api.delete_repo(token=self._token, name=REPO_NAME) @@ -140,3 +154,75 @@ class HfFolderTest(unittest.TestCase): # ^^ not an error, we test that the # second call does not fail. self.assertEqual(HfFolder.get_token(), None) + + +@require_git_lfs +class HfLargefilesTest(HfApiCommonTest): + @classmethod + def setUpClass(cls): + """ + Share this valid token in all tests below. + """ + cls._token = cls._api.login(username=USER, password=PASS) + + def setUp(self): + try: + shutil.rmtree(WORKING_REPO_DIR) + except FileNotFoundError: + pass + + def tearDown(self): + self._api.delete_repo(token=self._token, name=REPO_NAME_LARGE_FILE) + + def setup_local_clone(self, REMOTE_URL): + REMOTE_URL_AUTH = REMOTE_URL.replace(ENDPOINT_STAGING, ENDPOINT_STAGING_BASIC_AUTH) + subprocess.run(["git", "clone", REMOTE_URL_AUTH, WORKING_REPO_DIR], check=True, capture_output=True) + subprocess.run(["git", "lfs", "track", "*.pdf"], check=True, cwd=WORKING_REPO_DIR) + subprocess.run(["git", "lfs", "track", "*.epub"], check=True, cwd=WORKING_REPO_DIR) + + def test_end_to_end_thresh_6M(self): + REMOTE_URL = self._api.create_repo( + token=self._token, name=REPO_NAME_LARGE_FILE, lfsmultipartthresh=6 * 10 ** 6 + ) + self.setup_local_clone(REMOTE_URL) + + subprocess.run(["wget", LARGE_FILE_18MB], check=True, capture_output=True, cwd=WORKING_REPO_DIR) + subprocess.run(["git", "add", "*"], check=True, cwd=WORKING_REPO_DIR) + subprocess.run(["git", "commit", "-m", "commit message"], check=True, cwd=WORKING_REPO_DIR) + + # This will fail as we haven't set up our custom transfer agent yet. + failed_process = subprocess.run(["git", "push"], capture_output=True, cwd=WORKING_REPO_DIR) + self.assertEqual(failed_process.returncode, 1) + self.assertIn("transformers-cli lfs-enable-largefiles", failed_process.stderr.decode()) + # ^ Instructions on how to fix this are included in the error message. + + subprocess.run(["transformers-cli", "lfs-enable-largefiles", WORKING_REPO_DIR], check=True) + + start_time = time.time() + subprocess.run(["git", "push"], check=True, cwd=WORKING_REPO_DIR) + print("took", time.time() - start_time) + + # To be 100% sure, let's download the resolved file + pdf_url = f"{REMOTE_URL}/resolve/main/progit.pdf" + DEST_FILENAME = "uploaded.pdf" + subprocess.run(["wget", pdf_url, "-O", DEST_FILENAME], check=True, capture_output=True, cwd=WORKING_REPO_DIR) + dest_filesize = os.stat(os.path.join(WORKING_REPO_DIR, DEST_FILENAME)).st_size + self.assertEqual(dest_filesize, 18685041) + + def test_end_to_end_thresh_16M(self): + # Here we'll push one multipart and one non-multipart file in the same commit, and see what happens + REMOTE_URL = self._api.create_repo( + token=self._token, name=REPO_NAME_LARGE_FILE, lfsmultipartthresh=16 * 10 ** 6 + ) + self.setup_local_clone(REMOTE_URL) + + subprocess.run(["wget", LARGE_FILE_18MB], check=True, capture_output=True, cwd=WORKING_REPO_DIR) + subprocess.run(["wget", LARGE_FILE_14MB], check=True, capture_output=True, cwd=WORKING_REPO_DIR) + subprocess.run(["git", "add", "*"], check=True, cwd=WORKING_REPO_DIR) + subprocess.run(["git", "commit", "-m", "both files in same commit"], check=True, cwd=WORKING_REPO_DIR) + + subprocess.run(["transformers-cli", "lfs-enable-largefiles", WORKING_REPO_DIR], check=True) + + start_time = time.time() + subprocess.run(["git", "push"], check=True, cwd=WORKING_REPO_DIR) + print("took", time.time() - start_time)