diff --git a/.github/workflows/build-validation.yml b/.github/workflows/build-validation.yml index 9262f86d..559cacd9 100644 --- a/.github/workflows/build-validation.yml +++ b/.github/workflows/build-validation.yml @@ -57,14 +57,23 @@ jobs: - name: Integration Tests with Gradle run: ./gradlew integrationTest + - name: Stop Durable Task Sidecar + run: docker stop durabletask-sidecar + + - name: Initialize Durable Task GO + run: docker run --name durabletask-go -p 4001:4001 -d kaibocai/durabletask-go:latest + + - name: Integration GO Tests with Gradle + run: ./gradlew integrationGoTest + - name: Archive test report - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: Integration test report path: client/build/reports/tests/integrationTest - name: Upload JAR output - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: Package path: client/build/libs @@ -105,7 +114,7 @@ jobs: arguments: endToEndTest - name: Archive test report - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: Integration test report path: client/build/reports/tests/endToEndTest @@ -146,7 +155,7 @@ jobs: arguments: sampleTest - name: Archive test report - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4 with: name: Integration test report path: client/build/reports/tests/endToEndTest \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index f7c68d11..df8cf466 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,24 +1,45 @@ -## placeholder +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## v1.6.0 (unreleased) + +### New + +* Support orchestration id reuse policy ([#188](https://github.com/microsoft/durabletask-java/pull/188)) + +### Updates + * Fix infinite loop when use continueasnew after wait external event ([#183](https://github.com/microsoft/durabletask-java/pull/183)) * Fix the issue "Deserialize Exception got swallowed when use anyOf with external event." ([#185](https://github.com/microsoft/durabletask-java/pull/185)) ## v1.5.0 + +### Updates + * Fix exception type issue when using `RetriableTask` in fan in/out pattern ([#174](https://github.com/microsoft/durabletask-java/pull/174)) * Add implementation to generate name-based deterministic UUID ([#176](https://github.com/microsoft/durabletask-java/pull/176)) * Update dependencies to resolve CVEs ([#177](https://github.com/microsoft/durabletask-java/pull/177)) - ## v1.4.0 ### Updates + * Refactor `createTimer` to be non-blocking ([#161](https://github.com/microsoft/durabletask-java/pull/161)) ## v1.3.0 + +### Updates + * Refactor `RetriableTask` and add new `CompoundTask`, fixing Fan-out/Fan-in stuck when using `RetriableTask` ([#157](https://github.com/microsoft/durabletask-java/pull/157)) ## v1.2.0 ### Updates + * Add `thenAccept` and `thenApply` to `Task` interface ([#148](https://github.com/microsoft/durabletask-java/pull/148)) * Support Suspend and Resume Client APIs ([#151](https://github.com/microsoft/durabletask-java/pull/151)) * Support restartInstance and pass restartPostUri in HttpManagementPayload ([#108](https://github.com/microsoft/durabletask-java/issues/108)) @@ -27,11 +48,13 @@ ## v1.1.1 ### Updates + * Fix exception occurring when invoking the `TaskOrchestrationContext#continueAsNew` method ([#118](https://github.com/microsoft/durabletask-java/issues/118)) ## v1.1.0 ### Updates + * Fix the potential NPE issue of `DurableTaskClient#terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104)) * Add waitForCompletionOrCreateCheckStatusResponse client API ([#115](https://github.com/microsoft/durabletask-java/pull/115)) * Support long timers by breaking up into smaller timers ([#114](https://github.com/microsoft/durabletask-java/issues/114)) diff --git a/client/build.gradle b/client/build.gradle index 3588b3ac..d48fcf3d 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -80,6 +80,7 @@ test { // Skip tests tagged as "integration" since those are slower // and require external dependencies. excludeTags "integration" + excludeTags "integration-go" } } @@ -96,6 +97,16 @@ task integrationTest(type: Test) { testLogging.showStandardStreams = true } +// integration-go runs against sidecar durabletask-go +task integrationGoTest(type: Test) { + useJUnitPlatform { + includeTags 'integration-go' + } + dependsOn build + shouldRunAfter test + testLogging.showStandardStreams = true +} + publishing { repositories { maven { diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 95bb984a..ebb0d294 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -4,6 +4,7 @@ import com.google.protobuf.StringValue; import com.google.protobuf.Timestamp; +import com.microsoft.durabletask.client.InstanceIdReuseAction; import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; @@ -87,6 +88,17 @@ public String scheduleNewOrchestrationInstance( CreateInstanceRequest.Builder builder = CreateInstanceRequest.newBuilder(); builder.setName(orchestratorName); + // build orchestration ID reuse policy + OrchestrationIdReusePolicy.Builder reuseIdPolicyBuilder = OrchestrationIdReusePolicy.newBuilder(); + // if options.getInstanceIdReuseAction() is null, default value will be ERROR + if (options.getInstanceIdReuseAction() != null) { + reuseIdPolicyBuilder.setAction(InstanceIdReuseAction.toProtobuf(options.getInstanceIdReuseAction())); + } + for (OrchestrationRuntimeStatus targetStatus : options.getTargetStatuses()) { + reuseIdPolicyBuilder.addOperationStatus(OrchestrationRuntimeStatus.toProtobuf(targetStatus)); + } + builder.setOrchestrationIdReusePolicy(reuseIdPolicyBuilder); + String instanceId = options.getInstanceId(); if (instanceId == null) { instanceId = UUID.randomUUID().toString(); diff --git a/client/src/main/java/com/microsoft/durabletask/NewOrchestrationInstanceOptions.java b/client/src/main/java/com/microsoft/durabletask/NewOrchestrationInstanceOptions.java index 01a466fa..34651b44 100644 --- a/client/src/main/java/com/microsoft/durabletask/NewOrchestrationInstanceOptions.java +++ b/client/src/main/java/com/microsoft/durabletask/NewOrchestrationInstanceOptions.java @@ -2,7 +2,12 @@ // Licensed under the MIT License. package com.microsoft.durabletask; +import com.microsoft.durabletask.client.InstanceIdReuseAction; + import java.time.Instant; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** * Options for starting a new instance of an orchestration. @@ -12,6 +17,8 @@ public final class NewOrchestrationInstanceOptions { private String instanceId; private Object input; private Instant startTime; + private final Set targetStatuses = new HashSet<>(); + private InstanceIdReuseAction instanceIdReuseAction; /** * Default constructor for the {@link NewOrchestrationInstanceOptions} class. @@ -71,6 +78,112 @@ public NewOrchestrationInstanceOptions setStartTime(Instant startTime) { return this; } + /** + * Sets the target statuses for the reuse orchestration ID policy of the new orchestration instance. + * This method allows specifying the desired statuses for orchestrations with the same ID + * when configuring the orchestration ID reuse policy. + * + *

