Skip to content

Commit cca07fc

Browse files
authored
feat: add HTTP store support (#61)
1 parent c9b1d85 commit cca07fc

17 files changed

+473
-301
lines changed

.github/workflows/cd.yml

+4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ jobs:
3434
- { os: linux, manylinux: musllinux_1_2, target: armv7 }
3535
# windows
3636
- { os: windows, target: i686, python-architecture: x86 }
37+
exclude:
38+
# https://github.com/rust-cross/cargo-xwin/issues/76
39+
- os: windows
40+
target: aarch64
3741
runs-on: ${{ (matrix.os == 'linux' && 'ubuntu') || matrix.os }}-latest
3842
steps:
3943
- uses: actions/checkout@v4

Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ numpy = "0.23.0"
1919
unsafe_cell_slice = "0.2.0"
2020
serde_json = "1.0.128"
2121
pyo3-stub-gen = "0.6.1"
22+
opendal = { version = "0.50.2", features = ["services-http"] }
23+
tokio = { version = "1.41.1", features = ["rt-multi-thread"] }
24+
zarrs_opendal = "0.4.0"
2225

2326
[profile.release]
2427
lto = true

README.md

+8-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,14 @@ You can then use your `zarr` as normal (with some caveats)!
3030

3131
We export a `ZarrsCodecPipeline` class so that `zarr-python` can use the class but it is not meant to be instantiated and we do not guarantee the stability of its API beyond what is required so that `zarr-python` can use it. Therefore, it is not documented here. We also export two errors, `DiscontiguousArrayError` and `CollapsedDimensionError` that can be thrown in the process of converting to indexers that `zarrs` can understand (see below for more details).
3232

33-
At the moment, we only support local filesystems but intend to support more in the future: https://github.com/ilan-gold/zarrs-python/issues/44
33+
At the moment, we only support a subset of the `zarr-python` stores:
34+
35+
- [x] [LocalStore](https://zarr.readthedocs.io/en/main/_autoapi/zarr/storage/local/index.html) (FileSystem)
36+
- [RemoteStore](https://zarr.readthedocs.io/en/main/_autoapi/zarr/storage/remote/index.html)
37+
- [x] [HTTPFileSystem](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.http.HTTPFileSystem)
38+
39+
A `NotImplementedError` will be raised if a store is not supported.
40+
We intend to support more stores in the future: https://github.com/ilan-gold/zarrs-python/issues/44.
3441

3542
### Configuration
3643

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ dependencies = [
3030
'donfig',
3131
'pytest',
3232
'universal_pathlib>=0.2.0',
33-
'zarr>=3.0.0b2',
33+
'zarr>=3.0.0b2,<=3.0.0b3',
3434
]
3535

3636
[project.optional-dependencies]

python/zarrs/_internal.pyi

+28-18
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@
22
# ruff: noqa: E501, F401
33

44
import typing
5+
from enum import Enum, auto
56

67
import numpy
78
import numpy.typing
89

10+
class Basic:
11+
def __new__(cls, byte_interface: typing.Any, chunk_spec: typing.Any): ...
12+
...
13+
914
class CodecPipelineImpl:
1015
def __new__(
1116
cls,
@@ -19,29 +24,34 @@ class CodecPipelineImpl:
1924
): ...
2025
def retrieve_chunks_and_apply_index(
2126
self,
22-
chunk_descriptions: typing.Sequence[
23-
tuple[
24-
tuple[str, typing.Sequence[int], str, typing.Sequence[int]],
25-
typing.Sequence[slice],
26-
typing.Sequence[slice],
27-
]
28-
],
27+
chunk_descriptions: typing.Sequence[WithSubset],
2928
value: numpy.NDArray[typing.Any],
3029
) -> None: ...
3130
def retrieve_chunks(
32-
self,
33-
chunk_descriptions: typing.Sequence[
34-
tuple[str, typing.Sequence[int], str, typing.Sequence[int]]
35-
],
31+
self, chunk_descriptions: typing.Sequence[Basic]
3632
) -> list[numpy.typing.NDArray[numpy.uint8]]: ...
3733
def store_chunks_with_indices(
3834
self,
39-
chunk_descriptions: typing.Sequence[
40-
tuple[
41-
tuple[str, typing.Sequence[int], str, typing.Sequence[int]],
42-
typing.Sequence[slice],
43-
typing.Sequence[slice],
44-
]
45-
],
35+
chunk_descriptions: typing.Sequence[WithSubset],
4636
value: numpy.NDArray[typing.Any],
4737
) -> None: ...
38+
39+
class FilesystemStoreConfig:
40+
root: str
41+
42+
class HttpStoreConfig:
43+
endpoint: str
44+
45+
class WithSubset:
46+
def __new__(
47+
cls,
48+
item: Basic,
49+
chunk_subset: typing.Sequence[slice],
50+
subset: typing.Sequence[slice],
51+
shape: typing.Sequence[int],
52+
): ...
53+
...
54+
55+
class StoreConfig(Enum):
56+
Filesystem = auto()
57+
Http = auto()

python/zarrs/pipeline.py

+16-22
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,7 @@
66
from typing import TYPE_CHECKING, TypedDict
77

88
import numpy as np
9-
from zarr.abc.codec import (
10-
Codec,
11-
CodecPipeline,
12-
)
9+
from zarr.abc.codec import Codec, CodecPipeline
1310
from zarr.core.config import config
1411

1512
if TYPE_CHECKING:
@@ -18,7 +15,7 @@
1815

1916
from zarr.abc.store import ByteGetter, ByteSetter
2017
from zarr.core.array_spec import ArraySpec
21-
from zarr.core.buffer import Buffer, NDBuffer
18+
from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer
2219
from zarr.core.chunk_grids import ChunkGrid
2320
from zarr.core.common import ChunkCoords
2421
from zarr.core.indexing import SelectorTuple
@@ -120,30 +117,28 @@ async def read(
120117
batch_info: Iterable[
121118
tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple]
122119
],
123-
out: NDBuffer,
120+
out: NDBuffer, # type: ignore
124121
drop_axes: tuple[int, ...] = (), # FIXME: unused
125122
) -> None:
126-
out = out.as_ndarray_like() # FIXME: Error if array is not in host memory
123+
# FIXME: Error if array is not in host memory
124+
out: NDArrayLike = out.as_ndarray_like()
127125
if not out.dtype.isnative:
128126
raise RuntimeError("Non-native byte order not supported")
129127
try:
130-
chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes)
131-
index_in_rust = True
128+
chunks_desc = make_chunk_info_for_rust_with_indices(
129+
batch_info, drop_axes, out.shape
130+
)
132131
except (DiscontiguousArrayError, CollapsedDimensionError):
133132
chunks_desc = make_chunk_info_for_rust(batch_info)
134-
index_in_rust = False
135-
if index_in_rust:
133+
else:
136134
await asyncio.to_thread(
137135
self.impl.retrieve_chunks_and_apply_index,
138136
chunks_desc,
139137
out,
140138
)
141139
return None
142140
chunks = await asyncio.to_thread(self.impl.retrieve_chunks, chunks_desc)
143-
for chunk, chunk_info in zip(chunks, batch_info):
144-
out_selection = chunk_info[3]
145-
selection = chunk_info[2]
146-
spec = chunk_info[1]
141+
for chunk, (_, spec, selection, out_selection) in zip(chunks, batch_info):
147142
chunk_reshaped = chunk.view(spec.dtype).reshape(spec.shape)
148143
chunk_selected = chunk_reshaped[selection]
149144
if drop_axes:
@@ -155,18 +150,17 @@ async def write(
155150
batch_info: Iterable[
156151
tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
157152
],
158-
value: NDBuffer,
153+
value: NDBuffer, # type: ignore
159154
drop_axes: tuple[int, ...] = (),
160155
) -> None:
161-
value = value.as_ndarray_like() # FIXME: Error if array is not in host memory
156+
# FIXME: Error if array is not in host memory
157+
value: NDArrayLike | np.ndarray = value.as_ndarray_like()
162158
if not value.dtype.isnative:
163159
value = np.ascontiguousarray(value, dtype=value.dtype.newbyteorder("="))
164160
elif not value.flags.c_contiguous:
165161
value = np.ascontiguousarray(value)
166-
chunks_desc = make_chunk_info_for_rust_with_indices(batch_info, drop_axes)
167-
await asyncio.to_thread(
168-
self.impl.store_chunks_with_indices,
169-
chunks_desc,
170-
value,
162+
chunks_desc = make_chunk_info_for_rust_with_indices(
163+
batch_info, drop_axes, value.shape
171164
)
165+
await asyncio.to_thread(self.impl.store_chunks_with_indices, chunks_desc, value)
172166
return None

python/zarrs/utils.py

+19-22
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@
33
import operator
44
import os
55
from functools import reduce
6-
from typing import TYPE_CHECKING, Any
6+
from typing import TYPE_CHECKING
77

88
import numpy as np
99
from zarr.core.indexing import SelectorTuple, is_integer
1010

11+
from zarrs._internal import Basic, WithSubset
12+
1113
if TYPE_CHECKING:
1214
from collections.abc import Iterable
1315
from types import EllipsisType
1416

1517
from zarr.abc.store import ByteGetter, ByteSetter
1618
from zarr.core.array_spec import ArraySpec
17-
from zarr.core.common import ChunkCoords
1819

1920

2021
# adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
@@ -62,17 +63,6 @@ def selector_tuple_to_slice_selection(selector_tuple: SelectorTuple) -> list[sli
6263
return make_slice_selection(selector_tuple)
6364

6465

65-
def convert_chunk_to_primitive(
66-
byte_getter: ByteGetter | ByteSetter, chunk_spec: ArraySpec
67-
) -> tuple[str, ChunkCoords, str, Any]:
68-
return (
69-
str(byte_getter),
70-
chunk_spec.shape,
71-
str(chunk_spec.dtype),
72-
chunk_spec.fill_value.tobytes(),
73-
)
74-
75-
7666
def resulting_shape_from_index(
7767
array_shape: tuple[int, ...],
7868
index_tuple: tuple[int | slice | EllipsisType | np.ndarray],
@@ -149,10 +139,12 @@ def make_chunk_info_for_rust_with_indices(
149139
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
150140
],
151141
drop_axes: tuple[int, ...],
152-
) -> list[tuple[tuple[str, ChunkCoords, str, Any], list[slice], list[slice]]]:
153-
chunk_info_with_indices = []
142+
shape: tuple[int, ...],
143+
) -> list[WithSubset]:
144+
shape = shape if shape else (1,) # constant array
145+
chunk_info_with_indices: list[WithSubset] = []
154146
for byte_getter, chunk_spec, chunk_selection, out_selection in batch_info:
155-
chunk_info = convert_chunk_to_primitive(byte_getter, chunk_spec)
147+
chunk_info = Basic(byte_getter, chunk_spec)
156148
out_selection_as_slices = selector_tuple_to_slice_selection(out_selection)
157149
chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection)
158150
shape_chunk_selection_slices = get_shape_for_selector(
@@ -169,7 +161,12 @@ def make_chunk_info_for_rust_with_indices(
169161
f"{shape_chunk_selection} != {shape_chunk_selection_slices}"
170162
)
171163
chunk_info_with_indices.append(
172-
(chunk_info, out_selection_as_slices, chunk_selection_as_slices)
164+
WithSubset(
165+
chunk_info,
166+
chunk_subset=chunk_selection_as_slices,
167+
subset=out_selection_as_slices,
168+
shape=shape,
169+
)
173170
)
174171
return chunk_info_with_indices
175172

@@ -178,8 +175,8 @@ def make_chunk_info_for_rust(
178175
batch_info: Iterable[
179176
tuple[ByteGetter | ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]
180177
],
181-
) -> list[tuple[str, ChunkCoords, str, Any]]:
182-
return list(
183-
convert_chunk_to_primitive(byte_getter, chunk_spec)
184-
for (byte_getter, chunk_spec, _, _) in batch_info
185-
)
178+
) -> list[Basic]:
179+
return [
180+
Basic(byte_interface, chunk_spec)
181+
for (byte_interface, chunk_spec, _, _) in batch_info
182+
]

0 commit comments

Comments
 (0)