diff --git a/README.md b/README.md index b11fc29..4a45d9b 100644 --- a/README.md +++ b/README.md @@ -178,10 +178,11 @@ make test-unit ### Running E2E tests -The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following `docker` command: +The E2E (end-to-end) tests require a sidecar process to be running. You can use the Dapr sidecar for this or run a Durable Task test sidecar using the following command: ```sh -docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' --rm cgillum/durabletask-sidecar:latest start --backend Emulator +go install github.com/dapr/durabletask-go@main +durabletask-go --port 4001 ``` To run the E2E tests, run the following command from the project root: diff --git a/tests/test_orchestration_e2e.py b/tests/test_orchestration_e2e.py index d3d7f0b..bcb3d3c 100644 --- a/tests/test_orchestration_e2e.py +++ b/tests/test_orchestration_e2e.py @@ -279,39 +279,56 @@ def orchestrator(ctx: task.OrchestrationContext, _): assert state.serialized_output == json.dumps("some reason for termination") def test_terminate_recursive(): - def root(ctx: task.OrchestrationContext, _): - result = yield ctx.call_sub_orchestrator(child) - return result - def child(ctx: task.OrchestrationContext, _): - result = yield ctx.wait_for_external_event("my_event") - return result + thread_lock = threading.Lock() + activity_counter = 0 + delay_time = 4 # seconds - # Start a worker, which will connect to the sidecar in a background thread - with worker.TaskHubGrpcWorker() as w: - w.add_orchestrator(root) - w.add_orchestrator(child) - w.start() + def increment(ctx, _): + with thread_lock: + nonlocal activity_counter + activity_counter += 1 + raise Exception("Failed: Should not have executed the activity") - task_hub_client = client.TaskHubGrpcClient() - id = task_hub_client.schedule_new_orchestration(root) - state = task_hub_client.wait_for_orchestration_start(id, timeout=30) - assert state is not None - assert state.runtime_status == client.OrchestrationStatus.RUNNING + def orchestrator_child(ctx: task.OrchestrationContext, activity_count: int): + due_time = ctx.current_utc_datetime + timedelta(seconds=delay_time) + yield ctx.create_timer(due_time) + yield ctx.call_activity(increment) - # Terminate root orchestration(recursive set to True by default) - task_hub_client.terminate_orchestration(id, output="some reason for termination") - state = task_hub_client.wait_for_orchestration_completion(id, timeout=30) - assert state is not None - assert state.runtime_status == client.OrchestrationStatus.TERMINATED + def parent_orchestrator(ctx: task.OrchestrationContext, count: int): + tasks = [] + for _ in range(count): + tasks.append(ctx.call_sub_orchestrator(orchestrator_child, input=count)) + yield task.when_all(tasks) - # Verify that child orchestration is also terminated - c = task_hub_client.wait_for_orchestration_completion(id, timeout=30) - assert state is not None - assert state.runtime_status == client.OrchestrationStatus.TERMINATED + for recurse in [True, False]: + with worker.TaskHubGrpcWorker() as w: + w.add_activity(increment) + w.add_orchestrator(orchestrator_child) + w.add_orchestrator(parent_orchestrator) + w.start() + + task_hub_client = client.TaskHubGrpcClient() + instance_id = task_hub_client.schedule_new_orchestration(parent_orchestrator, input=5) + + time.sleep(2) + + output = "Recursive termination = {recurse}" + task_hub_client.terminate_orchestration(instance_id, output=output, recursive=recurse) + + + metadata = task_hub_client.wait_for_orchestration_completion(instance_id, timeout=30) + + assert metadata is not None + assert metadata.runtime_status == client.OrchestrationStatus.TERMINATED + assert metadata.serialized_output == f'"{output}"' + + time.sleep(delay_time) + + if recurse: + assert activity_counter == 0, "Activity should not have executed with recursive termination" + else: + assert activity_counter == 5, "Activity should have executed without recursive termination" - task_hub_client.purge_orchestration(id) - state = task_hub_client.get_orchestration_state(id) - assert state is None def test_continue_as_new():