Skip to content

Commit 175ffe4

Browse files
authored
Add support for SignalExternalWorkflow (coinbase#134)
* initial work to support SignalExternalWorkflow * define the serializer and hook it up * stub in what I think is the correct work for each event type * some fixes per antstorm advice * initial attempt at integration test * docs on testing and an improvement to existing test * encode the signal payload using correct helper * return a Future and fulfill it correctly upon completion * get the \*event_id from the right field in the command structure * modify test to verify the signal is only received once * test for failure to deliver a signal to external workflow * do not discard the failure command otherwise non-deterministic * simplify test workflow by eliminating unnecessary timer * oops, had double call to #schedule_command so signals were sent twice * edit description of example * split to separate files and improve test coverage * change method signature for consistency and a few other cleanups * oops, fix EventType name to match correct constant
1 parent e249d63 commit 175ffe4

11 files changed

+243
-11
lines changed

examples/README.md

+17-3
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,33 @@ To try these out you need to have a running Temporal service ([setup instruction
77
Install all the gem dependencies by running:
88

99
```sh
10-
> bundle install
10+
bundle install
1111
```
1212

1313
Modify the `init.rb` file to point to your Temporal cluster.
1414

1515
Start a worker process:
1616

1717
```sh
18-
> bin/worker
18+
bin/worker
1919
```
2020

2121
Use this command to trigger one of the example workflows from the `workflows` directory:
2222

2323
```sh
24-
> bin/trigger NAME_OF_THE_WORKFLOW [argument_1, argument_2, ...]
24+
bin/trigger NAME_OF_THE_WORKFLOW [argument_1, argument_2, ...]
2525
```
26+
## Testing
27+
28+
To run tests, make sure the temporal server and the worker process are already running:
29+
```shell
30+
docker-compose up
31+
bin/worker
32+
```
33+
To execute the tests, run:
34+
```shell
35+
bundle exec rspec
36+
```
37+
To add a new test that uses a new workflow or new activity, make sure to register those new
38+
workflows and activities by modifying the `bin/worker` file and adding them there. After any
39+
changes to that file, restart the worker process to pick up the new registrations.

examples/bin/worker

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ worker.register_workflow(SimpleTimerWorkflow)
4545
worker.register_workflow(TimeoutWorkflow)
4646
worker.register_workflow(TripBookingWorkflow)
4747
worker.register_workflow(WaitForWorkflow)
48+
worker.register_workflow(WaitForExternalSignalWorkflow)
49+
worker.register_workflow(SendSignalToExternalWorkflow)
4850

4951
worker.register_activity(AsyncActivity)
5052
worker.register_activity(EchoActivity)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
require 'workflows/wait_for_external_signal_workflow'
2+
require 'workflows/send_signal_to_external_workflow'
3+
4+
describe WaitForExternalSignalWorkflow do
5+
let(:signal_name) { "signal_name" }
6+
let(:receiver_workflow_id) { SecureRandom.uuid }
7+
let(:sender_workflow_id) { SecureRandom.uuid }
8+
9+
context 'when the workflows succeed then' do
10+
it 'receives signal from an external workflow only once' do
11+
run_id = Temporal.start_workflow(
12+
WaitForExternalSignalWorkflow,
13+
signal_name,
14+
options: {workflow_id: receiver_workflow_id}
15+
)
16+
17+
Temporal.start_workflow(
18+
SendSignalToExternalWorkflow,
19+
signal_name,
20+
receiver_workflow_id
21+
)
22+
23+
result = Temporal.await_workflow_result(
24+
WaitForExternalSignalWorkflow,
25+
workflow_id: receiver_workflow_id,
26+
run_id: run_id,
27+
)
28+
29+
expect(result).to eq(
30+
{
31+
received: {
32+
signal_name => ["arg1", "arg2"]
33+
},
34+
counts: {
35+
signal_name => 1
36+
}
37+
}
38+
)
39+
end
40+
41+
it 'returns :success to the sending workflow' do
42+
Temporal.start_workflow(
43+
WaitForExternalSignalWorkflow,
44+
signal_name,
45+
options: {workflow_id: receiver_workflow_id}
46+
)
47+
48+
run_id = Temporal.start_workflow(
49+
SendSignalToExternalWorkflow,
50+
signal_name,
51+
receiver_workflow_id,
52+
options: {workflow_id: sender_workflow_id}
53+
)
54+
55+
result = Temporal.await_workflow_result(
56+
SendSignalToExternalWorkflow,
57+
workflow_id: sender_workflow_id,
58+
run_id: run_id,
59+
)
60+
61+
expect(result).to eq(:success)
62+
end
63+
end
64+
65+
context 'when the workflows fail' do
66+
it 'correctly handles failure to deliver' do
67+
run_id = Temporal.start_workflow(
68+
SendSignalToExternalWorkflow,
69+
signal_name,
70+
receiver_workflow_id,
71+
options: {workflow_id: sender_workflow_id})
72+
73+
result = Temporal.await_workflow_result(
74+
SendSignalToExternalWorkflow,
75+
workflow_id: sender_workflow_id,
76+
run_id: run_id,
77+
)
78+
79+
expect(result).to eq(:failed)
80+
end
81+
end
82+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Sends +signal_name+ to the +target_workflow+ from within a workflow.
2+
# This is different than using the Client#send_signal method which is
3+
# for signaling a workflow *from outside* any workflow.
4+
#
5+
# Returns :success or :failed
6+
#
7+
class SendSignalToExternalWorkflow < Temporal::Workflow
8+
def execute(signal_name, target_workflow)
9+
logger.info("Send a signal to an external workflow")
10+
future = workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, target_workflow, nil, ["arg1", "arg2"])
11+
@status = nil
12+
future.done { @status = :success }
13+
future.failed { @status = :failed }
14+
future.get
15+
@status
16+
end
17+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# One workflow sends a signal to another workflow. Can be used to implement
2+
# the synchronous-proxy pattern (see Go samples)
3+
#
4+
class WaitForExternalSignalWorkflow < Temporal::Workflow
5+
def execute(expected_signal)
6+
signals_received = {}
7+
signal_counts = Hash.new { |h,k| h[k] = 0 }
8+
9+
workflow.on_signal do |signal, input|
10+
workflow.logger.info("Received signal name #{signal}, with input #{input.inspect}")
11+
signals_received[signal] = input
12+
signal_counts[signal] += 1
13+
end
14+
15+
workflow.wait_for do
16+
workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}")
17+
signals_received.key?(expected_signal)
18+
end
19+
20+
{ received: signals_received, counts: signal_counts }
21+
end
22+
end

lib/temporal/connection/serializer.rb

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
require 'temporal/connection/serializer/complete_workflow'
99
require 'temporal/connection/serializer/continue_as_new'
1010
require 'temporal/connection/serializer/fail_workflow'
11+
require 'temporal/connection/serializer/signal_external_workflow'
1112

1213
module Temporal
1314
module Connection
@@ -21,7 +22,8 @@ module Serializer
2122
Workflow::Command::CancelTimer => Serializer::CancelTimer,
2223
Workflow::Command::CompleteWorkflow => Serializer::CompleteWorkflow,
2324
Workflow::Command::ContinueAsNew => Serializer::ContinueAsNew,
24-
Workflow::Command::FailWorkflow => Serializer::FailWorkflow
25+
Workflow::Command::FailWorkflow => Serializer::FailWorkflow,
26+
Workflow::Command::SignalExternalWorkflow => Serializer::SignalExternalWorkflow
2527
}.freeze
2628

