Skip to content

Commit 96fc953

Browse files
committed
Coelescing doesn't work for every method yet
coalescing only preserves apply results, not messages where the content matters This means several things don't work: - scatter/gather - execute replies with errors - parallel datapub Some of these can be fixed, but not all without a substantial change to how coalescing works
1 parent 1f3a8ae commit 96fc953

File tree

7 files changed

+151
-38
lines changed

7 files changed

+151
-38
lines changed

ipyparallel/client/client.py

+26-6
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,12 @@ def _extract_metadata(self, msg):
833833

834834
if md['engine_uuid'] is not None:
835835
md['engine_id'] = self._engines.get(md['engine_uuid'], None)
836+
837+
if md['is_coalescing']:
838+
# get destinations from target metadata
839+
targets = msg_meta.get("broadcast_targets", [])
840+
md['engine_uuid'], md['engine_id'] = map(list, zip(*targets))
841+
836842
if 'date' in parent:
837843
md['submitted'] = parent['date']
838844
if 'started' in msg_meta:
@@ -917,9 +923,16 @@ def _handle_execute_reply(self, msg):
917923
md = self.metadata[msg_id]
918924
md.update(self._extract_metadata(msg))
919925

920-
e_outstanding = self._outstanding_dict[md['engine_uuid']]
921-
if msg_id in e_outstanding:
922-
e_outstanding.remove(msg_id)
926+
if md['is_coalescing']:
927+
engine_uuids = md['engine_uuid'] or []
928+
else:
929+
engine_uuids = [md['engine_uuid']]
930+
931+
for engine_uuid in engine_uuids:
932+
if engine_uuid is not None:
933+
e_outstanding = self._outstanding_dict[engine_uuid]
934+
if msg_id in e_outstanding:
935+
e_outstanding.remove(msg_id)
923936

924937
# construct result:
925938
if content['status'] == 'ok':
@@ -972,9 +985,16 @@ def _handle_apply_reply(self, msg):
972985
md = self.metadata[msg_id]
973986
md.update(self._extract_metadata(msg))
974987

975-
e_outstanding = self._outstanding_dict[md['engine_uuid']]
976-
if msg_id in e_outstanding:
977-
e_outstanding.remove(msg_id)
988+
if md['is_coalescing']:
989+
engine_uuids = md['engine_uuid'] or []
990+
else:
991+
engine_uuids = [md['engine_uuid']]
992+
993+
for engine_uuid in engine_uuids:
994+
if engine_uuid is not None:
995+
e_outstanding = self._outstanding_dict[engine_uuid]
996+
if msg_id in e_outstanding:
997+
e_outstanding.remove(msg_id)
978998

979999
# construct result:
9801000
if content['status'] == 'ok':

ipyparallel/client/view.py

+16
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,17 @@ def activate(self, suffix=''):
868868
ip.magics_manager.register(M)
869869

870870

871+
@decorator
872+
def _not_coalescing(method, self, *args, **kwargs):
873+
"""Decorator for broadcast methods that can't use reply coalescing"""
874+
is_coalescing = self.is_coalescing
875+
try:
876+
self.is_coalescing = False
877+
return method(self, *args, **kwargs)
878+
finally:
879+
self.is_coalescing = is_coalescing
880+
881+
871882
class BroadcastView(DirectView):
872883
is_coalescing = Bool(False)
873884

@@ -962,6 +973,7 @@ def make_asyncresult(message_future):
962973

963974
@sync_results
964975
@save_ids
976+
@_not_coalescing
965977
def execute(self, code, silent=True, targets=None, block=None):
966978
"""Executes `code` on `targets` in blocking or nonblocking manner.
967979
@@ -1010,6 +1022,10 @@ def make_asyncresult(message_future):
10101022
def map(self, f, *sequences, **kwargs):
10111023
raise NotImplementedError("BroadcastView.map not yet implemented")
10121024

1025+
# scatter/gather cannot be coalescing yet
1026+
scatter = _not_coalescing(DirectView.scatter)
1027+
gather = _not_coalescing(DirectView.gather)
1028+
10131029

10141030
class LoadBalancedView(View):
10151031
"""An load-balancing View that only executes via the Task scheduler.

