diff --git a/changes/2661.feature.1.rst b/changes/2661.feature.1.rst new file mode 100644 index 0000000000..5d0209c581 --- /dev/null +++ b/changes/2661.feature.1.rst @@ -0,0 +1 @@ +Improves performance of FsspecStore.delete_dir for remote filesystems supporting concurrent/batched deletes, e.g., s3fs. \ No newline at end of file diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 92c14fcc76..1cc7039e68 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -1,6 +1,7 @@ from __future__ import annotations import warnings +from contextlib import suppress from typing import TYPE_CHECKING, Any from zarr.abc.store import ( @@ -286,6 +287,19 @@ async def delete(self, key: str) -> None: except self.allowed_exceptions: pass + async def delete_dir(self, prefix: str) -> None: + # docstring inherited + if not self.supports_deletes: + raise NotImplementedError( + "This method is only available for stores that support deletes." + ) + self._check_writable() + + path_to_delete = _dereference_path(self.path, prefix) + + with suppress(*self.allowed_exceptions): + await self.fs._rm(path_to_delete, recursive=True) + async def exists(self, key: str) -> bool: # docstring inherited path = _dereference_path(self.path, key) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 929de37869..a710b9e22b 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -217,6 +217,14 @@ async def test_empty_nonexistent_path(self, store_kwargs) -> None: store = await self.store_cls.open(**store_kwargs) assert await store.is_empty("") + async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None: + store.supports_deletes = False + with pytest.raises( + NotImplementedError, + match="This method is only available for stores that support deletes.", + ): + await store.delete_dir("test_prefix") + @pytest.mark.skipif( parse_version(fsspec.__version__) < parse_version("2024.12.0"), @@ -244,3 +252,28 @@ def test_no_wrap_async_filesystem(): assert not isinstance(store.fs, AsyncFileSystemWrapper) assert store.fs.async_impl + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +async def test_delete_dir_wrapped_filesystem(tmpdir) -> None: + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + from fsspec.implementations.local import LocalFileSystem + + wrapped_fs = AsyncFileSystemWrapper(LocalFileSystem(auto_mkdir=True)) + store = FsspecStore(wrapped_fs, read_only=False, path=f"{tmpdir}/test/path") + + assert isinstance(store.fs, AsyncFileSystemWrapper) + assert store.fs.asynchronous + + await store.set("zarr.json", cpu.Buffer.from_bytes(b"root")) + await store.set("foo-bar/zarr.json", cpu.Buffer.from_bytes(b"root")) + await store.set("foo/zarr.json", cpu.Buffer.from_bytes(b"bar")) + await store.set("foo/c/0", cpu.Buffer.from_bytes(b"chunk")) + await store.delete_dir("foo") + assert await store.exists("zarr.json") + assert await store.exists("foo-bar/zarr.json") + assert not await store.exists("foo/zarr.json") + assert not await store.exists("foo/c/0")