2729
def self.serialize(object)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
require 'temporal/connection/serializer/base'
2+
require 'temporal/concerns/payloads'
3+
4+
module Temporal
5+
module Connection
6+
module Serializer
7+
class SignalExternalWorkflow < Base
8+
include Concerns::Payloads
9+
10+
def to_proto
11+
Temporal::Api::Command::V1::Command.new(
12+
command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
13+
signal_external_workflow_execution_command_attributes:
14+
Temporal::Api::Command::V1::SignalExternalWorkflowExecutionCommandAttributes.new(
15+
namespace: object.namespace,
16+
execution: serialize_execution(object.execution),
17+
signal_name: object.signal_name,
18+
input: to_signal_payloads(object.input),
19+
control: "", # deprecated
20+
child_workflow_only: object.child_workflow_only
21+
)
22+
)
23+
end
24+
25+
private
26+
27+
def serialize_execution(execution)
28+
Temporal::Api::Common::V1::WorkflowExecution.new(workflow_id: execution[:workflow_id], run_id: execution[:run_id])
29+
end
30+
end
31+
end
32+
end
33+
end

lib/temporal/workflow/command.rb

+4-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ module Command
1111
CancelTimer = Struct.new(:timer_id, keyword_init: true)
1212
CompleteWorkflow = Struct.new(:result, keyword_init: true)
1313
FailWorkflow = Struct.new(:exception, keyword_init: true)
14+
SignalExternalWorkflow = Struct.new(:namespace, :execution, :signal_name, :input, :child_workflow_only, keyword_init: true)
1415

1516
# only these commands are supported right now
1617
SCHEDULE_ACTIVITY_TYPE = :schedule_activity
@@ -21,6 +22,7 @@ module Command
2122
CANCEL_TIMER_TYPE = :cancel_timer
2223
COMPLETE_WORKFLOW_TYPE = :complete_workflow
2324
FAIL_WORKFLOW_TYPE = :fail_workflow
25+
SIGNAL_EXTERNAL_WORKFLOW_TYPE = :signal_external_workflow
2426

