-
-
Notifications
You must be signed in to change notification settings - Fork 736
Description
What happened:
I have a graph for a very common physical science use case which currently performs extremely poorly on the distributed scheduler.
I would like to:
(a) Get this running somehow as soon as I can,
(b) Propose this as a benchmark example for the distributed scheduler.
This workload is the generalization @gjoseph92 mentioned in #6360 (comment).
Why this example is interesting:
After talking to @gjoseph92 I believe this example is useful because whilst minimal it still demonstrates three big scheduling consideration / problems at once:
- "Root task overproduction" (see Ease memory pressure by deprioritizing root tasks? #6360),
- "Widely-shared dependencies" (see Ignore widely-shared dependencies in
decide_worker
#5325), - "Root task co-location". (This was fixed in Co-assign root-ish tasks #4967 but any solution to (1) and (2) should retain this feature, else this example will still behave poorly).
By minorly adjusting the full example one can easily test the performance of any combination of these three considerations.
It also represents a simple to describe and commonly-desired geospatial-type calculation. I expect that making this example work better would immediately improve the distributed scheduler's performance for very many pangeo and xarray-type problems, perhaps even for the overall majority of users in those communities. (I think @rabernat and @jbusecke would concur.) Some performance issues raised by pangeo users in previous issues (especially #2602) can be much more easily replicated using this type of workload.
What you expected to happen:
I expected this example to perform well with the distributed scheduler: this workload came about after an extensive refactor of xGCM to pipe all our operations through xarray.apply_ufunc
, and the resulting graph can't really be simplified much further. Prior to raising this I also messed around a lot with trying to fuse task chains, but that doesn't really help.
Minimal Complete Verifiable Example:
The essential part of the problem is basically just this:
def vort(u, v, dx, dy):
return (dx * u - dy * v)
vort(u, v, dx, dy).mean()
where u
and v
have dimensions (x, y, time)
, and are chunked along time
, whilst dx
and dy
have dimensions (x, y)
, and are not chunked, so get broadcast against all the time chunks.
I've gone into much more detail in this notebook though.
Anything else we need to know?:
The example is pure-dask, only inspired by xGCM (which uses xarray to perform basically all its dask operations).
How can I get this running in the meantime?
Before a full solution to (1) and (2) above are available, is there a way I can get this running smoothly in the meantime?
My ideas include:
- Inlining the creation of
dx
anddy
to remove all the cross-dependencies causing (2), - Scattering the metrics data onto all workers to remove all the cross-dependencies causing (2).
- Performing the calculation on subsets of the data using
map_blocks
instead.
But any input would be appreciated.