Skip to content

Introduce versioning into the Java SDK #224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## Unreleased
* Add versioning support for client and worker ([#224](https://github.com/microsoft/durabletask-java/pull/224))

## v1.5.2
* Add distributed tracing support for Azure Functions client scenarios ([#211](https://github.com/microsoft/durabletask-java/pull/211))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
private final DataConverter dataConverter;
private final ManagedChannel managedSidecarChannel;
private final TaskHubSidecarServiceBlockingStub sidecarClient;
private final String defaultVersion;

DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.defaultVersion = builder.defaultVersion;

Channel sidecarGrpcChannel;
if (builder.channel != null) {
Expand Down Expand Up @@ -100,6 +102,8 @@ public String scheduleNewOrchestrationInstance(
String version = options.getVersion();
if (version != null) {
builder.setVersion(StringValue.of(version));
} else if (this.defaultVersion != null) {
builder.setVersion(StringValue.of(this.defaultVersion));
}

Object input = options.getInput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public final class DurableTaskGrpcClientBuilder {
DataConverter dataConverter;
int port;
Channel channel;
String defaultVersion;

/**
* Sets the {@link DataConverter} to use for converting serializable data payloads.
Expand Down Expand Up @@ -53,6 +54,17 @@ public DurableTaskGrpcClientBuilder port(int port) {
return this;
}

/**
* Sets the default version that orchestrations will be created with.
*
* @param defaultVersion the default version to create orchestrations with
* @return this builder object
*/
public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) {
this.defaultVersion = defaultVersion;
return this;
}

/**
* Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskClient} object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*;
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase;
import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;
import com.microsoft.durabletask.util.VersionUtils;

import io.grpc.*;

Expand All @@ -16,6 +17,7 @@
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;

/**
* Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
Expand All @@ -31,6 +33,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final ManagedChannel managedSidecarChannel;
private final DataConverter dataConverter;
private final Duration maximumTimerInterval;
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;

private final TaskHubSidecarServiceBlockingStub sidecarClient;

Expand Down Expand Up @@ -61,6 +64,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.versioningOptions = builder.versioningOptions;
}

/**
Expand Down Expand Up @@ -130,20 +134,85 @@ public void startAndBlock() {
if (requestType == RequestCase.ORCHESTRATORREQUEST) {
OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();

// If versioning is set, process it first to see if the orchestration should be executed.
boolean versioningFailed = false;
if (versioningOptions != null && versioningOptions.getVersion() != null) {
String version = Stream.concat(orchestratorRequest.getPastEventsList().stream(), orchestratorRequest.getNewEventsList().stream())
.filter(event -> event.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONSTARTED)
.map(event -> event.getExecutionStarted().getVersion().getValue())
.findFirst()
.orElse(null);

logger.log(Level.INFO, String.format("Version from orchestration events: '%s'", version));

if (version != null) {
int comparison = VersionUtils.compareVersions(version, versioningOptions.getVersion());

switch (versioningOptions.getMatchStrategy()) {
case NONE:
break;
case STRICT:
if (comparison != 0) {
logger.log(Level.WARNING, String.format("The orchestration version '%s' does not match the worker version '%s'.", version, versioningOptions.getVersion()));
versioningFailed = true;
}
break;
case CURRENTOROLDER:
if (comparison > 0) {
logger.log(Level.WARNING, String.format("The orchestration version '%s' is greater than the worker version '%s'.", version, versioningOptions.getVersion()));
versioningFailed = true;
}
break;
}
}
}

// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
// TODO: Error handling
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
if (!versioningFailed) {
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());

OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.setCompletionToken(workItem.getCompletionToken())
.build();
OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.setCompletionToken(workItem.getCompletionToken())
.build();

this.sidecarClient.completeOrchestratorTask(response);
} else {
switch(versioningOptions.getFailureStrategy()) {
case FAIL:
CompleteOrchestrationAction completeAction = CompleteOrchestrationAction.newBuilder()
.setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED)
.setFailureDetails(TaskFailureDetails.newBuilder()
.setErrorType("VersionMismatch")
.setErrorMessage("The orchestration version does not match the worker version.")
.build())
.build();

OrchestratorAction action = OrchestratorAction.newBuilder()
.setCompleteOrchestration(completeAction)
.build();

OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.setCompletionToken(workItem.getCompletionToken())
.addActions(action)
.build();

this.sidecarClient.completeOrchestratorTask(response);
this.sidecarClient.completeOrchestratorTask(response);
break;
// Reject and default share the same behavior as it does not change the orchestration to a terminal state.
case REJECT:
default:
this.sidecarClient.abandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest.newBuilder()
.setCompletionToken(workItem.getCompletionToken())
.build());
}
}
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
ActivityRequest activityRequest = workItem.getActivityRequest();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public final class DurableTaskGrpcWorkerBuilder {
Channel channel;
DataConverter dataConverter;
Duration maximumTimerInterval;
DurableTaskGrpcWorkerVersioningOptions versioningOptions;

/**
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
Expand Down Expand Up @@ -113,6 +114,17 @@ public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerIn
return this;
}

/**
* Sets the versioning options for this worker.
*
* @param options the versioning options to use
* @return this builder object
*/
public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersioningOptions options) {
this.versioningOptions = options;
return this;
}

/**
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
* @return a new {@link DurableTaskGrpcWorker} object
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.durabletask;

/**
* Options for configuring versioning behavior in the DurableTaskGrpcWorker.
*/
public final class DurableTaskGrpcWorkerVersioningOptions {

/**
* Strategy for matching versions.
* NONE: No version matching is performed.
* STRICT: The version must match exactly.
* CURRENTOROLDER: The version must be the current version or older.
*/
public enum VersionMatchStrategy {
NONE,
STRICT,
CURRENTOROLDER;
}

/**
* Strategy for handling version mismatches.
* REJECT: Reject the orchestration if the version does not match. The orchestration will be retried later.
* FAIL: Fail the orchestration if the version does not match.
*/
public enum VersionFailureStrategy {
REJECT,
FAIL;
}

private final String version;
private final String defaultVersion;
private final VersionMatchStrategy matchStrategy;
private final VersionFailureStrategy failureStrategy;

/**
* Constructor for DurableTaskGrpcWorkerVersioningOptions.
* @param version the version that is matched against orchestrations
* @param defaultVersion the default version used when starting sub orchestrations from this worker
* @param matchStrategy the strategy for matching versions
* @param failureStrategy the strategy for handling version mismatches
*/
public DurableTaskGrpcWorkerVersioningOptions(String version, String defaultVersion, VersionMatchStrategy matchStrategy, VersionFailureStrategy failureStrategy) {
this.version = version;
this.defaultVersion = defaultVersion;
this.matchStrategy = matchStrategy;
this.failureStrategy = failureStrategy;
}

/**
* Gets the version that is matched against orchestrations.
* @return the version that is matched against orchestrations
*/
public String getVersion() {
return version;
}

/**
* Gets the default version used when starting sub orchestrations from this worker.
* @return the default version used when starting sub orchestrations from this worker
*/
public String getDefaultVersion() {
return defaultVersion;
}

/**
* Gets the strategy for matching versions.
* @return the strategy for matching versions
*/
public VersionMatchStrategy getMatchStrategy() {
return matchStrategy;
}

/**
* Gets the strategy for handling version mismatches.
* @return the strategy for handling version mismatches
*/
public VersionFailureStrategy getFailureStrategy() {
return failureStrategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ public interface TaskOrchestrationContext {
*/
boolean getIsReplaying();

/**
* Gets the version of the orchestration that this context represents.
*
* @return the version of the orchestration
*/
String getVersion();

/**
* Returns a new {@code Task} that is completed when all tasks in {@code tasks} completes.
* See {@link #allOf(Task[])} for more detailed information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ private class ContextImplTask implements TaskOrchestrationContext {
private boolean isSuspended;
private boolean isReplaying = true;
private int newUUIDCounter;
private String version;

// LinkedHashMap to maintain insertion order when returning the list of pending actions
private final LinkedHashMap<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
Expand Down Expand Up @@ -172,6 +173,15 @@ private void setDoneReplaying() {
this.isReplaying = false;
}

@Override
public String getVersion() {
return this.version;
}

private void setVersion(String version) {
this.version = version;
}

public <V> Task<V> completedTask(V value) {
CompletableTask<V> task = new CompletableTask<>();
task.complete(value);
Expand Down Expand Up @@ -839,6 +849,8 @@ private void processEvent(HistoryEvent e) {
this.setInstanceId(instanceId);
String input = startedEvent.getInput().getValue();
this.setInput(input);
String version = startedEvent.getVersion().getValue();
this.setVersion(version);
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
if (factory == null) {
// Try getting the default orchestrator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.durabletask.util;

public class VersionUtils {

/**
* Compares two version strings. We manually attempt to parse the version strings as a Version object
* was not introduced until Java 9 and we compile as far back as Java 8.
*/
public static int compareVersions(String v1, String v2) {
if (v1 == null && v2 == null) {
return 0;
}
if (v1 == null) {
return -1;
}
if (v2 == null) {
return 1;
}

String[] parts1 = v1.split("\\.");
String[] parts2 = v2.split("\\.");

int length = Math.max(parts1.length, parts2.length);
for (int i = 0; i < length; i++) {
int p1 = i < parts1.length ? parseVersionPart(parts1[i]) : 0;
int p2 = i < parts2.length ? parseVersionPart(parts2[i]) : 0;
if (p1 != p2) {
return p1 - p2;
}
}

// As a final comparison, compare the raw strings. We're either equal here or dealing with a non-numeric version.
return v1.compareTo(v2);
}

private static int parseVersionPart(String part) {
try {
return Integer.parseInt(part);
} catch (NumberFormatException e) {
return 0; // fallback for non-numeric parts
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,10 @@ public <R> IntegrationTests.TestDurableTaskWorkerBuilder addActivity(
});
return this;
}

public TestDurableTaskWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersioningOptions options) {
this.innerBuilder.useVersioning(options);
return this;
}
}
}
Loading
Loading