Skip to content

Commit 01e510d

Browse files
committed
Ensure shuffle split operations are blacklisted from work stealing
If shuffle split tasks are not blacklisted from work stealing, this can have catastrophic effects on performance. See also dask#4962
1 parent 9f4165a commit 01e510d

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

distributed/stealing.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -452,4 +452,4 @@ def _can_steal(thief, ts, victim):
452452
return True
453453

454454

455-
fast_tasks = {"shuffle-split"}
455+
fast_tasks = {"split-shuffle"}

distributed/tests/test_steal.py

+35
Original file line numberDiff line numberDiff line change
@@ -827,3 +827,38 @@ async def test_balance_with_longer_task(c, s, a, b):
827827
) # a task after y, suggesting a, but open to b
828828
await z
829829
assert z.key in b.data
830+
831+
832+
@gen_cluster(client=True)
833+
async def test_blacklist_shuffle_split(c, s, a, b):
834+
835+
pd = pytest.importorskip("pandas")
836+
dd = pytest.importorskip("dask.dataframe")
837+
npart = 10
838+
df = dd.from_pandas(pd.DataFrame({"A": range(100), "B": 1}), npartitions=npart)
839+
graph = df.shuffle(
840+
"A",
841+
shuffle="tasks",
842+
# If we don't have enough partitions, we'll fall back to a simple shuffle
843+
max_branch=npart - 1,
844+
).sum()
845+
res = c.compute(graph)
846+
847+
while not s.tasks:
848+
await asyncio.sleep(0.005)
849+
prefixes = set(s.task_prefixes.keys())
850+
from distributed.stealing import fast_tasks
851+
852+
blacklisted = fast_tasks & prefixes
853+
assert blacklisted
854+
assert any(["split" in prefix for prefix in blacklisted])
855+
856+
stealable = s.extensions["stealing"].stealable
857+
while not res.done():
858+
for tasks_per_level in stealable.values():
859+
for tasks in tasks_per_level:
860+
for ts in tasks:
861+
assert ts.prefix.name not in fast_tasks
862+
assert "split" not in ts.prefix.name
863+
await asyncio.sleep(0.001)
864+
await res

0 commit comments

Comments
 (0)