Skip to content

Commit 765f990

Browse files
committed
Try resolve missing items in cache
1 parent 4bea67e commit 765f990

File tree

3 files changed

+78
-6
lines changed

3 files changed

+78
-6
lines changed

mars/services/scheduling/utils.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import asyncio
1616
import contextlib
1717
import sys
18+
import time
19+
from collections import OrderedDict
20+
from typing import Dict, Mapping, Optional, TypeVar, Iterator
1821

1922
from ... import oscar as mo
2023
from ...lib.aio import alru_cache
@@ -58,3 +61,59 @@ async def redirect_subtask_errors(actor: mo.Actor, subtasks):
5861
)
5962
await asyncio.wait(coros)
6063
raise
64+
65+
66+
ResultType = TypeVar("ResultType")
67+
68+
69+
class ResultCache(Mapping[str, ResultType]):
70+
_cache: Dict[str, ResultType]
71+
_cache_time: Dict[str, float]
72+
_duration: float
73+
74+
def __init__(self, duration: float = 120):
75+
self._cache = dict()
76+
self._cache_time = OrderedDict()
77+
self._duration = duration
78+
79+
def __getitem__(self, item: str):
80+
self._del_expired_items()
81+
return self._cache[item]
82+
83+
def get(
84+
self, key: str, default: Optional[ResultType] = None
85+
) -> Optional[ResultType]:
86+
self._del_expired_items()
87+
return self._cache.get(key, default)
88+
89+
def _del_expired_items(self):
90+
keys = []
91+
expire_time = time.time() - self._duration
92+
for key, store_time in self._cache_time.items():
93+
if store_time < expire_time:
94+
break
95+
keys.append(key)
96+
for key in keys:
97+
self._delitem(key)
98+
99+
def __setitem__(self, key: str, value):
100+
self._del_expired_items()
101+
self._cache[key] = value
102+
self._cache_time[key] = time.time()
103+
104+
def _delitem(self, key: str):
105+
del self._cache[key]
106+
self._cache_time.pop(key, None)
107+
108+
def __delitem__(self, key: str):
109+
self._delitem(key)
110+
self._del_expired_items()
111+
112+
def __len__(self) -> int:
113+
self._del_expired_items()
114+
return len(self._cache)
115+
116+
def __iter__(self) -> Iterator[str]:
117+
self._del_expired_items()
118+
return iter(self._cache)
119+

mars/services/scheduling/worker/execution/actor.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from ....cluster import ClusterAPI
2727
from ....core import ActorCallback
2828
from ....subtask import Subtask, SubtaskAPI, SubtaskResult, SubtaskStatus
29+
from ...utils import ResultCache
2930
from ..queues import SubtaskPrepareQueueActor, SubtaskExecutionQueueActor
3031
from ..quota import QuotaActor
3132
from ..slotmanager import SlotManagerActor
@@ -49,18 +50,21 @@ class SubtaskExecutionActor(mo.Actor):
4950

5051
_subtask_api: SubtaskAPI
5152
_subtask_preparer: SubtaskPreparer
53+
_subtask_result_cache: ResultCache[SubtaskResult]
5254

5355
def __init__(
5456
self,
5557
subtask_max_retries: int = None,
5658
enable_kill_slot: bool = True,
5759
):
5860
self._pred_key_mapping_dag = DAG()
59-
self._subtask_caches = dict()
60-
self._subtask_executions = dict()
6161
self._prepare_queue_ref = None
6262
self._execution_queue_ref = None
6363

64+
self._subtask_caches = dict()
65+
self._subtask_executions = dict()
66+
self._subtask_result_cache = ResultCache()
67+
6468
self._subtask_max_retries = subtask_max_retries or DEFAULT_SUBTASK_MAX_RETRIES
6569
self._enable_kill_slot = enable_kill_slot
6670

@@ -222,6 +226,7 @@ async def submit_subtasks(
222226
priorities: List[Tuple],
223227
supervisor_address: str,
224228
band_name: str,
229+
rerun_when_fail: bool = False,
225230
):
226231
assert len(subtasks) == len(priorities)
227232
logger.debug("%d subtasks submitted to SubtaskExecutionActor", len(subtasks))
@@ -249,7 +254,6 @@ async def submit_subtasks(
249254
supervisor_address=supervisor_address,
250255
band_name=band_name,
251256
)
252-
self._subtask_caches.pop(subtask.subtask_id, None)
253257
self._subtask_executions[subtask.subtask_id] = subtask_info
254258
put_delays.append(
255259
self._prepare_queue_ref.put.delay(
@@ -322,7 +326,7 @@ async def cancel_subtasks(
322326
continue
323327
if not subtask_info.result.status.is_done:
324328
self._fill_result_with_exc(subtask_info, exc_cls=asyncio.CancelledError)
325-
infos_to_report.append(subtask_info)
329+
infos_to_report.append(subtask_info)
326330
await self._report_subtask_results(infos_to_report)
327331

328332
async def wait_subtasks(self, subtask_ids: List[str]):

mars/services/subtask/worker/tests/subtask_processor.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,19 @@ def _execute_operand(self, ctx: Dict[str, Any], op: OperandType):
5050
continue
5151
self.assert_object_consistent(out, ctx[out.key])
5252

53-
async def done(self):
54-
await super().done()
53+
def _unregister_executors(self):
5554
for op in self._operand_executors:
5655
try:
5756
op.unregister_executor()
5857
except KeyError:
5958
pass
59+
60+
async def _release_scheduling(self):
61+
# once the operand stops running, the slot may be reused immediately
62+
# thus executors must be cleaned in time
63+
self._unregister_executors()
64+
await super()._release_scheduling()
65+
66+
async def done(self):
67+
await super().done()
68+
self._unregister_executors()

0 commit comments

Comments
 (0)