Skip to content

Commit 460f5ef

Browse files
authored
Adding recursive option for terminate (#27)
Signed-off-by: Shivam Kumar <[email protected]>
1 parent cbd07a8 commit 460f5ef

File tree

2 files changed

+36
-2
lines changed

2 files changed

+36
-2
lines changed

durabletask/client.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,10 +177,12 @@ def raise_orchestration_event(self, instance_id: str, event_name: str, *,
177177
self._stub.RaiseEvent(req)
178178

179179
def terminate_orchestration(self, instance_id: str, *,
180-
output: Union[Any, None] = None):
180+
output: Union[Any, None] = None,
181+
recursive: bool = True):
181182
req = pb.TerminateRequest(
182183
instanceId=instance_id,
183-
output=wrappers_pb2.StringValue(value=shared.to_json(output)) if output else None)
184+
output=wrappers_pb2.StringValue(value=shared.to_json(output)) if output else None,
185+
recursive=recursive)
184186

185187
self._logger.info(f"Terminating instance '{instance_id}'.")
186188
self._stub.TerminateInstance(req)

tests/test_orchestration_e2e.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,38 @@ def orchestrator(ctx: task.OrchestrationContext, _):
278278
assert state.runtime_status == client.OrchestrationStatus.TERMINATED
279279
assert state.serialized_output == json.dumps("some reason for termination")
280280

281+
def test_terminate_recursive():
282+
def root(ctx: task.OrchestrationContext, _):
283+
result = yield ctx.call_sub_orchestrator(child)
284+
return result
285+
def child(ctx: task.OrchestrationContext, _):
286+
result = yield ctx.wait_for_external_event("my_event")
287+
return result
288+
289+
# Start a worker, which will connect to the sidecar in a background thread
290+
with worker.TaskHubGrpcWorker() as w:
291+
w.add_orchestrator(root)
292+
w.add_orchestrator(child)
293+
w.start()
294+
295+
task_hub_client = client.TaskHubGrpcClient()
296+
id = task_hub_client.schedule_new_orchestration(root)
297+
state = task_hub_client.wait_for_orchestration_start(id, timeout=30)
298+
assert state is not None
299+
assert state.runtime_status == client.OrchestrationStatus.RUNNING
300+
301+
# Terminate root orchestration(recursive set to True by default)
302+
task_hub_client.terminate_orchestration(id, output="some reason for termination")
303+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
304+
assert state is not None
305+
assert state.runtime_status == client.OrchestrationStatus.TERMINATED
306+
307+
# Verify that child orchestration is also terminated
308+
c = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
309+
assert state is not None
310+
assert state.runtime_status == client.OrchestrationStatus.TERMINATED
311+
312+
281313

282314
def test_continue_as_new():
283315
all_results = []

0 commit comments

Comments
 (0)