From 49163e067f99fc9eab9264f6140593cf417c0ae7 Mon Sep 17 00:00:00 2001 From: Gregory Lee Date: Tue, 30 Nov 2021 23:04:01 -0500 Subject: [PATCH] add StoreV3 support to most convenience routines consolidated metadata functions haven't been updated yet --- zarr/convenience.py | 191 +++++++++++++++++++++------------ zarr/tests/test_convenience.py | 190 +++++++++++++++++++++++++++----- 2 files changed, 290 insertions(+), 91 deletions(-) diff --git a/zarr/convenience.py b/zarr/convenience.py index 60fa5fe176..018af8b9d9 100644 --- a/zarr/convenience.py +++ b/zarr/convenience.py @@ -7,13 +7,13 @@ from zarr.core import Array from zarr.creation import array as _create_array -from zarr.creation import normalize_store_arg, open_array +from zarr.creation import open_array from zarr.errors import CopyError, PathNotFoundError from zarr.hierarchy import Group from zarr.hierarchy import group as _create_group from zarr.hierarchy import open_group from zarr.meta import json_dumps, json_loads -from zarr.storage import contains_array, contains_group, BaseStore +from zarr.storage import contains_array, contains_group, normalize_store_arg, BaseStore from zarr.util import TreeViewer, buffer_size, normalize_storage_path from typing import Union @@ -21,8 +21,14 @@ StoreLike = Union[BaseStore, MutableMapping, str, None] +def _check_and_update_path(store: BaseStore, path): + if getattr(store, '_store_version', 2) > 2 and not path: + raise ValueError("path must be provided for v3 stores") + return normalize_storage_path(path) + + # noinspection PyShadowingBuiltins -def open(store: StoreLike = None, mode: str = "a", **kwargs): +def open(store: StoreLike = None, mode: str = "a", *, zarr_version=2, path=None, **kwargs): """Convenience function to open a group or array using file-mode-like semantics. Parameters @@ -34,6 +40,10 @@ def open(store: StoreLike = None, mode: str = "a", **kwargs): read/write (must exist); 'a' means read/write (create if doesn't exist); 'w' means create (overwrite if exists); 'w-' means create (fail if exists). + zarr_version : {2, 3} + The zarr protocol version to use. + path : str + The path within the store to open. **kwargs Additional parameters are passed through to :func:`zarr.creation.open_array` or :func:`zarr.hierarchy.open_group`. @@ -75,15 +85,16 @@ def open(store: StoreLike = None, mode: str = "a", **kwargs): """ - path = kwargs.get('path', None) # handle polymorphic store arg clobber = mode == 'w' # we pass storage options explicitly, since normalize_store_arg might construct # a store if the input is a fsspec-compatible URL _store: BaseStore = normalize_store_arg( - store, clobber=clobber, storage_options=kwargs.pop("storage_options", {}) + store, clobber=clobber, storage_options=kwargs.pop("storage_options", {}), + zarr_version=zarr_version, ) - path = normalize_storage_path(path) + path = _check_and_update_path(_store, path) + kwargs['path'] = path if mode in {'w', 'w-', 'x'}: if 'shape' in kwargs: @@ -110,7 +121,7 @@ def _might_close(path): return isinstance(path, (str, os.PathLike)) -def save_array(store: StoreLike, arr, **kwargs): +def save_array(store: StoreLike, arr, *, zarr_version=2, path=None, **kwargs): """Convenience function to save a NumPy array to the local file system, following a similar API to the NumPy save() function. @@ -120,6 +131,10 @@ def save_array(store: StoreLike, arr, **kwargs): Store or path to directory in file system or name of zip file. arr : ndarray NumPy array with data to save. + zarr_version : {2, 3} + The zarr protocol version to use when saving. + path : str + The path within the store where the array will be saved. kwargs Passed through to :func:`create`, e.g., compressor. @@ -142,16 +157,18 @@ def save_array(store: StoreLike, arr, **kwargs): """ may_need_closing = _might_close(store) - _store: BaseStore = normalize_store_arg(store, clobber=True) + _store: BaseStore = normalize_store_arg(store, clobber=True, zarr_version=zarr_version) + path = _check_and_update_path(_store, path) try: - _create_array(arr, store=_store, overwrite=True, **kwargs) + _create_array(arr, store=_store, overwrite=True, zarr_version=zarr_version, path=path, + **kwargs) finally: if may_need_closing: # needed to ensure zip file records are written _store.close() -def save_group(store: StoreLike, *args, **kwargs): +def save_group(store: StoreLike, *args, zarr_version=2, path=None, **kwargs): """Convenience function to save several NumPy arrays to the local file system, following a similar API to the NumPy savez()/savez_compressed() functions. @@ -161,6 +178,10 @@ def save_group(store: StoreLike, *args, **kwargs): Store or path to directory in file system or name of zip file. args : ndarray NumPy arrays with data to save. + zarr_version : {2, 3} + The zarr protocol version to use when saving. + path : str + Path within the store where the group will be saved. kwargs NumPy arrays with data to save. @@ -213,21 +234,22 @@ def save_group(store: StoreLike, *args, **kwargs): raise ValueError('at least one array must be provided') # handle polymorphic store arg may_need_closing = _might_close(store) - _store: BaseStore = normalize_store_arg(store, clobber=True) + _store: BaseStore = normalize_store_arg(store, clobber=True, zarr_version=zarr_version) + path = _check_and_update_path(_store, path) try: - grp = _create_group(_store, overwrite=True) + grp = _create_group(_store, path=path, overwrite=True, zarr_version=zarr_version) for i, arr in enumerate(args): k = 'arr_{}'.format(i) - grp.create_dataset(k, data=arr, overwrite=True) + grp.create_dataset(k, data=arr, overwrite=True, zarr_version=zarr_version) for k, arr in kwargs.items(): - grp.create_dataset(k, data=arr, overwrite=True) + grp.create_dataset(k, data=arr, overwrite=True, zarr_version=zarr_version) finally: if may_need_closing: # needed to ensure zip file records are written _store.close() -def save(store: StoreLike, *args, **kwargs): +def save(store: StoreLike, *args, zarr_version=2, path=None, **kwargs): """Convenience function to save an array or group of arrays to the local file system. Parameters @@ -236,6 +258,10 @@ def save(store: StoreLike, *args, **kwargs): Store or path to directory in file system or name of zip file. args : ndarray NumPy arrays with data to save. + zarr_version : {2, 3} + The zarr protocol version to use when saving. + path : str + The path within the group where the arrays will be saved. kwargs NumPy arrays with data to save. @@ -302,9 +328,10 @@ def save(store: StoreLike, *args, **kwargs): if len(args) == 0 and len(kwargs) == 0: raise ValueError('at least one array must be provided') if len(args) == 1 and len(kwargs) == 0: - save_array(store, args[0]) + save_array(store, args[0], zarr_version=zarr_version, path=path) else: - save_group(store, *args, **kwargs) + save_group(store, *args, zarr_version=zarr_version, path=path, + **kwargs) class LazyLoader(Mapping): @@ -337,7 +364,7 @@ def __repr__(self): return r -def load(store: StoreLike): +def load(store: StoreLike, zarr_version=2, path=None): """Load data from an array or group into memory. Parameters @@ -363,11 +390,12 @@ def load(store: StoreLike): """ # handle polymorphic store arg - _store = normalize_store_arg(store) - if contains_array(_store, path=None): - return Array(store=_store, path=None)[...] - elif contains_group(_store, path=None): - grp = Group(store=_store, path=None) + _store = normalize_store_arg(store, zarr_version=zarr_version) + path = _check_and_update_path(_store, path) + if contains_array(_store, path=path): + return Array(store=_store, path=path)[...] + elif contains_group(_store, path=path): + grp = Group(store=_store, path=path) return LazyLoader(grp) @@ -601,6 +629,15 @@ def copy_store(source, dest, source_path='', dest_path='', excludes=None, # setup counting variables n_copied = n_skipped = n_bytes_copied = 0 + source_store_version = getattr(source, '_store_version', 2) + dest_store_version = getattr(dest, '_store_version', 2) + if source_store_version != dest_store_version: + raise ValueError("zarr stores must share the same protocol version") + if source_store_version > 2: + if not source_path or not dest_path: + raise ValueError("v3 stores require specifying a non-empty " + "source_path and dest_path") + # setup logging with _LogWriter(log) as log: @@ -608,52 +645,63 @@ def copy_store(source, dest, source_path='', dest_path='', excludes=None, for source_key in sorted(source.keys()): # filter to keys under source path - if source_key.startswith(source_path): + if source_store_version == 2: + if not source_key.startswith(source_path): + continue + elif source_store_version == 3: + # 'meta/root/' or 'data/root/' have length 10 + if not source_key[10:].startswith(source_path): + continue - # process excludes and includes - exclude = False - for prog in excludes: + # process excludes and includes + exclude = False + for prog in excludes: + if prog.search(source_key): + exclude = True + break + if exclude: + for prog in includes: if prog.search(source_key): - exclude = True + exclude = False break - if exclude: - for prog in includes: - if prog.search(source_key): - exclude = False - break - if exclude: - continue + if exclude: + continue - # map key to destination path + # map key to destination path + if source_store_version == 2: key_suffix = source_key[len(source_path):] dest_key = dest_path + key_suffix - - # create a descriptive label for this operation - descr = source_key - if dest_key != source_key: - descr = descr + ' -> ' + dest_key - - # decide what to do - do_copy = True - if if_exists != 'replace': - if dest_key in dest: - if if_exists == 'raise': - raise CopyError('key {!r} exists in destination' - .format(dest_key)) - elif if_exists == 'skip': - do_copy = False - - # take action - if do_copy: - log('copy {}'.format(descr)) - if not dry_run: - data = source[source_key] - n_bytes_copied += buffer_size(data) - dest[dest_key] = data - n_copied += 1 - else: - log('skip {}'.format(descr)) - n_skipped += 1 + elif source_store_version == 3: + # 10 is length of 'meta/root/' or 'data/root/' + key_suffix = source_key[10 + len(source_path):] + dest_key = source_key[:10] + dest_path + key_suffix + + # create a descriptive label for this operation + descr = source_key + if dest_key != source_key: + descr = descr + ' -> ' + dest_key + + # decide what to do + do_copy = True + if if_exists != 'replace': + if dest_key in dest: + if if_exists == 'raise': + raise CopyError('key {!r} exists in destination' + .format(dest_key)) + elif if_exists == 'skip': + do_copy = False + + # take action + if do_copy: + log('copy {}'.format(descr)) + if not dry_run: + data = source[source_key] + n_bytes_copied += buffer_size(data) + dest[dest_key] = data + n_copied += 1 + else: + log('skip {}'.format(descr)) + n_skipped += 1 # log a final message with a summary of what happened _log_copy_summary(log, dry_run, n_copied, n_skipped, n_bytes_copied) @@ -908,7 +956,15 @@ def _copy(log, source, dest, name, root, shallow, without_attrs, if_exists, # copy attributes if not without_attrs: - ds.attrs.update(source.attrs) + if dest_h5py and 'filters' in source.attrs: + # No filters key in v3 metadata so it was stored in the + # attributes instead. We cannot copy this key to + # HDF5 attrs, though! + source_attrs = source.attrs.asdict().copy() + source_attrs.pop('filters', None) + else: + source_attrs = source.attrs + ds.attrs.update(source_attrs) n_copied += 1 @@ -1064,6 +1120,8 @@ def copy_all(source, dest, shallow=False, without_attrs=False, log=None, # setup counting variables n_copied = n_skipped = n_bytes_copied = 0 + zarr_version = getattr(source, '_version', 2) + # setup logging with _LogWriter(log) as log: @@ -1075,7 +1133,8 @@ def copy_all(source, dest, shallow=False, without_attrs=False, log=None, n_copied += c n_skipped += s n_bytes_copied += b - dest.attrs.update(**source.attrs) + if zarr_version == 2: + dest.attrs.update(**source.attrs) # log a final message with a summary of what happened _log_copy_summary(log, dry_run, n_copied, n_skipped, n_bytes_copied) @@ -1083,7 +1142,7 @@ def copy_all(source, dest, shallow=False, without_attrs=False, log=None, return n_copied, n_skipped, n_bytes_copied -def consolidate_metadata(store: StoreLike, metadata_key=".zmetadata"): +def consolidate_metadata(store: BaseStore, metadata_key=".zmetadata"): """ Consolidate all metadata for groups and arrays within the given store into a single resource and put it under the given key. diff --git a/zarr/tests/test_convenience.py b/zarr/tests/test_convenience.py index e5ccbd494d..f253ec5d05 100644 --- a/zarr/tests/test_convenience.py +++ b/zarr/tests/test_convenience.py @@ -27,29 +27,44 @@ from zarr.storage import ( ConsolidatedMetadataStore, MemoryStore, + MemoryStoreV3, atexit_rmtree, getsize, ) -def test_open_array(path_type): +def _init_creation_kwargs(zarr_version): + kwargs = {'zarr_version': zarr_version} + if zarr_version == 3: + kwargs['path'] = 'dataset' + return kwargs + + +@pytest.mark.parametrize('zarr_version', [2, 3]) +def test_open_array(path_type, zarr_version): store = tempfile.mkdtemp() atexit.register(atexit_rmtree, store) store = path_type(store) + kwargs = _init_creation_kwargs(zarr_version) # open array, create if doesn't exist - z = open(store, mode='a', shape=100) + z = open(store, mode='a', shape=100, **kwargs) assert isinstance(z, Array) assert z.shape == (100,) # open array, overwrite - z = open(store, mode='w', shape=200) + z = open(store, mode='w', shape=200, **kwargs) assert isinstance(z, Array) assert z.shape == (200,) + if zarr_version == 3: + # cannot open a v3 array without path + with pytest.raises(ValueError): + open(store, mode='w', shape=200, zarr_version=3) + # open array, read-only - z = open(store, mode='r') + z = open(store, mode='r', **kwargs) assert isinstance(z, Array) assert z.shape == (200,) assert z.read_only @@ -59,44 +74,70 @@ def test_open_array(path_type): open('doesnotexist', mode='r') -def test_open_group(path_type): +@pytest.mark.parametrize("zarr_version", [2, 3]) +def test_open_group(path_type, zarr_version): store = tempfile.mkdtemp() atexit.register(atexit_rmtree, store) store = path_type(store) + kwargs = _init_creation_kwargs(zarr_version) # open group, create if doesn't exist - g = open(store, mode='a') + g = open(store, mode='a', **kwargs) g.create_group('foo') assert isinstance(g, Group) assert 'foo' in g # open group, overwrite - g = open(store, mode='w') + g = open(store, mode='w', **kwargs) assert isinstance(g, Group) assert 'foo' not in g + if zarr_version == 3: + # cannot open a v3 group without path + with pytest.raises(ValueError): + open(store, mode='w', zarr_version=3) + # open group, read-only - g = open(store, mode='r') + g = open(store, mode='r', **kwargs) assert isinstance(g, Group) assert g.read_only -def test_save_errors(): +@pytest.mark.parametrize("zarr_version", [2, 3]) +def test_save_errors(zarr_version): with pytest.raises(ValueError): # no arrays provided - save_group('data/group.zarr') + save_group('data/group.zarr', zarr_version=zarr_version) + with pytest.raises(TypeError): + # no array provided + save_array('data/group.zarr', zarr_version=zarr_version) with pytest.raises(ValueError): # no arrays provided - save('data/group.zarr') + save('data/group.zarr', zarr_version=zarr_version) + + +def test_zarr_v3_save_errors(): + x = np.ones(8) + with pytest.raises(ValueError): + # no path provided + save_group('data/group.zr3', x, zarr_version=3) + with pytest.raises(ValueError): + # no path provided + save_array('data/group.zr3', x, zarr_version=3) + with pytest.raises(ValueError): + # no path provided + save('data/group.zr3', x, zarr_version=3) -def test_lazy_loader(): +@pytest.mark.parametrize("zarr_version", [2, 3]) +def test_lazy_loader(zarr_version): foo = np.arange(100) bar = np.arange(100, 0, -1) - store = 'data/group.zarr' - save(store, foo=foo, bar=bar) - loader = load(store) + store = 'data/group.zarr' if zarr_version == 2 else 'data/group.zr3' + kwargs = _init_creation_kwargs(zarr_version) + save(store, foo=foo, bar=bar, **kwargs) + loader = load(store, **kwargs) assert 'foo' in loader assert 'bar' in loader assert 'baz' not in loader @@ -106,6 +147,8 @@ def test_lazy_loader(): assert_array_equal(bar, loader['bar']) +# TODO: consolidated metadata currently only supported for v2 + def test_consolidate_metadata(): # setup initial data @@ -250,9 +293,12 @@ def setUp(self): source['bar/qux'] = b'zzz' self.source = source + def _get_dest_store(self): + return dict() + def test_no_paths(self): source = self.source - dest = dict() + dest = self._get_dest_store() copy_store(source, dest) assert len(source) == len(dest) for key in source: @@ -262,7 +308,7 @@ def test_source_path(self): source = self.source # paths should be normalized for source_path in 'bar', 'bar/', '/bar', '/bar/': - dest = dict() + dest = self._get_dest_store() copy_store(source, dest, source_path=source_path) assert 2 == len(dest) for key in source: @@ -276,7 +322,7 @@ def test_dest_path(self): source = self.source # paths should be normalized for dest_path in 'new', 'new/', '/new', '/new/': - dest = dict() + dest = self._get_dest_store() copy_store(source, dest, dest_path=dest_path) assert len(source) == len(dest) for key in source: @@ -288,7 +334,7 @@ def test_source_dest_path(self): # paths should be normalized for source_path in 'bar', 'bar/', '/bar', '/bar/': for dest_path in 'new', 'new/', '/new', '/new/': - dest = dict() + dest = self._get_dest_store() copy_store(source, dest, source_path=source_path, dest_path=dest_path) assert 2 == len(dest) @@ -304,14 +350,14 @@ def test_excludes_includes(self): source = self.source # single excludes - dest = dict() + dest = self._get_dest_store() excludes = 'f.*' copy_store(source, dest, excludes=excludes) assert len(dest) == 2 assert 'foo' not in dest # multiple excludes - dest = dict() + dest = self._get_dest_store() excludes = 'b.z', '.*x' copy_store(source, dest, excludes=excludes) assert len(dest) == 1 @@ -320,7 +366,7 @@ def test_excludes_includes(self): assert 'bar/qux' not in dest # excludes and includes - dest = dict() + dest = self._get_dest_store() excludes = 'b.*' includes = '.*x' copy_store(source, dest, excludes=excludes, includes=includes) @@ -331,13 +377,13 @@ def test_excludes_includes(self): def test_dry_run(self): source = self.source - dest = dict() + dest = self._get_dest_store() copy_store(source, dest, dry_run=True) assert 0 == len(dest) def test_if_exists(self): source = self.source - dest = dict() + dest = self._get_dest_store() dest['bar/baz'] = b'mmm' # default ('raise') @@ -415,7 +461,14 @@ def check_copied_array(original, copied, without_attrs=False, for k in original.attrs.keys(): assert k not in copied.attrs else: - assert sorted(original.attrs.items()) == sorted(copied.attrs.items()) + if dest_h5py and 'filters' in original.attrs: + # special case in v3 (storing filters metadata under attributes) + # we explicitly do not copy this info over to HDF5 + original_attrs = original.attrs.asdict().copy() + original_attrs.pop('filters') + else: + original_attrs = original.attrs + assert sorted(original_attrs.items()) == sorted(copied.attrs.items()) def check_copied_group(original, copied, without_attrs=False, expect_props=None, @@ -469,10 +522,32 @@ def test_copy_all(): dry_run=False, ) + assert 'subgroup' in destination_group assert destination_group.attrs["info"] == "group attrs" assert destination_group.subgroup.attrs["info"] == "sub attrs" +def test_copy_all_v3(): + """ + https://github.com/zarr-developers/zarr-python/issues/269 + + copy_all used to not copy attributes as `.keys()` + + """ + original_group = zarr.group(store=MemoryStoreV3(), path='group1', overwrite=True) + original_group.create_group("subgroup") + + destination_group = zarr.group(store=MemoryStoreV3(), path='group2', overwrite=True) + + # copy from memory to directory store + copy_all( + original_group, + destination_group, + dry_run=False, + ) + assert 'subgroup' in destination_group + + class TestCopy: @pytest.fixture(params=[False, True], ids=['zarr', 'hdf5']) def source(self, request, tmpdir): @@ -715,3 +790,68 @@ def test_logging(self, source, dest, tmpdir): # bad option with pytest.raises(TypeError): copy(source['foo'], dest, dry_run=True, log=True) + + +class TestCopyV3(TestCopy): + + @pytest.fixture(params=[False, True], ids=['zarr', 'hdf5']) + def source(self, request, tmpdir): + def prep_source(source): + foo = source.create_group('foo') + foo.attrs['experiment'] = 'weird science' + baz = foo.create_dataset('bar/baz', data=np.arange(100), chunks=(50,)) + baz.attrs['units'] = 'metres' + if request.param: + extra_kws = dict(compression='gzip', compression_opts=3, fillvalue=84, + shuffle=True, fletcher32=True) + else: + extra_kws = dict(compressor=Zlib(3), order='F', fill_value=42, filters=[Adler32()]) + source.create_dataset('spam', data=np.arange(100, 200).reshape(20, 5), + chunks=(10, 2), dtype='i2', **extra_kws) + return source + + if request.param: + h5py = pytest.importorskip('h5py') + fn = tmpdir.join('source.h5') + with h5py.File(str(fn), mode='w') as h5f: + yield prep_source(h5f) + else: + yield prep_source(group(path='group1', zarr_version=3)) + + @pytest.fixture(params=[False, True], ids=['zarr', 'hdf5']) + def dest(self, request, tmpdir): + if request.param: + h5py = pytest.importorskip('h5py') + fn = tmpdir.join('dest.h5') + with h5py.File(str(fn), mode='w') as h5f: + yield h5f + else: + yield group(path='group2', zarr_version=3) + + def test_copy_array_create_options(self, source, dest): + dest_h5py = dest.__module__.startswith('h5py.') + + # copy array, provide creation options + compressor = Zlib(9) + create_kws = dict(chunks=(10,)) + if dest_h5py: + create_kws.update(compression='gzip', compression_opts=9, + shuffle=True, fletcher32=True, fillvalue=42) + else: + # v3 case has no filters argument in zarr create_kws + create_kws.update(compressor=compressor, fill_value=42, order='F') + copy(source['foo/bar/baz'], dest, without_attrs=True, **create_kws) + check_copied_array(source['foo/bar/baz'], dest['baz'], + without_attrs=True, expect_props=create_kws) + + def test_copy_group_no_name(self, source, dest): + if source.__module__.startswith('h5py'): + with pytest.raises(TypeError): + copy(source, dest) + else: + # For v3, dest.name will be inferred from source.name + copy(source, dest) + check_copied_group(source, dest[source.name.lstrip('/')]) + + copy(source, dest, name='root') + check_copied_group(source, dest['root'])