Skip to content

Commit 8e291a8

Browse files
authored
Support sub-orchestration default version (#241)
Sub orchestrations run through Durable Functions (as opposed to isolated with DTS) have a slightly different path for sub orchestration versioning. This change loads the default version provided by the properties field to use as the default sub orchestration version. Signed-off-by: Hal Spang <[email protected]>
1 parent f457df9 commit 8e291a8

File tree

8 files changed

+190
-4
lines changed

8 files changed

+190
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
## Unreleased
2+
* Add support for default versions in Durable Function sub-orchestrations ([#241](https://github.com/microsoft/durabletask-java/pull/241))
23

34
## v1.6.0
45
* Add support for tags when creating new orchestrations ([#231](https://github.com/microsoft/durabletask-java/pull/230))

client/src/main/java/com/microsoft/durabletask/NewSubOrchestrationInstanceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@ public class NewSubOrchestrationInstanceOptions extends TaskOptions {
88
private String instanceId;
99
private String version;
1010

11+
/**
12+
* Creates default options for the sub-orchestration. Useful for chaining
13+
* when a RetryPolicy or RetryHandler is not needed.
14+
*/
15+
public NewSubOrchestrationInstanceOptions() {
16+
super((RetryPolicy) null);
17+
}
18+
1119
/**
1220
* Creates options with a retry policy for the sub-orchestration.
1321
* @param retryPolicy The retry policy to use for the sub-orchestration.

client/src/main/java/com/microsoft/durabletask/OrchestrationRunner.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import com.google.protobuf.InvalidProtocolBufferException;
66
import com.google.protobuf.StringValue;
7+
import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionFailureStrategy;
8+
import com.microsoft.durabletask.DurableTaskGrpcWorkerVersioningOptions.VersionMatchStrategy;
79
import com.microsoft.durabletask.implementation.protobuf.OrchestratorService;
810

911
import java.time.Duration;
@@ -125,12 +127,24 @@ public TaskOrchestration create() {
125127
}
126128
});
127129

130+
DurableTaskGrpcWorkerVersioningOptions versioningOptions = null;
131+
if (orchestratorRequest.getPropertiesMap().containsKey("defaultVersion")) {
132+
// If a default version is found, add it to the versioning options so it can be used in the execution flow.
133+
// It is safe to construct this here as we do not provide a client version nor a match/failure strategy that
134+
// would take effect. This is only used in the creation of sub-orchestrations.
135+
versioningOptions = new DurableTaskGrpcWorkerVersioningOptions(
136+
null,
137+
orchestratorRequest.getPropertiesMap().get("defaultVersion").getStringValue(),
138+
VersionMatchStrategy.NONE,
139+
VersionFailureStrategy.REJECT);
140+
}
141+
128142
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
129143
orchestrationFactories,
130144
new JacksonDataConverter(),
131145
DEFAULT_MAXIMUM_TIMER_INTERVAL,
132146
logger,
133-
null);
147+
versioningOptions);
134148

135149
// TODO: Error handling
136150
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(

endtoendtests/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ dependencies {
1919
implementation project(':client')
2020
implementation project(':azurefunctions')
2121

22-
implementation 'com.microsoft.azure.functions:azure-functions-java-library:3.0.0'
22+
implementation 'com.microsoft.azure.functions:azure-functions-java-library:3.1.0'
2323
testImplementation 'org.junit.jupiter:junit-jupiter:5.6.2'
2424
testImplementation 'io.rest-assured:rest-assured:5.3.0'
2525
testImplementation 'io.rest-assured:json-path:5.3.0'

endtoendtests/e2e-test-setup.ps1

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ if ($NoSetup -eq $false) {
2929
docker run --name $ContainerName -p 8080:80 -it --add-host=host.docker.internal:host-gateway -d `
3030
--env 'AzureWebJobsStorage=UseDevelopmentStorage=true;DevelopmentStorageProxyUri=http://host.docker.internal' `
3131
--env 'WEBSITE_HOSTNAME=localhost:8080' `
32+
--env 'FUNCTIONS_EXTENSIONBUNDLE_SOURCE_URI=https://functionscdnstaging.azureedge.net/public' `
3233
$ImageName
3334
}
3435

endtoendtests/host.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
},
1414
"extensions": {
1515
"durableTask": {
16-
"hubName": "DFJavaSmokeTest"
16+
"hubName": "DFJavaSmokeTest",
17+
"defaultVersion": "1.0"
1718
}
1819
},
1920
"extensionBundle": {
2021
"id": "Microsoft.Azure.Functions.ExtensionBundle",
21-
"version": "[4.*, 5.0.0)"
22+
"version": "[4.26.0, 5.0.0)"
2223
}
2324
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package com.functions;
2+
3+
import java.util.Optional;
4+
5+
import com.microsoft.azure.functions.ExecutionContext;
6+
import com.microsoft.azure.functions.HttpMethod;
7+
import com.microsoft.azure.functions.HttpRequestMessage;
8+
import com.microsoft.azure.functions.HttpResponseMessage;
9+
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
10+
import com.microsoft.azure.functions.annotation.FunctionName;
11+
import com.microsoft.azure.functions.annotation.HttpTrigger;
12+
import com.microsoft.durabletask.DurableTaskClient;
13+
import com.microsoft.durabletask.NewOrchestrationInstanceOptions;
14+
import com.microsoft.durabletask.NewSubOrchestrationInstanceOptions;
15+
import com.microsoft.durabletask.TaskOrchestrationContext;
16+
import com.microsoft.durabletask.azurefunctions.DurableActivityTrigger;
17+
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
18+
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
19+
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
20+
21+
public class Versioning {
22+
/**
23+
* This HTTP-triggered function starts the orchestration.
24+
*/
25+
@FunctionName("StartVersionedOrchestration")
26+
public HttpResponseMessage startOrchestration(
27+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
28+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
29+
final ExecutionContext context) {
30+
context.getLogger().info("Java HTTP trigger processed a request.");
31+
32+
DurableTaskClient client = durableContext.getClient();
33+
String queryVersion = request.getQueryParameters().getOrDefault("version", "");
34+
context.getLogger().info(String.format("Received version '%s' from the query string", queryVersion));
35+
String instanceId = null;
36+
if (queryVersion.equals("default")) {
37+
instanceId = client.scheduleNewOrchestrationInstance("VersionedOrchestrator");
38+
} else {
39+
instanceId = client.scheduleNewOrchestrationInstance("VersionedOrchestrator", new NewOrchestrationInstanceOptions().setVersion(queryVersion));
40+
}
41+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
42+
return durableContext.createCheckStatusResponse(request, instanceId);
43+
}
44+
45+
/**
46+
* This HTTP-triggered function starts the orchestration.
47+
*/
48+
@FunctionName("StartVersionedSubOrchestration")
49+
public HttpResponseMessage startSubOrchestration(
50+
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
51+
@DurableClientInput(name = "durableContext") DurableClientContext durableContext,
52+
final ExecutionContext context) {
53+
context.getLogger().info("Java HTTP trigger processed a request.");
54+
55+
DurableTaskClient client = durableContext.getClient();
56+
String queryVersion = request.getQueryParameters().getOrDefault("version", "");
57+
String instanceId = null;
58+
if (queryVersion.equals("default")) {
59+
instanceId = client.scheduleNewOrchestrationInstance("VersionedSubOrchestrator");
60+
} else {
61+
instanceId = client.scheduleNewOrchestrationInstance("VersionedSubOrchestrator", queryVersion);
62+
}
63+
context.getLogger().info("Created new Java orchestration with instance ID = " + instanceId);
64+
return durableContext.createCheckStatusResponse(request, instanceId);
65+
}
66+
67+
@FunctionName("VersionedOrchestrator")
68+
public String versionedOrchestrator(
69+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
70+
return ctx.callActivity("SayVersion", ctx.getVersion(), String.class).await();
71+
}
72+
73+
@FunctionName("VersionedSubOrchestrator")
74+
public String versionedSubOrchestrator(
75+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
76+
String subVersion = ctx.getInput(String.class);
77+
NewSubOrchestrationInstanceOptions options = new NewSubOrchestrationInstanceOptions();
78+
if (subVersion != null) {
79+
options.setVersion(subVersion);
80+
}
81+
return ctx.callSubOrchestrator("SubOrchestrator", null, null, options, String.class).await();
82+
}
83+
84+
@FunctionName("SubOrchestrator")
85+
public String subOrchestrator(
86+
@DurableOrchestrationTrigger(name = "ctx") TaskOrchestrationContext ctx) {
87+
return ctx.callActivity("SayVersion", ctx.getVersion(), String.class).await();
88+
}
89+
90+
@FunctionName("SayVersion")
91+
public String sayVersion(
92+
@DurableActivityTrigger(name = "version") String version,
93+
final ExecutionContext context) {
94+
version = version.replaceAll("\"", "");
95+
context.getLogger().info(String.format("Called with version: '%s'", version));
96+
return String.format("Version: '%s'", version);
97+
}
98+
}

endtoendtests/src/test/java/com/functions/EndToEndTests.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.junit.jupiter.api.Assertions.assertEquals;
2222
import static org.junit.jupiter.api.Assertions.assertNotEquals;
2323
import static org.junit.jupiter.api.Assertions.assertTrue;
24+
import static org.junit.jupiter.api.Assertions.fail;
2425

2526
@Tag("e2e")
2627
public class EndToEndTests {
@@ -262,6 +263,68 @@ public void DeserializeFail(String functionName) throws InterruptedException {
262263
assertTrue(errorMessage.contains("Failed to deserialize the JSON text"));
263264
}
264265

266+
@ParameterizedTest
267+
@ValueSource(strings = {
268+
"default",
269+
"",
270+
"0.9",
271+
"1.0"
272+
})
273+
public void VersionedOrchestrationTests(String version) throws InterruptedException {
274+
Response response = post("api/StartVersionedOrchestration?version=" + version);
275+
String statusQueryGetUri = null;
276+
try {
277+
278+
JsonPath jsonPath = response.jsonPath();
279+
// assert orchestration status
280+
statusQueryGetUri = jsonPath.get("statusQueryGetUri");
281+
} catch (Exception e) {
282+
fail("Failed to parse response: " + response.asString());
283+
}
284+
boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(10));
285+
assertTrue(completed);
286+
287+
// assert exception message
288+
Response resp = get(statusQueryGetUri);
289+
String output = resp.jsonPath().get("output");
290+
if (version.equals("default")) {
291+
assertTrue(output.contains("Version: '1.0'"), "Expected default version (1.0), got: " + output);
292+
} else {
293+
assertTrue(output.contains(String.format("Version: '%s'", version)), "Expected version (" + version + "), got: " + output);
294+
}
295+
}
296+
297+
@ParameterizedTest
298+
@ValueSource(strings = {
299+
"default",
300+
"",
301+
"0.9",
302+
"1.0"
303+
})
304+
public void VersionedSubOrchestrationTests(String version) throws InterruptedException {
305+
Response response = post("api/StartVersionedSubOrchestration?version=" + version);
306+
String statusQueryGetUri = null;
307+
try {
308+
309+
JsonPath jsonPath = response.jsonPath();
310+
// assert orchestration status
311+
statusQueryGetUri = jsonPath.get("statusQueryGetUri");
312+
} catch (Exception e) {
313+
fail("Failed to parse response: " + response.asString());
314+
}
315+
boolean completed = pollingCheck(statusQueryGetUri, "Completed", null, Duration.ofSeconds(10));
316+
assertTrue(completed);
317+
318+
// assert exception message
319+
Response resp = get(statusQueryGetUri);
320+
String output = resp.jsonPath().get("output");
321+
if (version.equals("default")) {
322+
assertTrue(output.contains(String.format("Version: '%s'", "1.0")), "Expected default version (1.0), got: " + output);
323+
} else {
324+
assertTrue(output.contains(String.format("Version: '%s'", version)), "Expected version (" + version + "), got: " + output);
325+
}
326+
}
327+
265328
private boolean pollingCheck(String statusQueryGetUri,
266329
String expectedState,
267330
Set<String> continueStates,

0 commit comments

Comments
 (0)