Skip to content

Commit 68a99ed

Browse files
committed
CABI: rebase async onto Thread abstraction (no change)
1 parent 1e45dec commit 68a99ed

File tree

2 files changed

+286
-254
lines changed

2 files changed

+286
-254
lines changed

design/mvp/canonical-abi/definitions.py

Lines changed: 106 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,10 @@ class CanonicalOptions(LiftLowerOptions):
214214

215215
### Runtime State
216216

217-
scheduler = asyncio.Lock()
218-
219217
#### Component Instance State
220218

221219
class ComponentInstance:
220+
store: Store
222221
table: Table
223222
may_leave: bool
224223
backpressure: bool
@@ -227,7 +226,8 @@ class ComponentInstance:
227226
pending_tasks: list[tuple[Task, asyncio.Future]]
228227
starting_pending_task: bool
229228

230-
def __init__(self):
229+
def __init__(self, store):
230+
self.store = store
231231
self.table = Table()
232232
self.may_leave = True
233233
self.backpressure = False
@@ -461,10 +461,6 @@ class Cancelled(IntEnum):
461461
FALSE = 0
462462
TRUE = 1
463463

464-
OnStart = Callable[[], list[any]]
465-
OnResolve = Callable[[Optional[list[any]]], None]
466-
OnBlock = Callable[[Awaitable], Awaitable[Cancelled]]
467-
468464
class Task:
469465
class State(Enum):
470466
INITIAL = 1
@@ -477,29 +473,28 @@ class State(Enum):
477473
inst: ComponentInstance
478474
ft: FuncType
479475
supertask: Optional[Task]
480-
on_resolve: OnResolve
481-
on_block: OnBlock
476+
thread: Thread
477+
on_resolve: Callable[[Optional[list[any]]], None]
482478
num_borrows: int
483479
context: ContextLocalStorage
484480

485-
def __init__(self, opts, inst, ft, supertask, on_resolve, on_block):
481+
def __init__(self, opts, inst, ft, supertask, thread, on_resolve):
486482
self.state = Task.State.INITIAL
487483
self.opts = opts
488484
self.inst = inst
489485
self.ft = ft
490486
self.supertask = supertask
487+
self.thread = thread
491488
self.on_resolve = on_resolve
492-
self.on_block = on_block
493489
self.num_borrows = 0
494490
self.context = ContextLocalStorage()
495491

496492
async def enter(self):
497-
assert(scheduler.locked())
498493
self.trap_if_on_the_stack(self.inst)
499494
if not self.may_enter(self) or self.inst.pending_tasks:
500495
f = asyncio.Future()
501496
self.inst.pending_tasks.append((self, f))
502-
if await self.on_block(f) == Cancelled.TRUE:
497+
if await self.thread.suspend(f) == Cancelled.TRUE:
503498
[i] = [i for i,(t,_) in enumerate(self.inst.pending_tasks) if t == self]
504499
self.inst.pending_tasks.pop(i)
505500
self.on_resolve(None)
@@ -537,37 +532,24 @@ async def sync_wait(self, awaitable) -> None:
537532
return
538533
assert(self.inst.unblocked.is_set())
539534
self.inst.unblocked.clear()
540-
if await self.on_block(awaitable) == Cancelled.TRUE:
535+
if await self.thread.suspend(awaitable) == Cancelled.TRUE:
541536
assert(self.state == Task.State.INITIAL)
542537
self.state = Task.State.PENDING_CANCEL
543-
assert(await self.on_block(awaitable) == Cancelled.FALSE)
538+
assert(await self.thread.suspend(awaitable) == Cancelled.FALSE)
544539
self.inst.unblocked.set()
545540

546541
async def async_wait(self, awaitable) -> Cancelled:
547542
self.maybe_start_pending_task()
548543
awaitable = asyncio.ensure_future(awaitable)
549544
if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0,1):
550545
return Cancelled.FALSE
551-
cancelled = await self.on_block(awaitable)
546+
cancelled = await self.thread.suspend(awaitable)
552547
while not self.inst.unblocked.is_set():
553-
cancelled |= await self.on_block(self.inst.unblocked.wait())
548+
cancelled |= await self.thread.suspend(self.inst.unblocked.wait())
554549
if cancelled:
555550
awaitable.cancel()
556551
return cancelled
557552

