Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to tensorstore 0.1.72 and drop Python 3.9 #1277

Merged
merged 21 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
max-parallel: 4
matrix:
executors: [multiprocessing, slurm, kubernetes, dask]
python-version: ["3.13", "3.12", "3.11", "3.10", "3.9"]
python-version: ["3.13", "3.12", "3.11", "3.10"]
defaults:
run:
working-directory: cluster_tools
Expand Down Expand Up @@ -157,7 +157,7 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: ["3.12", "3.13", "3.11", "3.10", "3.9"]
python-version: ["3.12", "3.13", "3.11", "3.10"]
group: [1, 2, 3]
fail-fast: false
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13"]
group: [1, 2, 3]
fail-fast: false
defaults:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:

- uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.12"
architecture: 'x64'

- name: Setup git config
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/verify_published.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.13", "3.12", "3.11", "3.10", "3.9"]
python-version: ["3.13", "3.12", "3.11", "3.10"]
extras: ["", "[all]"]
steps:
- name: Set up Python ${{ matrix.python-version }}
Expand Down
2 changes: 1 addition & 1 deletion cluster_tools/cluster_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ def get_executor(environment: str, **kwargs: Any) -> "Executor":
return SequentialPickleExecutor(**kwargs)
elif environment == "multiprocessing_with_pickling":
return MultiprocessingPickleExecutor(**kwargs)
raise Exception("Unknown executor: {}".format(environment))
raise Exception(f"Unknown executor: {environment}")
11 changes: 3 additions & 8 deletions cluster_tools/cluster_tools/_utils/call.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import subprocess
from typing import Optional, Tuple


def call(command: str, stdin: Optional[str] = None) -> Tuple[str, str, int]:
def call(command: str, stdin: str | None = None) -> tuple[str, str, int]:
"""Invokes a shell command as a subprocess, optionally with some
data sent to the standard input. Returns the standard output data,
the standard error, and the return code.
Expand Down Expand Up @@ -31,14 +30,10 @@ def __init__(self, command: str, code: int, stderr: str):
self.stderr = stderr

def __str__(self) -> str:
return "%s exited with status %i: %s" % (
repr(self.command),
self.code,
repr(self.stderr),
)
return f"{self.command!r} exited with status {self.code}: {self.stderr!r}"


def chcall(command: str, stdin: Optional[str] = None) -> Tuple[str, str]:
def chcall(command: str, stdin: str | None = None) -> tuple[str, str]:
"""Like ``call`` but raises an exception when the return code is
nonzero. Only returns the stdout and stderr data.
"""
Expand Down
17 changes: 6 additions & 11 deletions cluster_tools/cluster_tools/_utils/file_wait_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os
import threading
import time
from typing import TYPE_CHECKING, Callable, Dict
from collections.abc import Callable
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from cluster_tools.schedulers.cluster_executor import ClusterExecutor
Expand All @@ -28,8 +29,8 @@ def __init__(
threading.Thread.__init__(self)
self.callback = callback
self.interval = interval
self.waiting: Dict[str, str] = {}
self.retryMap: Dict[str, int] = {}
self.waiting: dict[str, str] = {}
self.retryMap: dict[str, int] = {}
self.lock = threading.Lock()
self.shutdown = False
self.executor = executor
Expand Down Expand Up @@ -85,17 +86,11 @@ def handle_completed_job(
if self.retryMap[filename] <= FileWaitThread.MAX_RETRY:
# Retry by looping again
logging.warning(
"Job state is completed, but {} couldn't be found. Retrying {}/{}".format(
filename,
self.retryMap[filename],
FileWaitThread.MAX_RETRY,
)
f"Job state is completed, but {filename} couldn't be found. Retrying {self.retryMap[filename]}/{FileWaitThread.MAX_RETRY}"
)
else:
logging.error(
"Job state is completed, but {} couldn't be found.".format(
filename
)
f"Job state is completed, but {filename} couldn't be found."
)
handle_completed_job(job_id, filename, True)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import threading
import traceback
import warnings
from collections.abc import Callable, Sequence
from logging import getLogger
from logging.handlers import QueueHandler
from queue import Empty as QueueEmpty
from queue import Queue
from typing import Any, Callable, List
from typing import Any

# Inspired by https://stackoverflow.com/a/894284

Expand Down Expand Up @@ -106,7 +107,7 @@ def close(self) -> None:


def _setup_logging_multiprocessing(
queues: List[Queue], levels: List[int], filters: List[Any]
queues: list[Queue], levels: list[int], filters: Sequence[Any]
) -> None:
"""Re-setup logging in a multiprocessing context (only needed if a start_method other than
fork is used) by setting up QueueHandler loggers for each queue and level
Expand Down
14 changes: 7 additions & 7 deletions cluster_tools/cluster_tools/_utils/pickling.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pickle
import sys
from typing import Any, BinaryIO, Optional
from typing import Any, BinaryIO

from cluster_tools._utils.warning import warn_after

Expand All @@ -26,25 +26,25 @@ def dump(*args: Any, **kwargs: Any) -> None:

