Skip to content

Commit 8e3cf67

Browse files
Add deployment version features (#611)
1 parent 1a033a7 commit 8e3cf67

File tree

18 files changed

+615
-37
lines changed

18 files changed

+615
-37
lines changed

dockerfiles/dynamicconfig/docker.yaml

+9
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,18 @@ system.enableActivityEagerExecution:
77
system.enableEagerWorkflowStart:
88
- value: true
99
constraints: {}
10+
system.enableDeployments:
11+
- value: true
12+
constraints: {}
13+
system.enableDeploymentVersions:
14+
- value: true
15+
constraints: {}
1016
frontend.enableUpdateWorkflowExecution:
1117
- value: true
1218
constraints: {}
1319
frontend.enableUpdateWorkflowExecutionAsyncAccepted:
1420
- value: true
1521
constraints: {}
22+
frontend.workerVersioningWorkflowAPIs:
23+
- value: true
24+
constraints: {}
+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package deployment_versioning
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/temporalio/features/harness/go/harness"
8+
enumspb "go.temporal.io/api/enums/v1"
9+
"go.temporal.io/sdk/client"
10+
"go.temporal.io/sdk/worker"
11+
"go.temporal.io/sdk/workflow"
12+
)
13+
14+
func StartWorker(ctx context.Context, r *harness.Runner, version string, versioningBehavior workflow.VersioningBehavior, waitForSignal func(workflow.Context) (string, error)) worker.Worker {
15+
w := worker.New(r.Client, r.TaskQueue, worker.Options{
16+
DeploymentOptions: worker.DeploymentOptions{
17+
UseVersioning: true,
18+
Version: version,
19+
DefaultVersioningBehavior: versioningBehavior,
20+
},
21+
})
22+
w.RegisterWorkflowWithOptions(waitForSignal, workflow.RegisterOptions{
23+
Name: "WaitForSignal",
24+
})
25+
return w
26+
}
27+
28+
func WaitForDeploymentVersion(r *harness.Runner, ctx context.Context, dHandle client.WorkerDeploymentHandle, version string) error {
29+
return r.DoUntilEventually(ctx, 300*time.Millisecond, 10*time.Second,
30+
func() bool {
31+
d, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
32+
if err != nil {
33+
return false
34+
}
35+
for _, v := range d.Info.VersionSummaries {
36+
if v.Version == version {
37+
return true
38+
}
39+
}
40+
return false
41+
})
42+
}
43+
44+
func WaitForDeployment(r *harness.Runner, ctx context.Context, dHandle client.WorkerDeploymentHandle) error {
45+
return r.DoUntilEventually(ctx, 300*time.Millisecond, 10*time.Second,
46+
func() bool {
47+
_, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
48+
return err == nil
49+
})
50+
}
51+
52+
func WaitForWorkflowRunning(r *harness.Runner, ctx context.Context, handle client.WorkflowRun) error {
53+
return r.DoUntilEventually(ctx, 300*time.Millisecond, 10*time.Second,
54+
func() bool {
55+
describeResp, err := r.Client.DescribeWorkflowExecution(ctx, handle.GetID(), handle.GetRunID())
56+
if err != nil {
57+
return false
58+
}
59+
status := describeResp.WorkflowExecutionInfo.Status
60+
return enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING == status
61+
})
62+
}
63+
64+
func SetCurrent(r *harness.Runner, ctx context.Context, deploymentName string, version string) error {
65+
dHandle := r.Client.WorkerDeploymentClient().GetHandle(deploymentName)
66+
67+
if err := WaitForDeployment(r, ctx, dHandle); err != nil {
68+
return err
69+
}
70+
71+
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
72+
if err != nil {
73+
return err
74+
}
75+
76+
if err := WaitForDeploymentVersion(r, ctx, dHandle, version); err != nil {
77+
return err
78+
}
79+
80+
_, err = dHandle.SetCurrentVersion(ctx, client.WorkerDeploymentSetCurrentVersionOptions{
81+
Version: version,
82+
ConflictToken: response1.ConflictToken,
83+
})
84+
85+
return err
86+
}
87+
88+
func SetRamp(r *harness.Runner, ctx context.Context, deploymentName string, version string, percentage float32) error {
89+
dHandle := r.Client.WorkerDeploymentClient().GetHandle(deploymentName)
90+
91+
if err := WaitForDeployment(r, ctx, dHandle); err != nil {
92+
return err
93+
}
94+
95+
response1, err := dHandle.Describe(ctx, client.WorkerDeploymentDescribeOptions{})
96+
if err != nil {
97+
return err
98+
}
99+
100+
if err := WaitForDeploymentVersion(r, ctx, dHandle, version); err != nil {
101+
return err
102+
}
103+
104+
_, err = dHandle.SetRampingVersion(ctx, client.WorkerDeploymentSetRampingVersionOptions{
105+
Version: version,
106+
ConflictToken: response1.ConflictToken,
107+
Percentage: float32(100.0),
108+
})
109+
110+
return err
111+
}
112+
113+
func ServerSupportsDeployments(ctx context.Context, r *harness.Runner) bool {
114+
// No system capability, only dynamic config in namespace, need to just try...
115+
iter, err := r.Client.WorkerDeploymentClient().List(ctx, client.WorkerDeploymentListOptions{})
116+
if err != nil {
117+
return false
118+
}
119+
// Need to call `HasNext` to contact the server
120+
for iter.HasNext() {
121+
_, err := iter.Next()
122+
if err != nil {
123+
return false
124+
}
125+
}
126+
return true
127+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Deployment Versioning: Routing AutoUpgrade
2+
3+
When the Current Version for a Deployment changes, already started workflows
4+
will transition to the new version if they are `AutoUpgrade`.
5+
6+
# Detailed spec
7+
8+
* Create a random deployment name `deployment_name`
9+
* Start a `deployment_name.1-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v1`.
10+
* Start a `deployment_name.2-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v2`.
11+
* Set Current version for `deployment_name` to `deployment_name.1-0`
12+
* Start `workflow_1` of type `WaitForSignal`, it should start AutoUpgrade and with version `deployment_name.1-0`
13+
* Set Current version for `deployment_name` to `deployment_name.2-0`
14+
* Signal workflow. The workflow (pinned) should exit returning `prefix_v2`.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package routing_auto_upgrade
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/temporalio/features/features/deployment_versioning"
10+
"github.com/temporalio/features/harness/go/harness"
11+
"go.temporal.io/sdk/client"
12+
"go.temporal.io/sdk/worker"
13+
"go.temporal.io/sdk/workflow"
14+
)
15+
16+
var deploymentName = uuid.NewString()
17+
18+
func WaitForSignalOne(ctx workflow.Context) (string, error) {
19+
var value string
20+
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
21+
return value + "_v1", nil
22+
}
23+
24+
func WaitForSignalTwo(ctx workflow.Context) (string, error) {
25+
var value string
26+
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
27+
return value + "_v2", nil
28+
}
29+
30+
var Feature = harness.Feature{
31+
Workflows: []interface{}{
32+
harness.WorkflowWithOptions{
33+
Workflow: WaitForSignalOne,
34+
Options: workflow.RegisterOptions{
35+
Name: "WaitForSignal",
36+
VersioningBehavior: workflow.VersioningBehaviorAutoUpgrade,
37+
},
38+
},
39+
},
40+
Execute: Execute,
41+
WorkerOptions: worker.Options{
42+
DeploymentOptions: worker.DeploymentOptions{
43+
UseVersioning: true,
44+
Version: deploymentName + ".1.0",
45+
},
46+
},
47+
CheckHistory: CheckHistory,
48+
ExpectRunResult: "prefix_v2",
49+
}
50+
51+
var worker2 worker.Worker
52+
53+
func Execute(ctx context.Context, r *harness.Runner) (client.WorkflowRun, error) {
54+
if supported := deployment_versioning.ServerSupportsDeployments(ctx, r); !supported {
55+
return nil, r.Skip(fmt.Sprintf("server does not support deployment versioning"))
56+
}
57+
58+
worker2 = deployment_versioning.StartWorker(ctx, r, deploymentName+".2.0",
59+
workflow.VersioningBehaviorAutoUpgrade, WaitForSignalTwo)
60+
if err := worker2.Start(); err != nil {
61+
return nil, err
62+
}
63+
64+
if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".1.0"); err != nil {
65+
return nil, err
66+
}
67+
68+
run, err := r.Client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
69+
TaskQueue: r.TaskQueue,
70+
ID: "workflow_1",
71+
WorkflowExecutionTimeout: 1 * time.Minute,
72+
}, "WaitForSignal")
73+
74+
if err != nil {
75+
return nil, err
76+
}
77+
78+
if err := deployment_versioning.WaitForWorkflowRunning(r, ctx, run); err != nil {
79+
return nil, err
80+
}
81+
82+
if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".2.0"); err != nil {
83+
return nil, err
84+
}
85+
86+
if err := r.Client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "start-signal", "prefix"); err != nil {
87+
return nil, err
88+
}
89+
90+
return run, nil
91+
}
92+
93+
func CheckHistory(ctx context.Context, r *harness.Runner, run client.WorkflowRun) error {
94+
// Shut down the 2.0 worker
95+
worker2.Stop()
96+
return r.CheckHistoryDefault(ctx, run)
97+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Deployment Versioning: Version Routing Pinned
2+
3+
When the Current Version for a Deployment changes, already started workflows
4+
will never transition to the new version if they are `Pinned`.
5+
6+
# Detailed spec
7+
8+
* Create a random deployment name `deployment_name`
9+
* Start a `deployment_name.1-0` worker, register workflow type `WaitForSignal` as `Pinned`, the implementation of that workflow should end returning `prefix_v1`.
10+
* Start a `deployment_name.2-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v2`.
11+
* Set Current version for `deployment_name` to `deployment_name.1-0`
12+
* Start `workflow_1` of type `WaitForSignal`, it should start pinned and with version `deployment_name.1-0`
13+
* Set Current version for `deployment_name` to `deployment_name.2-0`
14+
* Signal workflow. The workflow (pinned) should exit returning `prefix_v1`.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package routing_pinned
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/temporalio/features/features/deployment_versioning"
10+
"github.com/temporalio/features/harness/go/harness"
11+
"go.temporal.io/sdk/client"
12+
"go.temporal.io/sdk/worker"
13+
"go.temporal.io/sdk/workflow"
14+
)
15+
16+
var deploymentName = uuid.NewString()
17+
18+
func WaitForSignalOne(ctx workflow.Context) (string, error) {
19+
var value string
20+
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
21+
return value + "_v1", nil
22+
}
23+
24+
func WaitForSignalTwo(ctx workflow.Context) (string, error) {
25+
var value string
26+
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
27+
return value + "_v2", nil
28+
}
29+
30+
var Feature = harness.Feature{
31+
Workflows: []interface{}{
32+
harness.WorkflowWithOptions{
33+
Workflow: WaitForSignalOne,
34+
Options: workflow.RegisterOptions{
35+
Name: "WaitForSignal",
36+
VersioningBehavior: workflow.VersioningBehaviorPinned,
37+
},
38+
},
39+
},
40+
Execute: Execute,
41+
WorkerOptions: worker.Options{
42+
DeploymentOptions: worker.DeploymentOptions{
43+
UseVersioning: true,
44+
Version: deploymentName + ".1.0",
45+
},
46+
},
47+
CheckHistory: CheckHistory,
48+
ExpectRunResult: "prefix_v1",
49+
}
50+
var worker2 worker.Worker
51+
52+
func Execute(ctx context.Context, r *harness.Runner) (client.WorkflowRun, error) {
53+
if supported := deployment_versioning.ServerSupportsDeployments(ctx, r); !supported {
54+
return nil, r.Skip(fmt.Sprintf("server does not support deployment versioning"))
55+
}
56+
57+
worker2 = deployment_versioning.StartWorker(ctx, r, deploymentName+".2.0",
58+
workflow.VersioningBehaviorAutoUpgrade, WaitForSignalTwo)
59+
if err := worker2.Start(); err != nil {
60+
return nil, err
61+
}
62+
63+
if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".1.0"); err != nil {
64+
return nil, err
65+
}
66+
67+
run, err := r.Client.ExecuteWorkflow(ctx, client.StartWorkflowOptions{
68+
TaskQueue: r.TaskQueue,
69+
ID: "workflow_1",
70+
WorkflowExecutionTimeout: 1 * time.Minute,
71+
}, "WaitForSignal")
72+
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
if err := deployment_versioning.WaitForWorkflowRunning(r, ctx, run); err != nil {
78+
return nil, err
79+
}
80+
81+
if err := deployment_versioning.SetCurrent(r, ctx, deploymentName, deploymentName+".2.0"); err != nil {
82+
return nil, err
83+
}
84+
85+
if err := r.Client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "start-signal", "prefix"); err != nil {
86+
return nil, err
87+
}
88+
89+
return run, nil
90+
}
91+
92+
func CheckHistory(ctx context.Context, r *harness.Runner, run client.WorkflowRun) error {
93+
// Shut down the 2.0 worker
94+
worker2.Stop()
95+
return r.CheckHistoryDefault(ctx, run)
96+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Deployment Versioning: Routing with Override
2+
3+
It is possible to override the Version of a new workflow with Start Workflow
4+
Options, so that it is pinned to a Version different from the Current one in that Deployment.
5+
6+
7+
# Detailed spec
8+
9+
* Create a random deployment name `deployment_name`
10+
* Start a `deployment_name.1-0` worker, register workflow type `WaitForSignal` as `Pinned`, the implementation of that workflow should end returning `prefix_v1`.
11+
* Start a `deployment_name.2-0` worker, register workflow type `WaitForSignal` as `AutoUpgrade`, the implementation of that workflow should end returning `prefix_v2`.
12+
* Set Current version for `deployment_name` to `deployment_name.2-0`
13+
* Start `workflow_1` of type `WaitForSignal`, and override for `Pinned` to `deployment_name.1.0`. It should start Pinned and with version `deployment_name.1-0`.
14+
* Signal workflow. The workflow (pinned) should exit returning `prefix_v1`.

0 commit comments

Comments
 (0)