558-
async def call_sync(self, callee, on_start, on_return):
559-
async def sync_on_block(awaitable):
560-
if await self.on_block(awaitable) == Cancelled.TRUE:
561-
assert(self.state == Task.State.INITIAL)
562-
self.state = Task.State.PENDING_CANCEL
563-
assert(await self.on_block(awaitable) == Cancelled.FALSE)
564-
return Cancelled.FALSE
565-
566-
assert(self.inst.unblocked.is_set())
567-
self.inst.unblocked.clear()
568-
await callee(self, on_start, on_return, sync_on_block)
569-
self.inst.unblocked.set()
570-
571553
async def wait_for_event(self, wset, sync) -> EventTuple:
572554
if self.state == Task.State.PENDING_CANCEL:
573555
self.state = Task.State.CANCEL_DELIVERED
@@ -626,7 +608,6 @@ def cancel(self):
626608
self.state = Task.State.RESOLVED
627609

628610
def exit(self):
629-
assert(scheduler.locked())
630611
trap_if(self.state != Task.State.RESOLVED)
631612
assert(self.num_borrows == 0)
632613
if self.opts.sync:
@@ -646,17 +627,17 @@ class State(IntEnum):
646627

647628
state: State
648629
task: Task
630+
thread: Optional[Thread]
649631
lenders: Optional[list[ResourceHandle]]
650-
request_cancel_begin: asyncio.Future
651-
request_cancel_end: asyncio.Future
632+
cancellation_requested: bool
652633

653634
def __init__(self, task):
654635
Waitable.__init__(self)
655636
self.state = Subtask.State.STARTING
656637
self.task = task
638+
self.thread = None
657639
self.lenders = []
658-
self.request_cancel_begin = asyncio.Future()
659-
self.request_cancel_end = asyncio.Future()
640+
self.cancellation_requested = False
660641

661642
def resolved(self):
662643
match self.state:
@@ -668,44 +649,6 @@ def resolved(self):
668649
Subtask.State.CANCELLED_BEFORE_RETURNED):
669650
return True
670651

671-
async def request_cancel(self):
672-
assert(not self.cancellation_requested() and not self.resolved())
673-
self.request_cancel_begin.set_result(None)
674-
await self.request_cancel_end
675-
676-
def cancellation_requested(self):
677-
return self.request_cancel_begin.done()
678-
679-
async def call_async(self, callee, on_start, on_resolve):
680-
async def do_call():
681-
await callee(self.task, on_start, on_resolve, async_on_block)
682-
relinquish_control()
683-
684-
async def async_on_block(awaitable):
685-
relinquish_control()
686-
if not self.request_cancel_end.done():
687-
await asyncio.wait([awaitable, self.request_cancel_begin],
688-
return_when = asyncio.FIRST_COMPLETED)
689-
if self.request_cancel_begin.done():
690-
return Cancelled.TRUE
691-
else:
692-
await awaitable
693-
assert(awaitable.done())
694-
await scheduler.acquire()
695-
return Cancelled.FALSE
696-
697-
def relinquish_control():
698-
if not ret.done():
699-
ret.set_result(None)
700-
elif self.request_cancel_begin.done() and not self.request_cancel_end.done():
701-
self.request_cancel_end.set_result(None)
702-
else:
703-
scheduler.release()
704-
705-
ret = asyncio.Future()
706-
asyncio.create_task(do_call())
707-
await ret
708-
709652
def add_lender(self, lending_handle):
710653
assert(not self.resolve_delivered() and not self.resolved())
711654
lending_handle.num_lends += 1
@@ -946,6 +889,82 @@ def drop(self):
946889
trap_if(not self.done)
947890
FutureEnd.drop(self)
948891