ipyparallel/controller/app.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,10 @@ def get_python_scheduler_args(
974974
in_addr=None,
975975
out_addr=None,
976976
):
977+
if identity is not None:
978+
logname = f"{scheduler_name}-{identity}"
979+
else:
980+
logname = scheduler_name
977981
return {
978982
'scheduler_class': scheduler_class,
979983
'in_addr': in_addr or self.client_url(scheduler_name),
@@ -984,7 +988,7 @@ def get_python_scheduler_args(
984988
'identity': identity
985989
if identity is not None
986990
else bytes(scheduler_name, 'utf8'),
987-
'logname': 'scheduler',
991+
'logname': logname,
988992
'loglevel': self.log_level,
989993
'log_url': self.log_url,
990994
'config': dict(self.config),

ipyparallel/controller/broadcast_scheduler.py

+50-28
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from traitlets import Bytes
66
from traitlets import Integer
77
from traitlets import List
8+
from traitlets import Unicode
89

910
from ipyparallel import util
1011
from ipyparallel.controller.scheduler import get_common_scheduler_streams
@@ -15,11 +16,13 @@
1516
class BroadcastScheduler(Scheduler):
1617
port_name = 'broadcast'
1718
accumulated_replies = {}
19+
accumulated_targets = {}
1820
is_leaf = Bool(False)
1921
connected_sub_scheduler_ids = List(Bytes())
2022
outgoing_streams = List()
2123
depth = Integer()
2224
max_depth = Integer()
25+
name = Unicode()
2326

2427
def start(self):
2528
self.client_stream.on_recv(self.dispatch_submission, copy=False)
@@ -28,12 +31,14 @@ def start(self):
2831
else:
2932
for outgoing_stream in self.outgoing_streams:
3033
outgoing_stream.on_recv(self.dispatch_result, copy=False)
34+
self.log.info(f"BroadcastScheduler {self.name} started")
3135

3236
def send_to_targets(self, msg, original_msg_id, targets, idents, is_coalescing):
3337
if is_coalescing:
3438
self.accumulated_replies[original_msg_id] = {
35-
bytes(target, 'utf8'): None for target in targets
39+
target.encode('utf8'): None for target in targets
3640
}
41+
self.accumulated_targets[original_msg_id] = targets
3742

3843
for target in targets:
3944
new_msg = self.append_new_msg_id_to_msg(
@@ -44,11 +49,6 @@ def send_to_targets(self, msg, original_msg_id, targets, idents, is_coalescing):
4449
def send_to_sub_schedulers(
4550
self, msg, original_msg_id, targets, idents, is_coalescing
4651
):
47-
if is_coalescing:
48-
self.accumulated_replies[original_msg_id] = {
49-
scheduler_id: None for scheduler_id in self.connected_sub_scheduler_ids
50-
}
51-
5252
trunc = 2 ** self.max_depth
5353
fmt = f"0{self.max_depth + 1}b"
5454

@@ -62,10 +62,21 @@ def send_to_sub_schedulers(
6262
next_idx = int(path[self.depth + 1]) # 0 or 1
6363
targets_by_scheduler[next_idx].append(target_tuple)
6464

65+
if is_coalescing:
66+
self.accumulated_replies[original_msg_id] = {
67+
scheduler_id: None for scheduler_id in self.connected_sub_scheduler_ids
68+
}
69+
self.accumulated_targets[original_msg_id] = {}
70+
6571
for i, scheduler_id in enumerate(self.connected_sub_scheduler_ids):
6672
targets_for_scheduler = targets_by_scheduler[i]
67-
if not targets_for_scheduler and is_coalescing:
68-
del self.accumulated_replies[original_msg_id][scheduler_id]
73+
if is_coalescing:
74+
if targets_for_scheduler:
75+
self.accumulated_targets[original_msg_id][
76+
scheduler_id
77+
] = targets_for_scheduler
78+
else:
79+
del self.accumulated_replies[original_msg_id][scheduler_id]
6980
msg['metadata']['targets'] = targets_for_scheduler
7081

7182
new_msg = self.append_new_msg_id_to_msg(
@@ -76,28 +87,36 @@ def send_to_sub_schedulers(
7687
)
7788
self.outgoing_streams[i].send_multipart(new_msg, copy=False)
7889

79-
def coalescing_reply(self, raw_msg, msg, original_msg_id, outgoing_id):
90+
def coalescing_reply(self, raw_msg, msg, original_msg_id, outgoing_id, idents):
91+
# accumulate buffers
92+
self.accumulated_replies[original_msg_id][outgoing_id] = msg['buffers']
8093
if all(
81-
msg is not None or stored_outgoing_id == outgoing_id
82-
for stored_outgoing_id, msg in self.accumulated_replies[
83-
original_msg_id
84-
].items()
94+
msg_buffers is not None
95+
for msg_buffers in self.accumulated_replies[original_msg_id].values()
8596
):
86-
new_msg = raw_msg[1:]
87-
new_msg.extend(
88-
[
89-
buffer
90-
for msg_buffers in self.accumulated_replies[
91-
original_msg_id
92-
].values()
93-
if msg_buffers
94-
for buffer in msg_buffers
95-
]
97+
replies = self.accumulated_replies.pop(original_msg_id)
98+
self.log.debug(f"Coalescing {len(replies)} reply to {original_msg_id}")
99+
targets = self.accumulated_targets.pop(original_msg_id)
100+
101+
new_msg = msg.copy()
102+
# begin rebuilding message
103+
# metadata['targets']
104+
if self.is_leaf:
105+
new_msg['metadata']['broadcast_targets'] = targets
106+
else:
107+
new_msg['metadata']['broadcast_targets'] = []
108+
109+
# avoid duplicated msg buffers
110+
buffers = []
111+
for sub_target, msg_buffers in replies.items():
112+
buffers.extend(msg_buffers)
113+
if not self.is_leaf:
114+
new_msg['metadata']['broadcast_targets'].extend(targets[sub_target])
115+
116+
new_raw_msg = self.session.serialize(new_msg)
117+
self.client_stream.send_multipart(
118+
idents + new_raw_msg + buffers, copy=False
96119
)
97-
self.client_stream.send_multipart(new_msg, copy=False)
98-
del self.accumulated_replies[original_msg_id]
99-
else:
100-
self.accumulated_replies[original_msg_id][outgoing_id] = msg['buffers']
101120

102121
@util.log_errors
103122
def dispatch_submission(self, raw_msg):
@@ -144,7 +163,9 @@ def dispatch_result(self, raw_msg):
144163
original_msg_id = msg['metadata']['original_msg_id']
145164
is_coalescing = msg['metadata']['is_coalescing']
146165
if is_coalescing:
147-
self.coalescing_reply(raw_msg, msg, original_msg_id, outgoing_id)
166+
self.coalescing_reply(
167+
raw_msg, msg, original_msg_id, outgoing_id, idents[1:]
168+
)
148169
else:
149170
self.client_stream.send_multipart(raw_msg[1:], copy=False)
150171

@@ -223,6 +244,7 @@ def launch_broadcast_scheduler(
223244
config=config,
224245
depth=depth,
225246
max_depth=max_depth,
247+
name=identity,
226248
)
227249
if is_leaf:
228250
scheduler_args.update(engine_stream=outgoing_streams[0], is_leaf=True)

ipyparallel/controller/task_scheduler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ def start(self):
215215
registration_notification=self._register_engine,
216216
unregistration_notification=self._unregister_engine,
217217
)
218-
self.log.info("Scheduler started [%s]" % self.scheme_name)
218+
self.log.info("Task scheduler started [%s]" % self.scheme_name)
219219
self.notifier_stream.on_recv(self.dispatch_notification)
220220

221221
# -----------------------------------------------------------------------

ipyparallel/tests/test_view_broadcast.py

+52-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ def setUp(self):
1515
super().setUp()
1616
self._broadcast_view_used = False
1717
# use broadcast view for direct API
18-
real_direct_view = self.client.direct_view
18+
real_direct_view = self.client.real_direct_view = self.client.direct_view
1919

2020
def broadcast_or_direct(targets):
2121
if isinstance(targets, int):
@@ -65,3 +65,54 @@ def test_scatter_tracked(self):
6565

6666
class TestBroadcastViewCoalescing(TestBroadcastView):
6767
is_coalescing = True
68+
69+
@pytest.mark.xfail(reason="coalescing view doesn't preserve target order")
70+
def test_target_ordering(self):
71+
self.minimum_engines(4)
72+
ids_in_order = self.client.ids
73+
dv = self.client.real_direct_view(ids_in_order)
74+
75+
dv.scatter('rank', ids_in_order, flatten=True, block=True)
76+
assert dv['rank'] == ids_in_order
77+
78+
view = self.client.broadcast_view(ids_in_order, is_coalescing=True)
79+
assert view['rank'] == ids_in_order
80+
81+
view = self.client.broadcast_view(ids_in_order[::-1], is_coalescing=True)
82+
assert view['rank'] == ids_in_order[::-1]
83+
84+
view = self.client.broadcast_view(ids_in_order[::2], is_coalescing=True)
85+
assert view['rank'] == ids_in_order[::2]
86+
87+
view = self.client.broadcast_view(ids_in_order[::-2], is_coalescing=True)
88+
assert view['rank'] == ids_in_order[::-2]
89+
90+
def test_engine_metadata(self):
91+
self.minimum_engines(4)
92+
ids_in_order = sorted(self.client.ids)
93+
dv = self.client.real_direct_view(ids_in_order)
94+
dv.scatter('rank', ids_in_order, flatten=True, block=True)
95+
view = self.client.broadcast_view(ids_in_order, is_coalescing=True)
96+
ar = view.pull('rank', block=False)
97+
result = ar.get(timeout=10)
98+
assert isinstance(ar.engine_id, list)
99+
assert isinstance(ar.engine_uuid, list)
100+
assert result == ar.engine_id
101+
assert sorted(ar.engine_id) == ids_in_order
102+
103+
even_ids = ids_in_order[::-2]
104+
view = self.client.broadcast_view(even_ids, is_coalescing=True)
105+
ar = view.pull('rank', block=False)
106+
result = ar.get(timeout=10)
107+
assert isinstance(ar.engine_id, list)
108+
assert isinstance(ar.engine_uuid, list)
109+
assert result == ar.engine_id
110+
assert sorted(ar.engine_id) == sorted(even_ids)
111+
112+
@pytest.mark.xfail(reason="displaypub ordering not preserved")
113+
def test_apply_displaypub(self):
114+
pass
115+
116+
117+
# FIXME
118+
del TestBroadcastView

ipyparallel/util.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ def local_logger(logname, loglevel=logging.DEBUG):
427427
handler = logging.StreamHandler()
428428
handler.setLevel(loglevel)
429429
formatter = logging.Formatter(
430-
"%(asctime)s.%(msecs).03d [%(levelname)1.1s %(name)s] %(message)s",
430+
"%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
431431
datefmt="%Y-%m-%d %H:%M:%S",
432432
)
433433
handler.setFormatter(formatter)

0 commit comments

Comments
 (0)