Skip to content

Commit ec21248

Browse files
committed
WIP: cooperative threads
1 parent 8b05478 commit ec21248

File tree

2 files changed

+117
-102
lines changed

2 files changed

+117
-102
lines changed

design/mvp/canonical-abi/definitions.py

Lines changed: 117 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,10 @@ class CanonicalOptions(LiftLowerOptions):
214214

215215
### Runtime State
216216

217-
scheduler = asyncio.Lock()
218-
219217
#### Component Instance State
220218

221219
class ComponentInstance:
220+
store: Store
222221
table: Table
223222
may_leave: bool
224223
backpressure: bool
@@ -455,6 +454,51 @@ def drop(self):
455454
trap_if(len(self.elems) > 0)
456455
trap_if(self.num_waiting > 0)
457456

457+
#### Thread State
458+
459+
class Cancelled(IntEnum):
460+
FALSE = 0
461+
TRUE = 1
462+
463+
class Thread:
464+
store: Store
465+
awaitable: Optional[Awaitable]
466+
on_resume: Optional[asyncio.Future]
467+
on_suspend_or_exit: Optional[asyncio.Future]
468+
469+
def __init__(self, store, callee, on_start, on_resolve):
470+
self.store = store
471+
self.awaitable = None
472+
self.on_resume = asyncio.Future()
473+
self.on_suspend_or_exit = None
474+
async def thread():
475+
assert(await self.on_resume == Cancelled.FALSE)
476+
callee(self, on_start, on_resolve)
477+
self.on_suspend_or_exit.set_result(None)
478+
asyncio.create_task(thread())
479+
store.waiting.append(self)
480+
481+
async def resume(self, cancelled = Cancelled.FALSE):
482+
assert(cancelled or (not self.awaitable or self.awaitable.done()))
483+
self.awaitable = None
484+
self.store.waiting.remove(self)
485+
self.on_resume.set_result(cancelled)
486+
self.on_resume = None
487+
assert(not self.on_suspend_or_exit)
488+
self.on_suspend_or_exit = asyncio.Future()
489+
await self.on_suspend_or_exit
490+
assert(self.awaitable)
491+
self.store.waiting.append(self)
492+
493+
async def suspend(self, awaitable) -> Cancelled:
494+
assert(not self.awaitable)
495+
self.awaitable = awaitable
496+
self.on_suspend_or_exit.set_result(None)
497+
self.on_suspend_or_exit = None
498+
assert(not self.on_resume)
499+
self.on_resume = asyncio.Future()
500+
return await thread.on_resume
501+
458502
#### Task State
459503

460504
class Cancelled(IntEnum):
@@ -477,29 +521,28 @@ class State(Enum):
477521
inst: ComponentInstance
478522
ft: FuncType
479523
supertask: Optional[Task]
524+
thread: Thread
480525
on_resolve: OnResolve
481-
on_block: OnBlock
482526
num_borrows: int
483527
context: ContextLocalStorage
484528

485-
def __init__(self, opts, inst, ft, supertask, on_resolve, on_block):
529+
def __init__(self, opts, inst, ft, supertask, thread, on_resolve):
486530
self.state = Task.State.INITIAL
487531
self.opts = opts
488532
self.inst = inst
489533
self.ft = ft
490534
self.supertask = supertask
535+
self.thread = thread
491536
self.on_resolve = on_resolve
492-
self.on_block = on_block
493537
self.num_borrows = 0
494538
self.context = ContextLocalStorage()
495539

496540
async def enter(self):
497-
assert(scheduler.locked())
498541
self.trap_if_on_the_stack(self.inst)
499542
if not self.may_enter(self) or self.inst.pending_tasks:
500543
f = asyncio.Future()
501544
self.inst.pending_tasks.append((self, f))
502-
if await self.on_block(f) == Cancelled.TRUE:
545+
if await self.thread.suspend(f) == Cancelled.TRUE:
503546
[i] = [i for i,(t,_) in enumerate(self.inst.pending_tasks) if t == self]
504547
self.inst.pending_tasks.pop(i)
505548
self.on_resolve(None)
@@ -537,35 +580,22 @@ async def sync_wait(self, awaitable) -> None:
537580
return
538581
assert(self.inst.unblocked.is_set())
539582
self.inst.unblocked.clear()
540-
if await self.on_block(awaitable) == Cancelled.TRUE:
583+
if await self.thread.suspend(awaitable) == Cancelled.TRUE:
541584
assert(self.state == Task.State.INITIAL)
542585
self.state = Task.State.PENDING_CANCEL
543-
assert(await self.on_block(awaitable) == Cancelled.FALSE)
586+
assert(await self.thread.suspend(awaitable) == Cancelled.FALSE)
544587
self.inst.unblocked.set()
545588

546589
async def async_wait(self, awaitable) -> Cancelled:
547590
self.maybe_start_pending_task()
548591
awaitable = asyncio.ensure_future(awaitable)
549592
if awaitable.done() and not DETERMINISTIC_PROFILE and random.randint(0,1):
550593
return
551-
cancelled = await self.on_block(awaitable)
594+
cancelled = await self.thread.suspend(awaitable)
552595
while not self.inst.unblocked.is_set():
553-
cancelled |= await self.on_block(self.inst.unblocked.wait())
596+
cancelled |= await self.thread.suspend(self.inst.unblocked.wait())
554597
return cancelled
555598

