diff --git a/CHANGELOG.md b/CHANGELOG.md index e26a9828..74a18f1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## Unreleased +* Add versioning support for client and worker ([#224](https://github.com/microsoft/durabletask-java/pull/224)) + ## v1.5.2 * Add distributed tracing support for Azure Functions client scenarios ([#211](https://github.com/microsoft/durabletask-java/pull/211)) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 20fc3a05..9421b3de 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -32,9 +32,11 @@ public final class DurableTaskGrpcClient extends DurableTaskClient { private final DataConverter dataConverter; private final ManagedChannel managedSidecarChannel; private final TaskHubSidecarServiceBlockingStub sidecarClient; + private final String defaultVersion; DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) { this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); + this.defaultVersion = builder.defaultVersion; Channel sidecarGrpcChannel; if (builder.channel != null) { @@ -100,6 +102,8 @@ public String scheduleNewOrchestrationInstance( String version = options.getVersion(); if (version != null) { builder.setVersion(StringValue.of(version)); + } else if (this.defaultVersion != null) { + builder.setVersion(StringValue.of(this.defaultVersion)); } Object input = options.getInput(); diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java index 4ccd8b02..1a1cb6f2 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java @@ -12,6 +12,7 @@ public final class DurableTaskGrpcClientBuilder { DataConverter dataConverter; int port; Channel channel; + String defaultVersion; /** * Sets the {@link DataConverter} to use for converting serializable data payloads. @@ -53,6 +54,17 @@ public DurableTaskGrpcClientBuilder port(int port) { return this; } + /** + * Sets the default version that orchestrations will be created with. + * + * @param defaultVersion the default version to create orchestrations with + * @return this builder object + */ + public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) { + this.defaultVersion = defaultVersion; + return this; + } + /** * Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object. * @return a new {@link DurableTaskClient} object diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 2c4b472d..76c946fd 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -8,6 +8,7 @@ import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase; import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; +import com.microsoft.durabletask.util.VersionUtils; import io.grpc.*; @@ -16,6 +17,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Stream; /** * Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events. @@ -31,6 +33,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { private final ManagedChannel managedSidecarChannel; private final DataConverter dataConverter; private final Duration maximumTimerInterval; + private final DurableTaskGrpcWorkerVersioningOptions versioningOptions; private final TaskHubSidecarServiceBlockingStub sidecarClient; @@ -61,6 +64,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable { this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel); this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter(); this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL; + this.versioningOptions = builder.versioningOptions; } /** @@ -130,20 +134,85 @@ public void startAndBlock() { if (requestType == RequestCase.ORCHESTRATORREQUEST) { OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest(); + // If versioning is set, process it first to see if the orchestration should be executed. + boolean versioningFailed = false; + if (versioningOptions != null && versioningOptions.getVersion() != null) { + String version = Stream.concat(orchestratorRequest.getPastEventsList().stream(), orchestratorRequest.getNewEventsList().stream()) + .filter(event -> event.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONSTARTED) + .map(event -> event.getExecutionStarted().getVersion().getValue()) + .findFirst() + .orElse(null); + + logger.log(Level.INFO, String.format("Version from orchestration events: '%s'", version)); + + if (version != null) { + int comparison = VersionUtils.compareVersions(version, versioningOptions.getVersion()); + + switch (versioningOptions.getMatchStrategy()) { + case NONE: + break; + case STRICT: + if (comparison != 0) { + logger.log(Level.WARNING, String.format("The orchestration version '%s' does not match the worker version '%s'.", version, versioningOptions.getVersion())); + versioningFailed = true; + } + break; + case CURRENTOROLDER: + if (comparison > 0) { + logger.log(Level.WARNING, String.format("The orchestration version '%s' is greater than the worker version '%s'.", version, versioningOptions.getVersion())); + versioningFailed = true; + } + break; + } + } + } + // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava // TODO: Error handling - TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( + if (!versioningFailed) { + TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( orchestratorRequest.getPastEventsList(), orchestratorRequest.getNewEventsList()); - OrchestratorResponse response = OrchestratorResponse.newBuilder() - .setInstanceId(orchestratorRequest.getInstanceId()) - .addAllActions(taskOrchestratorResult.getActions()) - .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) - .setCompletionToken(workItem.getCompletionToken()) - .build(); + OrchestratorResponse response = OrchestratorResponse.newBuilder() + .setInstanceId(orchestratorRequest.getInstanceId()) + .addAllActions(taskOrchestratorResult.getActions()) + .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus())) + .setCompletionToken(workItem.getCompletionToken()) + .build(); + + this.sidecarClient.completeOrchestratorTask(response); + } else { + switch(versioningOptions.getFailureStrategy()) { + case FAIL: + CompleteOrchestrationAction completeAction = CompleteOrchestrationAction.newBuilder() + .setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED) + .setFailureDetails(TaskFailureDetails.newBuilder() + .setErrorType("VersionMismatch") + .setErrorMessage("The orchestration version does not match the worker version.") + .build()) + .build(); + + OrchestratorAction action = OrchestratorAction.newBuilder() + .setCompleteOrchestration(completeAction) + .build(); + + OrchestratorResponse response = OrchestratorResponse.newBuilder() + .setInstanceId(orchestratorRequest.getInstanceId()) + .setCompletionToken(workItem.getCompletionToken()) + .addActions(action) + .build(); - this.sidecarClient.completeOrchestratorTask(response); + this.sidecarClient.completeOrchestratorTask(response); + break; + // Reject and default share the same behavior as it does not change the orchestration to a terminal state. + case REJECT: + default: + this.sidecarClient.abandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest.newBuilder() + .setCompletionToken(workItem.getCompletionToken()) + .build()); + } + } } else if (requestType == RequestCase.ACTIVITYREQUEST) { ActivityRequest activityRequest = workItem.getActivityRequest(); diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java index 3076a7b4..ec39fee2 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java @@ -17,6 +17,7 @@ public final class DurableTaskGrpcWorkerBuilder { Channel channel; DataConverter dataConverter; Duration maximumTimerInterval; + DurableTaskGrpcWorkerVersioningOptions versioningOptions; /** * Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}. @@ -113,6 +114,17 @@ public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerIn return this; } + /** + * Sets the versioning options for this worker. + * + * @param options the versioning options to use + * @return this builder object + */ + public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersioningOptions options) { + this.versioningOptions = options; + return this; + } + /** * Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object. * @return a new {@link DurableTaskGrpcWorker} object diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerVersioningOptions.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerVersioningOptions.java new file mode 100644 index 00000000..d6259251 --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerVersioningOptions.java @@ -0,0 +1,82 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask; + +/** + * Options for configuring versioning behavior in the DurableTaskGrpcWorker. + */ +public final class DurableTaskGrpcWorkerVersioningOptions { + + /** + * Strategy for matching versions. + * NONE: No version matching is performed. + * STRICT: The version must match exactly. + * CURRENTOROLDER: The version must be the current version or older. + */ + public enum VersionMatchStrategy { + NONE, + STRICT, + CURRENTOROLDER; + } + + /** + * Strategy for handling version mismatches. + * REJECT: Reject the orchestration if the version does not match. The orchestration will be retried later. + * FAIL: Fail the orchestration if the version does not match. + */ + public enum VersionFailureStrategy { + REJECT, + FAIL; + } + + private final String version; + private final String defaultVersion; + private final VersionMatchStrategy matchStrategy; + private final VersionFailureStrategy failureStrategy; + + /** + * Constructor for DurableTaskGrpcWorkerVersioningOptions. + * @param version the version that is matched against orchestrations + * @param defaultVersion the default version used when starting sub orchestrations from this worker + * @param matchStrategy the strategy for matching versions + * @param failureStrategy the strategy for handling version mismatches + */ + public DurableTaskGrpcWorkerVersioningOptions(String version, String defaultVersion, VersionMatchStrategy matchStrategy, VersionFailureStrategy failureStrategy) { + this.version = version; + this.defaultVersion = defaultVersion; + this.matchStrategy = matchStrategy; + this.failureStrategy = failureStrategy; + } + + /** + * Gets the version that is matched against orchestrations. + * @return the version that is matched against orchestrations + */ + public String getVersion() { + return version; + } + + /** + * Gets the default version used when starting sub orchestrations from this worker. + * @return the default version used when starting sub orchestrations from this worker + */ + public String getDefaultVersion() { + return defaultVersion; + } + + /** + * Gets the strategy for matching versions. + * @return the strategy for matching versions + */ + public VersionMatchStrategy getMatchStrategy() { + return matchStrategy; + } + + /** + * Gets the strategy for handling version mismatches. + * @return the strategy for handling version mismatches + */ + public VersionFailureStrategy getFailureStrategy() { + return failureStrategy; + } +} \ No newline at end of file diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java index 88adf875..370c4b66 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java @@ -59,6 +59,13 @@ public interface TaskOrchestrationContext { */ boolean getIsReplaying(); + /** + * Gets the version of the orchestration that this context represents. + * + * @return the version of the orchestration + */ + String getVersion(); + /** * Returns a new {@code Task} that is completed when all tasks in {@code tasks} completes. * See {@link #allOf(Task[])} for more detailed information. diff --git a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java index 4b55bd41..878cbb84 100644 --- a/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java +++ b/client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java @@ -81,6 +81,7 @@ private class ContextImplTask implements TaskOrchestrationContext { private boolean isSuspended; private boolean isReplaying = true; private int newUUIDCounter; + private String version; // LinkedHashMap to maintain insertion order when returning the list of pending actions private final LinkedHashMap pendingActions = new LinkedHashMap<>(); @@ -172,6 +173,15 @@ private void setDoneReplaying() { this.isReplaying = false; } + @Override + public String getVersion() { + return this.version; + } + + private void setVersion(String version) { + this.version = version; + } + public Task completedTask(V value) { CompletableTask task = new CompletableTask<>(); task.complete(value); @@ -839,6 +849,8 @@ private void processEvent(HistoryEvent e) { this.setInstanceId(instanceId); String input = startedEvent.getInput().getValue(); this.setInput(input); + String version = startedEvent.getVersion().getValue(); + this.setVersion(version); TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name); if (factory == null) { // Try getting the default orchestrator diff --git a/client/src/main/java/com/microsoft/durabletask/util/VersionUtils.java b/client/src/main/java/com/microsoft/durabletask/util/VersionUtils.java new file mode 100644 index 00000000..94efd6d8 --- /dev/null +++ b/client/src/main/java/com/microsoft/durabletask/util/VersionUtils.java @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.microsoft.durabletask.util; + +public class VersionUtils { + + /** + * Compares two version strings. We manually attempt to parse the version strings as a Version object + * was not introduced until Java 9 and we compile as far back as Java 8. + */ + public static int compareVersions(String v1, String v2) { + if (v1 == null && v2 == null) { + return 0; + } + if (v1 == null) { + return -1; + } + if (v2 == null) { + return 1; + } + + String[] parts1 = v1.split("\\."); + String[] parts2 = v2.split("\\."); + + int length = Math.max(parts1.length, parts2.length); + for (int i = 0; i < length; i++) { + int p1 = i < parts1.length ? parseVersionPart(parts1[i]) : 0; + int p2 = i < parts2.length ? parseVersionPart(parts2[i]) : 0; + if (p1 != p2) { + return p1 - p2; + } + } + + // As a final comparison, compare the raw strings. We're either equal here or dealing with a non-numeric version. + return v1.compareTo(v2); + } + + private static int parseVersionPart(String part) { + try { + return Integer.parseInt(part); + } catch (NumberFormatException e) { + return 0; // fallback for non-numeric parts + } + } +} diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTestBase.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTestBase.java index ce64f215..f640a9ad 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTestBase.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTestBase.java @@ -69,5 +69,10 @@ public IntegrationTests.TestDurableTaskWorkerBuilder addActivity( }); return this; } + + public TestDurableTaskWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersioningOptions options) { + this.innerBuilder.useVersioning(options); + return this; + } } } diff --git a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java index 38ae865e..505a09e7 100644 --- a/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java +++ b/client/src/test/java/com/microsoft/durabletask/IntegrationTests.java @@ -1565,4 +1565,182 @@ public void newUUIDTest() { throw new RuntimeException(e); } } + + @Test + public void defaultVersionPassedThroughToContext() { + final String orchestratorName = "VersionOrchestration"; + final String activityName = "SayVersion"; + final String defaultVersion = "1.0"; + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String version = ctx.getVersion(); + String output = ctx.callActivity(activityName, version, String.class).await(); + ctx.complete(output); + }) + .addActivity(activityName, ctx -> { + return String.format("Version: %s", ctx.getInput(String.class)); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder() + .defaultVersion(defaultVersion) + .build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, + defaultTimeout, + true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + String output = instance.readOutputAs(String.class); + String expected = String.format("Version: %s", defaultVersion); + assertEquals(expected, output); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + @Test + public void optionVersionOverridesDefault() { + final String orchestratorName = "VersionOrchestration"; + final String activityName = "SayVersion"; + final String defaultVersion = "1.0"; + final String overrideVersion = "2.0"; + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String version = ctx.getVersion(); + String output = ctx.callActivity(activityName, version, String.class).await(); + ctx.complete(output); + }) + .addActivity(activityName, ctx -> { + return String.format("Version: %s", ctx.getInput(String.class)); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder() + .defaultVersion(defaultVersion) + .build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, new NewOrchestrationInstanceOptions() + .setVersion(overrideVersion)); + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, + defaultTimeout, + true); + + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + String output = instance.readOutputAs(String.class); + String expected = String.format("Version: %s", overrideVersion); + assertEquals(expected, output); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + @ParameterizedTest + @ValueSource(strings = {"", "0.9", "1.0", "1.1"}) + void orchestrationVersionCurrentOrOlderMatchStrategy(String orchestrationVersion) { + final String orchestratorName = "VersionedOrchestrationFailTest"; + final String activityName = "SayVersion"; + final DurableTaskGrpcWorkerVersioningOptions versioningOptions = new DurableTaskGrpcWorkerVersioningOptions( + "1.0", + "1.0", + DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.CURRENTOROLDER, + DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy.FAIL);; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String version = ctx.getVersion(); + String output = ctx.callActivity(activityName, version, String.class).await(); + ctx.complete(output); + }) + .addActivity(activityName, ctx -> String.format("Version: %s", ctx.getInput(String.class))) + .useVersioning(versioningOptions) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance( + orchestratorName, + new NewOrchestrationInstanceOptions().setVersion(orchestrationVersion) + ); + if (orchestrationVersion.equals("1.1")) { + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, + defaultTimeout, + true + ); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); + } else { + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, + defaultTimeout, + true + ); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + String output = instance.readOutputAs(String.class); + String expected = String.format("Version: %s", orchestrationVersion); + assertEquals(expected, output); + } + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + @ParameterizedTest + @ValueSource(strings = {"", "0.9", "1.0", "1.1"}) + void orchestrationVersionStrictMatchStrategy(String orchestrationVersion) { + final String orchestratorName = "VersionedOrchestrationFailTestStrict"; + final String activityName = "SayVersion"; + final DurableTaskGrpcWorkerVersioningOptions versioningOptions = new DurableTaskGrpcWorkerVersioningOptions( + "1.0", + "1.0", + DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy.STRICT, + DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy.FAIL); + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + String version = ctx.getVersion(); + String output = ctx.callActivity(activityName, version, String.class).await(); + ctx.complete(output); + }) + .addActivity(activityName, ctx -> String.format("Version: %s", ctx.getInput(String.class))) + .useVersioning(versioningOptions) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance( + orchestratorName, + new NewOrchestrationInstanceOptions().setVersion(orchestrationVersion) + ); + if (orchestrationVersion.equals("1.0")) { + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, + defaultTimeout, + true + ); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + String output = instance.readOutputAs(String.class); + String expected = String.format("Version: %s", orchestrationVersion); + assertEquals(expected, output); + } else { + OrchestrationMetadata instance = client.waitForInstanceCompletion( + instanceId, + defaultTimeout, + true + ); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.FAILED, instance.getRuntimeStatus()); + } + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } } \ No newline at end of file