From 5f16e61814a903e1b256432557a91230189d72df Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Tue, 8 Oct 2024 19:23:25 -0700 Subject: [PATCH 1/9] initial commit Signed-off-by: Ayush Kamat --- latch/ldata/_transfer/upload.py | 2 +- latch/ldata/type.py | 2 +- latch_cli/services/cp/download/__init__.py | 0 latch_cli/services/cp/download/main.py | 196 ++++++++++++++++++++ latch_cli/services/cp/download/worker.py | 115 ++++++++++++ latch_cli/services/cp/http_utils.py | 72 +++++++ latch_cli/services/cp/main.py | 100 +++++----- latch_cli/services/cp/upload/__init__.py | 0 latch_cli/services/cp/upload/main.py | 120 ++++++++++++ latch_cli/services/cp/upload/worker.py | 206 +++++++++++++++++++++ latch_cli/services/cp/utils.py | 69 ++++++- latch_cli/utils/__init__.py | 3 +- latch_cli/utils/path.py | 48 ++--- 13 files changed, 864 insertions(+), 69 deletions(-) create mode 100644 latch_cli/services/cp/download/__init__.py create mode 100644 latch_cli/services/cp/download/main.py create mode 100644 latch_cli/services/cp/download/worker.py create mode 100644 latch_cli/services/cp/http_utils.py create mode 100644 latch_cli/services/cp/upload/__init__.py create mode 100644 latch_cli/services/cp/upload/main.py create mode 100644 latch_cli/services/cp/upload/worker.py diff --git a/latch/ldata/_transfer/upload.py b/latch/ldata/_transfer/upload.py index 93b798c96..c7a58d0aa 100644 --- a/latch/ldata/_transfer/upload.py +++ b/latch/ldata/_transfer/upload.py @@ -71,7 +71,7 @@ def upload( dest_data = node_data.data[dest] if not (dest_data.exists() or dest_data.is_direct_parent()) and not create_parents: - raise LatchPathError("no such Latch file or directory", dest) + raise LatchPathError("no such Latch file or directory", dest, node_data.acc_id) dest_is_dir = dest_data.type in { LDataNodeType.account_root, diff --git a/latch/ldata/type.py b/latch/ldata/type.py index e1240c0df..e71bf9435 100644 --- a/latch/ldata/type.py +++ b/latch/ldata/type.py @@ -6,7 +6,7 @@ class LatchPathError(RuntimeError): def __init__( self, message: str, - remote_path: Optional[str] = None, + remote_path: str, acc_id: Optional[str] = None, ): super().__init__(message) diff --git a/latch_cli/services/cp/download/__init__.py b/latch_cli/services/cp/download/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/latch_cli/services/cp/download/main.py b/latch_cli/services/cp/download/main.py new file mode 100644 index 000000000..bf5e0e305 --- /dev/null +++ b/latch_cli/services/cp/download/main.py @@ -0,0 +1,196 @@ +import asyncio +import shutil +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path +from textwrap import dedent +from typing import Dict, List, Literal, Optional, TypedDict + +import click +import requests +import requests.adapters +import tqdm + +from ....utils import get_auth_header, human_readable_time, urljoins, with_si_suffix +from ....utils.path import normalize_path +from ..glob import expand_pattern +from ..utils import LDataNodeType, get_node_data +from .worker import Work, worker + +http_session = requests.Session() + +_adapter = requests.adapters.HTTPAdapter( + max_retries=requests.adapters.Retry( + status_forcelist=[429, 500, 502, 503, 504], + backoff_factor=1, + allowed_methods=["GET", "PUT", "POST"], + ) +) +http_session.mount("https://", _adapter) +http_session.mount("http://", _adapter) + + +class GetSignedUrlData(TypedDict): + url: str + + +class GetSignedUrlsRecursiveData(TypedDict): + urls: Dict[str, str] + + +def download( + srcs: List[str], + dest: Path, + progress: Literal["none", "total", "tasks"], + verbose: bool, + expand_globs: bool, + cores: Optional[int], + chunk_size_mib: Optional[int], +): + if cores is None: + cores = 4 + if chunk_size_mib is None: + chunk_size_mib = 16 + + start = time.monotonic() + + if not dest.parent.exists(): + click.secho( + f"Invalid copy destination {dest}. Parent directory {dest.parent} does" + " not exist.", + fg="red", + ) + raise click.exceptions.Exit(1) + + if len(srcs) > 1 and not (dest.exists() and dest.is_dir()): + click.secho( + f"Copy destination {dest} does not exist. Multi-source copies must write to" + " a pre-existing directory.", + fg="red", + ) + raise click.exceptions.Exit(1) + + from latch.ldata.path import _get_node_data + + all_node_data = _get_node_data(*srcs) + work_queue = asyncio.Queue[Work]() + total = 0 + + if expand_globs: + new_srcs = [] + for src in srcs: + new_srcs.extend(expand_pattern(src)) + + srcs = new_srcs + + # todo(ayush): parallelize + for src in srcs: + node_data = all_node_data.data[src] + normalized = normalize_path(src) + + can_have_children = node_data.type in { + LDataNodeType.account_root, + LDataNodeType.dir, + LDataNodeType.mount, + LDataNodeType.mount_gcp, + LDataNodeType.mount_azure, + } + + if not can_have_children: + endpoint = "https://nucleus.latch.bio/ldata/get-signed-url" + else: + endpoint = "https://nucleus.latch.bio/ldata/get-signed-urls-recursive" + + res = http_session.post( + endpoint, + headers={"Authorization": get_auth_header()}, + json={"path": normalized}, + ) + + json = res.json() + + if not can_have_children: + gsud: GetSignedUrlData = json["data"] + total += 1 + + work_dest = dest + if dest.exists() and dest.is_dir(): + work_dest = dest / node_data.name + + if work_dest.exists() and not click.confirm( + f"Copy destination path {work_dest} already exists and its contents may" + " be overwritten. Proceed?" + ): + return + + try: + work_dest.unlink(missing_ok=True) + except OSError: + shutil.rmtree(work_dest) + + work_queue.put_nowait(Work(gsud["url"], work_dest, chunk_size_mib)) + else: + gsurd: GetSignedUrlsRecursiveData = json["data"] + total += len(gsurd["urls"]) + + work_dest = dest + if dest.exists() and not normalized.endswith("/"): + work_dest = dest / node_data.name + + if ( + work_dest.exists() + and work_dest.is_dir() + and not click.confirm( + f"Copy destination path {work_dest} already exists and its contents" + " may be overwritten. Proceed?" + ) + ): + return + + for rel, url in gsurd["urls"].items(): + res = work_dest / rel + + for parent in res.parents: + try: + parent.mkdir(exist_ok=True, parents=True) + break + except NotADirectoryError: # somewhere up the tree is a file + continue + except FileExistsError: + parent.unlink() + break + + # todo(ayush): use only one mkdir call + res.parent.mkdir(exist_ok=True, parents=True) + + work_queue.put_nowait(Work(url, work_dest / rel, chunk_size_mib)) + + tbar = tqdm.tqdm( + total=total, + leave=False, + colour="green", + smoothing=0, + unit="B", + unit_scale=True, + disable=progress == "none", + ) + + workers = min(total, cores) + with ThreadPoolExecutor(workers) as exec: + futs = [ + exec.submit(worker, work_queue, tbar, progress == "tasks", verbose) + for _ in range(workers) + ] + + total_bytes = 0 + for fut in as_completed(futs): + total_bytes += fut.result() + + tbar.clear() + total_time = time.monotonic() - start + + click.echo(dedent(f"""\ + {click.style("Download Complete", fg="green")} + {click.style("Time Elapsed:", fg="blue")} {human_readable_time(total_time)} + {click.style("Files Downloaded:", fg="blue")} {total} ({with_si_suffix(total_bytes)})\ + """)) diff --git a/latch_cli/services/cp/download/worker.py b/latch_cli/services/cp/download/worker.py new file mode 100644 index 000000000..26acf5e5f --- /dev/null +++ b/latch_cli/services/cp/download/worker.py @@ -0,0 +1,115 @@ +import asyncio +import os +import shutil +from dataclasses import dataclass +from pathlib import Path +from typing import Awaitable, List + +import aiohttp +import tqdm +import uvloop + +from ....constants import Units +from ..http_utils import RetryClientSession + + +@dataclass +class Work: + url: str + dest: Path + chunk_size_mib: int = 5 + + +async def download_chunk( + sess: aiohttp.ClientSession, + url: str, + fd: int, + index: int, + chunk_size: int, + pbar: tqdm.tqdm, +): + start = index * chunk_size + end = start + chunk_size - 1 + + res = await sess.get(url, headers={"Range": f"bytes={start}-{end}"}) + content = await res.read() + pbar.update(os.pwrite(fd, content, start)) + + +async def work_loop( + work_queue: asyncio.Queue[Work], + tbar: tqdm.tqdm, + show_task_progress: bool, + print_file_on_completion: bool, +) -> int: + pbar = tqdm.tqdm( + total=0, + leave=False, + unit="B", + unit_scale=True, + disable=not show_task_progress, + ) + + total_bytes = 0 + + async with RetryClientSession(read_timeout=90, conn_timeout=10) as sess: + while True: + try: + work = work_queue.get_nowait() + except asyncio.QueueEmpty: + break + + try: + if work.dest.exists() and work.dest.is_dir(): + shutil.rmtree(work.dest) + + async with sess.get(work.url) as res: + total_size = res.content_length + assert total_size is not None + + total_bytes += total_size + + pbar.total = total_size + pbar.desc = work.dest.name + + chunk_size = work.chunk_size_mib * Units.MiB + + with work.dest.open("wb") as f: + coros: List[Awaitable] = [] + + cur = 0 + while cur * chunk_size < total_size: + coros.append( + download_chunk( + sess, work.url, f.fileno(), cur, chunk_size, pbar + ) + ) + cur += 1 + + await asyncio.gather(*coros) + + if print_file_on_completion: + pbar.write(work.dest.name) + + except Exception as e: + raise Exception(f"{work}: {e}") + + pbar.reset() + tbar.update(1) + + pbar.clear() + return total_bytes + + +def worker( + work_queue: asyncio.Queue[Work], + tbar: tqdm.tqdm, + show_task_progress: bool, + print_file_on_completion: bool, +) -> int: + uvloop.install() + + loop = uvloop.new_event_loop() + return loop.run_until_complete( + work_loop(work_queue, tbar, show_task_progress, print_file_on_completion) + ) diff --git a/latch_cli/services/cp/http_utils.py b/latch_cli/services/cp/http_utils.py new file mode 100644 index 000000000..a8dfe3960 --- /dev/null +++ b/latch_cli/services/cp/http_utils.py @@ -0,0 +1,72 @@ +import asyncio +from http import HTTPStatus +from typing import Awaitable, Callable, List, Optional + +import aiohttp +from typing_extensions import ParamSpec + +P = ParamSpec("P") + + +class RetriesExhaustedException(RuntimeError): ... + + +class RetryClientSession(aiohttp.ClientSession): + def __init__( + self, + status_list: Optional[List[HTTPStatus]] = None, + retries: int = 3, + backoff: float = 1, + *args, + **kwargs, + ): + + self.status_list = ( + status_list + if status_list is not None + else [ + HTTPStatus.TOO_MANY_REQUESTS, # 429 + HTTPStatus.INTERNAL_SERVER_ERROR, # 500 + HTTPStatus.BAD_GATEWAY, # 502 + HTTPStatus.SERVICE_UNAVAILABLE, # 503 + HTTPStatus.GATEWAY_TIMEOUT, # 504 + ] + ) + + self.retries = retries + self.backoff = backoff + + super().__init__(*args, **kwargs) + + async def _with_retry( + self, + f: Callable[P, Awaitable[aiohttp.ClientResponse]], + *args: P.args, + **kwargs: P.kwargs, + ) -> aiohttp.ClientResponse: + error: Optional[Exception] = None + + cur = 0 + while cur < self.retries: + if cur > 0: + await asyncio.sleep(self.backoff * 2**cur) + + cur += 1 + + try: + res = await f(*args, **kwargs) + if res.status in self.status_list: + continue + + return res + except Exception as e: + error = e + continue + + if error is None: + raise RetriesExhaustedException + + raise error + + async def _request(self, *args, **kwargs) -> aiohttp.ClientResponse: + return await self._with_retry(super()._request, *args, **kwargs) diff --git a/latch_cli/services/cp/main.py b/latch_cli/services/cp/main.py index 4fbbcb096..094642e37 100644 --- a/latch_cli/services/cp/main.py +++ b/latch_cli/services/cp/main.py @@ -3,6 +3,8 @@ from typing import List, Optional import click +import gql +from latch_sdk_gql.execute import execute from latch.ldata._transfer.download import download as _download from latch.ldata._transfer.progress import Progress @@ -13,6 +15,9 @@ from latch_cli.utils import human_readable_time, with_si_suffix from latch_cli.utils.path import get_path_error, is_remote_path +from .download.main import download +from .upload.main import upload + def _copy_and_print(src: str, dst: str, progress: Progress) -> None: _remote_copy(src, dst) @@ -54,53 +59,62 @@ def cp( ) raise click.exceptions.Exit(1) - dest_remote = is_remote_path(dest) + # todo(ayush): make this a global thats computed in requires_auth() + acc_info = execute(gql.gql(""" + query AccountInfo { + accountInfoCurrent { + id + } + } + """))["accountInfoCurrent"] - for src in srcs: - src_remote = is_remote_path(src) + acc_id = acc_info["id"] - try: - if src_remote and not dest_remote: - if expand_globs: - [ - _download_and_print(p, Path(dest), progress, verbose) - for p in expand_pattern(src) - ] - else: - _download_and_print(src, Path(dest), progress, verbose) - elif not src_remote and dest_remote: - if progress != Progress.none: - click.secho(f"Uploading {src}", fg="blue") - res = _upload( - src, - dest, - progress=progress, - verbose=verbose, - cores=cores, - chunk_size_mib=chunk_size_mib, - ) - if progress != Progress.none: - click.echo(dedent(f""" - {click.style("Upload Complete", fg="green")} - {click.style("Time Elapsed: ", fg="blue")}{human_readable_time(res.total_time)} - {click.style("Files Uploaded: ", fg="blue")}{res.num_files} ({with_si_suffix(res.total_bytes)}) - """)) - elif src_remote and dest_remote: + dest_remote = is_remote_path(dest) + srcs_remote = [is_remote_path(src) for src in srcs] + + try: + if not dest_remote and all(srcs_remote): + download( + srcs, + Path(dest), + progress.name, + verbose, + expand_globs, + cores, + chunk_size_mib, + ) + elif dest_remote and not any(srcs_remote): + upload( + srcs, + dest, + progress.name, + verbose, + cores, + chunk_size_mib, + ) + elif dest_remote and all(srcs_remote): + for src in srcs: if expand_globs: [_copy_and_print(p, dest, progress) for p in expand_pattern(src)] else: _copy_and_print(src, dest, progress) - else: - raise ValueError( - dedent(f""" - `latch cp` cannot be used for purely local file copying. + else: + click.secho( + dedent(f"""\ + Invalid arguments. The following argument types are valid: + + (1) All source arguments are remote paths and destination argument is local (download) + (2) All source arguments are local paths and destination argument is remote (upload) + (3) All source arguments are remote paths and destination argument is remote (remote copy)\ + """), + fg="red", + ) + raise click.exceptions.Exit(1) - Please ensure at least one of your arguments is a remote path (beginning with `latch://`) - """).strip("\n"), - ) - except LatchPathError as e: - click.secho(get_path_error(e.remote_path, e.message, e.acc_id), fg="red") - raise click.exceptions.Exit(1) from e - except Exception as e: - click.secho(str(e), fg="red") - raise click.exceptions.Exit(1) from e + except LatchPathError as e: + click.secho(get_path_error(e.remote_path, e.message, acc_id), fg="red") + raise click.exceptions.Exit(1) from e + except Exception as e: + click.secho(str(e), fg="red") + raise click.exceptions.Exit(1) from e diff --git a/latch_cli/services/cp/upload/__init__.py b/latch_cli/services/cp/upload/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/latch_cli/services/cp/upload/main.py b/latch_cli/services/cp/upload/main.py new file mode 100644 index 000000000..4d42d3ef3 --- /dev/null +++ b/latch_cli/services/cp/upload/main.py @@ -0,0 +1,120 @@ +import asyncio +import os +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from pathlib import Path +from textwrap import dedent +from typing import List, Literal, Optional + +import click +import tqdm + +from ....utils import human_readable_time, urljoins, with_si_suffix +from ....utils.path import get_path_error, normalize_path +from ..utils import LDataNodeType, get_node_data +from .worker import Work, worker + + +def upload( + srcs: List[str], + dest: str, + progress: Literal["none", "total", "tasks"], + verbose: bool, + cores: Optional[int], + chunk_size_mib: Optional[int], +): + from latch.ldata.type import LatchPathError + + if cores is None: + cores = 4 + if chunk_size_mib is None: + chunk_size_mib = 16 + + start = time.monotonic() + + dest_data = get_node_data(dest) + dest_is_dir = dest_data.type in { + LDataNodeType.account_root, + LDataNodeType.mount, + LDataNodeType.mount_gcp, + LDataNodeType.mount_azure, + LDataNodeType.dir, + } + + work_queue = asyncio.Queue[Work]() + total_bytes = 0 + num_files = 0 + + for src in srcs: + src_path = Path(src) + if not src_path.exists(): + raise ValueError(f"{src_path}: no such file or directory") + + normalized = normalize_path(dest) + + if not dest_data.exists: + root = normalized + elif src_path.is_dir(): + if not dest_is_dir: + raise LatchPathError("not a directory", dest) + if src.endswith("/"): + root = normalized + else: + root = urljoins(normalized, src_path.name) + else: + if dest_is_dir: + root = urljoins(normalized, src_path.name) + else: + root = normalized + + if not src_path.is_dir(): + num_files += 1 + total_bytes += src_path.resolve().stat().st_size + + work_queue.put_nowait(Work(src_path, root, chunk_size_mib)) + else: + + for dir, _, file_names in os.walk(src_path, followlinks=True): + for f in file_names: + rel = Path(dir) / f + + try: + total_bytes += rel.resolve().stat().st_size + except FileNotFoundError: + print(f"WARNING: file {rel} not found, skipping...") + continue + + num_files += 1 + + remote = urljoins(root, str(rel.relative_to(src_path))) + work_queue.put_nowait(Work(rel, remote, chunk_size_mib)) + + total = tqdm.tqdm( + total=num_files, + leave=False, + smoothing=0, + colour="green", + unit="", + unit_scale=True, + disable=progress == "none", + ) + + workers = min(cores, num_files) + with ThreadPoolExecutor(workers) as exec: + futs = [ + exec.submit(worker, work_queue, total, progress == "tasks", verbose) + for _ in range(workers) + ] + + for f in as_completed(futs): + f.result() + + total.clear() + total_time = time.monotonic() - start + + click.echo(dedent(f"""\ + {click.style("Upload Complete", fg="green")} + {click.style("Time Elapsed:", fg="blue")} {human_readable_time(total_time)} + {click.style("Files Uploaded:", fg="blue")} {num_files} ({with_si_suffix(total_bytes)})\ + """)) diff --git a/latch_cli/services/cp/upload/worker.py b/latch_cli/services/cp/upload/worker.py new file mode 100644 index 000000000..5dbeb5c5a --- /dev/null +++ b/latch_cli/services/cp/upload/worker.py @@ -0,0 +1,206 @@ +import asyncio +import math +import mimetypes +import os +import random +from dataclasses import dataclass +from pathlib import Path +from typing import List, TypedDict + +import aiohttp +import click +import tqdm +import uvloop + +from latch_cli.constants import Units, latch_constants +from latch_cli.utils import get_auth_header, with_si_suffix + +from ..http_utils import RetryClientSession + + +@dataclass +class Work: + src: Path + dest: str + chunk_size_mib: int = 16 + + +class StartUploadData(TypedDict): + upload_id: str + urls: List[str] + + +@dataclass +class CompletedPart: + src: Path + etag: str + part: int + + +async def upload_chunk( + session: aiohttp.ClientSession, + src: Path, + url: str, + index: int, + part_size: int, + pbar: tqdm.tqdm, +) -> CompletedPart: + with open(src, "rb") as f: + data = os.pread(f.fileno(), part_size, index * part_size) + + res = await session.put(url, data=data) + if res.status != 200: + raise RuntimeError(f"failed to upload part {index} of {src}: {res.content}") + + etag = res.headers["ETag"] + assert etag is not None, ( + f"Malformed response from chunk upload for {src}, Part {index}," + f" Headers: {res.headers}" + ) + + pbar.update(len(data)) + + return CompletedPart(src=src, etag=etag, part=index + 1) + + +min_part_size = 5 * Units.MiB + + +async def work_loop( + work_queue: asyncio.Queue[Work], + total_pbar: tqdm.tqdm, + show_task_progress: bool, + print_file_on_completion: bool, +): + pbar = tqdm.tqdm( + total=0, + leave=False, + unit="B", + unit_scale=True, + disable=not show_task_progress, + ) + + async with RetryClientSession(read_timeout=90, conn_timeout=10) as sess: + while True: + try: + work = work_queue.get_nowait() + except asyncio.QueueEmpty: + break + + resolved = work.src + if work.src.is_symlink(): + resolved = work.src.resolve() + + content_type, _ = mimetypes.guess_type(resolved) + if content_type is None: + with open(resolved, "rb") as f: + sample = f.read(Units.KiB) + + try: + sample.decode() + content_type = "text/plain" + except UnicodeDecodeError: + content_type = "application/octet-stream" + + file_size = resolved.stat().st_size + if file_size > latch_constants.maximum_upload_size: + raise ValueError( + f"{resolved}: file is {with_si_suffix(file_size)} which exceeds the" + " maximum upload size (5TiB)", + ) + + chunk_size = work.chunk_size_mib * Units.MiB + if chunk_size < min_part_size: + click.secho( + """\ + Unable to complete upload - please check your internet connection speed or any firewall settings that may block outbound traffic.\ + """, + fg="red", + ) + raise click.exceptions.Exit(1) + + part_count = min( + latch_constants.maximum_upload_parts, + math.ceil(file_size / chunk_size), + ) + part_size = max( + chunk_size, + math.ceil(file_size / latch_constants.maximum_upload_parts), + ) + + pbar.desc = resolved.name + pbar.total = file_size + + # jitter to not dos nuc-data + await asyncio.sleep(0.1 * random.random()) + + resp = await sess.post( + "https://nucleus.latch.bio/ldata/start-upload", + headers={"Authorization": get_auth_header()}, + json={ + "path": work.dest, + "content_type": content_type, + "part_count": part_count, + }, + ) + resp.raise_for_status() + + json_data = await resp.json() + data: StartUploadData = json_data["data"] + + if "version_id" in data: + total_pbar.update(1) + # file is empty - nothing to do + continue + + try: + parts = await asyncio.gather(*[ + upload_chunk(sess, resolved, url, index, part_size, pbar) + for index, url in enumerate(data["urls"]) + ]) + except TimeoutError: + await work_queue.put( + Work(work.src, work.dest, work.chunk_size_mib // 2) + ) + + continue + + # exception handling + resp = await sess.post( + "https://nucleus.latch.bio/ldata/end-upload", + headers={"Authorization": get_auth_header()}, + json={ + "path": work.dest, + "upload_id": data["upload_id"], + "parts": [ + { + "ETag": part.etag, + "PartNumber": part.part, + } + for part in parts + ], + }, + ) + resp.raise_for_status() + + if print_file_on_completion: + pbar.write(work.src.name) + + pbar.reset() + total_pbar.update(1) + + pbar.clear() + + +def worker( + work_queue: asyncio.Queue[Work], + total: tqdm.tqdm, + show_task_progress: bool, + print_file_on_completion: bool, +): + uvloop.install() + + loop = uvloop.new_event_loop() + loop.run_until_complete( + work_loop(work_queue, total, show_task_progress, print_file_on_completion) + ) diff --git a/latch_cli/services/cp/utils.py b/latch_cli/services/cp/utils.py index 464d3ce4c..a1d9467ae 100644 --- a/latch_cli/services/cp/utils.py +++ b/latch_cli/services/cp/utils.py @@ -1,4 +1,7 @@ -from typing import List, TypedDict +from contextlib import asynccontextmanager +from dataclasses import dataclass +from enum import Enum +from typing import List, Optional, TypedDict try: from functools import cache @@ -101,7 +104,7 @@ class AccountInfoCurrent(TypedDict): # todo(taras): support for gcp and azure mounts # skipping now due to time. This decision does not -# influence correcetness of the CLI and only +# influence correctness of the CLI and only # reduces the set of returned autocomplete # suggestions @cache @@ -162,3 +165,65 @@ def _get_known_domains_for_account() -> List[str]: res.extend(f"{x}.mount" for x in buckets) return res + + +class FinalLinkTarget(TypedDict): + id: str + name: str + type: str + + +class LdataNode(TypedDict): + finalLinkTarget: FinalLinkTarget + + +class LdataResolvePathToNode(TypedDict): + path: Optional[str] + ldataNode: LdataNode + + +class LDataNodeType(str, Enum): + account_root = "account_root" + dir = "dir" + obj = "obj" + mount = "mount" + link = "link" + mount_gcp = "mount_gcp" + mount_azure = "mount_azure" + + +@dataclass +class NodeData: + type: LDataNodeType + exists: bool + name: str + + +def get_node_data(remote: str) -> NodeData: + res: Optional[LdataResolvePathToNode] = execute( + gql.gql(""" + query GetNodeData($argPath: String!) { + ldataResolvePathToNode(path: $argPath) { + path + ldataNode { + finalLinkTarget { + id + name + type + } + } + } + } + """), + {"argPath": remote.rstrip("/")}, + )["ldataResolvePathToNode"] + + if res is None: + raise ValueError("unauthorized") + + exists = res["path"] is None or res["path"] == "" + + flt = res["ldataNode"]["finalLinkTarget"] + type = LDataNodeType(flt["type"].lower()) + + return NodeData(type, exists, flt["name"]) diff --git a/latch_cli/utils/__init__.py b/latch_cli/utils/__init__.py index ebe381e7f..3a8c88aee 100644 --- a/latch_cli/utils/__init__.py +++ b/latch_cli/utils/__init__.py @@ -19,7 +19,6 @@ import jwt from latch_sdk_config.user import user_config -from latch.utils import current_workspace from latch_cli.click_utils import bold from latch_cli.constants import latch_constants from latch_cli.tinyrequests import get @@ -154,6 +153,8 @@ def human_readable_time(t_seconds: float) -> str: def hash_directory(dir_path: Path) -> str: + from latch.utils import current_workspace + # todo(maximsmol): store per-file hashes to show which files triggered a version change click.secho("Calculating workflow version based on file content hash", bold=True) click.secho(" Disable with --disable-auto-version/-d", italic=True, dim=True) diff --git a/latch_cli/utils/path.py b/latch_cli/utils/path.py index a18047ad6..69d67d25d 100644 --- a/latch_cli/utils/path.py +++ b/latch_cli/utils/path.py @@ -1,3 +1,4 @@ +import functools import re from pathlib import Path from textwrap import dedent @@ -161,6 +162,8 @@ def normalize_path( re.VERBOSE, ) +_style = functools.partial(click.style, reset=False) + def get_path_error(path: str, message: str, acc_id: str) -> str: with_scheme = append_scheme(path) @@ -178,7 +181,7 @@ def get_path_error(path: str, message: str, acc_id: str) -> str: auth_type = "Execution Token" auth_str = ( - f"{click.style(f'Authorized using:', bold=True, reset=False)} {click.style(auth_type, bold=False, reset=False)}" + f"{_style(f'Authorized using:', bold=True)} {_style(auth_type, bold=False)}" + "\n" ) @@ -186,29 +189,32 @@ def get_path_error(path: str, message: str, acc_id: str) -> str: ws_name = user_config.workspace_name resolve_str = ( - f"{click.style(f'Relative path resolved to:', bold=True, reset=False)} {click.style(normalized, bold=False, reset=False)}" - + "\n" - ) - ws_str = ( - f"{click.style(f'Using Workspace:', bold=True, reset=False)} {click.style(ws_id, bold=False, reset=False)}" + f"{_style(f'Relative path resolved to:', bold=True)} {_style(normalized, bold=False)}" ) + ws_str = f"{_style(f'Using Workspace:', bold=True)} {_style(ws_id, bold=False)}" if ws_name is not None: ws_str = f"{ws_str} ({ws_name})" - ws_str += "\n" - - return click.style( - f""" -{click.style(f'{path}: ', bold=True, reset=False)}{click.style(message, bold=False, reset=False)} -{resolve_str if account_relative else ""}{ws_str if account_relative else ""} -{auth_str} -{click.style("Check that:", bold=True, reset=False)} -{click.style("1. The target object exists", bold=False, reset=False)} -{click.style(f"2. Account ", bold=False, reset=False)}{click.style(acc_id, bold=True, reset=False)}{click.style(" has permission to view the target object", bold=False, reset=False)} -{"3. The correct workspace is selected" if account_relative else ""} - -For privacy reasons, non-viewable objects and non-existent objects are indistinguishable""", - fg="red", - ) + + res = [ + "".join([_style(f"{path}: ", bold=True), _style(message, bold=False)]), + *([resolve_str, ws_str] if account_relative else []), + auth_str, + _style("Check that:", bold=True), + _style("1. The target object exists", bold=False), + "".join([ + _style(f"2. Account ", bold=False), + _style(acc_id, bold=True), + _style(" has permission to view the target object", bold=False), + ]), + *(["3. The correct workspace is selected"] if account_relative else []), + "", + ( + "For privacy reasons, non-viewable objects and non-existent objects are" + " indistinguishable." + ), + ] + + return click.style("\n".join(res), fg="red") name = re.compile(r"^.*/(?P[^/]+)/?$") From bc891447912166cd429d94c18621554bdeb2338c Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Wed, 9 Oct 2024 10:46:34 -0700 Subject: [PATCH 2/9] dont use own node_data Signed-off-by: Ayush Kamat --- latch_cli/services/cp/download/main.py | 2 +- latch_cli/services/cp/upload/main.py | 8 ++-- latch_cli/services/cp/utils.py | 62 -------------------------- 3 files changed, 6 insertions(+), 66 deletions(-) diff --git a/latch_cli/services/cp/download/main.py b/latch_cli/services/cp/download/main.py index bf5e0e305..594bdc346 100644 --- a/latch_cli/services/cp/download/main.py +++ b/latch_cli/services/cp/download/main.py @@ -14,7 +14,6 @@ from ....utils import get_auth_header, human_readable_time, urljoins, with_si_suffix from ....utils.path import normalize_path from ..glob import expand_pattern -from ..utils import LDataNodeType, get_node_data from .worker import Work, worker http_session = requests.Session() @@ -71,6 +70,7 @@ def download( raise click.exceptions.Exit(1) from latch.ldata.path import _get_node_data + from latch.ldata.type import LDataNodeType all_node_data = _get_node_data(*srcs) work_queue = asyncio.Queue[Work]() diff --git a/latch_cli/services/cp/upload/main.py b/latch_cli/services/cp/upload/main.py index 4d42d3ef3..4bf8ffb64 100644 --- a/latch_cli/services/cp/upload/main.py +++ b/latch_cli/services/cp/upload/main.py @@ -11,8 +11,7 @@ import tqdm from ....utils import human_readable_time, urljoins, with_si_suffix -from ....utils.path import get_path_error, normalize_path -from ..utils import LDataNodeType, get_node_data +from ....utils.path import normalize_path from .worker import Work, worker @@ -33,7 +32,10 @@ def upload( start = time.monotonic() - dest_data = get_node_data(dest) + from latch.ldata.path import _get_node_data + from latch.ldata.type import LDataNodeType + + dest_data = _get_node_data(dest).data[dest] dest_is_dir = dest_data.type in { LDataNodeType.account_root, LDataNodeType.mount, diff --git a/latch_cli/services/cp/utils.py b/latch_cli/services/cp/utils.py index a1d9467ae..d66ee5c54 100644 --- a/latch_cli/services/cp/utils.py +++ b/latch_cli/services/cp/utils.py @@ -165,65 +165,3 @@ def _get_known_domains_for_account() -> List[str]: res.extend(f"{x}.mount" for x in buckets) return res - - -class FinalLinkTarget(TypedDict): - id: str - name: str - type: str - - -class LdataNode(TypedDict): - finalLinkTarget: FinalLinkTarget - - -class LdataResolvePathToNode(TypedDict): - path: Optional[str] - ldataNode: LdataNode - - -class LDataNodeType(str, Enum): - account_root = "account_root" - dir = "dir" - obj = "obj" - mount = "mount" - link = "link" - mount_gcp = "mount_gcp" - mount_azure = "mount_azure" - - -@dataclass -class NodeData: - type: LDataNodeType - exists: bool - name: str - - -def get_node_data(remote: str) -> NodeData: - res: Optional[LdataResolvePathToNode] = execute( - gql.gql(""" - query GetNodeData($argPath: String!) { - ldataResolvePathToNode(path: $argPath) { - path - ldataNode { - finalLinkTarget { - id - name - type - } - } - } - } - """), - {"argPath": remote.rstrip("/")}, - )["ldataResolvePathToNode"] - - if res is None: - raise ValueError("unauthorized") - - exists = res["path"] is None or res["path"] == "" - - flt = res["ldataNode"]["finalLinkTarget"] - type = LDataNodeType(flt["type"].lower()) - - return NodeData(type, exists, flt["name"]) From e83b6099eca051aeea565ff97bcbd4e7eaafda3d Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 10 Oct 2024 09:09:59 -0700 Subject: [PATCH 3/9] comments Signed-off-by: Ayush Kamat --- latch_cli/main.py | 10 ++++++++ latch_cli/services/cp/download/main.py | 23 +++++++++++------- latch_cli/services/cp/http_utils.py | 21 +++++++++++----- latch_cli/services/cp/main.py | 12 ++++++---- latch_cli/services/cp/upload/main.py | 11 +++++---- latch_cli/services/cp/upload/worker.py | 33 +++++++++++++++++--------- 6 files changed, 75 insertions(+), 35 deletions(-) diff --git a/latch_cli/main.py b/latch_cli/main.py index 9767ea477..458d0853d 100644 --- a/latch_cli/main.py +++ b/latch_cli/main.py @@ -705,6 +705,14 @@ def get_executions(): default=False, show_default=True, ) +@click.option( + "--force", + "-f", + help="Don't ask to confirm when overwriting files", + is_flag=True, + default=False, + show_default=True, +) @click.option( "--no-glob", "-G", @@ -729,6 +737,7 @@ def cp( dest: str, progress: _Progress, verbose: bool, + force: bool, no_glob: bool, cores: Optional[int] = None, chunk_size_mib: Optional[int] = None, @@ -747,6 +756,7 @@ def cp( src, dest, progress=progress, + force=force, verbose=verbose, expand_globs=not no_glob, cores=cores, diff --git a/latch_cli/services/cp/download/main.py b/latch_cli/services/cp/download/main.py index 594bdc346..d9f5f8d86 100644 --- a/latch_cli/services/cp/download/main.py +++ b/latch_cli/services/cp/download/main.py @@ -42,6 +42,7 @@ def download( dest: Path, progress: Literal["none", "total", "tasks"], verbose: bool, + force: bool, expand_globs: bool, cores: Optional[int], chunk_size_mib: Optional[int], @@ -117,9 +118,13 @@ def download( if dest.exists() and dest.is_dir(): work_dest = dest / node_data.name - if work_dest.exists() and not click.confirm( - f"Copy destination path {work_dest} already exists and its contents may" - " be overwritten. Proceed?" + if ( + work_dest.exists() + and not force + and not click.confirm( + f"Copy destination path {work_dest} already exists and its contents" + " may be overwritten. Proceed?" + ) ): return @@ -140,6 +145,7 @@ def download( if ( work_dest.exists() and work_dest.is_dir() + and not force and not click.confirm( f"Copy destination path {work_dest} already exists and its contents" " may be overwritten. Proceed?" @@ -189,8 +195,9 @@ def download( tbar.clear() total_time = time.monotonic() - start - click.echo(dedent(f"""\ - {click.style("Download Complete", fg="green")} - {click.style("Time Elapsed:", fg="blue")} {human_readable_time(total_time)} - {click.style("Files Downloaded:", fg="blue")} {total} ({with_si_suffix(total_bytes)})\ - """)) + if progress != "none": + click.echo(dedent(f"""\ + {click.style("Download Complete", fg="green")} + {click.style("Time Elapsed:", fg="blue")} {human_readable_time(total_time)} + {click.style("Files Downloaded:", fg="blue")} {total} ({with_si_suffix(total_bytes)})\ + """)) diff --git a/latch_cli/services/cp/http_utils.py b/latch_cli/services/cp/http_utils.py index a8dfe3960..c77b337e0 100644 --- a/latch_cli/services/cp/http_utils.py +++ b/latch_cli/services/cp/http_utils.py @@ -11,12 +11,15 @@ class RetriesExhaustedException(RuntimeError): ... +class RateLimitExceeded(RuntimeError): ... + + class RetryClientSession(aiohttp.ClientSession): def __init__( self, status_list: Optional[List[HTTPStatus]] = None, - retries: int = 3, - backoff: float = 1, + retries: int = 10, + backoff: float = 0.1, *args, **kwargs, ): @@ -45,17 +48,19 @@ async def _with_retry( **kwargs: P.kwargs, ) -> aiohttp.ClientResponse: error: Optional[Exception] = None + last_res: Optional[aiohttp.ClientResponse] = None cur = 0 while cur < self.retries: if cur > 0: - await asyncio.sleep(self.backoff * 2**cur) + await asyncio.sleep(max(self.backoff * 2**cur, 10)) cur += 1 try: res = await f(*args, **kwargs) if res.status in self.status_list: + last_res = res continue return res @@ -63,10 +68,14 @@ async def _with_retry( error = e continue - if error is None: - raise RetriesExhaustedException + if last_res is not None: + return last_res + + if error is not None: + raise error - raise error + # we'll never get here but putting here anyway so the type checker is happy + raise RetriesExhaustedException async def _request(self, *args, **kwargs) -> aiohttp.ClientResponse: return await self._with_retry(super()._request, *args, **kwargs) diff --git a/latch_cli/services/cp/main.py b/latch_cli/services/cp/main.py index 094642e37..cfd1d00b8 100644 --- a/latch_cli/services/cp/main.py +++ b/latch_cli/services/cp/main.py @@ -47,6 +47,7 @@ def cp( *, progress: Progress, verbose: bool, + force: bool, expand_globs: bool, cores: Optional[int] = None, chunk_size_mib: Optional[int] = None, @@ -70,21 +71,22 @@ def cp( acc_id = acc_info["id"] - dest_remote = is_remote_path(dest) - srcs_remote = [is_remote_path(src) for src in srcs] + dest_is_remote = is_remote_path(dest) + srcs_are_remote = [is_remote_path(src) for src in srcs] try: - if not dest_remote and all(srcs_remote): + if not dest_is_remote and all(srcs_are_remote): download( srcs, Path(dest), progress.name, verbose, + force, expand_globs, cores, chunk_size_mib, ) - elif dest_remote and not any(srcs_remote): + elif dest_is_remote and not any(srcs_are_remote): upload( srcs, dest, @@ -93,7 +95,7 @@ def cp( cores, chunk_size_mib, ) - elif dest_remote and all(srcs_remote): + elif dest_is_remote and all(srcs_are_remote): for src in srcs: if expand_globs: [_copy_and_print(p, dest, progress) for p in expand_pattern(src)] diff --git a/latch_cli/services/cp/upload/main.py b/latch_cli/services/cp/upload/main.py index 4bf8ffb64..4cf26909a 100644 --- a/latch_cli/services/cp/upload/main.py +++ b/latch_cli/services/cp/upload/main.py @@ -115,8 +115,9 @@ def upload( total.clear() total_time = time.monotonic() - start - click.echo(dedent(f"""\ - {click.style("Upload Complete", fg="green")} - {click.style("Time Elapsed:", fg="blue")} {human_readable_time(total_time)} - {click.style("Files Uploaded:", fg="blue")} {num_files} ({with_si_suffix(total_bytes)})\ - """)) + if progress != "none": + click.echo(dedent(f"""\ + {click.style("Upload Complete", fg="green")} + {click.style("Time Elapsed:", fg="blue")} {human_readable_time(total_time)} + {click.style("Files Uploaded:", fg="blue")} {num_files} ({with_si_suffix(total_bytes)})\ + """)) diff --git a/latch_cli/services/cp/upload/worker.py b/latch_cli/services/cp/upload/worker.py index 5dbeb5c5a..d8158d8c8 100644 --- a/latch_cli/services/cp/upload/worker.py +++ b/latch_cli/services/cp/upload/worker.py @@ -15,7 +15,7 @@ from latch_cli.constants import Units, latch_constants from latch_cli.utils import get_auth_header, with_si_suffix -from ..http_utils import RetryClientSession +from ..http_utils import RateLimitExceeded, RetryClientSession @dataclass @@ -53,10 +53,11 @@ async def upload_chunk( raise RuntimeError(f"failed to upload part {index} of {src}: {res.content}") etag = res.headers["ETag"] - assert etag is not None, ( - f"Malformed response from chunk upload for {src}, Part {index}," - f" Headers: {res.headers}" - ) + if etag is None: + raise RuntimeError( + f"Malformed response from chunk upload for {src}, Part {index}," + f" Headers: {res.headers}" + ) pbar.update(len(data)) @@ -111,13 +112,11 @@ async def work_loop( chunk_size = work.chunk_size_mib * Units.MiB if chunk_size < min_part_size: - click.secho( - """\ - Unable to complete upload - please check your internet connection speed or any firewall settings that may block outbound traffic.\ - """, - fg="red", + raise RuntimeError( + "Unable to complete upload - please check your internet" + " connection speed or any firewall settings that may block" + " outbound traffic." ) - raise click.exceptions.Exit(1) part_count = min( latch_constants.maximum_upload_parts, @@ -143,6 +142,12 @@ async def work_loop( "part_count": part_count, }, ) + if resp.status == 429: + raise RateLimitExceeded( + "The service is currently under load and could not complete your" + " request - please try again later." + ) + resp.raise_for_status() json_data = await resp.json() @@ -181,6 +186,12 @@ async def work_loop( ], }, ) + if resp.status == 429: + raise RateLimitExceeded( + "The service is currently under load and could not complete your" + " request - please try again later." + ) + resp.raise_for_status() if print_file_on_completion: From f10e53442886ae94990dc61f56ff64ca876f4e31 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 10 Oct 2024 10:08:05 -0700 Subject: [PATCH 4/9] switch queues Signed-off-by: Ayush Kamat --- latch_cli/services/cp/download/main.py | 7 ++++--- latch_cli/services/cp/download/worker.py | 9 +++++---- latch_cli/services/cp/upload/main.py | 7 ++++--- latch_cli/services/cp/upload/worker.py | 14 ++++++-------- latch_cli/services/cp/utils.py | 5 +---- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/latch_cli/services/cp/download/main.py b/latch_cli/services/cp/download/main.py index d9f5f8d86..a17daff17 100644 --- a/latch_cli/services/cp/download/main.py +++ b/latch_cli/services/cp/download/main.py @@ -1,4 +1,5 @@ import asyncio +import queue import shutil import time from concurrent.futures import ThreadPoolExecutor, as_completed @@ -74,7 +75,7 @@ def download( from latch.ldata.type import LDataNodeType all_node_data = _get_node_data(*srcs) - work_queue = asyncio.Queue[Work]() + work_queue = queue.Queue() total = 0 if expand_globs: @@ -133,7 +134,7 @@ def download( except OSError: shutil.rmtree(work_dest) - work_queue.put_nowait(Work(gsud["url"], work_dest, chunk_size_mib)) + work_queue.put(Work(gsud["url"], work_dest, chunk_size_mib)) else: gsurd: GetSignedUrlsRecursiveData = json["data"] total += len(gsurd["urls"]) @@ -169,7 +170,7 @@ def download( # todo(ayush): use only one mkdir call res.parent.mkdir(exist_ok=True, parents=True) - work_queue.put_nowait(Work(url, work_dest / rel, chunk_size_mib)) + work_queue.put(Work(url, work_dest / rel, chunk_size_mib)) tbar = tqdm.tqdm( total=total, diff --git a/latch_cli/services/cp/download/worker.py b/latch_cli/services/cp/download/worker.py index 26acf5e5f..ac94497d4 100644 --- a/latch_cli/services/cp/download/worker.py +++ b/latch_cli/services/cp/download/worker.py @@ -1,5 +1,6 @@ import asyncio import os +import queue import shutil from dataclasses import dataclass from pathlib import Path @@ -37,7 +38,7 @@ async def download_chunk( async def work_loop( - work_queue: asyncio.Queue[Work], + work_queue: queue.Queue, tbar: tqdm.tqdm, show_task_progress: bool, print_file_on_completion: bool, @@ -55,8 +56,8 @@ async def work_loop( async with RetryClientSession(read_timeout=90, conn_timeout=10) as sess: while True: try: - work = work_queue.get_nowait() - except asyncio.QueueEmpty: + work: Work = work_queue.get_nowait() + except queue.Empty: break try: @@ -102,7 +103,7 @@ async def work_loop( def worker( - work_queue: asyncio.Queue[Work], + work_queue: queue.Queue, tbar: tqdm.tqdm, show_task_progress: bool, print_file_on_completion: bool, diff --git a/latch_cli/services/cp/upload/main.py b/latch_cli/services/cp/upload/main.py index 4cf26909a..4dc4e02ad 100644 --- a/latch_cli/services/cp/upload/main.py +++ b/latch_cli/services/cp/upload/main.py @@ -1,5 +1,6 @@ import asyncio import os +import queue import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass @@ -44,7 +45,7 @@ def upload( LDataNodeType.dir, } - work_queue = asyncio.Queue[Work]() + work_queue = queue.Queue() total_bytes = 0 num_files = 0 @@ -74,7 +75,7 @@ def upload( num_files += 1 total_bytes += src_path.resolve().stat().st_size - work_queue.put_nowait(Work(src_path, root, chunk_size_mib)) + work_queue.put(Work(src_path, root, chunk_size_mib)) else: for dir, _, file_names in os.walk(src_path, followlinks=True): @@ -90,7 +91,7 @@ def upload( num_files += 1 remote = urljoins(root, str(rel.relative_to(src_path))) - work_queue.put_nowait(Work(rel, remote, chunk_size_mib)) + work_queue.put(Work(rel, remote, chunk_size_mib)) total = tqdm.tqdm( total=num_files, diff --git a/latch_cli/services/cp/upload/worker.py b/latch_cli/services/cp/upload/worker.py index d8158d8c8..a452e5fb6 100644 --- a/latch_cli/services/cp/upload/worker.py +++ b/latch_cli/services/cp/upload/worker.py @@ -2,6 +2,7 @@ import math import mimetypes import os +import queue import random from dataclasses import dataclass from pathlib import Path @@ -68,7 +69,7 @@ async def upload_chunk( async def work_loop( - work_queue: asyncio.Queue[Work], + work_queue: queue.Queue, total_pbar: tqdm.tqdm, show_task_progress: bool, print_file_on_completion: bool, @@ -84,8 +85,8 @@ async def work_loop( async with RetryClientSession(read_timeout=90, conn_timeout=10) as sess: while True: try: - work = work_queue.get_nowait() - except asyncio.QueueEmpty: + work: Work = work_queue.get_nowait() + except queue.Empty: break resolved = work.src @@ -164,10 +165,7 @@ async def work_loop( for index, url in enumerate(data["urls"]) ]) except TimeoutError: - await work_queue.put( - Work(work.src, work.dest, work.chunk_size_mib // 2) - ) - + work_queue.put(Work(work.src, work.dest, work.chunk_size_mib // 2)) continue # exception handling @@ -204,7 +202,7 @@ async def work_loop( def worker( - work_queue: asyncio.Queue[Work], + work_queue: queue.Queue, total: tqdm.tqdm, show_task_progress: bool, print_file_on_completion: bool, diff --git a/latch_cli/services/cp/utils.py b/latch_cli/services/cp/utils.py index d66ee5c54..3d6bc858e 100644 --- a/latch_cli/services/cp/utils.py +++ b/latch_cli/services/cp/utils.py @@ -1,7 +1,4 @@ -from contextlib import asynccontextmanager -from dataclasses import dataclass -from enum import Enum -from typing import List, Optional, TypedDict +from typing import List, TypedDict try: from functools import cache From 063e787a6695e07179099da2fdf6e34b17989393 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 10 Oct 2024 15:15:36 -0700 Subject: [PATCH 5/9] updates Signed-off-by: Ayush Kamat --- latch_cli/services/cp/download/main.py | 51 +++++++---------- latch_cli/services/cp/download/worker.py | 73 +++++++++++++----------- latch_cli/services/cp/http_utils.py | 28 ++++++--- latch_cli/services/cp/upload/main.py | 31 +++++----- latch_cli/services/cp/upload/worker.py | 52 ++++++++++------- latch_cli/services/cp/utils.py | 24 +++++++- 6 files changed, 145 insertions(+), 114 deletions(-) diff --git a/latch_cli/services/cp/download/main.py b/latch_cli/services/cp/download/main.py index a17daff17..c40f452e8 100644 --- a/latch_cli/services/cp/download/main.py +++ b/latch_cli/services/cp/download/main.py @@ -2,7 +2,6 @@ import queue import shutil import time -from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path from textwrap import dedent from typing import Dict, List, Literal, Optional, TypedDict @@ -11,11 +10,12 @@ import requests import requests.adapters import tqdm +import uvloop -from ....utils import get_auth_header, human_readable_time, urljoins, with_si_suffix +from ....utils import get_auth_header, human_readable_time, with_si_suffix from ....utils.path import normalize_path from ..glob import expand_pattern -from .worker import Work, worker +from .worker import Work, run_workers http_session = requests.Session() @@ -75,7 +75,7 @@ def download( from latch.ldata.type import LDataNodeType all_node_data = _get_node_data(*srcs) - work_queue = queue.Queue() + work_queue = asyncio.Queue[Work]() total = 0 if expand_globs: @@ -131,10 +131,10 @@ def download( try: work_dest.unlink(missing_ok=True) + work_queue.put_nowait(Work(gsud["url"], work_dest, chunk_size_mib)) except OSError: - shutil.rmtree(work_dest) + click.echo(f"Cannot write file to {work_dest} - directory exists.") - work_queue.put(Work(gsud["url"], work_dest, chunk_size_mib)) else: gsurd: GetSignedUrlsRecursiveData = json["data"] total += len(gsurd["urls"]) @@ -157,20 +157,14 @@ def download( for rel, url in gsurd["urls"].items(): res = work_dest / rel - for parent in res.parents: - try: - parent.mkdir(exist_ok=True, parents=True) - break - except NotADirectoryError: # somewhere up the tree is a file - continue - except FileExistsError: - parent.unlink() - break - - # todo(ayush): use only one mkdir call - res.parent.mkdir(exist_ok=True, parents=True) - - work_queue.put(Work(url, work_dest / rel, chunk_size_mib)) + try: + res.parent.mkdir(exist_ok=True, parents=True) + work_queue.put_nowait(Work(url, work_dest / rel, chunk_size_mib)) + except (NotADirectoryError, FileExistsError): + click.echo( + f"Cannot write file to {work_dest / rel} - upstream file" + " exists." + ) tbar = tqdm.tqdm( total=total, @@ -182,16 +176,15 @@ def download( disable=progress == "none", ) - workers = min(total, cores) - with ThreadPoolExecutor(workers) as exec: - futs = [ - exec.submit(worker, work_queue, tbar, progress == "tasks", verbose) - for _ in range(workers) - ] + num_workers = min(total, cores) + uvloop.install() + + loop = uvloop.new_event_loop() + res = loop.run_until_complete( + run_workers(work_queue, num_workers, tbar, progress != "none", verbose) + ) - total_bytes = 0 - for fut in as_completed(futs): - total_bytes += fut.result() + total_bytes = sum(res) tbar.clear() total_time = time.monotonic() - start diff --git a/latch_cli/services/cp/download/worker.py b/latch_cli/services/cp/download/worker.py index ac94497d4..ea232163d 100644 --- a/latch_cli/services/cp/download/worker.py +++ b/latch_cli/services/cp/download/worker.py @@ -2,7 +2,9 @@ import os import queue import shutil +import time from dataclasses import dataclass +from http import HTTPStatus from pathlib import Path from typing import Awaitable, List @@ -10,6 +12,8 @@ import tqdm import uvloop +from latch_cli.services.cp.utils import chunked + from ....constants import Units from ..http_utils import RetryClientSession @@ -37,8 +41,8 @@ async def download_chunk( pbar.update(os.pwrite(fd, content, start)) -async def work_loop( - work_queue: queue.Queue, +async def worker( + work_queue: asyncio.Queue[Work], tbar: tqdm.tqdm, show_task_progress: bool, print_file_on_completion: bool, @@ -46,32 +50,37 @@ async def work_loop( pbar = tqdm.tqdm( total=0, leave=False, + smoothing=0, unit="B", unit_scale=True, disable=not show_task_progress, ) - total_bytes = 0 - async with RetryClientSession(read_timeout=90, conn_timeout=10) as sess: - while True: - try: - work: Work = work_queue.get_nowait() - except queue.Empty: - break + try: + async with RetryClientSession(read_timeout=90, conn_timeout=10) as sess: + while True: + try: + work: Work = work_queue.get_nowait() + except asyncio.QueueEmpty: + break + + pbar.reset() + pbar.desc = work.dest.name - try: - if work.dest.exists() and work.dest.is_dir(): - shutil.rmtree(work.dest) + res = await sess.get(work.url, headers={"Range": "bytes=0-0"}) - async with sess.get(work.url) as res: - total_size = res.content_length - assert total_size is not None + # s3 throws a REQUESTED_RANGE_NOT_SATISFIABLE if the file is empty + if res.status == 416: + total_size = 0 + else: + content_range = res.headers["Content-Range"] + total_size = int(content_range.replace("bytes 0-0/", "")) - total_bytes += total_size + assert total_size is not None + total_bytes += total_size pbar.total = total_size - pbar.desc = work.dest.name chunk_size = work.chunk_size_mib * Units.MiB @@ -90,27 +99,23 @@ async def work_loop( await asyncio.gather(*coros) if print_file_on_completion: - pbar.write(work.dest.name) - - except Exception as e: - raise Exception(f"{work}: {e}") + pbar.write(str(work.dest)) - pbar.reset() - tbar.update(1) + tbar.update(1) - pbar.clear() - return total_bytes + return total_bytes + finally: + pbar.clear() -def worker( - work_queue: queue.Queue, +async def run_workers( + work_queue: asyncio.Queue[Work], + num_workers: int, tbar: tqdm.tqdm, show_task_progress: bool, print_file_on_completion: bool, -) -> int: - uvloop.install() - - loop = uvloop.new_event_loop() - return loop.run_until_complete( - work_loop(work_queue, tbar, show_task_progress, print_file_on_completion) - ) +) -> List[int]: + return await asyncio.gather(*[ + worker(work_queue, tbar, show_task_progress, print_file_on_completion) + for _ in range(num_workers) + ]) diff --git a/latch_cli/services/cp/http_utils.py b/latch_cli/services/cp/http_utils.py index c77b337e0..56570857d 100644 --- a/latch_cli/services/cp/http_utils.py +++ b/latch_cli/services/cp/http_utils.py @@ -1,8 +1,9 @@ import asyncio from http import HTTPStatus -from typing import Awaitable, Callable, List, Optional +from typing import Awaitable, Callable, Dict, List, Optional import aiohttp +import aiohttp.typedefs from typing_extensions import ParamSpec P = ParamSpec("P") @@ -39,14 +40,21 @@ def __init__( self.retries = retries self.backoff = backoff + self.semas: Dict[aiohttp.typedefs.StrOrURL, asyncio.BoundedSemaphore] = { + "https://nucleus.latch.bio/ldata/start-upload": asyncio.BoundedSemaphore(2), + "https://nucleus.latch.bio/ldata/end-upload": asyncio.BoundedSemaphore(2), + } + super().__init__(*args, **kwargs) - async def _with_retry( + async def _request( self, - f: Callable[P, Awaitable[aiohttp.ClientResponse]], - *args: P.args, - **kwargs: P.kwargs, + method: str, + str_or_url: aiohttp.typedefs.StrOrURL, + **kwargs, ) -> aiohttp.ClientResponse: + sema = self.semas.get(str_or_url) + error: Optional[Exception] = None last_res: Optional[aiohttp.ClientResponse] = None @@ -58,7 +66,12 @@ async def _with_retry( cur += 1 try: - res = await f(*args, **kwargs) + if sema is None: + res = await super()._request(method, str_or_url, **kwargs) + else: + async with sema: + res = await super()._request(method, str_or_url, **kwargs) + if res.status in self.status_list: last_res = res continue @@ -76,6 +89,3 @@ async def _with_retry( # we'll never get here but putting here anyway so the type checker is happy raise RetriesExhaustedException - - async def _request(self, *args, **kwargs) -> aiohttp.ClientResponse: - return await self._with_retry(super()._request, *args, **kwargs) diff --git a/latch_cli/services/cp/upload/main.py b/latch_cli/services/cp/upload/main.py index 4dc4e02ad..29f2326f9 100644 --- a/latch_cli/services/cp/upload/main.py +++ b/latch_cli/services/cp/upload/main.py @@ -1,19 +1,17 @@ import asyncio import os -import queue import time -from concurrent.futures import ThreadPoolExecutor, as_completed -from dataclasses import dataclass from pathlib import Path from textwrap import dedent from typing import List, Literal, Optional import click import tqdm +import uvloop from ....utils import human_readable_time, urljoins, with_si_suffix from ....utils.path import normalize_path -from .worker import Work, worker +from .worker import Work, run_workers def upload( @@ -36,7 +34,7 @@ def upload( from latch.ldata.path import _get_node_data from latch.ldata.type import LDataNodeType - dest_data = _get_node_data(dest).data[dest] + dest_data = _get_node_data(dest, allow_resolve_to_parent=True).data[dest] dest_is_dir = dest_data.type in { LDataNodeType.account_root, LDataNodeType.mount, @@ -45,7 +43,7 @@ def upload( LDataNodeType.dir, } - work_queue = queue.Queue() + work_queue = asyncio.Queue[Work]() total_bytes = 0 num_files = 0 @@ -56,7 +54,7 @@ def upload( normalized = normalize_path(dest) - if not dest_data.exists: + if not dest_data.exists(): root = normalized elif src_path.is_dir(): if not dest_is_dir: @@ -75,7 +73,7 @@ def upload( num_files += 1 total_bytes += src_path.resolve().stat().st_size - work_queue.put(Work(src_path, root, chunk_size_mib)) + work_queue.put_nowait(Work(src_path, root, chunk_size_mib)) else: for dir, _, file_names in os.walk(src_path, followlinks=True): @@ -91,7 +89,7 @@ def upload( num_files += 1 remote = urljoins(root, str(rel.relative_to(src_path))) - work_queue.put(Work(rel, remote, chunk_size_mib)) + work_queue.put_nowait(Work(rel, remote, chunk_size_mib)) total = tqdm.tqdm( total=num_files, @@ -103,15 +101,14 @@ def upload( disable=progress == "none", ) - workers = min(cores, num_files) - with ThreadPoolExecutor(workers) as exec: - futs = [ - exec.submit(worker, work_queue, total, progress == "tasks", verbose) - for _ in range(workers) - ] + num_workers = min(cores, num_files) - for f in as_completed(futs): - f.result() + uvloop.install() + + loop = uvloop.new_event_loop() + loop.run_until_complete( + run_workers(work_queue, num_workers, total, progress == "tasks", verbose) + ) total.clear() total_time = time.monotonic() - start diff --git a/latch_cli/services/cp/upload/worker.py b/latch_cli/services/cp/upload/worker.py index a452e5fb6..19cf58e81 100644 --- a/latch_cli/services/cp/upload/worker.py +++ b/latch_cli/services/cp/upload/worker.py @@ -6,7 +6,7 @@ import random from dataclasses import dataclass from pathlib import Path -from typing import List, TypedDict +from typing import Iterable, List, TypedDict, TypeVar import aiohttp import click @@ -17,6 +17,7 @@ from latch_cli.utils import get_auth_header, with_si_suffix from ..http_utils import RateLimitExceeded, RetryClientSession +from ..utils import chunked @dataclass @@ -67,9 +68,12 @@ async def upload_chunk( min_part_size = 5 * Units.MiB +start_upload_sema = asyncio.BoundedSemaphore(2) +end_upload_sema = asyncio.BoundedSemaphore(2) -async def work_loop( - work_queue: queue.Queue, + +async def worker( + work_queue: asyncio.Queue[Work], total_pbar: tqdm.tqdm, show_task_progress: bool, print_file_on_completion: bool, @@ -77,6 +81,7 @@ async def work_loop( pbar = tqdm.tqdm( total=0, leave=False, + smoothing=0, unit="B", unit_scale=True, disable=not show_task_progress, @@ -85,8 +90,8 @@ async def work_loop( async with RetryClientSession(read_timeout=90, conn_timeout=10) as sess: while True: try: - work: Work = work_queue.get_nowait() - except queue.Empty: + work = work_queue.get_nowait() + except asyncio.QueueEmpty: break resolved = work.src @@ -131,9 +136,6 @@ async def work_loop( pbar.desc = resolved.name pbar.total = file_size - # jitter to not dos nuc-data - await asyncio.sleep(0.1 * random.random()) - resp = await sess.post( "https://nucleus.latch.bio/ldata/start-upload", headers={"Authorization": get_auth_header()}, @@ -143,6 +145,7 @@ async def work_loop( "part_count": part_count, }, ) + if resp.status == 429: raise RateLimitExceeded( "The service is currently under load and could not complete your" @@ -159,16 +162,21 @@ async def work_loop( # file is empty - nothing to do continue + parts: List[CompletedPart] = [] try: - parts = await asyncio.gather(*[ - upload_chunk(sess, resolved, url, index, part_size, pbar) - for index, url in enumerate(data["urls"]) - ]) + for pairs in chunked(enumerate(data["urls"])): + parts.extend( + await asyncio.gather(*[ + upload_chunk(sess, resolved, url, index, part_size, pbar) + for index, url in pairs + ]) + ) except TimeoutError: - work_queue.put(Work(work.src, work.dest, work.chunk_size_mib // 2)) + await work_queue.put( + Work(work.src, work.dest, work.chunk_size_mib // 2) + ) continue - # exception handling resp = await sess.post( "https://nucleus.latch.bio/ldata/end-upload", headers={"Authorization": get_auth_header()}, @@ -184,6 +192,7 @@ async def work_loop( ], }, ) + if resp.status == 429: raise RateLimitExceeded( "The service is currently under load and could not complete your" @@ -201,15 +210,14 @@ async def work_loop( pbar.clear() -def worker( - work_queue: queue.Queue, +async def run_workers( + work_queue: asyncio.Queue[Work], + num_workers: int, total: tqdm.tqdm, show_task_progress: bool, print_file_on_completion: bool, ): - uvloop.install() - - loop = uvloop.new_event_loop() - loop.run_until_complete( - work_loop(work_queue, total, show_task_progress, print_file_on_completion) - ) + await asyncio.gather(*[ + worker(work_queue, total, show_task_progress, print_file_on_completion) + for _ in range(num_workers) + ]) diff --git a/latch_cli/services/cp/utils.py b/latch_cli/services/cp/utils.py index 3d6bc858e..6d16b64da 100644 --- a/latch_cli/services/cp/utils.py +++ b/latch_cli/services/cp/utils.py @@ -1,8 +1,9 @@ -from typing import List, TypedDict +import sys +from typing import Iterable, List, TypedDict, TypeVar -try: +if sys.version_info >= (3, 9): from functools import cache -except ImportError: +else: from functools import lru_cache as cache import gql @@ -162,3 +163,20 @@ def _get_known_domains_for_account() -> List[str]: res.extend(f"{x}.mount" for x in buckets) return res + + +chunk_batch_size = 3 + +T = TypeVar("T") + + +def chunked(iter: Iterable[T]) -> Iterable[List[T]]: + chunk = [] + for x in iter: + if len(chunk) == chunk_batch_size: + yield chunk + chunk = [] + + chunk.append(x) + + yield chunk From 158e1302270ef4b75e1c14851a5bcdef050eacda Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 10 Oct 2024 15:24:48 -0700 Subject: [PATCH 6/9] download fixies Signed-off-by: Ayush Kamat --- latch_cli/services/cp/download/main.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/latch_cli/services/cp/download/main.py b/latch_cli/services/cp/download/main.py index c40f452e8..afa0d23fc 100644 --- a/latch_cli/services/cp/download/main.py +++ b/latch_cli/services/cp/download/main.py @@ -159,6 +159,13 @@ def download( try: res.parent.mkdir(exist_ok=True, parents=True) + if res.is_dir(): + click.echo( + f"Cannot write file to {work_dest / rel} - directory" + " exists." + ) + continue + work_queue.put_nowait(Work(url, work_dest / rel, chunk_size_mib)) except (NotADirectoryError, FileExistsError): click.echo( From af0af19ba564b517ebdaf41b1d965044fe6d2a92 Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Thu, 10 Oct 2024 15:41:11 -0700 Subject: [PATCH 7/9] remove all top level latch.ldata imports Signed-off-by: Ayush Kamat --- latch_cli/main.py | 8 ++-- latch_cli/services/cp/download/main.py | 16 ++++--- latch_cli/services/cp/main.py | 59 +++++++++----------------- latch_cli/services/cp/upload/main.py | 11 +++-- 4 files changed, 39 insertions(+), 55 deletions(-) diff --git a/latch_cli/main.py b/latch_cli/main.py index 458d0853d..9070becfc 100644 --- a/latch_cli/main.py +++ b/latch_cli/main.py @@ -4,15 +4,13 @@ import sys from pathlib import Path from textwrap import dedent -from typing import Callable, List, Optional, Tuple, TypeVar, Union +from typing import Callable, List, Literal, Optional, Tuple, TypeVar, Union import click from packaging.version import parse as parse_version from typing_extensions import ParamSpec import latch_cli.click_utils -from latch.ldata._transfer.progress import Progress as _Progress -from latch_cli.click_utils import EnumChoice from latch_cli.exceptions.handler import CrashHandler from latch_cli.services.cp.autocomplete import complete as cp_complete from latch_cli.services.cp.autocomplete import remote_complete @@ -693,7 +691,7 @@ def get_executions(): @click.option( "--progress", help="Type of progress information to show while copying", - type=EnumChoice(_Progress, case_sensitive=False), + type=click.Choice(["none", "total", "tasks"]), default="tasks", show_default=True, ) @@ -735,7 +733,7 @@ def get_executions(): def cp( src: List[str], dest: str, - progress: _Progress, + progress: Literal["none", "total", "tasks"], verbose: bool, force: bool, no_glob: bool, diff --git a/latch_cli/services/cp/download/main.py b/latch_cli/services/cp/download/main.py index afa0d23fc..40e4f2d8e 100644 --- a/latch_cli/services/cp/download/main.py +++ b/latch_cli/services/cp/download/main.py @@ -127,13 +127,15 @@ def download( " may be overwritten. Proceed?" ) ): - return + continue try: work_dest.unlink(missing_ok=True) work_queue.put_nowait(Work(gsud["url"], work_dest, chunk_size_mib)) except OSError: - click.echo(f"Cannot write file to {work_dest} - directory exists.") + click.secho( + f"Cannot write file to {work_dest} - directory exists.", fg="red" + ) else: gsurd: GetSignedUrlsRecursiveData = json["data"] @@ -160,17 +162,19 @@ def download( try: res.parent.mkdir(exist_ok=True, parents=True) if res.is_dir(): - click.echo( + click.secho( f"Cannot write file to {work_dest / rel} - directory" - " exists." + " exists.", + fg="red", ) continue work_queue.put_nowait(Work(url, work_dest / rel, chunk_size_mib)) except (NotADirectoryError, FileExistsError): - click.echo( + click.secho( f"Cannot write file to {work_dest / rel} - upstream file" - " exists." + " exists.", + fg="red", ) tbar = tqdm.tqdm( diff --git a/latch_cli/services/cp/main.py b/latch_cli/services/cp/main.py index cfd1d00b8..37e911808 100644 --- a/latch_cli/services/cp/main.py +++ b/latch_cli/services/cp/main.py @@ -1,43 +1,29 @@ from pathlib import Path from textwrap import dedent -from typing import List, Optional +from typing import List, Literal, Optional import click -import gql -from latch_sdk_gql.execute import execute -from latch.ldata._transfer.download import download as _download -from latch.ldata._transfer.progress import Progress -from latch.ldata._transfer.remote_copy import remote_copy as _remote_copy -from latch.ldata._transfer.upload import upload as _upload -from latch.ldata.type import LatchPathError from latch_cli.services.cp.glob import expand_pattern -from latch_cli.utils import human_readable_time, with_si_suffix from latch_cli.utils.path import get_path_error, is_remote_path from .download.main import download from .upload.main import upload -def _copy_and_print(src: str, dst: str, progress: Progress) -> None: +def _copy_and_print( + src: str, dst: str, progress: Literal["none", "total", "tasks"] +) -> None: + from latch.ldata._transfer.remote_copy import remote_copy as _remote_copy + _remote_copy(src, dst) - if progress != Progress.none: - click.echo(dedent(f""" + + if progress != "none": + click.echo(dedent(f"""\ {click.style("Copy Requested.", fg="green")} {click.style("Source: ", fg="blue")}{(src)} - {click.style("Destination: ", fg="blue")}{(dst)}""")) - - -def _download_and_print(src: str, dst: Path, progress: Progress, verbose: bool) -> None: - if progress != Progress.none: - click.secho(f"Downloading {dst.name}", fg="blue") - res = _download(src, dst, progress, verbose) - if progress != Progress.none: - click.echo(dedent(f""" - {click.style("Download Complete", fg="green")} - {click.style("Time Elapsed: ", fg="blue")}{human_readable_time(res.total_time)} - {click.style("Files Downloaded: ", fg="blue")}{res.num_files} ({with_si_suffix(res.total_bytes)}) - """)) + {click.style("Destination: ", fg="blue")}{(dst)}\ + """)) # todo(ayush): come up with a better behavior scheme than unix cp @@ -45,13 +31,15 @@ def cp( srcs: List[str], dest: str, *, - progress: Progress, + progress: Literal["none", "total", "tasks"], verbose: bool, force: bool, expand_globs: bool, cores: Optional[int] = None, chunk_size_mib: Optional[int] = None, ): + from latch.ldata.type import LatchPathError + if chunk_size_mib is not None and chunk_size_mib < 5: click.secho( "The chunk size specified by --chunk-size-mib must be at least 5. You" @@ -60,17 +48,6 @@ def cp( ) raise click.exceptions.Exit(1) - # todo(ayush): make this a global thats computed in requires_auth() - acc_info = execute(gql.gql(""" - query AccountInfo { - accountInfoCurrent { - id - } - } - """))["accountInfoCurrent"] - - acc_id = acc_info["id"] - dest_is_remote = is_remote_path(dest) srcs_are_remote = [is_remote_path(src) for src in srcs] @@ -79,7 +56,7 @@ def cp( download( srcs, Path(dest), - progress.name, + progress, verbose, force, expand_globs, @@ -90,7 +67,7 @@ def cp( upload( srcs, dest, - progress.name, + progress, verbose, cores, chunk_size_mib, @@ -115,7 +92,9 @@ def cp( raise click.exceptions.Exit(1) except LatchPathError as e: - click.secho(get_path_error(e.remote_path, e.message, acc_id), fg="red") + if e.acc_id is not None: + click.secho(get_path_error(e.remote_path, e.message, e.acc_id), fg="red") + raise click.exceptions.Exit(1) from e except Exception as e: click.secho(str(e), fg="red") diff --git a/latch_cli/services/cp/upload/main.py b/latch_cli/services/cp/upload/main.py index 29f2326f9..40605fe63 100644 --- a/latch_cli/services/cp/upload/main.py +++ b/latch_cli/services/cp/upload/main.py @@ -22,8 +22,6 @@ def upload( cores: Optional[int], chunk_size_mib: Optional[int], ): - from latch.ldata.type import LatchPathError - if cores is None: cores = 4 if chunk_size_mib is None: @@ -58,7 +56,12 @@ def upload( root = normalized elif src_path.is_dir(): if not dest_is_dir: - raise LatchPathError("not a directory", dest) + click.secho( + f"Failed to upload directory {src_path}: destination {dest} is not" + " a directory", + fg="red", + ) + continue if src.endswith("/"): root = normalized else: @@ -83,7 +86,7 @@ def upload( try: total_bytes += rel.resolve().stat().st_size except FileNotFoundError: - print(f"WARNING: file {rel} not found, skipping...") + click.secho(f"File {rel} not found, skipping...", fg="yellow") continue num_files += 1 From cae8968642d15f965f926304152e0ff934a70edc Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Fri, 18 Oct 2024 12:03:42 -0700 Subject: [PATCH 8/9] dev version Signed-off-by: Ayush Kamat --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a2cee0492..181d7e912 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="latch", - version="v2.53.7", + version="v2.53.7.dev1", author_email="kenny@latch.bio", description="The Latch SDK", packages=find_packages(), From 4a87bc706bbe755d320cba8fea33141d8e5aef9c Mon Sep 17 00:00:00 2001 From: Ayush Kamat Date: Fri, 18 Oct 2024 12:06:04 -0700 Subject: [PATCH 9/9] deps Signed-off-by: Ayush Kamat --- setup.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 181d7e912..9b252570d 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="latch", - version="v2.53.7.dev1", + version="v2.53.7.dev2", author_email="kenny@latch.bio", description="The Latch SDK", packages=find_packages(), @@ -53,6 +53,9 @@ "asyncssh==2.13.2", "websockets==11.0.3", "watchfiles==0.19.0", + # new latch cp + "uvloop==0.19.0", + "aiohttp==3.9.5", ], extras_require={ "snakemake": ["snakemake>=7.18.0, <7.30.2", "pulp >=2.0, <2.8"],