Skip to content

Commit 2089e79

Browse files
authored
Refactor task_groups & task_prefixes (#4452)
* Refactor `task_groups` & `task_prefixes` Moves `task_groups` and `task_prefixes` to `SchedulerState` where they are type annotated. Then uses them through `parent` within `Scheduler`. Allows Cython to recognize these are Python `dict`s and optimize calls and operations on them. * Use `.get(...)` instead of `try...except...` Instead of using `try...except...` to catch and handle `KeyError`s, just use `.get(...), which gets the key needed or returns `None`. This has less overhead. Also the following `None` check is a quick pointer comparison. Otherwise the code is unchanged. * Assign `TaskGroup` before `if` Since it is used in the check as well, go ahead and assign it beforehand for simplicity.
1 parent 777d48e commit 2089e79

File tree

1 file changed

+23
-13
lines changed

1 file changed

+23
-13
lines changed

distributed/scheduler.py

+23-13
Original file line numberDiff line numberDiff line change
@@ -1585,6 +1585,8 @@ class SchedulerState:
15851585
_resources: object
15861586
_saturated: set
15871587
_tasks: dict
1588+
_task_groups: dict
1589+
_task_prefixes: dict
15881590
_task_metadata: dict
15891591
_total_nthreads: Py_ssize_t
15901592
_total_occupancy: double
@@ -1635,6 +1637,8 @@ def __init__(
16351637
self._tasks = tasks
16361638
else:
16371639
self._tasks = dict()
1640+
self._task_groups = dict()
1641+
self._task_prefixes = dict()
16381642
self._task_metadata = dict()
16391643
self._total_nthreads = 0
16401644
self._total_occupancy = 0
@@ -1691,6 +1695,14 @@ def saturated(self):
16911695
def tasks(self):
16921696
return self._tasks
16931697

1698+
@property
1699+
def task_groups(self):
1700+
return self._task_groups
1701+
1702+
@property
1703+
def task_prefixes(self):
1704+
return self._task_prefixes
1705+
16941706
@property
16951707
def task_metadata(self):
16961708
return self._task_metadata
@@ -1738,6 +1750,8 @@ def __pdict__(self):
17381750
"unknown_durations": self._unknown_durations,
17391751
"validate": self._validate,
17401752
"tasks": self._tasks,
1753+
"task_groups": self._task_groups,
1754+
"task_prefixes": self._task_prefixes,
17411755
"total_nthreads": self._total_nthreads,
17421756
"total_occupancy": self._total_occupancy,
17431757
"extensions": self._extensions,
@@ -2926,8 +2940,6 @@ def __init__(
29262940

29272941
# Task state
29282942
tasks = dict()
2929-
self.task_groups = dict()
2930-
self.task_prefixes = dict()
29312943
for old_attr, new_attr, wrap in [
29322944
("priority", "priority", None),
29332945
("dependencies", "dependencies", _legacy_task_key_set),
@@ -3919,17 +3931,15 @@ def new_task(self, key, spec, state):
39193931
tg: TaskGroup
39203932
ts._state = state
39213933
prefix_key = key_split(key)
3922-
try:
3923-
tp = self.task_prefixes[prefix_key]
3924-
except KeyError:
3925-
self.task_prefixes[prefix_key] = tp = TaskPrefix(prefix_key)
3934+
tp = parent._task_prefixes.get(prefix_key)
3935+
if tp is None:
3936+
parent._task_prefixes[prefix_key] = tp = TaskPrefix(prefix_key)
39263937
ts._prefix = tp
39273938

39283939
group_key = ts._group_key
3929-
try:
3930-
tg = self.task_groups[group_key]
3931-
except KeyError:
3932-
self.task_groups[group_key] = tg = TaskGroup(group_key)
3940+
tg = parent._task_groups.get(group_key)
3941+
if tg is None:
3942+
parent._task_groups[group_key] = tg = TaskGroup(group_key)
39333943
tg._prefix = tp
39343944
tp._groups.append(tg)
39353945
tg.add(ts)
@@ -5891,12 +5901,12 @@ def transition(self, key, finish, *args, **kwargs):
58915901
if ts._state == "forgotten":
58925902
del parent._tasks[ts._key]
58935903

5894-
if ts._state == "forgotten" and ts._group._name in self.task_groups:
5904+
tg: TaskGroup = ts._group
5905+
if ts._state == "forgotten" and tg._name in parent._task_groups:
58955906
# Remove TaskGroup if all tasks are in the forgotten state
5896-
tg: TaskGroup = ts._group
58975907
if not any([tg._states.get(s) for s in ALL_TASK_STATES]):
58985908
ts._prefix._groups.remove(tg)
5899-
del self.task_groups[tg._name]
5909+
del parent._task_groups[tg._name]
59005910

59015911
return recommendations
59025912
except Exception as e:

0 commit comments

Comments
 (0)