Skip to content

Commit a7af633

Browse files
committed
Do renaming, implement lowlevel Outcome sending
As was listed in the many todos, this changes the `.start_soon()` impl to instead (manually) `.send()` into the user defined `@task_scope_manager` an `Outcome` from the spawned task. In this case the task manager wraps that in a user defined (and renamed) `TaskOutcome` and delivers that + a containing `trio.CancelScope` to the `.start_soon()` caller. Here the user defined `TaskOutcome` defines a `.wait_for_result()` method that can be used to await the task's exit and handle it's underlying returned value or raised error; the implementation could be different and subject to the user's own whims. Note that by default, if this was added to `trio`'s core, the `@task_scope_manager` would simply be implemented as either a `None` yielding single-yield-generator but more likely just entirely ignored by the runtime (as in no manual task outcome collecting, generator calling and sending is done at all) by default if the user does not provide the `task_scope_manager` to the nursery at open time.
1 parent 3f1bf47 commit a7af633

File tree

1 file changed

+90
-61
lines changed

1 file changed

+90
-61
lines changed

tractor/trionics/_supervisor.py

+90-61
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,17 @@
3939
Nursery,
4040
)
4141

42-
class MaybeOutcome(Struct):
4342

44-
_ready: Event = trio.Event()
45-
_outcome: Outcome | None = None
46-
_result: Any | None = None
43+
class TaskOutcome(Struct):
44+
'''
45+
The outcome of a scheduled ``trio`` task which includes an interface
46+
for synchronizing to the completion of the task's runtime and access
47+
to the eventual boxed result/value or raised exception.
48+
49+
'''
50+
_exited: Event = trio.Event() # as per `trio.Runner.task_exited()`
51+
_outcome: Outcome | None = None # as per `outcome.Outcome`
52+
_result: Any | None = None # the eventual maybe-returned-value
4753

4854
@property
4955
def result(self) -> Any:
@@ -55,27 +61,35 @@ def result(self) -> Any:
5561
raise RuntimeError(
5662
# f'Task {task.name} is not complete.\n'
5763
f'Outcome is not complete.\n'
58-
'wait on `await MaybeOutcome.unwrap()` first!'
64+
'wait on `await TaskOutcome.wait_for_result()` first!'
5965
)
6066
return self._result
6167

6268
def _set_outcome(
6369
self,
6470
outcome: Outcome,
6571
):
72+
'''
73+
Set the ``Outcome`` for this task.
74+
75+
This method should only ever be called by the task's supervising
76+
nursery implemenation.
77+
78+
'''
6679
self._outcome = outcome
6780
self._result = outcome.unwrap()
68-
self._ready.set()
69-
70-
# TODO: maybe a better name like,
71-
# - .wait_and_unwrap()
72-
# - .wait_unwrap()
73-
# - .aunwrap() ?
74-
async def unwrap(self) -> Any:
75-
if self._ready.is_set():
81+
self._exited.set()
82+
83+
async def wait_for_result(self) -> Any:
84+
'''
85+
Unwind the underlying task's ``Outcome`` by async waiting for
86+
the task to first complete and then unwrap it's result-value.
87+
88+
'''
89+
if self._exited.is_set():
7690
return self._result
7791

78-
await self._ready.wait()
92+
await self._exited.wait()
7993

8094
out = self._outcome
8195
if out is None:
@@ -84,13 +98,6 @@ async def unwrap(self) -> Any:
8498
return self.result
8599

86100

