Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v3] zarr.array from from an existing zarr.Array #2410

Open
jhamman opened this issue Oct 18, 2024 · 1 comment · May be fixed by #2622
Open

[v3] zarr.array from from an existing zarr.Array #2410

jhamman opened this issue Oct 18, 2024 · 1 comment · May be fixed by #2622
Labels
bug Potential issues with the zarr-python library
Milestone

Comments

@jhamman
Copy link
Member

jhamman commented Oct 18, 2024

Zarr version

3.0.0.beta

Numcodecs version

0.13

Python Version

3.11

Operating System

Mac

Installation

pip

Description

It used to be possible to create a one zarr array from another. This is currently broken but would be interested to bring back in a more optimized form using the underlying async array for streaming.

Steps to reproduce

In [24]: a = zarr.zeros((10000, 10000), chunks=(100,100), dtype='uint16',
    ...: store='a.zarr')

In [25]: b = zarr.array(a, chunks=(100, 200), store='b.zarr')
--------------------------------------------------------------------------
SyncError                                Traceback (most recent call last)
Cell In[25], line 1
----> 1 b = zarr.array(a, chunks=(100, 200), store='b.zarr')

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/api/synchronous.py:164, in array(data, **kwargs)
    163 def array(data: NDArrayLike, **kwargs: Any) -> Array:
--> 164     return Array(sync(async_api.array(data=data, **kwargs)))

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/sync.py:141, in sync(coro, loop, timeout)
    138 return_result = next(iter(finished)).result()
    140 if isinstance(return_result, BaseException):
--> 141     raise return_result
    142 else:
    143     return return_result

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/sync.py:100, in _runner(coro)
     95 """
     96 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
     97 exception, the exception will be returned.
     98 """
     99 try:
--> 100     return await coro
    101 except Exception as ex:
    102     return ex

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/api/asynchronous.py:529, in array(data, **kwargs)
    526 z = await create(**kwargs)
    528 # fill with data
--> 529 await z.setitem(slice(None), data)
    531 return z

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/array.py:880, in AsyncArray.setitem(self, selection, value, prototype)
    874     prototype = default_buffer_prototype()
    875 indexer = BasicIndexer(
    876     selection,
    877     shape=self.metadata.shape,
    878     chunk_grid=self.metadata.chunk_grid,
    879 )
--> 880 return await self._set_selection(indexer, value, prototype=prototype)

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/array.py:853, in AsyncArray._set_selection(self, indexer, value, prototype, fields)
    850 value_buffer = prototype.nd_buffer.from_ndarray_like(value)
    852 # merging with existing data and encoding chunks
--> 853 await self.codec_pipeline.write(
    854     [
    855         (
    856             self.store_path / self.metadata.encode_chunk_key(chunk_coords),
    857             self.metadata.get_chunk_spec(chunk_coords, self.order, prototype),
    858             chunk_selection,
    859             out_selection,
    860         )
    861         for chunk_coords, chunk_selection, out_selection in indexer
    862     ],
    863     value_buffer,
    864     drop_axes=indexer.drop_axes,
    865 )

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/codecs/pipeline.py:456, in BatchedCodecPipeline.write(self, batch_info, value, drop_axes)
    450 async def write(
    451     self,
    452     batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple]],
    453     value: NDBuffer,
    454     drop_axes: tuple[int, ...] = (),
    455 ) -> None:
--> 456     await concurrent_map(
    457         [
    458             (single_batch_info, value, drop_axes)
    459             for single_batch_info in batched(batch_info, self.batch_size)
    460         ],
    461         self.write_batch,
    462         config.get("async.concurrency"),
    463     )

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/common.py:64, in concurrent_map(items, func, limit)
     61     async with sem:
     62         return await func(*item)
---> 64 return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/common.py:62, in concurrent_map.<locals>.run(item)
     60 async def run(item: tuple[Any]) -> V:
     61     async with sem:
