diff --git a/kerchunk/open_meteo.py b/kerchunk/open_meteo.py new file mode 100644 index 00000000..9b3f869f --- /dev/null +++ b/kerchunk/open_meteo.py @@ -0,0 +1,411 @@ +import base64 +import datetime +import io +import json +import logging +import math +import os +import pathlib +from dataclasses import dataclass +from enum import Enum + +import fsspec.core +import numcodecs +import numcodecs.abc +import numcodecs.compat +import numpy as np + +from .utils import ( + _encode_for_JSON, + dict_to_store, + translate_refs_serializable, +) + +try: + import omfiles +except ModuleNotFoundError: # pragma: no cover + raise ImportError( + "omfiles is required for kerchunking Open-Meteo files. Please install with " + "`pip install omfiles`." + ) + +logger = logging.getLogger("kerchunk.open_meteo") + +class SupportedDomain(Enum): + dwd_icon = "dwd_icon" + + # Potentially all this information should be available in the + # meta.json file under /data/{domain}/{static}/meta.json + def s3_chunk_size(self) -> int: + """Defines how many timesteps in native model-dt are in the om-files on AWS S3""" + if self == SupportedDomain.dwd_icon: + return 253 + else: + raise ValueError(f"Unsupported domain {self}") + + +@dataclass +class StartLengthStepSize: + start: datetime.datetime + length: int + step_dt: int # in seconds + + +def chunk_number_to_start_time(domain: SupportedDomain, chunk_no: int) -> StartLengthStepSize: + meta_file = f"openmeteo/data/{domain.value}/static/meta.json" + # Load metadata from S3 + fs = fsspec.filesystem(protocol="s3", anon=True) + with fs.open(meta_file, mode="r") as f: + metadata = json.load(f) + + dt_seconds = metadata["temporal_resolution_seconds"] + om_file_length = domain.s3_chunk_size() + + seconds_since_epoch = chunk_no * om_file_length * dt_seconds + + epoch = datetime.datetime(1970, 1, 1) + chunk_start = epoch + datetime.timedelta(seconds=seconds_since_epoch) + print("chunk_start", chunk_start) + return StartLengthStepSize(start=chunk_start, length=om_file_length, step_dt=dt_seconds) + + +class Reshape(numcodecs.abc.Codec): + """Codec to reshape data between encoding and decoding. + + This codec reshapes the data to a specific shape before passing it to the next + filter in the pipeline, which is particularly useful for filters like Delta2D + that expect 2D data. + + Parameters + ---------- + shape : tuple + Shape to reshape the data to during decoding + """ + + codec_id = 'reshape' + + def __init__(self, shape): + self.shape = tuple(shape) + + def encode(self, buf): + # For encoding, we flatten back to 1D + arr = numcodecs.compat.ensure_ndarray(buf) + return arr.reshape(-1) + + def decode(self, buf, out=None): + print(f"Reshape decode to {self.shape}") + # Reshape to the specified 2D shape for delta2d + arr = numcodecs.compat.ensure_ndarray(buf) + + # Check if total elements match + expected_size = np.prod(self.shape) + if arr.size != expected_size: + raise ValueError(f"Buffer size {arr.size} doesn't match expected size {expected_size}") + + # Reshape + arr = arr.reshape(self.shape) + print("arr.shape", arr.shape) + return arr + + def get_config(self): + return {'id': self.codec_id, 'shape': self.shape} + + def __repr__(self): + return f'{type(self).__name__}(shape={self.shape!r})' + +class Delta2D(numcodecs.abc.Codec): + """Codec to encode data as the difference between adjacent rows in a 2D array.""" + + codec_id = 'delta2d' + + def __init__(self, dtype, astype=None): + self.dtype = np.dtype(dtype) + if astype is None: + self.astype = self.dtype + else: + self.astype = np.dtype(astype) + if self.dtype == np.dtype(object) or self.astype == np.dtype(object): + raise ValueError('object arrays are not supported') + + def encode(self, buf): + arr = numcodecs.compat.ensure_ndarray(buf).view(self.dtype) + if arr.ndim != 2: + raise ValueError("Delta2D only works with 2D arrays") + enc = arr.astype(self.astype, copy=True) + if enc.shape[0] > 1: + for d0 in range(enc.shape[0]-1, 0, -1): + enc[d0] -= enc[d0-1] + return enc + + def decode(self, buf, out=None): + print("Delta2D decode") + print("buf.shape", buf.shape) + enc = numcodecs.compat.ensure_ndarray(buf).view(self.astype) + if enc.ndim != 2: + raise ValueError("Delta2D only works with 2D arrays") + if out is not None: + dec = out.view(self.dtype) + if dec.shape != enc.shape: + raise ValueError("Output array has wrong shape") + else: + dec = np.empty_like(enc, dtype=self.dtype) + dec[0] = enc[0] + if enc.shape[0] > 1: + for d0 in range(1, enc.shape[0]): + dec[d0] = enc[d0] + dec[d0-1] + return dec if out is None else out + + def get_config(self): + return {'id': self.codec_id, 'dtype': self.dtype.str, 'astype': self.astype.str} + + def __repr__(self): + r = f'{type(self).__name__}(dtype={self.dtype.str!r}' + if self.astype != self.dtype: + r += f', astype={self.astype.str!r}' + r += ')' + return r + +# Register codecs +# NOTE: TurboPfor is register as `turbo_pfor` by omfiles already +numcodecs.register_codec(Delta2D, "delta2d") +numcodecs.register_codec(Reshape, "reshape") + + +class SingleOmToZarr: + """Translate a .om file into Zarr metadata""" + + def __init__( + self, + om_file, + url=None, + spec=1, + inline_threshold=500, + storage_options=None, + chunk_no=None, + domain=None, + reference_time=None, + time_step=3600, + ): + # Initialize a reader for your om file + if isinstance(om_file, (pathlib.Path, str)): + fs, path = fsspec.core.url_to_fs(om_file, **(storage_options or {})) + self.input_file = fs.open(path, "rb") + url = om_file + self.reader = omfiles.OmFilePyReader(self.input_file) + elif isinstance(om_file, io.IOBase): + self.input_file = om_file + self.reader = omfiles.OmFilePyReader(self.input_file) + else: + raise ValueError("type of input `om_file` not recognized") + + self.url = url if url else om_file + self.spec = spec + self.inline = inline_threshold + self.store_dict = {} + self.store = dict_to_store(self.store_dict) + self.name = "data" # FIXME: This should be the name from om-variable + + if domain is not None and chunk_no is not None: + start_step = chunk_number_to_start_time(domain=domain, chunk_no=chunk_no) + # Store time parameters + self.reference_time = start_step.start + self.time_step = start_step.step_dt + else: + self.reference_time = None + self.time_step = None + + def translate(self): + """Main method to create the kerchunk references""" + # 1. Extract metadata about shape, dtype, chunks, etc. + shape = self.reader.shape + dtype = self.reader.dtype + chunks = self.reader.chunk_dimensions + scale_factor = self.reader.scale_factor + add_offset = self.reader.add_offset + lut = self.reader.get_complete_lut() + + # Get dimension names if available, otherwise use defaults + # FIXME: Currently we don't have dimension names exposed by the reader (or even necessarily in the file) + dim_names = getattr(self.reader, "dimension_names", ["x", "y", "time"]) + + # Calculate number of chunks in each dimension + chunks_per_dim = [math.ceil(s/c) for s, c in zip(shape, chunks)] + + # 2. Create Zarr array metadata (.zarray) + blocksize = chunks[0] * chunks[1] * chunks[2] if len(chunks) >= 3 else chunks[0] * chunks[1] + + zarray = { + "zarr_format": 2, + "shape": shape, + "chunks": chunks, + "dtype": str(dtype), + "compressor": {"id": "turbo_pfor", "chunk_elements": blocksize}, # As main compressor + "fill_value": None, + "order": "C", + "filters": [ + {"id": "fixedscaleoffset", "scale": scale_factor, "offset": add_offset, "dtype": "f4", "astype": "i2"}, + {"id": "delta2d", "dtype": " 0 and chunk_size < self.inline: + # Read the chunk data and inline it + self.input_file.seek(lut[chunk_idx]) + data = self.input_file.read(chunk_size) + try: + # Try to decode as ASCII + self.store_dict[key] = data.decode('ascii') + except UnicodeDecodeError: + # If not ASCII, encode as base64 + self.store_dict[key] = b"base64:" + base64.b64encode(data) + else: + # Otherwise store as reference + self.store_dict[key] = [self.url, lut[chunk_idx], chunk_size] + + # 5. Create coordinate arrays. TODO: This needs to be improved + # Add coordinate arrays for ALL dimensions + for i, dim_name in enumerate(dim_names): + dim_size = shape[i] + if dim_name == "time": + # Special handling for time dimension + self._add_time_coordinate(dim_size, i) + else: + print(f"No coordinates for dimension {dim_name}") + continue + + # Convert to proper format for return + if self.spec < 1: + print("self.spec < 1") + return self.store + else: + print("translate_refs_serializable") + translate_refs_serializable(self.store_dict) + store = _encode_for_JSON(self.store_dict) + return {"version": 1, "refs": store} + + def _add_time_coordinate(self, time_dim, time_axis=0): + """Add a time coordinate array following CF conventions""" + + # Always use standard CF epoch reference + units = "seconds since 1970-01-01T00:00:00Z" + + # Create CF-compliant units string from reference_time + if self.reference_time is not None and self.time_step is not None: + ref_time = self.reference_time + + # Format the reference time as CF-compliant string + if isinstance(ref_time, datetime.datetime): + # Calculate hours since epoch (1970-01-01) + epoch = datetime.datetime(1970, 1, 1, 0, 0, 0) + seconds_since_epoch = int((ref_time - epoch).total_seconds()) + + # Generate time values with integer seconds + time_values = np.arange( + seconds_since_epoch, + seconds_since_epoch + time_dim * self.time_step, + self.time_step, + dtype='i8' + ) + else: + raise TypeError("expected datetime.datetime object as self.reference_time") + else: + raise TypeError("expected datetime.datetime object as self.reference_time") + + # Create time array metadata + time_zarray = { + "zarr_format": 2, + "shape": [time_dim], + "chunks": [time_dim], + "dtype": " 0: + print(f"First timestamp: {time_values[0]} seconds since 1970-01-01, Last: {time_values[-1]}") + + def _get_chunk_coords(self, idx, chunks_per_dim): + """Convert linear chunk index to multidimensional coordinates + + Parameters + ---------- + idx : int + Linear chunk index + chunks_per_dim : list + Number of chunks in each dimension + + Returns + ------- + list + Chunk coordinates in multidimensional space + """ + coords = [] + remaining = idx + + # Start from the fastest-changing dimension (C-order) + for chunks_in_dim in reversed(chunks_per_dim): + coords.insert(0, remaining % chunks_in_dim) + remaining //= chunks_in_dim + + return coords + + def _get_file_size(self): + """Get the total file size""" + current_pos = self.input_file.tell() + self.input_file.seek(0, os.SEEK_END) + size = self.input_file.tell() + self.input_file.seek(current_pos) + return size + + def close(self): + """Close the reader""" + if hasattr(self, 'reader'): + self.reader.close() diff --git a/pyproject.toml b/pyproject.toml index b383f04e..b2e91b61 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,7 @@ dev = [ "scipy", "netcdf4", "pytest-subtests", + "omfiles @ git+https://github.com/open-meteo/python-omfiles.git@codecs" ] [project.urls] diff --git a/tests/test_open_meteo.py b/tests/test_open_meteo.py new file mode 100644 index 00000000..5a4d86a9 --- /dev/null +++ b/tests/test_open_meteo.py @@ -0,0 +1,187 @@ +import os + +import fsspec +import numpy as np +import omfiles +import pytest +import zarr + +from kerchunk.combine import MultiZarrToZarr +from kerchunk.df import refs_to_dataframe +from kerchunk.open_meteo import SingleOmToZarr, SupportedDomain +from kerchunk.utils import fs_as_store + +zarr.config.config["async"]["concurrency"] = 1 + +def test_single_om_to_zarr(): + # Path to test file - adjust as needed + chunk_no=1914 + test_file = f"tests/rh_icon_chunk{chunk_no}.om" + """Test creating references for a single OM file and reading via Zarr""" + # Create references using SingleOmToZarr + om_to_zarr = SingleOmToZarr(test_file, inline_threshold=0, domain=SupportedDomain.dwd_icon, chunk_no=chunk_no) # No inlining for testing + references = om_to_zarr.translate() + om_to_zarr.close() + + # Save references to json if desired. These are veryyy big... + # with open("om_refs.json", "w") as f: + # json.dump(references, f) + + # Save references to parquet + refs_to_dataframe( + fo=references, # References dict + url="om_refs_parquet/", # Output directory + record_size=100000, # Records per file, adjust as needed + categorical_threshold=10 # URL encoding efficiency + ) + + # Create a filesystem from the references + fs = fsspec.filesystem("reference", fo=references) + store = fs_as_store(fs) + + # Open with zarr + group = zarr.open(store, zarr_format=2) + print("group['time']", group["time"][:]) + z = group["data"] # Here we just use a dummy data name we have hardcoded in SingleOmToZarr + + print("z.shape", z.shape) + print("z.chunks", z.chunks) + + # Verify basic metadata matches original file + reader = omfiles.OmFilePyReader(test_file) + assert list(z.shape) == reader.shape, f"Shape mismatch: {z.shape} vs {reader.shape}" + assert str(z.dtype) == str(reader.dtype), f"Dtype mismatch: {z.dtype} vs {reader.dtype}" + assert list(z.chunks) == reader.chunk_dimensions, f"Chunks mismatch: {z.chunks} vs {reader.chunk_dimensions}" + + # TODO: Using the following chunk_index leads to a double free / corruption error! + # Even with a concurrency of 1: `zarr.config.config["async"]["concurrency"] = 1` + # Most likely, because zarr and open-meteo treat partial chunks differently: + # om-files encode partial chunks with a reduced dimension, while zarr most likely expects a full block of data? + # chunk_index = (slice(90, 100), 2878, ...) + + # Test retrieving a specific chunk + chunk_index = (5, 5, ...) + + # Get direct chunk data + direct_data = reader[chunk_index] + + # Now get the same chunk via kerchunk/zarr + # Get the data via zarr + try: + print("z", z) + zarr_data = z[chunk_index] + print("zarr_data", zarr_data) + + # Compare the data + print(f"Direct data shape: {direct_data.shape}, Zarr data shape: {zarr_data.shape}") + print(f"Direct data min/max: {np.min(direct_data)}/{np.max(direct_data)}") + print(f"Zarr data min/max: {np.min(zarr_data)}/{np.max(zarr_data)}") + + # Assert data is equivalent + np.testing.assert_allclose(zarr_data, direct_data, rtol=1e-5) + + print("✓ Data from kerchunk matches direct access!") + except Exception as e: + pytest.fail(f"Failed to read kerchunked data through zarr: {e}") + + # Clean up + reader.close() + +def test_multizarr_to_zarr(): + """Test combining two OM files into a single kerchunked Zarr reference""" + chunk_no1=1914 + chunk_no2=1915 + file1 = f"tests/rh_icon_chunk{chunk_no1}.om" + file2 = f"tests/rh_icon_chunk{chunk_no2}.om" + assert os.path.exists(file1), f"{file1} not found" + assert os.path.exists(file2), f"{file2} not found" + refs1 = SingleOmToZarr(file1, inline_threshold=0, domain=SupportedDomain.dwd_icon, chunk_no=chunk_no1).translate() + refs2 = SingleOmToZarr(file2, inline_threshold=0, domain=SupportedDomain.dwd_icon, chunk_no=chunk_no2).translate() + + # Combine using MultiZarrToZarr + mzz = MultiZarrToZarr( + [refs1, refs2], + concat_dims=["time"], + coo_map={"time": "data:time"}, + identical_dims=["y", "x"], + remote_protocol=None, + remote_options=None, + target_options=None, + inline_threshold=0, + out=None, + ) + combined_refs = mzz.translate() + + # Save references to parquet + refs_to_dataframe( + fo=combined_refs, # References dict + url="om_refs_mzz_parquet/", # Output directory + record_size=100000, # Records per file, adjust as needed + categorical_threshold=10 # URL encoding efficiency + ) + + # Open with zarr + fs = fsspec.filesystem("reference", fo=combined_refs) + store = fs_as_store(fs) + group = zarr.open(store, zarr_format=2) + z = group["data"] + + # Open both original files for comparison + reader1 = omfiles.OmFilePyReader(file1) + reader2 = omfiles.OmFilePyReader(file2) + + # Check that the combined shape is the sum along the time axis + expected_shape = list(reader1.shape) + expected_shape[2] += reader2.shape[2] + assert list(z.shape) == expected_shape, f"Combined shape mismatch: {z.shape} vs {expected_shape}" + + # Check that the first part matches file1 and the second part matches file2 + # (Assume 3D: time, y, x) + slc1 = (5, 5, slice(0, reader1.shape[2])) + slc2 = (5, 5, slice(reader1.shape[2], expected_shape[2])) + np.testing.assert_allclose(z[slc1], reader1[slc1], rtol=1e-5) + np.testing.assert_allclose(z[slc2], reader2[slc1], rtol=1e-5) + + print("✓ MultiZarrToZarr combined data matches originals!") + + reader1.close() + reader2.close() + + +# def test_om_to_xarray(): +# """Test opening kerchunked OM file with xarray""" +# import xarray as xr + +# # Create references using SingleOmToZarr +# om_to_zarr = SingleOmToZarr(test_file) +# references = om_to_zarr.translate() +# om_to_zarr.close() + +# # Open with xarray +# store = refs_as_store(references) + +# try: +# # Open dataset with xarray using zarr engine +# ds = xr.open_zarr(store, consolidated=False) + +# # Basic validation +# reader = omfiles.OmFilePyReader(test_file) +# assert ds.dims == dict(zip(["time", "y", "x"], reader.shape)) + +# # Get some data to verify decompression pipeline +# data_sample = ds.isel(time=0, y=0, x=slice(0, 5)).data +# assert data_sample.shape == (5,) + +# print(f"Successfully opened dataset with xarray: {ds}") +# print(f"Sample data: {data_sample}") + +# reader.close() +# except Exception as e: +# pytest.fail(f"Failed to open with xarray: {e}") + + +if __name__ == "__main__": + # Run tests directly if file is executed + test_single_om_to_zarr() + test_multizarr_to_zarr() + # test_om_to_xarray()