Concurrent data downloading and appending to zarr store #2532
-
I'm trying to speed up the download and processing of many netcdf files by running concurrent processes.
So I start with dates = pd.date_range(DATE_START, DATE_END, freq="1D")
# Write the first file to make sure we start with something
download_file(dates[0])
# then proceed with the others
process_map(download_file, dates, max_workers=MAX_WORKERS, chunksize=CHUNKSIZE) ( The function with fsspec.open(url, mode="rb") as f_remote:
with gzip.GzipFile(fileobj=f_remote) as f_decompressed:
with io.BytesIO(f_decompressed.read()) as f_buffer:
ds = xr.open_dataset(f_buffer).squeeze()
subset = ds.sel(x=slice(x_min, x_max), y=slice(y_min, y_max))
subset = subset.expand_dims("time").drop_vars('projection')
if not os.path.exists(OUTPUT_ZARR):
subset.to_zarr(OUTPUT_ZARR, mode="w", consolidated=True)
else: # Append to the existing Zarr dataset
subset.to_zarr(OUTPUT_ZARR, mode="a", append_dim="time", consolidated=True) This works when I run the script, but when I try to open the dataset with
If I look into the folder I can indeed see that the variable Surely there must be a smarter way of downloading and appending to a zarr in parallel. |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 4 replies
-
Thanks for sharing! It seems like there is a logical problem with what you're trying to do here. Appending over the time dimension has to be done in the correct order. You want the first file to be first, then append the second file, then the third, etc. so that the Zarr dataset has the correct temporal order. Because it is sequential, this operation can't be parallelized! Your However, there is another approach you can use here that is parallelizable. Since you know the shape and coordinates of the final dataset in advance, you can pre-allocate the entire domain and then write to each time slice in parallel. In this mode of operation, you use Finally, when doing complex distributed writes to Zarr, it can be really useful to have a store that supports transactional updates and rollbacks in case something goes wrong. Have a look at our new Icechunk project if you're curious about that. |
Beta Was this translation helpful? Give feedback.
-
Cannot believe it was THAT easy. import xarray as xr
import fsspec
import gzip
import io
import pyproj
import pandas as pd
from tqdm.contrib.concurrent import process_map
import dask.array as da
import logging
# Define lat/lon box boundaries
lat_min, lat_max = 65, 25 # Replace with your values
lon_min, lon_max = -40, 40 # Replace with your values
# Create a transformer for polar stereographic projection
transformer = pyproj.Transformer.from_crs(
"EPSG:4326",
"+proj=stere +lat_0=90 +lat_ts=60 +lon_0=-80 +k=1 +x_0=0 +y_0=0 +a=6378137 +b=6356257 +units=m +no_defs",
always_xy=True,
)
# Transform the lat/lon boundaries to x/y boundaries
x_min, y_min = transformer.transform(lon_min, lat_min)
x_max, y_max = transformer.transform(lon_max, lat_max)
RESOLUTION="4km"
OUTPUT_ZARR = f"/Users/guidocioni/Downloads/data_{RESOLUTION}.zarr"
DATE_START="2005-01-01"
DATE_END="2024-12-03"
MAX_WORKERS=10
CHUNKSIZE=10
# Helper function to generate the URL for a given date
def get_url_for_date(date):
if RESOLUTION == "1km":
version = "v1.3"
else:
if date < pd.to_datetime("2014-12-03"):
version = "v1.2"
else:
version = "v1.3"
return f"https://noaadata.apps.nsidc.org/NOAA/G02156/netcdf/{RESOLUTION}/{date.year}/ims{date.year}{date.day_of_year:03d}_{RESOLUTION}_{version}.nc.gz"
# Parallel version
def main():
# Process each year
dates = pd.date_range(DATE_START, DATE_END, freq="1D")
# Initialize the zarr store
url = get_url_for_date(dates[0])
with fsspec.open(url, mode="rb") as f_remote:
with gzip.GzipFile(fileobj=f_remote) as f_decompressed:
with io.BytesIO(f_decompressed.read()) as f_buffer:
ds = xr.open_dataset(f_buffer).squeeze()
subset = ds.sel(x=slice(x_min, x_max), y=slice(y_min, y_max)).drop_vars('projection').expand_dims("time")
# Initialize the Zarr store with the correct dimensions
shape = (len(dates), subset.sizes['y'], subset.sizes['x'])
chunks = (1, subset.sizes['y'], subset.sizes['x']) # Write 1 time step at a time
template = xr.Dataset(
data_vars={
"IMS_Surface_Values": (
('time', 'y', 'x'),
da.zeros(shape, chunks=chunks, dtype=subset.IMS_Surface_Values.dtype)
)
},
coords={
'time': dates,
'y': subset['y'],
'x': subset['x']
}
)
template.to_zarr(OUTPUT_ZARR, mode="w", compute=False)
# Now append first time step that we already downloaded
subset.to_zarr(OUTPUT_ZARR, region="auto")
process_map(download_time_step, dates[1:], max_workers=MAX_WORKERS, chunksize=CHUNKSIZE)
def download_time_step(date):
url = get_url_for_date(date)
try:
with fsspec.open(url, mode="rb") as f_remote:
with gzip.GzipFile(fileobj=f_remote) as f_decompressed:
with io.BytesIO(f_decompressed.read()) as f_buffer:
ds = xr.open_dataset(f_buffer).squeeze()
subset = ds.sel(x=slice(x_min, x_max), y=slice(y_min, y_max)).drop_vars('projection').expand_dims("time")
subset.to_zarr(OUTPUT_ZARR, region="auto")
except Exception as e:
logging.error(
f"Could not process {url}: {type(e).__name__} at line {e.__traceback__.tb_lineno} of {__file__}: {e}")
if __name__ == "__main__":
main() |
Beta Was this translation helpful? Give feedback.
-
Hey @rabernat just curiosity. Of course the serial approach (appending one instant after another sequentially) would still be available but this could become slow in case there are many timesteps to append. Thanks again |
Beta Was this translation helpful? Give feedback.
Cannot believe it was THAT easy.
Thanks to the suggestions of @rabernat I was able to create the following example which does exactly what it's supposed to do by only writing the region specific to a certain time (which is automatically detected by xarray).
The result is that I'm able to download and process about 20 years of daily data in about 7 minutes, and this could be improved by using even more workers (but then the network will probably become the bottleneck).
I'm leaving this here in case someone has the same problem.