You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, reaching out because I am running into performance issues trying to run my workflow on a HPC environment with thousands of tasks. I might be doing this wrong but also seeking help.
To explain my workflow:
I am trying to use a Zarr.DirectoryStore to store thousands of xarray.Datasets (>10_000) that are about 10-18MB each. I start by scraping all the directories and generating a list of futures with a Dataset containing data from that directory. I then want to use a Zarr.DirectoryStore store to sink all my data but with the caveat that I want to store it into a specific group name.
So workflow looks something like this:
defscrape_dir(dir_name):
foo=GenerateDataset(dir_name)
dataset=foo.run()
returndatasetdefwrite_to_store(dataset, file_name, mode_="w", append_dim=None):
lock=fasteners.InterProcessLock(os.getcwd()+"/.lock")
withlock:
try:
synchronizer=zarr.ProcessSynchronizer(os.getcwd()+'/zarr.sync')
path=os.path.join(os.getcwd(), file_name)
store=zarr.DirectoryStore(path=path)
dataset.to_zarr(
store=store,
mode=mode_,
synchronizer=synchronizer,
group="MY_GROUP",
encoding=None,
compute=True,
consolidated=True,
append_dim=append_dim
)
msg="success"exceptExceptionase:
msg=f"failed! reason: {e}"returnmsg# generate datasets as futuresfuture_datasets= [client.submit(scrape_dir, dir_name) fordir_nameindirectories]
# get first dataset to start the Zarr storedataset_1=future_datasets[0].result()
# write initial write_to_store(dataset=dataset_1, file_name="myzarr_store.zarr", mode_="w", append_dim=None)
# write rest of datasets as futures msgs= []
fordsinfuture_datasets[1:]:
msg=client.submit(write_to_store, dataset=ds, file_name="myzarr_store.zarr", mode_="a", append_dim="case")
msgs.append(msg)
As you can see I had to add my own file locking because without it Zarr would just hang.
I've been experimenting with other ways such as using xr.concat(futures) while the Datasets are sitting on my Dask workers but that concat function takes forever and even kills my cluster.
I've also dumped my Datasets as netCDF files and later used xr.open_mfdataset to open them up and store as a Zarr store but that defeats the purpose of dumping into Zarr while I am still holding on to the Datasets on my workers.
I'd wish that there was an xr.open_mfdataset(path=futures) that allowed me to pass futures and then allow it to save to Zarr without having to save to Disk and then load them up again but I did not find anything like that.
So yeah, open to suggestions as how I should go about this workflow? Many thanks in advance !!
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi, reaching out because I am running into performance issues trying to run my workflow on a HPC environment with thousands of tasks. I might be doing this wrong but also seeking help.
To explain my workflow:
I am trying to use a
Zarr.DirectoryStore
to store thousands ofxarray.Datasets
(>10_000) that are about 10-18MB each. I start by scraping all the directories and generating a list of futures with a Dataset containing data from that directory. I then want to use aZarr.DirectoryStore
store to sink all my data but with the caveat that I want to store it into a specific group name.So workflow looks something like this:
As you can see I had to add my own file locking because without it Zarr would just hang.
I've been experimenting with other ways such as using
xr.concat(futures)
while the Datasets are sitting on my Dask workers but thatconcat
function takes forever and even kills my cluster.I've also dumped my Datasets as netCDF files and later used
xr.open_mfdataset
to open them up and store as a Zarr store but that defeats the purpose of dumping into Zarr while I am still holding on to the Datasets on my workers.I'd wish that there was an
xr.open_mfdataset(path=futures)
that allowed me to pass futures and then allow it to save to Zarr without having to save to Disk and then load them up again but I did not find anything like that.So yeah, open to suggestions as how I should go about this workflow? Many thanks in advance !!
Beta Was this translation helpful? Give feedback.
All reactions