Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nexus SyncOperation context propagation #395

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions nexus-context-propagation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ go run ./starter \

which should result in:
```
2025/02/27 12:57:40 Started workflow WorkflowID nexus_hello_caller_workflow_20240723195740 RunID c9789128-2fcd-4083-829d-95e43279f6d7
2025/02/27 12:57:40 Workflow result: ¡Hola! Nexus, caller-id: samples-go 👋
2025/02/28 14:54:29 Started workflow WorkflowID nexus_hello_caller_workflow_20250228145225 RunID 01954ec2-d1d2-72b7-a6cc-a83de5421754
2025/02/28 14:54:30 Workflow result: Nexus Echo 👋, caller-id: samples-go
2025/02/28 14:54:30 Started workflow WorkflowID nexus_hello_caller_workflow_20250228145430 RunID 01954ec4-bc4d-7dac-a311-d4ceacacfa9a
2025/02/28 14:54:31 Workflow result: ¡Hola! Nexus, caller-id: samples-go 👋
```
16 changes: 11 additions & 5 deletions nexus-context-propagation/caller/starter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,22 @@ func main() {
}
defer c.Close()
ctx := context.Background()
workflowOptions := client.StartWorkflowOptions{
ID: "nexus_hello_caller_workflow_" + time.Now().Format("20060102150405"),
TaskQueue: caller.TaskQueue,
}

ctx = context.WithValue(ctx, ctxpropagation.PropagateKey, ctxpropagation.Values{
Key: "caller-id",
Value: "samples-go",
})
wr, err := c.ExecuteWorkflow(ctx, workflowOptions, caller.HelloCallerWorkflow, "Nexus", service.ES)

runWorkflow(ctx, c, caller.EchoCallerWorkflow, "Nexus Echo 👋")
runWorkflow(ctx, c, caller.HelloCallerWorkflow, "Nexus", service.ES)
}

func runWorkflow(ctx context.Context, c client.Client, workflow interface{}, args ...interface{}) {
workflowOptions := client.StartWorkflowOptions{
ID: "nexus_hello_caller_workflow_" + time.Now().Format("20060102150405"),
TaskQueue: caller.TaskQueue,
}
wr, err := c.ExecuteWorkflow(ctx, workflowOptions, workflow, args...)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
Expand Down
1 change: 1 addition & 0 deletions nexus-context-propagation/caller/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func main() {
})

w.RegisterWorkflow(caller.HelloCallerWorkflow)
w.RegisterWorkflow(caller.EchoCallerWorkflow)

err = w.Run(worker.InterruptCh())
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions nexus-context-propagation/handler/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,26 @@ import (
"github.com/temporalio/samples-go/nexus/service"
)

// NewSyncOperation is a meant for exposing simple RPC handlers.
var EchoOperation = nexus.NewSyncOperation(service.EchoOperationName, func(ctx context.Context, input service.EchoInput, options nexus.StartOperationOptions) (service.EchoOutput, error) {
// Values may be extracted from the context in the Operation handler body.
values, ok := ctx.Value(ctxpropagation.PropagateKey).(ctxpropagation.Values)
if ok {
input.Message += ", " + values.Key + ": " + values.Value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just log the context value, we generally using headers to pass input

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion, do you think it's worth blocking the PR for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are samples should reflect what we recommend users do in production. Maxim has called out our samples that do things we wouldn't recommend users do in production so yes I think we should change.

}
// Use temporalnexus.GetClient to get the client that the worker was initialized with to perform client calls
// such as signaling, querying, and listing workflows. Implementations are free to make arbitrary calls to other
// services or databases, or perform simple computations such as this one.
return service.EchoOutput(input), nil
})

// Use the NewWorkflowRunOperation constructor, which is the easiest way to expose a workflow as an operation.
// See alternatives at https://pkg.go.dev/go.temporal.io/sdk/temporalnexus.
var HelloOperation = temporalnexus.NewWorkflowRunOperation(service.HelloOperationName, HelloHandlerWorkflow, func(ctx context.Context, input service.HelloInput, options nexus.StartOperationOptions) (client.StartWorkflowOptions, error) {
// Values may be extracted from the context in the Operation handler body if necessary, this sample propagates
// the context to the handler workflow.
// values, ok := ctx.Value(ctxpropagation.PropagateKey).(ctxpropagation.Values)

return client.StartWorkflowOptions{
// Workflow IDs should typically be business meaningful IDs and are used to dedupe workflow starts.
// For this example, we're using the request ID allocated by Temporal when the caller workflow schedules
Expand All @@ -27,6 +44,7 @@ var HelloOperation = temporalnexus.NewWorkflowRunOperation(service.HelloOperatio
})

func HelloHandlerWorkflow(ctx workflow.Context, input service.HelloInput) (service.HelloOutput, error) {
// Values may be extracted from the handler workflow context.
values, ok := ctx.Value(ctxpropagation.PropagateKey).(ctxpropagation.Values)
if ok {
input.Name += ", " + values.Key + ": " + values.Value
Expand Down
2 changes: 1 addition & 1 deletion nexus-context-propagation/handler/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
},
})
service := nexus.NewService(service.HelloServiceName)
err = service.Register(handler.HelloOperation)
err = service.Register(handler.HelloOperation, handler.EchoOperation)
if err != nil {
log.Fatalln("Unable to register operations", err)
}
Expand Down