556-
async def call_sync(self, callee, on_start, on_return):
557-
async def sync_on_block(awaitable):
558-
if await self.on_block(awaitable) == Cancelled.TRUE:
559-
assert(self.state == Task.State.INITIAL)
560-
self.state = Task.State.PENDING_CANCEL
561-
assert(await self.on_block(awaitable) == Cancelled.FALSE)
562-
return False
563-
564-
assert(self.inst.unblocked.is_set())
565-
self.inst.unblocked.clear()
566-
await callee(self, on_start, on_return, sync_on_block)
567-
self.inst.unblocked.set()
568-
569599
async def wait_for_event(self, waitable_set, sync) -> EventTuple:
570600
if self.state == Task.State.PENDING_CANCEL:
571601
self.state = Task.State.CANCEL_DELIVERED
@@ -625,7 +655,6 @@ def cancel(self):
625655
self.state = Task.State.RESOLVED
626656

627657
def exit(self):
628-
assert(scheduler.locked())
629658
trap_if(self.state != Task.State.RESOLVED)
630659
assert(self.num_borrows == 0)
631660
if self.opts.sync:
@@ -645,34 +674,46 @@ class State(IntEnum):
645674

646675
state: State
647676
task: Task
677+
callee_thread: Optional[Thread]
648678
lenders: Optional[list[ResourceHandle]]
649-
request_cancel_begin: asyncio.Future
650-
request_cancel_end: asyncio.Future
679+
cancelled: bool
651680

652681
def __init__(self, task):
653682
Waitable.__init__(self)
654683
self.state = Subtask.State.STARTING
655684
self.task = task
685+
self.callee_thread = None
656686
self.lenders = []
657-
self.request_cancel_begin = asyncio.Future()
658-
self.request_cancel_end = asyncio.Future()
687+
self.cancelled = False
659688

660-
async def call_sync(self, callee, on_start, on_resolve):
661-
def sync_on_start():
689+
async def call(self, callee, on_start, on_resolve):
690+
def update_on_start():
662691
assert(self.state == Subtask.State.STARTING)
663692
self.state = Subtask.State.STARTED
664693
return on_start()
665694

666-
def sync_on_resolve(result):
667-
assert(result is not None)
695+
def update_on_resolve(result):
668696
assert(self.state == Subtask.State.STARTED)
669-
self.state = Subtask.State.RETURNED
697+
if result is None:
698+
assert(self.cancelled)
699+
if self.state == Subtask.State.STARTING:
700+
self.state = Subtask.State.CANCELLED_BEFORE_STARTED
701+
else:
702+
assert(self.state == Subtask.State.STARTED)
703+
self.state = Subtask.State.CANCELLED_BEFORE_RETURNED
704+
else:
705+
assert(self.state == Subtask.State.STARTED)
706+
self.state = Subtask.State.RETURNED
670707
on_resolve(result)
671708

672-
await Task.call_sync(self.task, callee, sync_on_start, sync_on_resolve)
709+
assert(not self.callee_thread)
710+
self.callee_thread = Thread(self.supertask.inst.store, callee, update_on_start, update_on_resolve)
711+
await self.callee_thread.resume()
673712

674-
def cancelled(self):
675-
return self.request_cancel_begin.done()
713+
async def wait_until_resolved(self):
714+
while not self.resolved():
715+
await self.supertask.sync_wait(self.callee_thread.awaitable)
716+
await self.callee_thread.resume()
676717

677718
def resolved(self):
678719
match self.state:
@@ -684,57 +725,13 @@ def resolved(self):
684725
Subtask.State.CANCELLED_BEFORE_RETURNED):
685726
return True
686727

687-
async def request_cancel(self):
688-
assert(not self.cancelled() and not self.resolved())
689-
self.request_cancel_begin.set_result(None)
690-
await self.request_cancel_end
691-
692-
async def call_async(self, callee, on_start, on_resolve):
693-
async def do_call():
694-
await callee(self.task, async_on_start, async_on_resolve, async_on_block)
695-
relinquish_control()
696-
697-
def async_on_start():
698-
assert(self.state == Subtask.State.STARTING)
699-
self.state = Subtask.State.STARTED
700-
return on_start()
701-
702-
def async_on_resolve(result):
703-
if result is None:
704-
if self.state == Subtask.State.STARTING:
705-
self.state = Subtask.State.CANCELLED_BEFORE_STARTED
706-
else:
707-
assert(self.state == Subtask.State.STARTED)
708-
self.state = Subtask.State.CANCELLED_BEFORE_RETURNED
709-
else:
710-
assert(self.state == Subtask.State.STARTED)
711-
self.state = Subtask.State.RETURNED
712-
on_resolve(result)
713-
714-
async def async_on_block(awaitable):
715-
relinquish_control()
716-
if not self.request_cancel_end.done():
717-
await asyncio.wait([awaitable, self.request_cancel_begin],
718-
return_when = asyncio.FIRST_COMPLETED)
719-
if self.request_cancel_begin.done():
720-
return True
721-
else:
722-
await awaitable
723-
assert(awaitable.done())
724-
await scheduler.acquire()
725-
return False
726-
727-
def relinquish_control():
728-
if not ret.done():
729-
ret.set_result(None)
730-
elif self.request_cancel_begin.done() and not self.request_cancel_end.done():
731-
self.request_cancel_end.set_result(None)
732-
else:
733-
scheduler.release()
728+
def cancellation_requested(self):
729+
return self.cancelled
734730

