Skip to content

Implement missing send_event capability for orchestration context #52

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
16 changes: 16 additions & 0 deletions durabletask/internal/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ def new_event_raised_event(name: str, encoded_input: Optional[str] = None) -> pb
)


def new_event_sent_event(instance_id: str, name: str, encoded_input: Optional[str] = None) -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
timestamp=timestamp_pb2.Timestamp(),
eventSent=pb.EventSentEvent(instanceId=instance_id, name=name, input=get_string_value(encoded_input))
)


def new_suspend_event() -> pb.HistoryEvent:
return pb.HistoryEvent(
eventId=-1,
Expand Down Expand Up @@ -203,6 +211,14 @@ def new_create_sub_orchestration_action(
))


def new_send_event_action(id: int, instance_id: str, event_name: str, encoded_data: Optional[str]) -> pb.OrchestratorAction:
return pb.OrchestratorAction(id=id, sendEvent=pb.SendEventAction(
instance=pb.OrchestrationInstance(instanceId=instance_id),
name=event_name,
data=get_string_value(encoded_data)
))


def is_empty(v: wrappers_pb2.StringValue):
return v is None or v.value == ''

Expand Down
16 changes: 16 additions & 0 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,22 @@ def wait_for_external_event(self, name: str) -> Task:
"""
pass

@abstractmethod
def send_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None) -> None:
"""Send an event to another orchestration instance.

Parameters
----------
instance_id : str
The ID of the orchestration instance to send the event to.
event_name : str
The name of the event to send.
data : Optional[Any]
The optional JSON-serializable data to include with the event.
"""
pass

@abstractmethod
def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
"""Continue the orchestration execution as a new instance.
Expand Down
26 changes: 21 additions & 5 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,13 +629,16 @@ def set_complete(
result: Any,
status: pb.OrchestrationStatus,
is_result_encoded: bool = False,
preserve_actions: bool = False,
):
if self._is_complete:
return

self._is_complete = True
self._completion_status = status
self._pending_actions.clear() # Cancel any pending actions

if not preserve_actions:
self._pending_actions.clear() # Cancel any pending actions

self._result = result
result_json: Optional[str] = None
Expand Down Expand Up @@ -852,6 +855,18 @@ def wait_for_external_event(self, name: str) -> task.Task:
task_list.append(external_event_task)
return external_event_task

def send_event(self, instance_id: str, event_name: str, *,
data: Optional[Any] = None) -> None:
if not instance_id:
raise ValueError("instance_id cannot be None or empty")
if not event_name:
raise ValueError("event_name cannot be None or empty")

id = self.next_sequence_number()
encoded_data = shared.to_json(data) if data is not None else None
action = ph.new_send_event_action(id, instance_id, event_name, encoded_data)
self._pending_actions[id] = action

def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
if self._is_complete:
return
Expand Down Expand Up @@ -973,8 +988,9 @@ def process_event(
# Start the orchestrator's generator function
ctx.run(result)
else:
# This is an orchestrator that doesn't schedule any tasks
ctx.set_complete(result, pb.ORCHESTRATION_STATUS_COMPLETED)
# This is an orchestrator that doesn't use generators (async tasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert this comment

# but it may have scheduled actions like send_event
ctx.set_complete(result, pb.ORCHESTRATION_STATUS_COMPLETED, preserve_actions=True)
elif event.HasField("timerCreated"):
# This history event confirms that the timer was successfully scheduled.
# Remove the timerCreated event from the pending action list so we don't schedule it again.
Expand Down Expand Up @@ -1304,8 +1320,8 @@ def _get_method_name_for_action(action: pb.OrchestratorAction) -> str:
return task.get_name(task.OrchestrationContext.create_timer)
elif action_type == "createSubOrchestration":
return task.get_name(task.OrchestrationContext.call_sub_orchestrator)
# elif action_type == "sendEvent":
# return task.get_name(task.OrchestrationContext.send_event)
elif action_type == "sendEvent":
return task.get_name(task.OrchestrationContext.send_event)
else:
raise NotImplementedError(f"Action type '{action_type}' not supported!")

Expand Down
Loading