Skip to content

Commit

Permalink
Merge pull request #10 from elena-kolevska/fix/terminate-recursive-test
Browse files Browse the repository at this point in the history
Extend the recursive termination test to match the test in durabletask-go
  • Loading branch information
elena-kolevska authored Feb 12, 2025
2 parents a47e15b + 16ca420 commit 61a8492
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 30 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,11 @@ make test-unit

### Running E2E tests

The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following `docker` command:
The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following command:

```sh
docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator
go install github.com/dapr/durabletask-go@main
durabletask-go --port 4001
```

To run the E2E tests, run the following command from the project root:
Expand Down
73 changes: 45 additions & 28 deletions tests/test_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,39 +279,56 @@ def orchestrator(ctx: task.OrchestrationContext, _):
assert state.serialized_output == json.dumps("some reason for termination")

def test_terminate_recursive():
def root(ctx: task.OrchestrationContext, _):
result = yield ctx.call_sub_orchestrator(child)
return result
def child(ctx: task.OrchestrationContext, _):
result = yield ctx.wait_for_external_event("my_event")
return result
thread_lock = threading.Lock()
activity_counter = 0
delay_time = 4 # seconds

# Start a worker, which will connect to the sidecar in a background thread
with worker.TaskHubGrpcWorker() as w:
w.add_orchestrator(root)
w.add_orchestrator(child)
w.start()
def increment(ctx, _):
with thread_lock:
nonlocal activity_counter
activity_counter += 1
raise Exception("Failed: Should not have executed the activity")

task_hub_client = client.TaskHubGrpcClient()
id = task_hub_client.schedule_new_orchestration(root)
state = task_hub_client.wait_for_orchestration_start(id, timeout=30)
assert state is not None
assert state.runtime_status == client.OrchestrationStatus.RUNNING
def orchestrator_child(ctx: task.OrchestrationContext, activity_count: int):
due_time = ctx.current_utc_datetime + timedelta(seconds=delay_time)
yield ctx.create_timer(due_time)
yield ctx.call_activity(increment)

# Terminate root orchestration(recursive set to True by default)
task_hub_client.terminate_orchestration(id, output="some reason for termination")
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.runtime_status == client.OrchestrationStatus.TERMINATED
def parent_orchestrator(ctx: task.OrchestrationContext, count: int):
tasks = []
for _ in range(count):
tasks.append(ctx.call_sub_orchestrator(orchestrator_child, input=count))
yield task.when_all(tasks)

# Verify that child orchestration is also terminated
c = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.runtime_status == client.OrchestrationStatus.TERMINATED
for recurse in [True, False]:
with worker.TaskHubGrpcWorker() as w:
w.add_activity(increment)
w.add_orchestrator(orchestrator_child)
w.add_orchestrator(parent_orchestrator)
w.start()

task_hub_client = client.TaskHubGrpcClient()
instance_id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=5)

time.sleep(2)

output = "Recursive termination = {recurse}"
task_hub_client.terminate_orchestration(instance_id, output=output, recursive=recurse)


metadata = task_hub_client.wait_for_orchestration_completion(instance_id, timeout=30)

assert metadata is not None
assert metadata.runtime_status == client.OrchestrationStatus.TERMINATED
assert metadata.serialized_output == f'"{output}"'

time.sleep(delay_time)

if recurse:
assert activity_counter == 0, "Activity should not have executed with recursive termination"
else:
assert activity_counter == 5, "Activity should have executed without recursive termination"

task_hub_client.purge_orchestration(id)
state = task_hub_client.get_orchestration_state(id)
assert state is None


def test_continue_as_new():
Expand Down

0 comments on commit 61a8492

Please sign in to comment.