From f74e53aca5311ec077da71585dd962c4af7b8a11 Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Thu, 14 Nov 2024 13:43:07 -0600 Subject: [PATCH] Added Store.getsize (#2426) * Added Store.getsize Closes https://github.com/zarr-developers/zarr-python/issues/2420 * fixups * lint * wip * Use prefix * fixup * Maybe fixup * lint * revert buffer chnages * fixup * fixup * Remove AsyncIterable support * fixup --------- Co-authored-by: Davis Bennett --- src/zarr/abc/store.py | 68 ++++++++++++++++++++++++ src/zarr/core/array.py | 13 +++++ src/zarr/core/common.py | 4 +- src/zarr/storage/local.py | 9 +++- src/zarr/storage/logging.py | 8 +++ src/zarr/storage/remote.py | 13 +++++ src/zarr/testing/store.py | 23 ++++++++ tests/test_array.py | 24 +++++++++ tests/test_indexing.py | 1 + tests/test_metadata/test_consolidated.py | 2 +- tests/test_v2.py | 1 + 11 files changed, 163 insertions(+), 3 deletions(-) diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index fa3c7f3bdf..bd0a7ad503 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -5,6 +5,10 @@ from itertools import starmap from typing import TYPE_CHECKING, Protocol, runtime_checkable +from zarr.core.buffer.core import default_buffer_prototype +from zarr.core.common import concurrent_map +from zarr.core.config import config + if TYPE_CHECKING: from collections.abc import AsyncGenerator, AsyncIterator, Iterable from types import TracebackType @@ -344,6 +348,70 @@ async def _get_many( for req in requests: yield (req[0], await self.get(*req)) + async def getsize(self, key: str) -> int: + """ + Return the size, in bytes, of a value in a Store. + + Parameters + ---------- + key : str + + Returns + ------- + nbytes : int + The size of the value (in bytes). + + Raises + ------ + FileNotFoundError + When the given key does not exist in the store. + """ + # Note to implementers: this default implementation is very inefficient since + # it requires reading the entire object. Many systems will have ways to get the + # size of an object without reading it. + value = await self.get(key, prototype=default_buffer_prototype()) + if value is None: + raise FileNotFoundError(key) + return len(value) + + async def getsize_prefix(self, prefix: str) -> int: + """ + Return the size, in bytes, of all values under a prefix. + + Parameters + ---------- + prefix : str + The prefix of the directory to measure. + + Returns + ------- + nbytes : int + The sum of the sizes of the values in the directory (in bytes). + + See Also + -------- + zarr.Array.nbytes_stored + Store.getsize + + Notes + ----- + ``getsize_prefix`` is just provided as a potentially faster alternative to + listing all the keys under a prefix calling :meth:`Store.getsize` on each. + + In general, ``prefix`` should be the path of an Array or Group in the Store. + Implementations may differ on the behavior when some other ``prefix`` + is provided. + """ + # TODO: Overlap listing keys with getsize calls. + # Currently, we load the list of keys into memory and only then move + # on to getting sizes. Ideally we would overlap those two, which should + # improve tail latency and might reduce memory pressure (since not all keys + # would be in memory at once). + keys = [(x,) async for x in self.list_prefix(prefix)] + limit = config.get("async.concurrency") + sizes = await concurrent_map(keys, self.getsize, limit=limit) + return sum(sizes) + @runtime_checkable class ByteGetter(Protocol): diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index eeff8378ae..249168723f 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -889,6 +889,9 @@ async def nchunks_initialized(self) -> int: """ return len(await chunks_initialized(self)) + async def nbytes_stored(self) -> int: + return await self.store_path.store.getsize_prefix(self.store_path.path) + def _iter_chunk_coords( self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None ) -> Iterator[ChunkCoords]: @@ -1727,6 +1730,16 @@ def nchunks_initialized(self) -> int: """ return sync(self._async_array.nchunks_initialized()) + def nbytes_stored(self) -> int: + """ + Determine the size, in bytes, of the array actually written to the store. + + Returns + ------- + size : int + """ + return sync(self._async_array.nbytes_stored()) + def _iter_chunk_keys( self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None ) -> Iterator[str]: diff --git a/src/zarr/core/common.py b/src/zarr/core/common.py index f3f49b0d5d..e76ddd030d 100644 --- a/src/zarr/core/common.py +++ b/src/zarr/core/common.py @@ -50,7 +50,9 @@ def product(tup: ChunkCoords) -> int: async def concurrent_map( - items: Iterable[T], func: Callable[..., Awaitable[V]], limit: int | None = None + items: Iterable[T], + func: Callable[..., Awaitable[V]], + limit: int | None = None, ) -> list[V]: if limit is None: return await asyncio.gather(*list(starmap(func, items))) diff --git a/src/zarr/storage/local.py b/src/zarr/storage/local.py index f6e94ae479..f9b1747c31 100644 --- a/src/zarr/storage/local.py +++ b/src/zarr/storage/local.py @@ -2,12 +2,14 @@ import asyncio import io +import os import shutil from pathlib import Path from typing import TYPE_CHECKING from zarr.abc.store import ByteRangeRequest, Store from zarr.core.buffer import Buffer +from zarr.core.buffer.core import default_buffer_prototype from zarr.core.common import concurrent_map if TYPE_CHECKING: @@ -124,10 +126,12 @@ def __eq__(self, other: object) -> bool: async def get( self, key: str, - prototype: BufferPrototype, + prototype: BufferPrototype | None = None, byte_range: tuple[int | None, int | None] | None = None, ) -> Buffer | None: # docstring inherited + if prototype is None: + prototype = default_buffer_prototype() if not self._is_open: await self._open() assert isinstance(key, str) @@ -222,3 +226,6 @@ async def list_dir(self, prefix: str) -> AsyncIterator[str]: yield key.relative_to(base).as_posix() except (FileNotFoundError, NotADirectoryError): pass + + async def getsize(self, key: str) -> int: + return os.path.getsize(self.root / key) diff --git a/src/zarr/storage/logging.py b/src/zarr/storage/logging.py index 3b11ddbba7..bc90b4f30f 100644 --- a/src/zarr/storage/logging.py +++ b/src/zarr/storage/logging.py @@ -225,3 +225,11 @@ async def delete_dir(self, prefix: str) -> None: # docstring inherited with self.log(prefix): await self._store.delete_dir(prefix=prefix) + + async def getsize(self, key: str) -> int: + with self.log(key): + return await self._store.getsize(key) + + async def getsize_prefix(self, prefix: str) -> int: + with self.log(prefix): + return await self._store.getsize_prefix(prefix) diff --git a/src/zarr/storage/remote.py b/src/zarr/storage/remote.py index c08963bfaa..2b8329c9fa 100644 --- a/src/zarr/storage/remote.py +++ b/src/zarr/storage/remote.py @@ -325,3 +325,16 @@ async def list_prefix(self, prefix: str) -> AsyncIterator[str]: f"{self.path}/{prefix}", detail=False, maxdepth=None, withdirs=False ): yield onefile.removeprefix(f"{self.path}/") + + async def getsize(self, key: str) -> int: + path = _dereference_path(self.path, key) + info = await self.fs._info(path) + + size = info.get("size") + + if size is None: + # Not all filesystems support size. Fall back to reading the entire object + return await super().getsize(key) + else: + # fsspec doesn't have typing. We'll need to assume or verify this is true + return int(size) diff --git a/src/zarr/testing/store.py b/src/zarr/testing/store.py index b544bf87e2..d26d83e566 100644 --- a/src/zarr/testing/store.py +++ b/src/zarr/testing/store.py @@ -318,3 +318,26 @@ async def test_set_if_not_exists(self, store: S) -> None: result = await store.get("k2", default_buffer_prototype()) assert result == new + + async def test_getsize(self, store: S) -> None: + key = "k" + data = self.buffer_cls.from_bytes(b"0" * 10) + await self.set(store, key, data) + + result = await store.getsize(key) + assert isinstance(result, int) + assert result > 0 + + async def test_getsize_raises(self, store: S) -> None: + with pytest.raises(FileNotFoundError): + await store.getsize("not-a-real-key") + + async def test_getsize_prefix(self, store: S) -> None: + prefix = "array/c/" + for i in range(10): + data = self.buffer_cls.from_bytes(b"0" * 10) + await self.set(store, f"{prefix}/{i}", data) + + result = await store.getsize_prefix(prefix) + assert isinstance(result, int) + assert result > 0 diff --git a/tests/test_array.py b/tests/test_array.py index 3948896186..975873053d 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -372,6 +372,30 @@ async def test_chunks_initialized() -> None: assert observed == expected +def test_nbytes_stored() -> None: + arr = zarr.create(shape=(100,), chunks=(10,), dtype="i4") + result = arr.nbytes_stored() + assert result == 366 # the size of the metadata document. This is a fragile test. + arr[:50] = 1 + result = arr.nbytes_stored() + assert result == 566 # the size with 5 chunks filled. + arr[50:] = 2 + result = arr.nbytes_stored() + assert result == 766 # the size with all chunks filled. + + +async def test_nbytes_stored_async() -> None: + arr = await zarr.api.asynchronous.create(shape=(100,), chunks=(10,), dtype="i4") + result = await arr.nbytes_stored() + assert result == 366 # the size of the metadata document. This is a fragile test. + await arr.setitem(slice(50), 1) + result = await arr.nbytes_stored() + assert result == 566 # the size with 5 chunks filled. + await arr.setitem(slice(50, 100), 2) + result = await arr.nbytes_stored() + assert result == 766 # the size with all chunks filled. + + def test_default_fill_values() -> None: a = Array.create(MemoryStore(), shape=5, chunk_shape=5, dtype="