87-
class TaskHandle(Struct):
88-
task: Task
89-
cs: CancelScope
90-
exited: Event | None = None
91-
_outcome: Outcome | None = None
92-
93-
94101
class ScopePerTaskNursery(Struct):
95102
_n: Nursery
96103
_scopes: dict[
@@ -122,73 +129,96 @@ async def start_soon(
122129
cs = CancelScope()
123130
new_task: Task | None = None
124131
to_return: tuple[Any] | None = None
125-
maybe_outcome = MaybeOutcome()
126132

127133
sm = self.scope_manager
128134
if sm is None:
129135
mngr = nullcontext([cs])
130136
else:
131-
mngr = sm(
132-
nursery=n,
133-
scope=cs,
134-
maybe_outcome=maybe_outcome,
135-
)
137+
# NOTE: what do we enforce as a signature for the
138+
# `@task_scope_manager` here?
139+
mngr = sm(nursery=n, scope=cs)
136140

137141
async def _start_wrapped_in_scope(
138142
task_status: TaskStatus[
139143
tuple[CancelScope, Task]
140144
] = trio.TASK_STATUS_IGNORED,
141145

142146
) -> None:
143-
nonlocal maybe_outcome
144-
nonlocal to_return
147+
148+
# TODO: this was working before?!
149+
# nonlocal to_return
145150

146151
with cs:
147152

148153
task = trio.lowlevel.current_task()
149154
self._scopes[cs] = task
150155

151-
# TODO: instead we should probably just use
152-
# `Outcome.send(mngr)` here no and inside a custom
153-
# decorator `@trio.cancel_scope_manager` enforce
154-
# that it's a single yield generator?
155-
with mngr as to_return:
156-
157-
# TODO: relay through whatever the
158-
# started task passes back via `.started()` ?
159-
# seems like that won't work with also returning
160-
# a "task handle"?
161-
task_status.started()
156+
# execute up to the first yield
157+
try:
158+
to_return: tuple[Any] = next(mngr)
159+
except StopIteration:
160+
raise RuntimeError("task manager didn't yield") from None
161+
162+
# TODO: how do we support `.start()` style?
163+
# - relay through whatever the
164+
# started task passes back via `.started()` ?
165+
# seems like that won't work with also returning
166+
# a "task handle"?
167+
# - we were previously binding-out this `to_return` to
168+
# the parent's lexical scope, why isn't that working
169+
# now?
170+
task_status.started(to_return)
171+
172+
# invoke underlying func now that cs is entered.
173+
outcome = await acapture(async_fn, *args)
174+
175+
# execute from the 1st yield to return and expect
176+
# generator-mngr `@task_scope_manager` thinger to
177+
# terminate!
178+
try:
179+
mngr.send(outcome)
180+
181+
# NOTE: this will instead send the underlying
182+
# `.value`? Not sure if that's better or not?
183+
# I would presume it's better to have a handle to
184+
# the `Outcome` entirely? This method sends *into*
185+
# the mngr this `Outcome.value`; seems like kinda
186+
# weird semantics for our purposes?
187+
# outcome.send(mngr)
188+
189+
except StopIteration:
190+
return
191+
else:
192+
raise RuntimeError(f"{mngr} didn't stop!")
193+
194+
to_return = await n.start(_start_wrapped_in_scope)
195+
assert to_return is not None
162196

163-
# invoke underlying func now that cs is entered.
164-
outcome = await acapture(async_fn, *args)
197+
# TODO: use the fancy type-check-time type signature stuff from
198+
# mypy i guess..to like, relay the type of whatever the
199+
# generator yielded through? betcha that'll be un-grokable XD
200+
return to_return
165201

166-
# TODO: instead, mngr.send(outcome) so that we don't
167-
# tie this `.start_soon()` impl to the
168-
# `MaybeOutcome` type? Use `Outcome.send(mngr)`
169-
# right?
170-
maybe_outcome._set_outcome(outcome)
171202

172-
await n.start(_start_wrapped_in_scope)
173-
assert to_return is not None
174203

175-
# TODO: better way to concat the values delivered by the user
176-
# provided `.scope_manager` and the outcome?
177-
return tuple([maybe_outcome] + to_return)
204+
# TODO: you could wrap your output task handle in this?
205+
# class TaskHandle(Struct):
206+
# task: Task
207+
# cs: CancelScope
208+
# outcome: TaskOutcome
178209

179210

180211
# TODO: maybe just make this a generator with a single yield that also
181212
# delivers a value (of some std type) from the yield expression?
182-
# @trio.cancel_scope_manager
183-
@cm
213+
# @trio.task_scope_manager
184214
def add_task_handle_and_crash_handling(
185215
nursery: Nursery,
186216
scope: CancelScope,
187-
maybe_outcome: MaybeOutcome,
188217

189218
) -> Generator[None, list[Any]]:
190219

191220
cs: CancelScope = CancelScope()
221+
task_outcome = TaskOutcome()
192222

193223
# if you need it you can ask trio for the task obj
194224
task: Task = trio.lowlevel.current_task()
@@ -197,12 +227,11 @@ def add_task_handle_and_crash_handling(
197227
try:
198228
# yields back when task is terminated, cancelled, returns?
199229
with cs:
200-
# the yielded values here are what are returned to the
201-
# nursery's `.start_soon()` caller
202230

203-
# TODO: actually make this work so that `MaybeOutcome` isn't
204-
# tied to the impl of `.start_soon()` on our custom nursery!
205-
task_outcome: Outcome = yield [cs]
231+
# the yielded value(s) here are what are returned to the
232+
# nursery's `.start_soon()` caller B)
233+
lowlevel_outcome: Outcome = yield (task_outcome, cs)
234+
task_outcome._set_outcome(lowlevel_outcome)
206235

207236
except Exception as err:
208237
# Adds "crash handling" from `pdbp` by entering
@@ -247,7 +276,7 @@ async def main():
247276

248277
val: str = 'yoyoyo'
249278
val_outcome, cs = await sn.start_soon(sleep_then_return_val, val)
250-
res = await val_outcome.unwrap()
279+
res = await val_outcome.wait_for_result()
251280
assert res == val
252281
print(f'GOT EXPECTED TASK VALUE: {res}')
253282

0 commit comments

Comments
 (0)