2527
COMMAND_CLASS_MAP = {
2628
SCHEDULE_ACTIVITY_TYPE => ScheduleActivity,
@@ -30,7 +32,8 @@ module Command
3032
START_TIMER_TYPE => StartTimer,
3133
CANCEL_TIMER_TYPE => CancelTimer,
3234
COMPLETE_WORKFLOW_TYPE => CompleteWorkflow,
33-
FAIL_WORKFLOW_TYPE => FailWorkflow
35+
FAIL_WORKFLOW_TYPE => FailWorkflow,
36+
SIGNAL_EXTERNAL_WORKFLOW_TYPE => SignalExternalWorkflow
3437
}.freeze
3538

3639
def self.generate(type, **args)

lib/temporal/workflow/context.rb

+46
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,52 @@ def cancel(target, cancelation_id)
304304
end
305305
end
306306

307+
# Send a signal from inside a workflow to another workflow. Not to be confused with
308+
# Client#signal_workflow which sends a signal from outside a workflow to a workflow.
309+
#
310+
# @param workflow [Temporal::Workflow, nil] workflow class or nil
311+
# @param signal [String] name of the signal to send
312+
# @param workflow_id [String]
313+
# @param run_id [String]
314+
# @param input [String, Array, nil] optional arguments for the signal
315+
# @param namespace [String, nil] if nil, choose the one declared on the workflow class or the
316+
# global default
317+
# @param child_workflow_only [Boolean] indicates whether the signal should only be delivered to a
318+
# child workflow; defaults to false
319+
#
320+
# @return [Future] future
321+
def signal_external_workflow(workflow, signal, workflow_id, run_id = nil, input = nil, namespace: nil, child_workflow_only: false)
322+
options ||= {}
323+
324+
execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)
325+
326+
command = Command::SignalExternalWorkflow.new(
327+
namespace: namespace || execution_options.namespace,
328+
execution: {
329+
workflow_id: workflow_id,
330+
run_id: run_id
331+
},
332+
signal_name: signal,
333+
input: input,
334+
child_workflow_only: child_workflow_only
335+
)
336+
337+
target, cancelation_id = schedule_command(command)
338+
future = Future.new(target, self, cancelation_id: cancelation_id)
339+
340+
dispatcher.register_handler(target, 'completed') do |result|
341+
future.set(result)
342+
future.success_callbacks.each { |callback| call_in_fiber(callback, result) }
343+
end
344+
345+
dispatcher.register_handler(target, 'failed') do |exception|
346+
future.fail(exception)
347+
future.failure_callbacks.each { |callback| call_in_fiber(callback, exception) }
348+
end
349+
350+
future
351+
end
352+
307353
private
308354

309355
attr_reader :state_manager, :dispatcher, :workflow_class

lib/temporal/workflow/history/event.rb

+1-3
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ class Event
1010
ACTIVITY_TASK_CANCELED
1111
TIMER_FIRED
1212
REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED
13-
SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED
1413
EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED
15-
EXTERNAL_WORKFLOW_EXECUTION_SIGNALED
1614
UPSERT_WORKFLOW_SEARCH_ATTRIBUTES
1715
].freeze
1816

@@ -48,7 +46,7 @@ def originating_event_id
4846
1 # fixed id for everything related to current workflow
4947
when *EVENT_TYPES
5048
attributes.scheduled_event_id
51-
when *CHILD_WORKFLOW_EVENTS
49+
when *CHILD_WORKFLOW_EVENTS, 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED', 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED'
5250
attributes.initiated_event_id
5351
else
5452
id

lib/temporal/workflow/state_manager.rb

+16-3
Original file line numberDiff line numberDiff line change
@@ -241,13 +241,24 @@ def apply_event(event)
241241
# todo
242242

243243
when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED'
244-
# todo
244+
# Temporal Server will try to Signal the targeted Workflow
245+
# Contains the Signal name, as well as a Signal payload
246+
# The workflow that sends the signal creates this event in its log; the
247+
# receiving workflow records WORKFLOW_EXECUTION_SIGNALED on reception
248+
state_machine.start
249+
discard_command(target)
245250

246251
when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED'
247-
# todo
252+
# Temporal Server cannot Signal the targeted Workflow
253+
# Usually because the Workflow could not be found
254+
state_machine.fail
255+
dispatch(target, 'failed', 'StandardError', event.attributes.cause)
248256

249257
when 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED'
250-
# todo
258+
# Temporal Server has successfully Signaled the targeted Workflow
259+
# Return the result to the Future waiting on this
260+
state_machine.complete
261+
dispatch(target, 'completed')
251262

252263
when 'UPSERT_WORKFLOW_SEARCH_ATTRIBUTES'
253264
# todo
@@ -274,6 +285,8 @@ def event_target_from(command_id, command)
274285
History::EventTarget::WORKFLOW_TYPE
275286
when Command::StartChildWorkflow
276287
History::EventTarget::CHILD_WORKFLOW_TYPE
288+
when Command::SignalExternalWorkflow
289+
History::EventTarget::EXTERNAL_WORKFLOW_TYPE
277290
end
278291

279292
History::EventTarget.new(command_id, target_type)

0 commit comments

Comments
 (0)