---> 62         return await func(*item)

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/codecs/pipeline.py:374, in BatchedCodecPipeline.write_batch(self, batch_info, value, drop_axes)
    353 chunk_bytes_batch = await concurrent_map(
    354     [
    355         (
   (...)
    362     config.get("async.concurrency"),
    363 )
    364 chunk_array_batch = await self.decode_batch(
    365     [
    366         (chunk_bytes, chunk_spec)
   (...)
    370     ],
    371 )
    373 chunk_array_batch = [
--> 374     self._merge_chunk_array(
    375         chunk_array, value, out_selection, chunk_spec, chunk_selection, drop_axes
    376     )
    377     for chunk_array, (_, chunk_spec, chunk_selection, out_selection) in zip(
    378         chunk_array_batch, batch_info, strict=False
    379     )
    380 ]
    382 chunk_array_batch = [
    383     None
    384     if chunk_array is None or chunk_array.all_equal(chunk_spec.fill_value)
   (...)
    388     )
    389 ]
    391 chunk_bytes_batch = await self.encode_batch(
    392     [
    393         (chunk_array, chunk_spec)
   (...)
    397     ],
    398 )

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/codecs/pipeline.py:316, in BatchedCodecPipeline._merge_chunk_array(self, existing_chunk_array, value, out_selection, chunk_spec, chunk_selection, drop_axes)
    314     chunk_value = value
    315 else:
--> 316     chunk_value = value[out_selection]
    317     # handle missing singleton dimensions
    318     if drop_axes != ():

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/buffer/cpu.py:180, in NDBuffer.__getitem__(self, key)
    179 def __getitem__(self, key: Any) -> Self:
--> 180     return self.__class__(np.asanyarray(self._data.__getitem__(key)))

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/array.py:1327, in Array.__getitem__(self, selection)
   1325     return self.vindex[cast(CoordinateSelection | MaskSelection, selection)]
   1326 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
-> 1327     return self.get_orthogonal_selection(pure_selection, fields=fields)
   1328 else:
   1329     return self.get_basic_selection(cast(BasicSelection, pure_selection), fields=fields)

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/_compat.py:43, in _deprecate_positional_args.<locals>._inner_deprecate_positional_args.<locals>.inner_f(*args, **kwargs)
     41 extra_args = len(args) - len(all_args)
     42 if extra_args <= 0:
---> 43     return f(*args, **kwargs)
     45 # extra_args > 0
     46 args_msg = [
     47     f"{name}={arg}"
     48     for name, arg in zip(kwonly_args[:extra_args], args[-extra_args:], strict=False)
     49 ]

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/array.py:1769, in Array.get_orthogonal_selection(self, selection, out, fields, prototype)
   1767     prototype = default_buffer_prototype()
   1768 indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid)
-> 1769 return sync(
   1770     self._async_array._get_selection(
   1771         indexer=indexer, out=out, fields=fields, prototype=prototype
   1772     )
   1773 )

File ~/miniforge3/envs/icechunk-demo/lib/python3.12/site-packages/zarr/core/sync.py:128, in sync(coro, loop, timeout)
    126     loop0 = asyncio.events.get_running_loop()
    127     if loop0 is loop:
--> 128         raise SyncError("Calling sync() from within a running loop")
    129 except RuntimeError:
    130     pass

SyncError: Calling sync() from within a running loop

Additional output

No response

@jhamman jhamman added the bug Potential issues with the zarr-python library label Oct 18, 2024
@jhamman jhamman added this to the 3.0.0 milestone Oct 18, 2024
@jhamman jhamman modified the milestones: 3.0.0, After 3.0.0 Oct 18, 2024
@dstansby
Copy link
Contributor

Looks like documenting this breaking API change should be added to the docs in #2596 (unless it's been fixed since?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Potential issues with the zarr-python library
Projects
Status: Todo
Development

Successfully merging a pull request may close this issue.

2 participants