+ * By default, the {@code targetStatuses} is empty. If an orchestration with the same instance ID + * already exists, an error will be thrown, indicating a duplicate orchestration instance. + * You can customize the orchestration ID reuse policy by setting the {@code targetStatuses} + * and {@code instanceIdReuseAction}. + * + *

+ * For example, the following options will terminate an existing orchestration instance with the same instance ID + * if it's in RUNNING, FAILED, or COMPLETED runtime status: + *

{@code
+     * NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions();
+     * options.addTargetStatus(OrchestrationRuntimeStatus.RUNNING, OrchestrationRuntimeStatus.FAILED,
+     *                         OrchestrationRuntimeStatus.COMPLETED);
+     * options.setInstanceIdReuseAction(InstanceIdReuseAction.TERMINATE);
+     * }
+ * + * @param statuses The target statuses for the reuse orchestration ID policy when creating the new orchestration instance. + * @return This {@link NewOrchestrationInstanceOptions} object. + */ + public NewOrchestrationInstanceOptions addTargetStatus(OrchestrationRuntimeStatus... statuses) { + for (OrchestrationRuntimeStatus status : statuses) { + this.addTargetStatus(status); + } + return this; + } + + /** + * Sets the target statuses for the reuse orchestration ID policy of the new orchestration instance. + * This method allows specifying the desired statuses for orchestrations with the same ID + * when configuring the orchestration ID reuse policy. + * + *

+ * By default, the {@code targetStatuses} is empty. If an orchestration with the same instance ID + * already exists, an error will be thrown, indicating a duplicate orchestration instance. + * You can customize the orchestration ID reuse policy by setting the {@code targetStatuses} + * and {@code instanceIdReuseAction}. + * + *

+ * For example, the following options will terminate an existing orchestration instance with the same instance ID + * if it's in RUNNING, FAILED, or COMPLETED runtime status: + *

{@code
+     * NewOrchestrationInstanceOptions option = new NewOrchestrationInstanceOptions();
+     * List statuses = new ArrayList<>();
+     * statuses.add(RUNNING);
+     * statuses.add(FAILED);
+     * statuses.add(COMPLETED);
+     * option.setTargetStatus(statuses);
+     * option.setInstanceIdReuseAction(TERMINATE);
+     * }
+     *
+ * @param statuses A list of target statuses for the reuse orchestration ID policy of creating the new orchestration instance. + * @return this {@link NewOrchestrationInstanceOptions} object + */ + public NewOrchestrationInstanceOptions setTargetStatus(List statuses) { + for (OrchestrationRuntimeStatus status : statuses) { + this.addTargetStatus(status); + } + return this; + } + + private void addTargetStatus(OrchestrationRuntimeStatus status) { + this.targetStatuses.add(status); + } + + /** + * Sets the target action for the reuse orchestration ID policy of the new orchestration instance. + * This method allows specifying the desired action for orchestrations with the same ID + * when configuring the orchestration ID reuse policy. + * + *

+ * By default, the {@code instanceIdReuseAction} is {@code InstanceIdReuseAction.ERROR}. If an orchestration with the same instance ID + * already exists, an error will be thrown, indicating a duplicate orchestration instance. + * You can customize the orchestration ID reuse policy by setting the {@code targetStatuses} + * and {@code instanceIdReuseAction}. + * + *

+ * + *

+ * For example, the following options will terminate an existing orchestration instance with the same instance ID + * if it's in RUNNING, FAILED, or COMPLETED runtime status: + *

{@code
+     * NewOrchestrationInstanceOptions options = new NewOrchestrationInstanceOptions();
+     * options.addTargetStatus(OrchestrationRuntimeStatus.RUNNING, OrchestrationRuntimeStatus.FAILED,
+     *                         OrchestrationRuntimeStatus.COMPLETED);
+     * options.setInstanceIdReuseAction(InstanceIdReuseAction.TERMINATE);
+     * }
+ * + * @param instanceIdReuseAction The target action for the reuse orchestration ID policy when creating the new orchestration instance. + * @return This {@link NewOrchestrationInstanceOptions} object. + */ + public NewOrchestrationInstanceOptions setInstanceIdReuseAction(InstanceIdReuseAction instanceIdReuseAction) { + this.instanceIdReuseAction = instanceIdReuseAction; + return this; + } + /** * Gets the user-specified version of the new orchestration. * @@ -106,4 +219,22 @@ public Object getInput() { public Instant getStartTime() { return this.startTime; } + + /** + * Gets the target statuses for the reuse orchestration ID policy of the new orchestration instance. + * + * @return The target statuses for the reuse orchestration ID policy when creating the new orchestration instance. + */ + public Set getTargetStatuses() { + return this.targetStatuses; + } + + /** + * Gets the target action for the reuse orchestration ID policy of the new orchestration instance. + * + * @return The target action for the reuse orchestration ID policy when creating the new orchestration instance. + */ + public InstanceIdReuseAction getInstanceIdReuseAction() { + return this.instanceIdReuseAction; + } } diff --git a/client/src/main/java/com/microsoft/durabletask/client/InstanceIdReuseAction.java b/client/src/main/java/com/microsoft/durabletask/client/InstanceIdReuseAction.java new file mode 100644 index 00000000..a546dcfd --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/client/InstanceIdReuseAction.java @@ -0,0 +1,23 @@ +package com.microsoft.durabletask.client; + +import com.microsoft.durabletask.implementation.protobuf.OrchestratorService; + +public enum InstanceIdReuseAction { + ERROR, + IGNORE, + TERMINATE; + + public static OrchestratorService.CreateOrchestrationAction toProtobuf( + InstanceIdReuseAction action) { + switch (action) { + case ERROR: + return OrchestratorService.CreateOrchestrationAction.ERROR; + case IGNORE: + return OrchestratorService.CreateOrchestrationAction.IGNORE; + case TERMINATE: + return OrchestratorService.CreateOrchestrationAction.TERMINATE; + default: + throw new IllegalArgumentException(String.format("Unknown action value: %s", action)); + } + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationGoTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationGoTests.java new file mode 100644 index 00000000..7c492873 --- /dev/null +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationGoTests.java @@ -0,0 +1,185 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +import com.microsoft.durabletask.client.InstanceIdReuseAction; +import io.grpc.StatusRuntimeException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * These integration tests are designed to exercise the core, high-level features of + * the Durable Task programming model. + *

+ * These tests currently require a sidecar process to be + * running on the local machine (the sidecar is what accepts the client operations and + * sends invocation instructions to the DurableTaskWorker). + */ +@Tag("integration-go") +public class IntegrationGoTests extends IntegrationTestBase { + static final Duration defaultTimeout = Duration.ofSeconds(100); + // All tests that create a server should save it to this variable for proper shutdown + private DurableTaskGrpcWorker server; + + @AfterEach + private void shutdown() throws InterruptedException { + if (this.server != null) { + this.server.stop(); + } + } + + @Test + void singleActivityIgnore() throws TimeoutException { + final String orchestratorName = "SingleActivity"; + final String activityName = "Echo"; + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String activityInput = ctx.getInput(String.class); + ctx.createTimer(Duration.ofSeconds(2)); + String output = ctx.callActivity(activityName, activityInput, String.class).await(); + ctx.complete(output); + }) + .addActivity(activityName, ctx -> { + return String.format("Hello, %s!", ctx.getInput(String.class)); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + final String instanceID = "SKIP_IF_RUNNING_OR_COMPLETED"; + NewOrchestrationInstanceOptions instanceOptions = new NewOrchestrationInstanceOptions(); + instanceOptions + .setInstanceId(instanceID) + .setInput("World") + .addTargetStatus(OrchestrationRuntimeStatus.RUNNING, OrchestrationRuntimeStatus.COMPLETED, OrchestrationRuntimeStatus.PENDING) + .setInstanceIdReuseAction(InstanceIdReuseAction.IGNORE); + + client.scheduleNewOrchestrationInstance(orchestratorName, "GO", instanceID); + client.waitForInstanceStart(instanceID, defaultTimeout); + long pivotTime = Instant.now().getEpochSecond(); + client.scheduleNewOrchestrationInstance(orchestratorName, instanceOptions); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceID, + defaultTimeout, + true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + String output = instance.readOutputAs(String.class); + String expected = "Hello, GO!"; + assertEquals(expected, output); + + // Verify that the delay actually happened + long expectedCompletionSecond = instance.getCreatedAt().getEpochSecond(); + assertTrue(expectedCompletionSecond <= pivotTime); + } + } + + @Test + void singleActivityTerminate() throws TimeoutException { + final String orchestratorName = "SingleActivity"; + final String activityName = "Echo"; + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String activityInput = ctx.getInput(String.class); + ctx.createTimer(Duration.ofSeconds(2)); + String output = ctx.callActivity(activityName, activityInput, String.class).await(); + ctx.complete(output); + }) + .addActivity(activityName, ctx -> { + return String.format("Hello, %s!", ctx.getInput(String.class)); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + final String instanceID = "TERMINATE_IF_RUNNING_OR_COMPLETED"; + NewOrchestrationInstanceOptions instanceOptions = new NewOrchestrationInstanceOptions(); + instanceOptions + .setInstanceId(instanceID) + .setInput("World") + .addTargetStatus(OrchestrationRuntimeStatus.RUNNING, OrchestrationRuntimeStatus.COMPLETED, OrchestrationRuntimeStatus.PENDING) + .setInstanceIdReuseAction(InstanceIdReuseAction.TERMINATE); + + client.scheduleNewOrchestrationInstance(orchestratorName, "GO", instanceID); + client.waitForInstanceStart(instanceID, defaultTimeout); + long pivotTime = Instant.now().getEpochSecond(); + client.scheduleNewOrchestrationInstance(orchestratorName, instanceOptions); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceID, + defaultTimeout, + true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + String output = instance.readOutputAs(String.class); + String expected = "Hello, World!"; + assertEquals(expected, output); + + // Verify that the delay actually happened + long expectedCompletionSecond = instance.getCreatedAt().getEpochSecond(); + assertTrue(pivotTime <= expectedCompletionSecond); + } + } + + @Test + void singleActivityError() throws TimeoutException { + final String orchestratorName = "SingleActivity"; + final String activityName = "Echo"; + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String activityInput = ctx.getInput(String.class); + ctx.createTimer(Duration.ofSeconds(2)); + String output = ctx.callActivity(activityName, activityInput, String.class).await(); + ctx.complete(output); + }) + .addActivity(activityName, ctx -> { + return String.format("Hello, %s!", ctx.getInput(String.class)); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + final String instanceID = "ERROR_IF_RUNNING_OR_COMPLETED"; + + client.scheduleNewOrchestrationInstance(orchestratorName, "GO", instanceID); + assertThrows( + StatusRuntimeException.class, + () -> client.scheduleNewOrchestrationInstance(orchestratorName, "World", instanceID) + ); + } + } +} \ No newline at end of file diff --git a/submodules/durabletask-protobuf b/submodules/durabletask-protobuf index 7d682688..f9b2d7e5 160000 --- a/submodules/durabletask-protobuf +++ b/submodules/durabletask-protobuf @@ -1 +1 @@ -Subproject commit 7d6826889eb9b104592ab1020c648517a155ba79 +Subproject commit f9b2d7e5e904e80feb552c5d8b524408ca4db2a4