Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow single chunk download using s3fs storage option. #2662

Open
JoshCu opened this issue Jan 6, 2025 · 2 comments
Open

Slow single chunk download using s3fs storage option. #2662

JoshCu opened this issue Jan 6, 2025 · 2 comments

Comments

@JoshCu
Copy link

JoshCu commented Jan 6, 2025

When using s3fs with zarr xarray and dask, I found that if the data subset being fetched was all within a single chunk then the download was single threaded (as far as I can tell) and as a result quite slow.
For larger subsets spanning multiple chunks this isn't an issue as it quickly becomes network bandwidth limited. With the exception of the end of a download where as threads finish downloading, you're stuck waiting for the last few threads to finish up with network bandwidth to spare.

The s3fs cat_file function does take range input parameters but as far as I could tell they weren't being used.
For my use case, I overwrote the s3fs function to always check the size of the file being downloaded and chunk the download with range requests if it was over a certain size. Even with this additional logic slowing down all other requests for metadata etc, it cut down the ~5 minute download to 20s.

I initialized a mfdataset like this
link

def load_zarr_datasets(forcing_vars: list[str] = None) -> xr.Dataset:
    if not forcing_vars:
        forcing_vars = ["lwdown", "precip", "psfc", "q2d", "swdown", "t2d", "u2d", "v2d"]
    # if a LocalCluster is not already running, start one
    try:
        client = Client.current()
    except ValueError:
        cluster = LocalCluster()
        client = Client(cluster)
    s3_urls = [
        f"s3://noaa-nwm-retrospective-3-0-pds/CONUS/zarr/forcing/{var}.zarr"
        for var in forcing_vars
    ]
    # default cache is readahead which is detrimental to performance in this case
    fs = S3ParallelFileSystem(anon=True, default_cache_type="none")  # default_block_size
    s3_stores = [s3fs.S3Map(url, s3=fs) for url in s3_urls]
    dataset = xr.open_mfdataset(s3_stores, parallel=True, engine="zarr", cache=True)
    return dataset

and save it like this

client = Client.current()
future = client.compute(dataset.to_netcdf(temp_path, compute=False))
# Display progress bar
progress(future)
future.result()

This may just be a workaround for the real solution of rechunking the data into smaller pieces, but I'm not really able to do that. At the very least I hope this is useful to save someone else some time if they come across the same issue.

Happy to expand on this with more info, tests, and examples if it's useful?

@d-v-b
Copy link
Contributor

d-v-b commented Jan 7, 2025

@JoshCu that's very interesting. in the main branch (and in the upcoming zarr-python 3.0 release) we made the entire IO layer async, with the intention of facilitating exactly these kinds of performance optimizations. I wonder what your improvements would look like the zarr-python 3.0 version of FSStore, which we are currently calling FSSpecStore. Would you be interested in opening a PR?

@JoshCu
Copy link
Author

JoshCu commented Jan 7, 2025

gladly! It might take me a few days to update everything to the new versions and get test cases for zarr standalone and also the dask+xarray+zarr stack working, but I'll definitely have a look into it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants