Skip to content

Commit c65b05e

Browse files
committed
Introduce versioning into the Java SDK
This commit adds the versioning feature from the dotnet SDK (durabletask-dotnet). This allows for the version to be passed via the context and for the worker to reject/fail orchestrations based on the provided version. Signed-off-by: Hal Spang <[email protected]>
1 parent 402b88e commit c65b05e

15 files changed

+507
-141
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## Unreleased
2+
* Add versioning support for client and worker ([#224](https://github.com/microsoft/durabletask-java/pull/224))
3+
14
## v1.5.2
25
* Add distributed tracing support for Azure Functions client scenarios ([#211](https://github.com/microsoft/durabletask-java/pull/211))
36

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@ public final class DurableTaskGrpcClient extends DurableTaskClient {
3232
private final DataConverter dataConverter;
3333
private final ManagedChannel managedSidecarChannel;
3434
private final TaskHubSidecarServiceBlockingStub sidecarClient;
35+
private final String defaultVersion;
3536

3637
DurableTaskGrpcClient(DurableTaskGrpcClientBuilder builder) {
3738
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
39+
this.defaultVersion = builder.defaultVersion;
3840

3941
Channel sidecarGrpcChannel;
4042
if (builder.channel != null) {
@@ -100,6 +102,8 @@ public String scheduleNewOrchestrationInstance(
100102
String version = options.getVersion();
101103
if (version != null) {
102104
builder.setVersion(StringValue.of(version));
105+
} else if (this.defaultVersion != null) {
106+
builder.setVersion(StringValue.of(this.defaultVersion));
103107
}
104108

105109
Object input = options.getInput();

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ public final class DurableTaskGrpcClientBuilder {
1212
DataConverter dataConverter;
1313
int port;
1414
Channel channel;
15+
String defaultVersion;
1516

1617
/**
1718
* Sets the {@link DataConverter} to use for converting serializable data payloads.
@@ -53,6 +54,17 @@ public DurableTaskGrpcClientBuilder port(int port) {
5354
return this;
5455
}
5556

57+
/**
58+
* Sets the default version that orchestrations will be created with.
59+
*
60+
* @param defaultVersion the default version to create orchestrations with
61+
* @return this builder object
62+
*/
63+
public DurableTaskGrpcClientBuilder defaultVersion(String defaultVersion) {
64+
this.defaultVersion = defaultVersion;
65+
return this;
66+
}
67+
5668
/**
5769
* Initializes a new {@link DurableTaskClient} object with the settings specified in the current builder object.
5870
* @return a new {@link DurableTaskClient} object

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 77 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*;
99
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase;
1010
import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*;
11+
import com.microsoft.durabletask.util.VersionUtils;
1112

1213
import io.grpc.*;
1314

@@ -16,6 +17,7 @@
1617
import java.util.concurrent.TimeUnit;
1718
import java.util.logging.Level;
1819
import java.util.logging.Logger;
20+
import java.util.stream.Stream;
1921

2022
/**
2123
* Task hub worker that connects to a sidecar process over gRPC to execute orchestrator and activity events.
@@ -31,6 +33,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
3133
private final ManagedChannel managedSidecarChannel;
3234
private final DataConverter dataConverter;
3335
private final Duration maximumTimerInterval;
36+
private final DurableTaskGrpcWorkerVersioningOptions versioningOptions;
3437

3538
private final TaskHubSidecarServiceBlockingStub sidecarClient;
3639

@@ -61,6 +64,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
6164
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
6265
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
6366
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
67+
this.versioningOptions = builder.versioningOptions;
6468
}
6569

6670
/**
@@ -130,20 +134,85 @@ public void startAndBlock() {
130134
if (requestType == RequestCase.ORCHESTRATORREQUEST) {
131135
OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
132136

137+
// If versioning is set, process it first to see if the orchestration should be executed.
138+
boolean versioningFailed = false;
139+
if (versioningOptions != null && versioningOptions.getVersion() != null) {
140+
String version = Stream.concat(orchestratorRequest.getPastEventsList().stream(), orchestratorRequest.getNewEventsList().stream())
141+
.filter(event -> event.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONSTARTED)
142+
.map(event -> event.getExecutionStarted().getVersion().getValue())
143+
.findFirst()
144+
.orElse(null);
145+
146+
logger.log(Level.INFO, String.format("Version from orchestration events: '%s'", version));
147+
148+
if (version != null) {
149+
int comparison = VersionUtils.compareVersions(version, versioningOptions.getVersion());
150+
151+
switch (versioningOptions.getMatchStrategy()) {
152+
case NONE:
153+
break;
154+
case STRICT:
155+
if (comparison != 0) {
156+
logger.log(Level.WARNING, String.format("The orchestration version '%s' does not match the worker version '%s'.", version, versioningOptions.getVersion()));
157+
versioningFailed = true;
158+
}
159+
break;
160+
case CURRENTOROLDER:
161+
if (comparison > 0) {
162+
logger.log(Level.WARNING, String.format("The orchestration version '%s' is greater than the worker version '%s'.", version, versioningOptions.getVersion()));
163+
versioningFailed = true;
164+
}
165+
break;
166+
}
167+
}
168+
}
169+
133170
// TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava
134171
// TODO: Error handling
135-
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
172+
if (!versioningFailed) {
173+
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
136174
orchestratorRequest.getPastEventsList(),
137175
orchestratorRequest.getNewEventsList());
138176

139-
OrchestratorResponse response = OrchestratorResponse.newBuilder()
140-
.setInstanceId(orchestratorRequest.getInstanceId())
141-
.addAllActions(taskOrchestratorResult.getActions())
142-
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
143-
.setCompletionToken(workItem.getCompletionToken())
144-
.build();
177+
OrchestratorResponse response = OrchestratorResponse.newBuilder()
178+
.setInstanceId(orchestratorRequest.getInstanceId())
179+
.addAllActions(taskOrchestratorResult.getActions())
180+
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
181+
.setCompletionToken(workItem.getCompletionToken())
182+
.build();
183+
184+
this.sidecarClient.completeOrchestratorTask(response);
185+
} else {
186+
switch(versioningOptions.getFailureStrategy()) {
187+
case FAIL:
188+
CompleteOrchestrationAction completeAction = CompleteOrchestrationAction.newBuilder()
189+
.setOrchestrationStatus(OrchestrationStatus.ORCHESTRATION_STATUS_FAILED)
190+
.setFailureDetails(TaskFailureDetails.newBuilder()
191+
.setErrorType("VersionMismatch")
192+
.setErrorMessage("The orchestration version does not match the worker version.")
193+
.build())
194+
.build();
195+
196+
OrchestratorAction action = OrchestratorAction.newBuilder()
197+
.setCompleteOrchestration(completeAction)
198+
.build();
199+
200+
OrchestratorResponse response = OrchestratorResponse.newBuilder()
201+
.setInstanceId(orchestratorRequest.getInstanceId())
202+
.setCompletionToken(workItem.getCompletionToken())
203+
.addActions(action)
204+
.build();
145205

146-
this.sidecarClient.completeOrchestratorTask(response);
206+
this.sidecarClient.completeOrchestratorTask(response);
207+
break;
208+
// Reject and default share the same behavior as it does not change the orchestration to a terminal state.
209+
case REJECT:
210+
default:
211+
this.sidecarClient.abandonTaskOrchestratorWorkItem(AbandonOrchestrationTaskRequest.newBuilder()
212+
.setCompletionToken(workItem.getCompletionToken())
213+
.build());
214+
}
215+
}
147216
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
148217
ActivityRequest activityRequest = workItem.getActivityRequest();
149218

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public final class DurableTaskGrpcWorkerBuilder {
1717
Channel channel;
1818
DataConverter dataConverter;
1919
Duration maximumTimerInterval;
20+
DurableTaskGrpcWorkerVersioningOptions versioningOptions;
2021

2122
/**
2223
* Adds an orchestration factory to be used by the constructed {@link DurableTaskGrpcWorker}.
@@ -113,6 +114,17 @@ public DurableTaskGrpcWorkerBuilder maximumTimerInterval(Duration maximumTimerIn
113114
return this;
114115
}
115116

117+
/**
118+
* Sets the versioning options for this worker.
119+
*
120+
* @param options the versioning options to use
121+
* @return this builder object
122+
*/
123+
public DurableTaskGrpcWorkerBuilder useVersioning(DurableTaskGrpcWorkerVersioningOptions options) {
124+
this.versioningOptions = options;
125+
return this;
126+
}
127+
116128
/**
117129
* Initializes a new {@link DurableTaskGrpcWorker} object with the settings specified in the current builder object.
118130
* @return a new {@link DurableTaskGrpcWorker} object
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask;
4+
5+
/**
6+
* Options for configuring versioning behavior in the DurableTaskGrpcWorker.
7+
*/
8+
public final class DurableTaskGrpcWorkerVersioningOptions {
9+
10+
/**
11+
* Strategy for matching versions.
12+
* NONE: No version matching is performed.
13+
* STRICT: The version must match exactly.
14+
* CURRENTOROLDER: The version must be the current version or older.
15+
*/
16+
public enum VersionMatchStrategy {
17+
NONE,
18+
STRICT,
19+
CURRENTOROLDER;
20+
}
21+
22+
/**
23+
* Strategy for handling version mismatches.
24+
* REJECT: Reject the orchestration if the version does not match. The orchestration will be retried later.
25+
* FAIL: Fail the orchestration if the version does not match.
26+
*/
27+
public enum VersionFailureStrategy {
28+
REJECT,
29+
FAIL;
30+
}
31+
32+
private final String version;
33+
private final String defaultVersion;
34+
private final VersionMatchStrategy matchStrategy;
35+
private final VersionFailureStrategy failureStrategy;
36+
37+
/**
38+
* Constructor for DurableTaskGrpcWorkerVersioningOptions.
39+
* @param version the version that is matched against orchestrations
40+
* @param defaultVersion the default version used when starting sub orchestrations from this worker
41+
* @param matchStrategy the strategy for matching versions
42+
* @param failureStrategy the strategy for handling version mismatches
43+
*/
44+
public DurableTaskGrpcWorkerVersioningOptions(String version, String defaultVersion, VersionMatchStrategy matchStrategy, VersionFailureStrategy failureStrategy) {
45+
this.version = version;
46+
this.defaultVersion = defaultVersion;
47+
this.matchStrategy = matchStrategy;
48+
this.failureStrategy = failureStrategy;
49+
}
50+
51+
/**
52+
* Gets the version that is matched against orchestrations.
53+
* @return the version that is matched against orchestrations
54+
*/
55+
public String getVersion() {
56+
return version;
57+
}
58+
59+
/**
60+
* Gets the default version used when starting sub orchestrations from this worker.
61+
* @return the default version used when starting sub orchestrations from this worker
62+
*/
63+
public String getDefaultVersion() {
64+
return defaultVersion;
65+
}
66+
67+
/**
68+
* Gets the strategy for matching versions.
69+
* @return the strategy for matching versions
70+
*/
71+
public VersionMatchStrategy getMatchStrategy() {
72+
return matchStrategy;
73+
}
74+
75+
/**
76+
* Gets the strategy for handling version mismatches.
77+
* @return the strategy for handling version mismatches
78+
*/
79+
public VersionFailureStrategy getFailureStrategy() {
80+
return failureStrategy;
81+
}
82+
}

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationContext.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ public interface TaskOrchestrationContext {
5959
*/
6060
boolean getIsReplaying();
6161

62+
/**
63+
* Gets the version of the orchestration that this context represents.
64+
*
65+
* @return the version of the orchestration
66+
*/
67+
String getVersion();
68+
6269
/**
6370
* Returns a new {@code Task} that is completed when all tasks in {@code tasks} completes.
6471
* See {@link #allOf(Task[])} for more detailed information.

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ private class ContextImplTask implements TaskOrchestrationContext {
8181
private boolean isSuspended;
8282
private boolean isReplaying = true;
8383
private int newUUIDCounter;
84+
private String version;
8485

8586
// LinkedHashMap to maintain insertion order when returning the list of pending actions
8687
private final LinkedHashMap<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
@@ -172,6 +173,15 @@ private void setDoneReplaying() {
172173
this.isReplaying = false;
173174
}
174175

176+
@Override
177+
public String getVersion() {
178+
return this.version;
179+
}
180+
181+
private void setVersion(String version) {
182+
this.version = version;
183+
}
184+
175185
public <V> Task<V> completedTask(V value) {
176186
CompletableTask<V> task = new CompletableTask<>();
177187
task.complete(value);
@@ -839,6 +849,8 @@ private void processEvent(HistoryEvent e) {
839849
this.setInstanceId(instanceId);
840850
String input = startedEvent.getInput().getValue();
841851
this.setInput(input);
852+
String version = startedEvent.getVersion().getValue();
853+
this.setVersion(version);
842854
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
843855
if (factory == null) {
844856
// Try getting the default orchestrator
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask.util;
4+
5+
public class VersionUtils {
6+
7+
/**
8+
* Compares two version strings. We manually attempt to parse the version strings as a Version object
9+
* was not introduced until Java 9 and we compile as far back as Java 8.
10+
*/
11+
public static int compareVersions(String v1, String v2) {
12+
if (v1 == null && v2 == null) {
13+
return 0;
14+
}
15+
if (v1 == null) {
16+
return -1;
17+
}
18+
if (v2 == null) {
19+
return 1;
20+
}
21+
22+
String[] parts1 = v1.split("\\.");
23+
String[] parts2 = v2.split("\\.");
24+
25+
int length = Math.max(parts1.length, parts2.length);
26+
for (int i = 0; i < length; i++) {
27+
int p1 = i < parts1.length ? parseVersionPart(parts1[i]) : 0;
28+
int p2 = i < parts2.length ? parseVersionPart(parts2[i]) : 0;
29+
if (p1 != p2) {
30+
return p1 - p2;
31+
}
32+
}
33+
34+
// As a final comparison, compare the raw strings. We're either equal here or dealing with a non-numeric version.
35+
return v1.compareTo(v2);
36+
}
37+
38+
private static int parseVersionPart(String part) {
39+
try {
40+
return Integer.parseInt(part);
41+
} catch (NumberFormatException e) {
42+
return 0; // fallback for non-numeric parts
43+
}
44+
}
45+
}

0 commit comments

Comments
 (0)