1111import temporalio .nexus ._operation_handlers
1212from temporalio import exceptions , nexus , workflow
1313from temporalio .api .enums .v1 import EventType
14- from temporalio .api .history .v1 import HistoryEvent
1514from temporalio .client import (
1615 WithStartWorkflowOperation ,
1716 WorkflowExecutionStatus ,
@@ -311,10 +310,11 @@ async def check_behavior_for_abandon(
311310 await caller_wf .signal (CallerWorkflow .release )
312311 await caller_wf .result ()
313312 await assert_event_subsequence (
313+ caller_wf ,
314314 [
315- ( caller_wf , EventType .EVENT_TYPE_WORKFLOW_EXECUTION_STARTED ) ,
316- ( caller_wf , EventType .EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED ) ,
317- ]
315+ EventType .EVENT_TYPE_WORKFLOW_EXECUTION_STARTED ,
316+ EventType .EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED ,
317+ ],
318318 )
319319 assert not await has_event (
320320 caller_wf ,
@@ -347,16 +347,17 @@ async def check_behavior_for_try_cancel(
347347 handler_status = (await handler_wf .describe ()).status
348348 assert handler_status == WorkflowExecutionStatus .CANCELED
349349 await assert_event_subsequence (
350+ caller_wf ,
350351 [
351- ( caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ) ,
352- ( caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCELED ) ,
353- ]
352+ EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ,
353+ EventType .EVENT_TYPE_NEXUS_OPERATION_CANCELED ,
354+ ],
354355 )
355356 op_cancel_requested_event = await get_event_time (
356357 caller_wf ,
357358 EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ,
358359 )
359- assert result .caller_op_future_resolved < op_cancel_requested_event
360+ assert result .caller_op_future_resolved <= op_cancel_requested_event
360361
361362
362363async def check_behavior_for_wait_cancellation_requested (
@@ -382,11 +383,12 @@ async def check_behavior_for_wait_cancellation_requested(
382383 handler_status = (await handler_wf .describe ()).status
383384 assert handler_status == WorkflowExecutionStatus .CANCELED
384385 await assert_event_subsequence (
386+ caller_wf ,
385387 [
386- ( caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ) ,
387- ( caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED ) ,
388- ( caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCELED ) ,
389- ]
388+ EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ,
389+ EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED ,
390+ EventType .EVENT_TYPE_NEXUS_OPERATION_CANCELED ,
391+ ],
390392 )
391393 op_cancel_request_completed = await get_event_time (
392394 caller_wf ,
@@ -396,7 +398,7 @@ async def check_behavior_for_wait_cancellation_requested(
396398 handler_wf ,
397399 EventType .EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED ,
398400 )
399- assert op_cancel_request_completed < result .caller_op_future_resolved < op_canceled
401+ assert op_cancel_request_completed <= result .caller_op_future_resolved < op_canceled
400402
401403
402404async def check_behavior_for_wait_cancellation_completed (
@@ -421,23 +423,22 @@ async def check_behavior_for_wait_cancellation_completed(
421423 result = await caller_wf .result ()
422424
423425 await assert_event_subsequence (
426+ caller_wf ,
424427 [
425- (caller_wf , EventType .EVENT_TYPE_WORKFLOW_EXECUTION_STARTED ),
426- (caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ),
427- (
428- handler_wf ,
429- EventType .EVENT_TYPE_WORKFLOW_EXECUTION_CANCEL_REQUESTED ,
430- ),
431- (handler_wf , EventType .EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED ),
432- (caller_wf , EventType .EVENT_TYPE_NEXUS_OPERATION_CANCELED ),
433- (caller_wf , EventType .EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED ),
434- ]
428+ EventType .EVENT_TYPE_WORKFLOW_EXECUTION_STARTED ,
429+ EventType .EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED ,
430+ EventType .EVENT_TYPE_NEXUS_OPERATION_CANCELED ,
431+ EventType .EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED ,
432+ ],
435433 )
436434 handler_wf_canceled_event = await get_event_time (
437435 handler_wf ,
438436 EventType .EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED ,
439437 )
440- assert handler_wf_canceled_event < result .caller_op_future_resolved
438+ assert handler_wf_canceled_event <= result .caller_op_future_resolved , (
439+ "expected caller op future resolved after handler workflow canceled, but got "
440+ f"{ result .caller_op_future_resolved } before { handler_wf_canceled_event } "
441+ )
441442
442443
443444async def has_event (wf_handle : WorkflowHandle , event_type : EventType .ValueType ):
@@ -459,46 +460,35 @@ async def get_event_time(
459460
460461
461462async def assert_event_subsequence (
462- expected_events : list [tuple [WorkflowHandle , EventType .ValueType ]],
463+ wf_handle : WorkflowHandle ,
464+ expected_events : list [EventType .ValueType ],
463465) -> None :
464466 """
465- Given a sequence of (WorkflowHandle, EventType) pairs , assert that the sorted sequence of events
466- from both workflows contains that subsequence.
467+ Given a workflow handle and a sequence of event types , assert that the workflow's history
468+ contains that subsequence of events in the order specified .
467469 """
468-
469- def _event_time (
470- item : tuple [WorkflowHandle , HistoryEvent ],
471- ) -> datetime :
472- return item [1 ].event_time .ToDatetime ()
473-
474470 all_events = []
475- handles = {h for h , _ in expected_events }
476- for h in handles :
477- async for e in h .fetch_history_events ():
478- all_events .append ((h , e ))
479- _all_events = iter (sorted (all_events , key = _event_time ))
471+ async for e in wf_handle .fetch_history_events ():
472+ all_events .append (e )
473+
474+ _all_events = iter (all_events )
480475 _expected_events = iter (expected_events )
481476
482- previous_expected_handle , previous_expected_event_type_name = None , None
483- for expected_handle , expected_event_type in _expected_events :
477+ previous_expected_event_type_name = None
478+ for expected_event_type in _expected_events :
484479 expected_event_type_name = EventType .Name (expected_event_type ).removeprefix (
485480 "EVENT_TYPE_"
486481 )
487482 has_expected = next (
488- (
489- (h , e )
490- for h , e in _all_events
491- if h == expected_handle and e .event_type == expected_event_type
492- ),
483+ (e for e in _all_events if e .event_type == expected_event_type ),
493484 None ,
494485 )
495486 if not has_expected :
496- if previous_expected_handle is not None :
497- prefix = f"After { previous_expected_event_type_name } in { previous_expected_handle . id } , "
487+ if previous_expected_event_type_name is not None :
488+ prefix = f"After { previous_expected_event_type_name } , "
498489 else :
499490 prefix = ""
500491 pytest .fail (
501- f"{ prefix } expected { expected_event_type_name } in { expected_handle .id } "
492+ f"{ prefix } expected { expected_event_type_name } in workflow { wf_handle .id } "
502493 )
503494 previous_expected_event_type_name = expected_event_type_name
504- previous_expected_handle = expected_handle
0 commit comments