Skip to content

Commit 0ac9abd

Browse files
author
Xuye (Chris) Qin
authored
[BACKPORT] Fix hang when start sub pool fails (#2468) (#2476)
1 parent 9999a53 commit 0ac9abd

File tree

5 files changed

+80
-17
lines changed

5 files changed

+80
-17
lines changed

mars/oscar/backends/mars/pool.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import os
2020
import signal
2121
import sys
22+
from dataclasses import dataclass
23+
from types import TracebackType
2224
from typing import List
2325

24-
from ....utils import get_next_port
26+
from ....utils import get_next_port, dataslots
2527
from ..config import ActorPoolConfig
2628
from ..message import CreateActorMessage
2729
from ..pool import MainActorPoolBase, SubActorPoolBase, _register_message_handler
@@ -58,6 +60,15 @@ def _mp_kill(self):
5860
logger = logging.getLogger(__name__)
5961

6062

63+
@dataslots
64+
@dataclass
65+
class SubpoolStatus:
66+
# for status, 0 is succeeded, 1 is failed
67+
status: int = None
68+
error: BaseException = None
69+
traceback: TracebackType = None
70+
71+
6172
@_register_message_handler
6273
class MainActorPool(MainActorPoolBase):
6374

@@ -107,29 +118,41 @@ async def start_sub_pool(
107118

108119
def start_pool_in_process():
109120
ctx = multiprocessing.get_context(method=start_method)
110-
started = ctx.Event()
121+
status_queue = ctx.Queue()
111122
process = ctx.Process(
112123
target=cls._start_sub_pool,
113-
args=(actor_pool_config, process_index, started),
124+
args=(actor_pool_config, process_index, status_queue),
114125
name=f'MarsActorPool{process_index}',
115126
)
116127
process.daemon = True
117128
process.start()
118129
# wait for sub actor pool to finish starting
119-
started.wait()
120-
return process
130+
process_status = status_queue.get()
131+
return process, process_status
121132

122133
loop = asyncio.get_running_loop()
123134
executor = futures.ThreadPoolExecutor(1)
124135
create_pool_task = loop.run_in_executor(executor, start_pool_in_process)
125136
return await create_pool_task
126137

138+
@classmethod
139+
async def wait_sub_pools_ready(cls,
140+
create_pool_tasks: List[asyncio.Task]):
141+
processes = []
142+
for task in create_pool_tasks:
143+
process, status = await task
144+
if status.status == 1:
145+
# start sub pool failed
146+
raise status.error.with_traceback(status.traceback)
147+
processes.append(process)
148+
return processes
149+
127150
@classmethod
128151
def _start_sub_pool(
129152
cls,
130153
actor_config: ActorPoolConfig,
131154
process_index: int,
132-
started: multiprocessing.Event):
155+
status_queue: multiprocessing.Queue):
133156
if not _is_windows:
134157
try:
135158
# register coverage hooks on SIGTERM
@@ -159,15 +182,16 @@ def _start_sub_pool(
159182
else:
160183
asyncio.set_event_loop(asyncio.new_event_loop())
161184

162-
coro = cls._create_sub_pool(actor_config, process_index, started)
185+
coro = cls._create_sub_pool(actor_config, process_index, status_queue)
163186
asyncio.run(coro)
164187

165188
@classmethod
166189
async def _create_sub_pool(
167190
cls,
168191
actor_config: ActorPoolConfig,
169192
process_index: int,
170-
started: multiprocessing.Event):
193+
status_queue: multiprocessing.Queue):
194+
process_status = None
171195
try:
172196
env = actor_config.get_pool_config(process_index)['env']
173197
if env:
@@ -176,9 +200,14 @@ async def _create_sub_pool(
176200
'actor_pool_config': actor_config,
177201
'process_index': process_index
178202
})
203+
process_status = SubpoolStatus(status=0)
179204
await pool.start()
205+
except: # noqa: E722 # nosec # pylint: disable=bare-except
206+
_, error, tb = sys.exc_info()
207+
process_status = SubpoolStatus(status=1, error=error, traceback=tb)
208+
raise
180209
finally:
181-
started.set()
210+
status_queue.put(process_status)
182211
await pool.join()
183212

