From 11ac3d9993bbf520c55e93774405bb5a6092829f Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Mon, 6 Jan 2025 14:38:18 -0500 Subject: [PATCH 01/13] Implement asynchronous directory deletion in FsspecStore - override Store.delete_dir default method, which deletes keys one by one, to support bulk deletion for fsspec implementations that support a list of paths in the fs._rm method. - This can greatly reduce the number of requests to S3, which reduces likelihood of running into throttling errors and improves delete performance. - Currently, only s3fs is supported. --- src/zarr/storage/_fsspec.py | 43 +++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 89d80320dd..afaf22e660 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -261,6 +261,49 @@ async def delete(self, key: str) -> None: except self.allowed_exceptions: pass + async def delete_dir(self, prefix: str) -> None: + """ + Remove all keys and prefixes in the store that begin with a given prefix. + """ + if not self.supports_deletes: + raise NotImplementedError + if not self.supports_listing: + raise NotImplementedError + self._check_writable() + + if prefix and not prefix.endswith("/"): + prefix += "/" + + paths_to_delete = [] + async for key in self.list_prefix(prefix): + paths_to_delete.append(_dereference_path(self.path, key)) + + if not paths_to_delete: + return + + try: + import s3fs + except ImportError: + s3fs = None + + # If s3fs is installed and our filesystem is S3FileSystem, do a bulk delete + if s3fs and isinstance(self.fs, s3fs.S3FileSystem): + try: + await self.fs._rm(paths_to_delete) + except FileNotFoundError: + pass + except self.allowed_exceptions: + pass + else: + # Otherwise, delete one by one + for path in paths_to_delete: + try: + await self.fs._rm(path) + except FileNotFoundError: + pass + except self.allowed_exceptions: + pass + async def exists(self, key: str) -> bool: # docstring inherited path = _dereference_path(self.path, key) From bfb4397b5b41f7a15ae5fdaaf41a1e3b169b50be Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Wed, 8 Jan 2025 12:54:50 -0500 Subject: [PATCH 02/13] Use async batched _rm() for FsspecStore.delete_dir() --- src/zarr/storage/_fsspec.py | 40 ++++++++----------------------------- 1 file changed, 8 insertions(+), 32 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index afaf22e660..ce4c3f03fa 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -1,6 +1,7 @@ from __future__ import annotations import warnings +import inspect from typing import TYPE_CHECKING, Any from zarr.abc.store import ByteRangeRequest, Store @@ -262,47 +263,22 @@ async def delete(self, key: str) -> None: pass async def delete_dir(self, prefix: str) -> None: - """ - Remove all keys and prefixes in the store that begin with a given prefix. - """ + # docstring inherited if not self.supports_deletes: - raise NotImplementedError + raise NotImplementedError('This method is only available for stores that support deletes.') if not self.supports_listing: - raise NotImplementedError + raise NotImplementedError('This method is only available for stores that support directory listing.') self._check_writable() - if prefix and not prefix.endswith("/"): - prefix += "/" - - paths_to_delete = [] - async for key in self.list_prefix(prefix): - paths_to_delete.append(_dereference_path(self.path, key)) + path_to_delete = _dereference_path(self.path, prefix) - if not paths_to_delete: - return - - try: - import s3fs - except ImportError: - s3fs = None - - # If s3fs is installed and our filesystem is S3FileSystem, do a bulk delete - if s3fs and isinstance(self.fs, s3fs.S3FileSystem): + if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm): try: - await self.fs._rm(paths_to_delete) - except FileNotFoundError: - pass + await self.fs._rm(path_to_delete, recursive=True) except self.allowed_exceptions: pass else: - # Otherwise, delete one by one - for path in paths_to_delete: - try: - await self.fs._rm(path) - except FileNotFoundError: - pass - except self.allowed_exceptions: - pass + raise NotImplementedError("The store does not support async deletes") async def exists(self, key: str) -> bool: # docstring inherited From f21f1c2ab68b27bbccfd1689926b19e36fe15c67 Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Wed, 8 Jan 2025 14:52:24 -0500 Subject: [PATCH 03/13] Suppress allowed exceptions instead of try-except-pass --- src/zarr/storage/_fsspec.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index ce4c3f03fa..4bdad2731c 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -2,6 +2,7 @@ import warnings import inspect +from contextlib import suppress from typing import TYPE_CHECKING, Any from zarr.abc.store import ByteRangeRequest, Store @@ -273,10 +274,8 @@ async def delete_dir(self, prefix: str) -> None: path_to_delete = _dereference_path(self.path, prefix) if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm): - try: + with suppress(self.allowed_exceptions): await self.fs._rm(path_to_delete, recursive=True) - except self.allowed_exceptions: - pass else: raise NotImplementedError("The store does not support async deletes") From 220c28b76efa91ab64930a6ed69d0c238faaeaab Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Wed, 8 Jan 2025 15:03:27 -0500 Subject: [PATCH 04/13] Adds note on possibly redundant condition in FsspecStore.delete_dir() --- src/zarr/storage/_fsspec.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 4bdad2731c..777dc85e2e 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -273,6 +273,7 @@ async def delete_dir(self, prefix: str) -> None: path_to_delete = _dereference_path(self.path, prefix) + # this is probably the same condition as `if self.fs.async_impl` if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm): with suppress(self.allowed_exceptions): await self.fs._rm(path_to_delete, recursive=True) From ece0f0ec73ab46f9cfbe4c7897dd4cd07c0274c3 Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Wed, 8 Jan 2025 15:09:05 -0500 Subject: [PATCH 05/13] Fix: unpack allowed arguments list --- src/zarr/storage/_fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 777dc85e2e..5aad22fe5c 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -275,7 +275,7 @@ async def delete_dir(self, prefix: str) -> None: # this is probably the same condition as `if self.fs.async_impl` if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm): - with suppress(self.allowed_exceptions): + with suppress(*self.allowed_exceptions): await self.fs._rm(path_to_delete, recursive=True) else: raise NotImplementedError("The store does not support async deletes") From 08dc4f9c7fef20c598299f92303dd4cca8111a54 Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Fri, 24 Jan 2025 17:47:26 -0500 Subject: [PATCH 06/13] Adds tests for FsspecStore.delete_dir --- tests/test_store/test_fsspec.py | 38 +++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index a560ca02e8..fe084eae2a 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -217,6 +217,20 @@ async def test_empty_nonexistent_path(self, store_kwargs) -> None: store = await self.store_cls.open(**store_kwargs) assert await store.is_empty("") + async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None: + store.supports_deletes = False + with pytest.raises( + NotImplementedError, match=".*only available for stores that support deletes." + ): + await store.delete_dir("test_prefix") + + async def test_delete_dir_unsupported_listing(self, store: FsspecStore) -> None: + store.supports_listing = False + with pytest.raises( + NotImplementedError, match=".*only available for stores that support directory listing." + ): + await store.delete_dir("test_prefix") + @pytest.mark.skipif( parse_version(fsspec.__version__) < parse_version("2024.12.0"), @@ -244,3 +258,27 @@ def test_no_wrap_async_filesystem(): assert not isinstance(store.fs, AsyncFileSystemWrapper) assert store.fs.async_impl + + +@pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) +async def test_delete_dir_wrapped_filesystem(tmpdir) -> None: + """The local fs is not async so we should expect it to be wrapped automatically""" + from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + + store = FsspecStore.from_url(tmpdir / "test/path", storage_options={"auto_mkdir": True}) + + assert isinstance(store.fs, AsyncFileSystemWrapper) + assert store.fs.async_impl + + await store.set("zarr.json", cpu.Buffer.from_bytes(b"root")) + await store.set("foo-bar/zarr.json", cpu.Buffer.from_bytes(b"root")) + await store.set("foo/zarr.json", cpu.Buffer.from_bytes(b"bar")) + await store.set("foo/c/0", cpu.Buffer.from_bytes(b"chunk")) + await store.delete_dir("foo") + assert await store.exists("zarr.json") + assert await store.exists("foo-bar/zarr.json") + assert not await store.exists("foo/zarr.json") + assert not await store.exists("foo/c/0") From 6ad9a0ace5972cdabbb7a42036da2bd336e2fab1 Mon Sep 17 00:00:00 2001 From: Cameron Arshadi <44130022+carshadi@users.noreply.github.com> Date: Wed, 29 Jan 2025 10:45:08 -0500 Subject: [PATCH 07/13] Update src/zarr/storage/_fsspec.py Co-authored-by: Joe Hamman --- src/zarr/storage/_fsspec.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index a04c7d59c4..be2abf380b 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -293,12 +293,8 @@ async def delete_dir(self, prefix: str) -> None: path_to_delete = _dereference_path(self.path, prefix) - # this is probably the same condition as `if self.fs.async_impl` - if hasattr(self.fs, "_rm") and inspect.iscoroutinefunction(self.fs._rm): - with suppress(*self.allowed_exceptions): - await self.fs._rm(path_to_delete, recursive=True) - else: - raise NotImplementedError("The store does not support async deletes") + with suppress(*self.allowed_exceptions): + await self.fs._rm(path_to_delete, recursive=True) async def exists(self, key: str) -> bool: # docstring inherited From 89e1b5f40cfab1dd90304e1dd9baa0f8d400abba Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Wed, 29 Jan 2025 14:28:50 -0500 Subject: [PATCH 08/13] Remove supports_listing condition from FsspecStore.delete_dir --- src/zarr/storage/_fsspec.py | 2 -- tests/test_store/test_fsspec.py | 9 +-------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index be2abf380b..bb7ea079d8 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -287,8 +287,6 @@ async def delete_dir(self, prefix: str) -> None: # docstring inherited if not self.supports_deletes: raise NotImplementedError('This method is only available for stores that support deletes.') - if not self.supports_listing: - raise NotImplementedError('This method is only available for stores that support directory listing.') self._check_writable() path_to_delete = _dereference_path(self.path, prefix) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index fe084eae2a..52871145d1 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -220,14 +220,7 @@ async def test_empty_nonexistent_path(self, store_kwargs) -> None: async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None: store.supports_deletes = False with pytest.raises( - NotImplementedError, match=".*only available for stores that support deletes." - ): - await store.delete_dir("test_prefix") - - async def test_delete_dir_unsupported_listing(self, store: FsspecStore) -> None: - store.supports_listing = False - with pytest.raises( - NotImplementedError, match=".*only available for stores that support directory listing." + NotImplementedError, match="This method is only available for stores that support deletes." ): await store.delete_dir("test_prefix") From 5e675660c718520df48c681aa8b1743f7b3ad018 Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Wed, 29 Jan 2025 14:37:12 -0500 Subject: [PATCH 09/13] use f-string for url formatting --- tests/test_store/test_fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 52871145d1..f6283e455d 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -261,7 +261,7 @@ async def test_delete_dir_wrapped_filesystem(tmpdir) -> None: """The local fs is not async so we should expect it to be wrapped automatically""" from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper - store = FsspecStore.from_url(tmpdir / "test/path", storage_options={"auto_mkdir": True}) + store = FsspecStore.from_url(f"{tmpdir}/test/path", storage_options={"auto_mkdir": True}) assert isinstance(store.fs, AsyncFileSystemWrapper) assert store.fs.async_impl From a0214e48886c64335339180cffd7c0612c7a15f1 Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Wed, 29 Jan 2025 14:39:50 -0500 Subject: [PATCH 10/13] assert `store.fs.asynchronous` instead of `store.fs.async_impl` --- tests/test_store/test_fsspec.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index f6283e455d..cea32c0338 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -264,7 +264,7 @@ async def test_delete_dir_wrapped_filesystem(tmpdir) -> None: store = FsspecStore.from_url(f"{tmpdir}/test/path", storage_options={"auto_mkdir": True}) assert isinstance(store.fs, AsyncFileSystemWrapper) - assert store.fs.async_impl + assert store.fs.asynchronous await store.set("zarr.json", cpu.Buffer.from_bytes(b"root")) await store.set("foo-bar/zarr.json", cpu.Buffer.from_bytes(b"root")) From aa4a7ff6f2f72067daa7adc4f1e9c6c6b08ba913 Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Wed, 29 Jan 2025 14:49:08 -0500 Subject: [PATCH 11/13] updates release notes --- changes/2661.feature.1.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/2661.feature.1.rst diff --git a/changes/2661.feature.1.rst b/changes/2661.feature.1.rst new file mode 100644 index 0000000000..5d0209c581 --- /dev/null +++ b/changes/2661.feature.1.rst @@ -0,0 +1 @@ +Improves performance of FsspecStore.delete_dir for remote filesystems supporting concurrent/batched deletes, e.g., s3fs. \ No newline at end of file From 96fb2f6d900b0ead85ee7d8fc384c90acdc3b9a7 Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Fri, 14 Feb 2025 13:25:28 -0500 Subject: [PATCH 12/13] remove unused import --- src/zarr/storage/_fsspec.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 827047c9a6..1cc7039e68 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -1,7 +1,6 @@ from __future__ import annotations import warnings -import inspect from contextlib import suppress from typing import TYPE_CHECKING, Any @@ -291,7 +290,9 @@ async def delete(self, key: str) -> None: async def delete_dir(self, prefix: str) -> None: # docstring inherited if not self.supports_deletes: - raise NotImplementedError('This method is only available for stores that support deletes.') + raise NotImplementedError( + "This method is only available for stores that support deletes." + ) self._check_writable() path_to_delete = _dereference_path(self.path, prefix) From f1d90cc53f14aa317729b499364bb9d5dc70b6ad Mon Sep 17 00:00:00 2001 From: Cameron Arshadi Date: Fri, 14 Feb 2025 13:27:07 -0500 Subject: [PATCH 13/13] Explicitly construct wrapped local filesystem for test --- tests/test_store/test_fsspec.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/test_store/test_fsspec.py b/tests/test_store/test_fsspec.py index 3b06f9c08d..a710b9e22b 100644 --- a/tests/test_store/test_fsspec.py +++ b/tests/test_store/test_fsspec.py @@ -220,7 +220,8 @@ async def test_empty_nonexistent_path(self, store_kwargs) -> None: async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None: store.supports_deletes = False with pytest.raises( - NotImplementedError, match="This method is only available for stores that support deletes." + NotImplementedError, + match="This method is only available for stores that support deletes.", ): await store.delete_dir("test_prefix") @@ -258,10 +259,11 @@ def test_no_wrap_async_filesystem(): reason="No AsyncFileSystemWrapper", ) async def test_delete_dir_wrapped_filesystem(tmpdir) -> None: - """The local fs is not async so we should expect it to be wrapped automatically""" from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper + from fsspec.implementations.local import LocalFileSystem - store = FsspecStore.from_url(f"{tmpdir}/test/path", storage_options={"auto_mkdir": True}) + wrapped_fs = AsyncFileSystemWrapper(LocalFileSystem(auto_mkdir=True)) + store = FsspecStore(wrapped_fs, read_only=False, path=f"{tmpdir}/test/path") assert isinstance(store.fs, AsyncFileSystemWrapper) assert store.fs.asynchronous