892+
#### Thread State
893+
894+
class Thread:
895+
store: Store
896+
awaitable: Optional[Awaitable]
897+
on_resume: Optional[asyncio.Future]
898+
on_suspend_or_exit: Optional[asyncio.Future]
899+
900+
def __init__(self, store, lifted_func, caller, on_start, on_resolve):
901+
self.store = store
902+
self.awaitable = asyncio.Future()
903+
self.on_resume = asyncio.Future()
904+
self.on_suspend_or_exit = None
905+
async def coro():
906+
assert(await self.on_resume == Cancelled.FALSE)
907+
self.on_resume = None
908+
await lifted_func(caller, self, on_start, on_resolve)
909+
self.on_suspend_or_exit.set_result(None)
910+
asyncio.create_task(coro())
911+
store.waiting.append(self)
912+
self.awaitable.set_result(None)
913+
914+
async def resume(self, cancelled = Cancelled.FALSE):
915+
assert(cancelled or self.awaitable.done())
916+
self.awaitable = None
917+
self.store.waiting.remove(self)
918+
self.on_resume.set_result(cancelled)
919+
assert(not self.on_suspend_or_exit)
920+
self.on_suspend_or_exit = asyncio.Future()
921+
await self.on_suspend_or_exit
922+
self.on_suspend_or_exit = None
923+
if self.awaitable:
924+
self.store.waiting.append(self)
925+
926+
async def suspend(self, awaitable) -> Cancelled:
927+
assert(not self.awaitable)
928+
self.awaitable = awaitable
929+
self.on_suspend_or_exit.set_result(None)
930+
self.on_suspend_or_exit = None
931+
assert(not self.on_resume)
932+
self.on_resume = asyncio.Future()
933+
result = await self.on_resume
934+
self.on_resume = None
935+
return result
936+
937+
#### Store State / Embedding API
938+
939+
class Store:
940+
loop: asyncio.AbstractEventLoop
941+
waiting: list[Thread]
942+
943+
def __init__(self):
944+
self.loop = asyncio.new_event_loop()
945+
self.waiting = []
946+
947+
ExportCall = Thread
948+
949+
def start_export_call(self, lifted_func, on_start, on_resolve) -> ExportCall:
950+
async def coro():
951+
caller = None
952+
thread = Thread(self, lifted_func, caller, on_start, on_resolve)
953+
await thread.resume()
954+
return thread
955+
return self.loop.run_until_complete(coro())
956+
957+
def tick(self):
958+
if not DETERMINISTIC_PROFILE:
959+
random.shuffle(self.waiting)
960+
for thread in self.waiting:
961+
if thread.awaitable.done():
962+
self.loop.run_until_complete(thread.resume())
963+
return
964+
965+
def export_call_finished(self, export_call: ExportCall):
966+
return export_call.awaitable is None
967+
949968
### Despecialization
950969

951970
def despecialize(t):
@@ -1901,8 +1920,8 @@ def lower_flat_values(cx, max_flat, vs, ts, out_param = None):
19011920

19021921
### `canon lift`
19031922

1904-
async def canon_lift(opts, inst, ft, callee, caller, on_start, on_resolve, on_block):
1905-
task = Task(opts, inst, ft, caller, on_resolve, on_block)
1923+
async def canon_lift(opts, inst, ft, callee, caller, thread, on_start, on_resolve):
1924+
task = Task(opts, inst, ft, caller, thread, on_resolve)
19061925
if not await task.enter():
19071926
return
19081927

@@ -2001,7 +2020,7 @@ def on_start():
20012020
def on_resolve(result):
20022021
on_progress()
20032022
if result is None:
2004-
assert(subtask.cancellation_requested())
2023+
assert(subtask.cancellation_requested)
20052024
if subtask.state == Subtask.State.STARTING:
20062025
subtask.state = Subtask.State.CANCELLED_BEFORE_STARTED
20072026
else:
@@ -2013,13 +2032,19 @@ def on_resolve(result):
20132032
nonlocal flat_results
20142033
flat_results = lower_flat_values(cx, max_flat_results, result, ft.result_type(), flat_args)
20152034

2035+
subtask.thread = Thread(task.inst.store, callee, task, on_start, on_resolve)
2036+
await subtask.thread.resume()
2037+
20162038
if opts.sync:
2017-
await task.call_sync(callee, on_start, on_resolve)
2039+
if not subtask.resolved():
2040+
done = asyncio.Event()
2041+
def on_progress():
2042+
done.set()
2043+
await task.sync_wait(done.wait())
20182044
assert(types_match_values(flat_ft.results, flat_results))
20192045
subtask.deliver_resolve()
20202046
return flat_results
20212047
else:
2022-
await subtask.call_async(callee, on_start, on_resolve)
20232048
if subtask.resolved():
20242049
assert(flat_results == [])
20252050
subtask.deliver_resolve()
@@ -2196,11 +2221,12 @@ async def canon_subtask_cancel(sync, task, i):
21962221
subtask = task.inst.table.get(i)
21972222
trap_if(not isinstance(subtask, Subtask))
21982223
trap_if(subtask.resolve_delivered())
2199-
trap_if(subtask.cancellation_requested())
2224+
trap_if(subtask.cancellation_requested)
22002225
if subtask.resolved():
22012226
assert(subtask.has_pending_event())
22022227
else:
2203-
await subtask.request_cancel()
2228+
subtask.cancellation_requested = True
2229+
await subtask.thread.resume(Cancelled.TRUE)
22042230
if sync:
22052231
while not subtask.resolved():
22062232
if subtask.has_pending_event():

0 commit comments

Comments
 (0)