184213
async def kill_sub_pool(self, process: multiprocessing.Process,
@@ -203,8 +232,9 @@ async def recover_sub_pool(self, address: str):
203232
process_index = self._config.get_process_index(address)
204233
# process dead, restart it
205234
# remember always use spawn to recover sub pool
206-
self.sub_processes[address] = await self.__class__.start_sub_pool(
207-
self._config, process_index, 'spawn')
235+
task = asyncio.create_task(self.start_sub_pool(
236+
self._config, process_index, 'spawn'))
237+
self.sub_processes[address] = (await self.wait_sub_pools_ready([task]))[0]
208238

209239
if self._auto_recover == 'actor':
210240
# need to recover all created actors

mars/oscar/backends/mars/tests/test_pool.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,23 @@ async def test_sub_actor_pool(notify_main_pool):
227227
await pool.stop()
228228

229229

230+
@pytest.mark.asyncio
231+
async def test_fail_when_create_subpool():
232+
config = ActorPoolConfig()
233+
my_label = 'computation'
234+
main_address = f'127.0.0.1:{get_next_port()}'
235+
port = get_next_port()
236+
_add_pool_conf(config, 0, 'main', 'unixsocket:///0', main_address)
237+
238+
# use the same port for sub pools, will raise `OSError` with "address already in use"
239+
_add_pool_conf(config, 1, my_label, 'unixsocket:///1', f'127.0.0.1:{port}',
240+
env={'my_env': '1'})
241+
_add_pool_conf(config, 2, my_label, 'unixsocket:///2', f'127.0.0.1:{port}')
242+
243+
with pytest.raises(OSError):
244+
await MainActorPool.create({'actor_pool_config': config})
245+
246+
230247
@pytest.mark.asyncio
231248
async def test_main_actor_pool():
232249
config = ActorPoolConfig()

mars/oscar/backends/pool.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -912,7 +912,7 @@ async def create(cls, config: Dict) -> MainActorPoolType:
912912
# await create_pool_task
913913
tasks.append(create_pool_task)
914914

915-
processes = [await t for t in tasks]
915+
processes = await cls.wait_sub_pools_ready(tasks)
916916
# create main actor pool
917917
pool: MainActorPoolType = await super().create(config)
918918
addresses = actor_pool_config.get_external_addresses()[1:]
@@ -950,6 +950,12 @@ async def start_sub_pool(
950950
start_method: str = None):
951951
"""Start a sub actor pool"""
952952

953+
@classmethod
954+
@abstractmethod
955+
async def wait_sub_pools_ready(cls,
956+
create_pool_tasks: List[asyncio.Task]):
957+
"""Wait all sub pools ready """
958+
953959
def attach_sub_process(self,
954960
external_address: str,
955961
process: SubProcessHandle):

mars/oscar/backends/ray/pool.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ async def start_sub_pool(
8484
await actor_handle.start.remote(actor_pool_config, process_index)
8585
return actor_handle
8686

87+
@classmethod
88+
async def wait_sub_pools_ready(cls,
89+
create_pool_tasks: List[asyncio.Task]):
90+
return [await t for t in create_pool_tasks]
91+
8792
async def recover_sub_pool(self, address: str):
8893
process = self.sub_processes[address]
8994
process_index = self._config.get_process_index(address)

mars/oscar/backends/test/pool.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from ..config import ActorPoolConfig
2020
from ..communication import gen_local_address, get_server_type, DummyServer
21-
from ..mars.pool import MainActorPool, SubActorPool
21+
from ..mars.pool import MainActorPool, SubActorPool, SubpoolStatus
2222
from ..pool import ActorPoolType
2323

2424

@@ -41,22 +41,27 @@ async def start_sub_pool(
4141
actor_pool_config: ActorPoolConfig,
4242
process_index: int,
4343
start_method: str = None):
44-
started = multiprocessing.Event()
44+
status_queue = multiprocessing.Queue()
4545
return asyncio.create_task(
46-
cls._create_sub_pool(actor_pool_config, process_index, started))
46+
cls._create_sub_pool(actor_pool_config, process_index, status_queue))
47+
48+
@classmethod
49+
async def wait_sub_pools_ready(cls,
50+
create_pool_tasks: List[asyncio.Task]):
51+
return [await t for t in create_pool_tasks]
4752

4853
@classmethod
4954
async def _create_sub_pool(
5055
cls,
5156
actor_config: ActorPoolConfig,
5257
process_index: int,
53-
started: multiprocessing.Event):
58+
status_queue: multiprocessing.Queue):
5459
pool = await TestSubActorPool.create({
5560
'actor_pool_config': actor_config,
5661
'process_index': process_index
5762
})
5863
await pool.start()
59-
started.set()
64+
status_queue.put(SubpoolStatus(0))
6065
await pool.join()
6166

6267
async def kill_sub_pool(self, process: multiprocessing.Process,

0 commit comments

Comments
 (0)