Skip to content

Commit f836cef

Browse files
committed
Don't sample dict result of a shuffle group when calculating its size
The size calculation for shuffle group results is very sensitive to sampling since there may be empty splits skewing the result. See also dask/distributed#4962
1 parent 0f2ba09 commit f836cef

File tree

2 files changed

+41
-1
lines changed

2 files changed

+41
-1
lines changed

dask/dataframe/backends.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
union_categoricals,
1313
)
1414

15+
from dask.sizeof import SimpleSizeof, sizeof
16+
1517
from ..utils import is_arraylike, typename
1618
from ._compat import PANDAS_GT_100
1719
from .core import DataFrame, Index, Scalar, Series, _Frame
@@ -341,6 +343,23 @@ def hash_object_pandas(
341343
)
342344

343345

346+
class ShuffleGroupResult(SimpleSizeof, dict):
347+
def __sizeof__(self) -> int:
348+
"""
349+
The result of the shuffle split are typically small dictionaries
350+
(#keys << 100; typically <= 32) The splits are often non-uniformly
351+
distributed. Some of the splits may even be empty. Sampling the
352+
dictionary for size estimation can cause severe errors.
353+
354+
See also https://github.com/dask/distributed/issues/4962
355+
"""
356+
total_size = super().__sizeof__()
357+
for k, df in self.items():
358+
total_size += sizeof(k)
359+
total_size += sizeof(df)
360+
return total_size
361+
362+
344363
@group_split_dispatch.register((pd.DataFrame, pd.Series, pd.Index))
345364
def group_split_pandas(df, c, k, ignore_index=False):
346365
indexer, locations = pd._libs.algos.groupsort_indexer(
@@ -352,7 +371,7 @@ def group_split_pandas(df, c, k, ignore_index=False):
352371
df2.iloc[a:b].reset_index(drop=True) if ignore_index else df2.iloc[a:b]
353372
for a, b in zip(locations[:-1], locations[1:])
354373
]
355-
return dict(zip(range(k), parts))
374+
return ShuffleGroupResult(zip(range(k), parts))
356375

357376

358377
@concat_dispatch.register((pd.DataFrame, pd.Series, pd.Index))

dask/sizeof.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,27 @@ def sizeof_python_collection(seq):
5959
return getsizeof(seq) + sum(map(sizeof, seq))
6060

6161

62+
class SimpleSizeof:
63+
"""Sentinel class to mark a class to be skipped by the dispatcher. This only
64+
works if this sentinel mixin is first in the mro.
65+
66+
Examples
67+
--------
68+
69+
>>> class TheAnswer(SimpleSizeof):
70+
... def __sizeof__(self):
71+
... return 42
72+
73+
>>> assert sizeof(TheAnswer()) == 42
74+
75+
"""
76+
77+
78+
@sizeof.register(SimpleSizeof)
79+
def sizeof_blocked(d):
80+
return getsizeof(d)
81+
82+
6283
@sizeof.register(dict)
6384
def sizeof_python_dict(d):
6485
return (

0 commit comments

Comments
 (0)