735-
ret = asyncio.Future()
736-
asyncio.create_task(do_call())
737-
await ret
731+
async def request_cancel(self):
732+
assert(not self.cancellation_requested() and not self.resolved())
733+
self.cancelled = True
734+
await self.callee_thread.resume(Cancelled.TRUE)
738735

739736
def add_lender(self, lending_handle):
740737
assert(not self.resolve_delivered() and not self.resolved())
@@ -976,6 +973,27 @@ def drop(self):
976973
trap_if(not self.done)
977974
FutureEnd.drop(self)
978975

976+
### Store State
977+
978+
class Store:
979+
loop: asyncio.AbstractEventLoop
980+
waiting: list[Thread]
981+
982+
def __init__(self):
983+
self.loop = asyncio.new_event_loop()
984+
self.waiting = []
985+
986+
def call_export(self, callee, on_start, on_resolve):
987+
self.run_until_complete(Thread(self, callee, on_start, on_resolve).resume())
988+
989+
def tick(self, i):
990+
if not DETERMINISTIC_PROFILE:
991+
random.shuffle(self.waiting)
992+
for thread in self.waiting:
993+
if thread.awaitable.done():
994+
self.run_until_complete(thread.resume())
995+
return
996+
979997
### Despecialization
980998

981999
def despecialize(t):
@@ -1931,8 +1949,8 @@ def lower_flat_values(cx, max_flat, vs, ts, out_param = None):
19311949

19321950
### `canon lift`
19331951

1934-
async def canon_lift(opts, inst, ft, callee, caller, on_start, on_resolve, on_block):
1935-
task = Task(opts, inst, ft, caller, on_resolve, on_block)
1952+
async def canon_lift(opts, inst, ft, callee, caller, thread, on_start, on_resolve):
1953+
task = Task(opts, inst, ft, caller, thread, on_resolve)
19361954
if not await task.enter():
19371955
return
19381956

@@ -2021,7 +2039,9 @@ def on_resolve(result):
20212039
nonlocal flat_results
20222040
flat_results = lower_flat_values(cx, MAX_FLAT_RESULTS, result, ft.result_type(), flat_args)
20232041

2024-
await subtask.call_sync(callee, on_start, on_resolve)
2042+
await subtask.call(callee, on_start, on_resolve)
2043+
await subtask.wait_until_resolved()
2044+
20252045
assert(types_match_values(flat_ft.results, flat_results))
20262046
subtask.deliver_resolve()
20272047
return flat_results
@@ -2049,6 +2069,11 @@ def subtask_event():
20492069
subtask.deliver_resolve()
20502070
return [Subtask.State.RETURNED]
20512071

2072+
await subtask.call(callee, on_start, on_resolve)
2073+
if subtask.resolved():
2074+
subtask.deliver_resolve()
2075+
return [Subtask.State.RETURNED]
2076+
20522077
subtaski = task.inst.table.add(subtask)
20532078
assert(0 < subtaski <= Table.MAX_LENGTH < 2**28)
20542079
assert(0 <= subtask.state < 2**4)
@@ -2213,16 +2238,13 @@ async def canon_subtask_cancel(sync, task, i):
22132238
trap_if(not task.inst.may_leave)
22142239
subtask = task.inst.table.get(i)
22152240
trap_if(not isinstance(subtask, Subtask))
2216-
trap_if(subtask.resolve_delivered() or subtask.cancelled())
2241+
trap_if(subtask.resolve_delivered() or subtask.cancellation_requested())
22172242
if subtask.resolved():
22182243
assert(subtask.has_pending_event())
22192244
else:
22202245
await subtask.request_cancel()
22212246
if sync:
2222-
while not subtask.resolved():
2223-
if subtask.has_pending_event():
2224-
_ = subtask.get_event()
2225-
await task.sync_wait(subtask.wait_for_pending_event())
2247+
await subtask.wait_until_resolved()
22262248
else:
22272249
if not subtask.resolved():
22282250
return [BLOCKED]

design/mvp/canonical-abi/run_tests.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,6 @@
22
from definitions import *
33

44
definitions.DETERMINISTIC_PROFILE = True
5-
asyncio.run(scheduler.acquire())
6-
7-
async def host_on_block(a: Awaitable):
8-
scheduler.release()
9-
await a
10-
await scheduler.acquire()
11-
return False
125

136
def equal_modulo_string_encoding(s, t):
147
if s is None and t is None:

0 commit comments

Comments
 (0)