diff --git a/src/zarr/storage/_common.py b/src/zarr/storage/_common.py index 6ab539bb0a..12255c234d 100644 --- a/src/zarr/storage/_common.py +++ b/src/zarr/storage/_common.py @@ -1,5 +1,7 @@ from __future__ import annotations +import importlib +import importlib.util import json from pathlib import Path from typing import TYPE_CHECKING, Any, Literal @@ -12,6 +14,10 @@ from zarr.storage._memory import MemoryStore from zarr.storage._utils import normalize_path +_has_fsspec = importlib.util.find_spec("fsspec") +if _has_fsspec: + from fsspec.mapping import FSMap + if TYPE_CHECKING: from zarr.core.buffer import BufferPrototype @@ -228,7 +234,7 @@ def __eq__(self, other: object) -> bool: async def make_store_path( - store_like: StoreLike | None, + store_like: StoreLike | FSMap | None, *, path: str | None = "", mode: AccessModeLiteral | None = None, @@ -311,9 +317,18 @@ async def make_store_path( # We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings. # By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate. store = await MemoryStore.open(store_dict=store_like, read_only=_read_only) + elif _has_fsspec: + if not isinstance(store_like, FSMap): + raise (TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'")) + if path: + raise ValueError("'path' was provided but is not used for FSMap store_like objects") + if storage_options: + raise ValueError( + "'storage_options was provided but is not used for FSMap store_like objects" + ) + store = FsspecStore.from_mapper(store_like, read_only=_read_only) else: - msg = f"Unsupported type for store_like: '{type(store_like).__name__}'" # type: ignore[unreachable] - raise TypeError(msg) + raise (TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'")) result = await StorePath.open(store, path=path_normalized, mode=mode) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index a4730a93d9..ddb8e1c9ed 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -17,7 +17,9 @@ if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterable + from fsspec import AbstractFileSystem from fsspec.asyn import AsyncFileSystem + from fsspec.mapping import FSMap from zarr.core.buffer import BufferPrototype from zarr.core.common import BytesLike @@ -30,6 +32,45 @@ ) +def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem: + """Convert a sync FSSpec filesystem to an async FFSpec filesystem + + If the filesystem class supports async operations, a new async instance is created + from the existing instance. + + If the filesystem class does not support async operations, the existing instance + is wrapped with AsyncFileSystemWrapper. + """ + import fsspec + from packaging.version import parse as parse_version + + fsspec_version = parse_version(fsspec.__version__) + if fs.async_impl and fs.asynchronous: + # Already an async instance of an async filesystem, nothing to do + return fs + if fs.async_impl: + # Convert sync instance of an async fs to an async instance + import json + + fs_dict = json.loads(fs.to_json()) + fs_dict["asynchronous"] = True + return fsspec.AbstractFileSystem.from_json(json.dumps(fs_dict)) + + # Wrap sync filesystems with the async wrapper + if type(fs) is fsspec.implementations.local.LocalFileSystem and not fs.auto_mkdir: + raise ValueError( + f"LocalFilesystem {fs} was created with auto_mkdir=False but Zarr requires the filesystem to automatically create directories" + ) + if fsspec_version < parse_version("2024.12.0"): + raise ImportError( + "The filesystem '{fs}' is synchronous, and the required " + "AsyncFileSystemWrapper is not available. Upgrade fsspec to version " + "2024.12.0 or later to enable this functionality." + ) + + return fsspec.implementations.asyn_wrapper.AsyncFileSystemWrapper(fs, asynchronous=True) + + class FsspecStore(Store): """ A remote Store based on FSSpec @@ -137,6 +178,39 @@ def from_upath( allowed_exceptions=allowed_exceptions, ) + @classmethod + def from_mapper( + cls, + fs_map: FSMap, + read_only: bool = False, + allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, + ) -> FsspecStore: + """ + Create a FsspecStore from a FSMap object. + + Parameters + ---------- + fs_map : FSMap + Fsspec mutable mapping object. + read_only : bool + Whether the store is read-only, defaults to False. + allowed_exceptions : tuple, optional + The exceptions that are allowed to be raised when accessing the + store. Defaults to ALLOWED_EXCEPTIONS. + + Returns + ------- + FsspecStore + """ + if not fs_map.fs.async_impl or not fs_map.fs.asynchronous: + fs_map.fs = _make_async(fs_map.fs) + return cls( + fs=fs_map.fs, + path=fs_map.root, + read_only=read_only, + allowed_exceptions=allowed_exceptions, + ) + @classmethod def from_url( cls, @@ -175,16 +249,7 @@ def from_url( fs, path = url_to_fs(url, **opts) if not fs.async_impl: - try: - from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - - fs = AsyncFileSystemWrapper(fs, asynchronous=True) - except ImportError as e: - raise ImportError( - f"The filesystem for URL '{url}' is synchronous, and the required " - "AsyncFileSystemWrapper is not available. Upgrade fsspec to version " - "2024.12.0 or later to enable this functionality." - ) from e + fs = _make_async(fs) # fsspec is not consistent about removing the scheme from the path, so check and strip it here # https://github.com/fsspec/filesystem_spec/issues/1722 diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 2e9620f29d..4f83312b02 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -4,17 +4,21 @@ import os from typing import TYPE_CHECKING +import numpy as np import pytest from packaging.version import parse as parse_version import zarr.api.asynchronous +from zarr import Array from zarr.abc.store import OffsetByteRequest from zarr.core.buffer import Buffer, cpu, default_buffer_prototype from zarr.core.sync import _collect_aiterator, sync from zarr.storage import FsspecStore +from zarr.storage._fsspec import _make_async from zarr.testing.store import StoreTests if TYPE_CHECKING: + import pathlib from collections.abc import Generator import botocore.client @@ -240,18 +244,45 @@ async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None: await store.delete_dir("test_prefix") +def array_roundtrip(store): + """ + Round trip an array using a Zarr store + + Args: + store: Store-Like object (e.g., FSMap) + """ + arr = zarr.open(store=store, mode="w", shape=(3, 3)) + assert isinstance(arr, Array) + # Set values + arr[:] = 1 + # Read set values + arr = zarr.open(store=store, mode="r", shape=(3, 3)) + assert isinstance(arr, Array) + np.testing.assert_array_equal(np.ones((3, 3)), arr[:]) + + @pytest.mark.skipif( parse_version(fsspec.__version__) < parse_version("2024.12.0"), reason="No AsyncFileSystemWrapper", ) -def test_wrap_sync_filesystem(): +def test_wrap_sync_filesystem(tmp_path): """The local fs is not async so we should expect it to be wrapped automatically""" from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - store = FsspecStore.from_url("local://test/path") - + store = FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True}) assert isinstance(store.fs, AsyncFileSystemWrapper) assert store.fs.async_impl + array_roundtrip(store) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) >= parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_wrap_sync_filesystem_raises(tmp_path): + """The local fs is not async so we should expect it to be wrapped automatically""" + with pytest.raises(ImportError, match="The filesystem .*"): + FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True}) @pytest.mark.skipif( @@ -259,13 +290,73 @@ def test_wrap_sync_filesystem(): reason="No AsyncFileSystemWrapper", ) def test_no_wrap_async_filesystem(): - """An async fs should not be wrapped automatically; fsspec's https filesystem is such an fs""" + """An async fs should not be wrapped automatically; fsspec's s3 filesystem is such an fs""" from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - store = FsspecStore.from_url("https://test/path") - + store = FsspecStore.from_url( + f"s3://{test_bucket_name}/foo/spam/", + storage_options={"endpoint_url": endpoint_url, "anon": False}, + ) assert not isinstance(store.fs, AsyncFileSystemWrapper) assert store.fs.async_impl + array_roundtrip(store) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_open_fsmap_file(tmp_path: pathlib.Path) -> None: + fsspec = pytest.importorskip("fsspec") + fs = fsspec.filesystem("file", auto_mkdir=True) + mapper = fs.get_mapper(tmp_path) + array_roundtrip(mapper) + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +def test_open_fsmap_file_raises(tmp_path: pathlib.Path) -> None: + fsspec = pytest.importorskip("fsspec.implementations.local") + fs = fsspec.LocalFileSystem(auto_mkdir=False) + mapper = fs.get_mapper(tmp_path) + with pytest.raises(ValueError, match="LocalFilesystem .*"): + array_roundtrip(mapper) + + +@pytest.mark.parametrize("asynchronous", [True, False]) +def test_open_fsmap_s3(asynchronous: bool) -> None: + s3_filesystem = s3fs.S3FileSystem( + asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False + ) + mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") + array_roundtrip(mapper) + + +def test_open_s3map_raises() -> None: + with pytest.raises(TypeError, match="Unsupported type for store_like:.*"): + zarr.open(store=0, mode="w", shape=(3, 3)) + s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False) + mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/") + with pytest.raises( + ValueError, match="'path' was provided but is not used for FSMap store_like objects" + ): + zarr.open(store=mapper, path="bar", mode="w", shape=(3, 3)) + with pytest.raises( + ValueError, + match="'storage_options was provided but is not used for FSMap store_like objects", + ): + zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3)) + + +@pytest.mark.parametrize("asynchronous", [True, False]) +def test_make_async(asynchronous: bool) -> None: + s3_filesystem = s3fs.S3FileSystem( + asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False + ) + fs = _make_async(s3_filesystem) + assert fs.asynchronous @pytest.mark.skipif(