Skip to content

Commit 57f1efc

Browse files
committed
PARTIAL add worker scheduling
1 parent a969a17 commit 57f1efc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2517
-2910
lines changed

mars/core/graph/core.pyx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ cdef class DirectedGraph:
143143
try:
144144
return list(self._successors[n])
145145
except KeyError:
146-
return KeyError(f'Node {n} does not exist in the directed graph')
146+
raise KeyError(f'Node {n} does not exist in the directed graph')
147147

148148
def iter_predecessors(self, n):
149149
try:

mars/oscar/backends/ray/tests/test_ray_pool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030

3131

3232
class TestActor(mo.Actor):
33+
__test__ = False
34+
3335
async def kill(self, address, uid):
3436
actor_ref = await mo.actor_ref(address, uid)
3537
task = asyncio.create_task(actor_ref.crash())

mars/oscar/backends/test/pool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323

2424

2525
class TestMainActorPool(MainActorPool):
26+
__test__ = False
27+
2628
@classmethod
2729
def get_external_addresses(
2830
cls, address: str, n_process: int = None, ports: List[int] = None

mars/oscar/core.pyx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ cdef class ActorRefMethod:
111111
"""
112112
Wrapper for an Actor method at client
113113
"""
114-
cdef ActorRef ref
115-
cdef object method_name
114+
cdef public ActorRef ref
115+
cdef public object method_name
116116
cdef object _options
117117

118118
def __init__(self, ref, method_name, options=None):

mars/remote/core.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ def __init__(
5757
n_output=None,
5858
**kw,
5959
):
60+
function_args = function_args or []
61+
function_kwargs = function_kwargs or {}
6062
super().__init__(
6163
_function=function,
6264
_function_args=function_args,

mars/services/cluster/api/oscar.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ async def get_mars_versions(self) -> List[str]:
211211
node_info_ref = await self._get_node_info_ref()
212212
return await node_info_ref.get_mars_versions()
213213

214-
async def get_bands(self) -> Dict:
214+
async def get_bands(self) -> Dict[BandType, int]:
215215
"""
216216
Get bands that can be used for computation on current node.
217217

mars/services/core.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@
2020
import warnings
2121
from typing import Dict, Iterable, List, Union
2222

23+
from .. import oscar as mo
24+
from ..lib.aio import alru_cache
25+
from ..serialization.serializables import (
26+
Serializable,
27+
BytesField,
28+
StringField,
29+
TupleField,
30+
DictField,
31+
)
32+
2333
_ModulesType = Union[List, str, None]
2434

2535

@@ -28,6 +38,35 @@ class NodeRole(enum.Enum):
2838
WORKER = 1
2939

3040

41+
class ActorCallback(Serializable):
42+
actor_uid: bytes = BytesField("actor_uid")
43+
actor_address: str = StringField("actor_address")
44+
actor_method: str = StringField("actor_method")
45+
args: tuple = TupleField("args")
46+
kwargs: dict = DictField("kwargs")
47+
48+
def __init__(self, ref_method=None, **kw):
49+
if ref_method is not None:
50+
kw["actor_uid"] = ref_method.ref.uid
51+
kw["actor_address"] = ref_method.ref.address
52+
kw["actor_method"] = ref_method.method_name
53+
kw["args"] = kw.get("args") or ()
54+
kw["kwargs"] = kw.get("kwargs") or {}
55+
super().__init__(**kw)
56+
57+
@classmethod
58+
@alru_cache(cache_exceptions=False)
59+
async def _get_ref(cls, actor_uid: bytes, actor_address: str):
60+
return await mo.actor_ref(actor_uid, address=actor_address)
61+
62+
async def __call__(self, *args, **kwargs):
63+
ref = await self._get_ref(self.actor_uid, self.actor_address)
64+
args = self.args + args
65+
kw = self.kwargs.copy()
66+
kw.update(kwargs)
67+
return await getattr(ref, self.actor_method)(*args, **kw)
68+
69+
3170
class AbstractService(abc.ABC):
3271
_instances = dict()
3372

mars/services/scheduling/api/oscar.py

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,11 @@
2424

2525

2626
class SchedulingAPI(AbstractSchedulingAPI):
27-
def __init__(
28-
self, session_id: str, address: str, manager_ref=None, queueing_ref=None
29-
):
27+
def __init__(self, session_id: str, address: str, manager_ref=None):
3028
self._session_id = session_id
3129
self._address = address
3230

3331
self._manager_ref = manager_ref
34-
self._queueing_ref = queueing_ref
3532

3633
@classmethod
3734
@alru_cache
@@ -41,20 +38,30 @@ async def create(cls: Type[APIType], session_id: str, address: str) -> APIType:
4138
manager_ref = await mo.actor_ref(
4239
SubtaskManagerActor.gen_uid(session_id), address=address
4340
)
44-
from ..supervisor.queueing import SubtaskQueueingActor
45-
46-
queueing_ref = await mo.actor_ref(
47-
SubtaskQueueingActor.gen_uid(session_id), address=address
48-
)
4941

50-
scheduling_api = SchedulingAPI(session_id, address, manager_ref, queueing_ref)
42+
scheduling_api = SchedulingAPI(session_id, address, manager_ref)
5143
return scheduling_api
5244

5345
async def get_subtask_schedule_summaries(
5446
self, task_id: Optional[str] = None
5547
) -> List[SubtaskScheduleSummary]:
5648
return await self._manager_ref.get_schedule_summaries(task_id)
5749

50+
async def cache_subtasks(
51+
self, subtasks: List[Subtask], priorities: Optional[List[Tuple]] = None
52+
):
53+
"""
54+
Add subtask graph to cache for fast forwarding
55+
56+
Parameters
57+
----------
58+
subtasks
59+
list of subtasks to be submitted to service
60+
priorities
61+
list of priorities of subtasks
62+
"""
63+
await self._manager_ref.cache_subtasks(subtasks, priorities)
64+
5865
async def add_subtasks(
5966
self, subtasks: List[Subtask], priorities: Optional[List[Tuple]] = None
6067
):
@@ -88,12 +95,12 @@ async def update_subtask_priority(self, subtask_id: str, priority: Tuple):
8895

8996
@update_subtask_priority.batch
9097
async def update_subtask_priority(self, args_list, kwargs_list):
91-
await self._queueing_ref.update_subtask_priority.batch(
92-
*(
93-
self._queueing_ref.update_subtask_priority.delay(*args, **kwargs)
94-
for args, kwargs in zip(args_list, kwargs_list)
95-
)
96-
)
98+
subtask_ids, priorities = [], []
99+
for args, kwargs in zip(args_list, kwargs_list):
100+
subtask_id, priority = self.update_subtask_priority.bind(*args, **kwargs)
101+
subtask_ids.append(subtask_id)
102+
priorities.append(priority)
103+
await self._manager_ref.update_subtask_priorities(subtask_ids, priorities)
97104

98105
async def cancel_subtasks(
99106
self, subtask_ids: List[str], kill_timeout: Union[float, int] = 5
@@ -128,33 +135,39 @@ async def finish_subtasks(self, subtask_ids: List[str], schedule_next: bool = Tr
128135
class MockSchedulingAPI(SchedulingAPI):
129136
@classmethod
130137
async def create(cls: Type[APIType], session_id: str, address: str) -> APIType:
131-
from ..supervisor import GlobalSlotManagerActor, AutoscalerActor
132-
133-
await mo.create_actor(
134-
GlobalSlotManagerActor,
135-
uid=GlobalSlotManagerActor.default_uid(),
136-
address=address,
137-
)
138-
await mo.create_actor(
139-
AutoscalerActor, {}, uid=AutoscalerActor.default_uid(), address=address
140-
)
138+
# from ..supervisor import AutoscalerActor
139+
# await mo.create_actor(
140+
# AutoscalerActor, {}, uid=AutoscalerActor.default_uid(), address=address
141+
# )
141142

142143
from .... import resource as mars_resource
143144
from ..worker import (
144145
SubtaskExecutionActor,
145-
WorkerSlotManagerActor,
146+
SubtaskPrepareQueueActor,
147+
SubtaskExecutionQueueActor,
146148
WorkerQuotaManagerActor,
149+
SlotManagerActor,
147150
)
148151

149152
await mo.create_actor(
150-
SubtaskExecutionActor,
151-
subtask_max_retries=0,
152-
uid=SubtaskExecutionActor.default_uid(),
153+
SlotManagerActor,
154+
uid=SlotManagerActor.default_uid(),
153155
address=address,
154156
)
155157
await mo.create_actor(
156-
WorkerSlotManagerActor,
157-
uid=WorkerSlotManagerActor.default_uid(),
158+
SubtaskPrepareQueueActor,
159+
uid=SubtaskPrepareQueueActor.default_uid(),
160+
address=address,
161+
)
162+
await mo.create_actor(
163+
SubtaskExecutionQueueActor,
164+
uid=SubtaskExecutionQueueActor.default_uid(),
165+
address=address,
166+
)
167+
await mo.create_actor(
168+
SubtaskExecutionActor,
169+
subtask_max_retries=0,
170+
uid=SubtaskExecutionActor.default_uid(),
158171
address=address,
159172
)
160173
await mo.create_actor(

mars/services/scheduling/supervisor/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,5 @@
1414

1515
from .assigner import AssignerActor
1616
from .autoscale import AutoscalerActor
17-
from .globalslot import GlobalSlotManagerActor
1817
from .manager import SubtaskManagerActor
19-
from .queueing import SubtaskQueueingActor
2018
from .service import SchedulingSupervisorService

mars/services/scheduling/supervisor/autoscale.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ async def __post_create__(self):
4646
strategy_cls = getattr(importlib.import_module(module), name)
4747
else:
4848
strategy_cls = PendingTaskBacklogStrategy
49-
from ..supervisor import GlobalSlotManagerActor
50-
51-
self.global_slot_ref = await mo.actor_ref(
52-
GlobalSlotManagerActor.default_uid(), address=self.address
53-
)
49+
# from ..supervisor import GlobalSlotManagerActor
50+
#
51+
# self.global_slot_ref = await mo.actor_ref(
52+
# GlobalSlotManagerActor.default_uid(), address=self.address
53+
# )
5454
self._cluster_api = await ClusterAPI.create(self.address)
5555
self._strategy = await strategy_cls.create(self._autoscale_conf, self)
5656
if self._enabled:
@@ -62,11 +62,12 @@ async def __pre_destroy__(self):
6262
await self._strategy.stop()
6363

6464
async def register_session(self, session_id: str, address: str):
65-
from .queueing import SubtaskQueueingActor
66-
67-
self.queueing_refs[session_id] = await mo.actor_ref(
68-
SubtaskQueueingActor.gen_uid(session_id), address=address
69-
)
65+
pass
66+
# from .queueing import SubtaskQueueingActor
67+
#
68+
# self.queueing_refs[session_id] = await mo.actor_ref(
69+
# SubtaskQueueingActor.gen_uid(session_id), address=address
70+
# )
7071

7172
async def unregister_session(self, session_id: str):
7273
self.queueing_refs.pop(session_id, None)

0 commit comments

Comments
 (0)