@warn_after("pickle.loads", WARNING_TIMEOUT)
def loads(*args: Any, **kwargs: Any) -> Any:
assert (
"custom_main_path" not in kwargs
), "loads does not implement support for the argument custom_main_path"
assert "custom_main_path" not in kwargs, (
"loads does not implement support for the argument custom_main_path"
)
return pickle.loads(*args, **kwargs)


class _RenameUnpickler(pickle.Unpickler):
custom_main_path: Optional[str]
custom_main_path: str | None

def find_class(self, module: str, name: str) -> Any:
renamed_module = module
if module == "__main__" and self.custom_main_path is not None:
renamed_module = self.custom_main_path

return super(_RenameUnpickler, self).find_class(renamed_module, name)
return super().find_class(renamed_module, name)


@warn_after("pickle.load", WARNING_TIMEOUT)
def load(f: BinaryIO, custom_main_path: Optional[str] = None) -> Any:
def load(f: BinaryIO, custom_main_path: str | None = None) -> Any:
unpickler = _RenameUnpickler(f)
unpickler.custom_main_path = custom_main_path
return unpickler.load()
2 changes: 1 addition & 1 deletion cluster_tools/cluster_tools/_utils/reflection.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Callable
from collections.abc import Callable

WARNING_TIMEOUT = 10 * 60 # seconds

Expand Down
9 changes: 5 additions & 4 deletions cluster_tools/cluster_tools/_utils/tailf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import os
import sys
import time
from typing import Any, Callable
from collections.abc import Callable
from typing import Any


class Tail:
Expand Down Expand Up @@ -58,11 +59,11 @@ def register_callback(self, func: Callable[[str], Any]) -> None:
def check_file_validity(self, file_: str) -> None:
"""Check whether the a given file exists, readable and is a file"""
if not os.access(file_, os.F_OK):
raise TailError("File '%s' does not exist" % (file_))
raise TailError(f"File '{file_}' does not exist")
if not os.access(file_, os.R_OK):
raise TailError("File '%s' not readable" % (file_))
raise TailError(f"File '{file_}' not readable")
if os.path.isdir(file_):
raise TailError("File '%s' is a directory" % (file_))
raise TailError(f"File '{file_}' is a directory")


class TailError(Exception):
Expand Down
15 changes: 5 additions & 10 deletions cluster_tools/cluster_tools/_utils/warning.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
import threading
import time
from collections.abc import Callable
from concurrent.futures import Future
from typing import Callable, TypeVar
from typing import TypeVar

from typing_extensions import ParamSpec

Expand All @@ -24,9 +25,7 @@ def inner(*args: _P.args, **kwargs: _P.kwargs) -> _T:

def warn_function() -> None:
logging.warning(
"Function {} is taking suspiciously long (longer than {} seconds)".format(
job, seconds
)
f"Function {job} is taking suspiciously long (longer than {seconds} seconds)"
)
exceeded_timeout[0] = True

Expand All @@ -38,9 +37,7 @@ def warn_function() -> None:
if exceeded_timeout[0]:
end_time = time.time()
logging.warning(
"Function {} succeeded after all (took {} seconds)".format(
job, int(end_time - start_time)
)
f"Function {job} succeeded after all (took {int(end_time - start_time)} seconds)"
)
finally:
timer.cancel()
Expand All @@ -56,9 +53,7 @@ def warn_on_exception(future: Future) -> None:
maybe_exception = future.exception()
if maybe_exception is not None:
logging.error(
"A future crashed with an exception: {}. Future: {}".format(
maybe_exception, future
)
f"A future crashed with an exception: {maybe_exception}. Future: {future}"
)

if not hasattr(f, "is_wrapped_by_cluster_tools"):
Expand Down
20 changes: 8 additions & 12 deletions cluster_tools/cluster_tools/executor_protocol.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
from collections.abc import Callable, Iterable, Iterator
from concurrent.futures import Future
from contextlib import AbstractContextManager
from os import PathLike
from typing import (
Callable,
ContextManager,
Iterable,
Iterator,
List,
Optional,
Protocol,
TypeVar,
)
Expand All @@ -18,9 +14,9 @@
_S = TypeVar("_S")


class Executor(Protocol, ContextManager["Executor"]):
class Executor(Protocol, AbstractContextManager["Executor"]):
@classmethod
def as_completed(cls, futures: List[Future[_T]]) -> Iterator[Future[_T]]: ...
def as_completed(cls, futures: list[Future[_T]]) -> Iterator[Future[_T]]: ...

def submit(
self,
Expand All @@ -34,15 +30,15 @@ def map_to_futures(
self,
fn: Callable[[_S], _T],
args: Iterable[_S],
output_pickle_path_getter: Optional[Callable[[_S], PathLike]] = None,
) -> List[Future[_T]]: ...
output_pickle_path_getter: Callable[[_S], PathLike] | None = None,
) -> list[Future[_T]]: ...

def map(
self,
fn: Callable[[_S], _T],
iterables: Iterable[_S],
timeout: Optional[float] = None,
chunksize: Optional[int] = None,
timeout: float | None = None,
chunksize: int | None = None,
) -> Iterator[_T]: ...

def forward_log(self, fut: Future[_T]) -> _T: ...
Expand Down
Loading