Skip to content

Commit 8598f6b

Browse files
authored
Update durabletask protos, set custom status (#31)
Signed-off-by: Fabian Martinez <[email protected]>
1 parent a51257f commit 8598f6b

10 files changed

+1347
-923
lines changed

CHANGELOG.md

+10
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## v0.2.0 (Unreleased)
9+
10+
### New
11+
12+
- Support for orchestration custom status ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)
13+
14+
### Updates
15+
16+
- Updated `durabletask-protobuf` submodule reference to latest
17+
818
## v0.1.1a1
919

1020
### New

durabletask/client.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ def __init__(self, *,
104104
def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
105105
input: Union[TInput, None] = None,
106106
instance_id: Union[str, None] = None,
107-
start_at: Union[datetime, None] = None) -> str:
107+
start_at: Union[datetime, None] = None,
108+
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None) -> str:
108109

109110
name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)
110111

@@ -113,7 +114,9 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
113114
instanceId=instance_id if instance_id else uuid.uuid4().hex,
114115
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
115116
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
116-
version=wrappers_pb2.StringValue(value=""))
117+
version=wrappers_pb2.StringValue(value=""),
118+
orchestrationIdReusePolicy=reuse_id_policy,
119+
)
117120

118121
self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
119122
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)

durabletask/internal/orchestrator_service_pb2.py

+203-182
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

durabletask/internal/orchestrator_service_pb2.pyi

+650-594
Large diffs are not rendered by default.

durabletask/internal/orchestrator_service_pb2_grpc.py

+330-88
Large diffs are not rendered by default.

durabletask/task.py

+6
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ def is_replaying(self) -> bool:
7171
"""
7272
pass
7373

74+
@abstractmethod
75+
def set_custom_status(self, custom_status: str) -> None:
76+
"""Set the custom status.
77+
"""
78+
pass
79+
7480
@abstractmethod
7581
def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
7682
"""Create a Timer Task to fire after at the specified deadline.

durabletask/worker.py

+17-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
TypeVar, Union)
1111

1212
import grpc
13-
from google.protobuf import empty_pb2
13+
from google.protobuf import empty_pb2, wrappers_pb2
1414

1515
import durabletask.internal.helpers as ph
1616
import durabletask.internal.helpers as pbh
@@ -188,8 +188,8 @@ def stop(self):
188188
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub):
189189
try:
190190
executor = _OrchestrationExecutor(self._registry, self._logger)
191-
actions = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
192-
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions)
191+
result = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
192+
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=result.actions, customStatus=wrappers_pb2.StringValue(value=result.custom_status))
193193
except Exception as ex:
194194
self._logger.exception(f"An error occurred while trying to execute instance '{req.instanceId}': {ex}")
195195
failure_details = pbh.new_failure_details(ex)
@@ -242,6 +242,7 @@ def __init__(self, instance_id: str):
242242
self._pending_events: Dict[str, List[task.CompletableTask]] = {}
243243
self._new_input: Optional[Any] = None
244244
self._save_events = False
245+
self._custom_status: str = ""
245246

246247
def run(self, generator: Generator[task.Task, Any, Any]):
247248
self._generator = generator
@@ -355,6 +356,9 @@ def is_replaying(self) -> bool:
355356
def current_utc_datetime(self, value: datetime):
356357
self._current_utc_datetime = value
357358

359+
def set_custom_status(self, custom_status: str) -> None:
360+
self._custom_status = custom_status
361+
358362
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
359363
return self.create_timer_internal(fire_at)
360364

@@ -457,6 +461,14 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
457461
self.set_continued_as_new(new_input, save_events)
458462

459463

464+
class ExecutionResults:
465+
actions: List[pb.OrchestratorAction]
466+
custom_status: str
467+
468+
def __init__(self, actions: List[pb.OrchestratorAction], custom_status: str):
469+
self.actions = actions
470+
self.custom_status = custom_status
471+
460472
class _OrchestrationExecutor:
461473
_generator: Optional[task.Orchestrator] = None
462474

@@ -466,7 +478,7 @@ def __init__(self, registry: _Registry, logger: logging.Logger):
466478
self._is_suspended = False
467479
self._suspended_events: List[pb.HistoryEvent] = []
468480

469-
def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> List[pb.OrchestratorAction]:
481+
def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> ExecutionResults:
470482
if not new_events:
471483
raise task.OrchestrationStateError("The new history event list must have at least one event in it.")
472484

@@ -501,7 +513,7 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e
501513
actions = ctx.get_actions()
502514
if self._logger.level <= logging.DEBUG:
503515
self._logger.debug(f"{instance_id}: Returning {len(actions)} action(s): {_get_action_summary(actions)}")
504-
return actions
516+
return ExecutionResults(actions=actions, custom_status=ctx._custom_status)
505517

506518
def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None:
507519
if self._is_suspended and _is_suspendable(event):

tests/test_orchestration_e2e.py

+23
Original file line numberDiff line numberDiff line change
@@ -441,3 +441,26 @@ def throw_activity(ctx: task.ActivityContext, _):
441441
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
442442
assert state.failure_details.stack_trace is not None
443443
assert throw_activity_counter == 4
444+
445+
def test_custom_status():
446+
447+
def empty_orchestrator(ctx: task.OrchestrationContext, _):
448+
ctx.set_custom_status("foobaz")
449+
450+
# Start a worker, which will connect to the sidecar in a background thread
451+
with worker.TaskHubGrpcWorker() as w:
452+
w.add_orchestrator(empty_orchestrator)
453+
w.start()
454+
455+
c = client.TaskHubGrpcClient()
456+
id = c.schedule_new_orchestration(empty_orchestrator)
457+
state = c.wait_for_orchestration_completion(id, timeout=30)
458+
459+
assert state is not None
460+
assert state.name == task.get_name(empty_orchestrator)
461+
assert state.instance_id == id
462+
assert state.failure_details is None
463+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
464+
assert state.serialized_input is None
465+
assert state.serialized_output is None
466+
assert state.serialized_custom_status is "\"foobaz\""

0 commit comments

Comments
 (0)