Skip to content

Commit b54ff8c

Browse files
authored
[Docs] Update docs for the Activity Worker (temporalio#100)
* Add YARD docs to all Activity Worker related classes/methods * Update README with Activity Worker info * Add signal handling example to README
1 parent 5def9df commit b54ff8c

25 files changed

+408
-5
lines changed

Diff for: .yardopts

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
--exclude lib/gen
2+
--exclude lib/thermite_patch.rb
3+
lib/**/*.rb

Diff for: README.md

+159-2
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,17 @@ At this point the SDK only supports the **Temporal Client** capabilities:
2323
- gRPC access to Temporal Server
2424
- Temporal Cloud is not yet supported due to the lack of TLS support, but it's coming soon
2525

26+
As well as **Activity Worker** capabilities:
27+
28+
- Definiting activities
29+
- Activity heartbeats/cancellations
30+
- Running activity workers
2631

2732
## Quick Start
2833

2934
### Installation
3035

31-
Add the [`temporalio` gem](https://rubygems.org/gems/temporalio) to your Gemfile:
36+
Add the [temporalio gem](https://rubygems.org/gems/temporalio) to your Gemfile:
3237

3338
```ruby
3439
gem 'temporalio'
@@ -82,12 +87,164 @@ The default data converter supports converting multiple types including:
8287

8388
- `nil`
8489
- bytes (`String` with `Encoding::ASCII_8BIT`)
85-
- Anything that [`JSON.generate`](https://ruby-doc.org/stdlib-3.0.0/libdoc/json/rdoc/JSON.html#method-i-generate)
90+
- Anything that [JSON.generate](https://ruby-doc.org/stdlib-3.0.0/libdoc/json/rdoc/JSON.html#method-i-generate)
8691
supports
8792

8893
This notably doesn't include any `Date`, `Time`, or `DateTime` objects as they may not work across
8994
different SDKs. A custom payload converter can be implemented to support these.
9095

96+
### Workers
97+
98+
Workers host workflows (coming soon) and/or activities. Here's how to run a worker:
99+
100+
```ruby
101+
require 'temporal'
102+
103+
# Establish a gRPC connection to the server
104+
connection = Temporal::Connection.new('localhost:7233')
105+
106+
# Initialize a new worker with your activities
107+
worker = Temporal::Worker.new(connection, 'my-namespace', 'my-task-queue', activities: [MyActivity])
108+
109+
# Occupy the thread by running the worker
110+
worker.run
111+
```
112+
113+
Some things to note about the above code:
114+
115+
- This creates/uses the same connection that is used for initializing a client
116+
- Workers can have many more options not shown here (e.g. data converters and interceptors)
117+
118+
In order to have more control over running of a worker you can provide a block of code by the end of
119+
which the worker will shut itself down:
120+
121+
```ruby
122+
# Initialize worker_1, worker_2 and worker_3 as in the example above
123+
124+
# Run the worker for 5 seconds, then shut down
125+
worker_1.run { sleep 5 }
126+
127+
# Or shut the worker down when a workflow completes (very useful for running specs):
128+
client = Temporal::Client.new(connection, 'my-namespace')
129+
handle = client.start_workflow('MyWorkflow', 'some input', id: 'my-id', task_queue: 'my-task-queue')
130+
worker_2.run { handle.result }
131+
132+
# Or wait for some external signal to stop the worker
133+
stop_queue = Queue.new
134+
Signal.trap('USR1') { stop_queue.close }
135+
worker_3.run { stop_queue.pop }
136+
```
137+
138+
You can also shut down a running worker by calling `Temporal::Worker#shutdown` from a separate
139+
thread at any time.
140+
141+
#### Running multiple workers
142+
143+
In order to run multiple workers in the same thread you can use the `Temporal::Worker.run` method:
144+
145+
```ruby
146+
# Initialize workers
147+
worker_1 = Temporal::Worker.new(connection, 'my-namespace-1', 'my-task-queue-1', activities: [MyActivity1, MyActivity2])
148+
worker_2 = Temporal::Worker.new(connection, 'my-namespace-2', 'my-task-queue-1', activities: [MyActivity3])
149+
worker_3 = Temporal::Worker.new(connection, 'my-namespace-1', 'my-task-queue-2', activities: [MyActivity4])
150+
151+
Temporal::Worker.run(worker_1, worker_2, worker_3)
152+
```
153+
154+
Please note that similar to `Temporal::Worker#run`, `Temporal::Worker.run` accepts a block that
155+
behaves the same way — the workers will be shut down when the block finishes.
156+
157+
#### Worker Shutdown
158+
159+
The `Temporal::Worker#run` (as well as `Temporal::Worker#shutdown`) invocation will wait on all
160+
activities to complete, so if a long-running activity does not at least respect cancellation, the
161+
shutdown may never complete.
162+
163+
### Activities
164+
165+
#### Definition
166+
167+
Activities are defined by subclassing `Temporal::Activity` class:
168+
169+
```ruby
170+
class SayHelloActivity < Temporal::Activity
171+
# Optionally specify a custom activity name:
172+
# (The class name `SayHelloActivity` will be used by default)
173+
activity_name 'say-hello'
174+
175+
def execute(name)
176+
return "Hello, #{name}!"
177+
end
178+
end
179+
```
180+
181+
Some things to note about activity definitions:
182+
183+
- Long running activities should regularly heartbeat and handle cancellation
184+
- Activities can only have positional arguments. Best practice is to only take a single argument
185+
that is an object/dataclass of fields that can be added to as needed.
186+
187+
#### Activity Context
188+
189+
Activity classes have access to `Temporal::Activity::Context` via the `activity` method. Which
190+
itself provides access to useful methods, specifically:
191+
192+
- `info` - Returns the immutable info of the currently running activity
193+
- `heartbeat(*details)` - Record a heartbeat
194+
- `cancelled?` - Whether a cancellation has been requested on this activity
195+
- `shield` - Prevent cancel exception from being thrown during the provided block of code
196+
197+
##### Heartbeating and Cancellation
198+
199+
In order for a non-local activity to be notified of cancellation requests, it must call
200+
`activity.heartbeat`. It is strongly recommended that all but the fastest executing activities call
201+
this method regularly.
202+
203+
In addition to obtaining cancellation information, heartbeats also support detail data that is
204+
persisted on the server for retrieval during activity retry. If an activity calls
205+
`activity.heartbeat(123, 456)` and then fails and is retried, `activity.info.heartbeat_details` will
206+
return an array containing `123` and `456` on the next run.
207+
208+
A cancellation is implemented using the `Thread#raise` method, which will raise a
209+
`Temporal::Error::CancelledError` during the execution of an activity. This means that your code
210+
might get interrupted at any point and never complete. In order to protect critical parts of your
211+
activities wrap them in `activity.shield`:
212+
213+
```ruby
214+
class ActivityWithCriticalLogic < Temporal::Activity
215+
def execute
216+
activity.shield do
217+
run_business_critical_logic_1
218+
end
219+
220+
run_non_critical_logic
221+
222+
activity.shield do
223+
run_business_critical_logic_2
224+
end
225+
end
226+
end
227+
```
228+
229+
This will ensure that a cancellation request received while inside the `activity.shield` block will
230+
not raise an exception until that block finishes.
231+
232+
In case the entire activity is considered critical, you can mark it as `shielded!` and ignore
233+
cancellation requests altogether:
234+
235+
```ruby
236+
class CriticalActivity < Temporal::Activity
237+
shielded!
238+
239+
def execute
240+
...
241+
end
242+
end
243+
```
244+
245+
For any long-running activity using this approach it is recommended to periodically check
246+
`activity.cancelled?` flag and respond accordingly.
247+
91248

92249
## Dev Setup
93250

Diff for: lib/temporalio/activity.rb

+52
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,77 @@
11
module Temporalio
2+
# This is an abstract superclass for implementing activities.
3+
#
4+
# Temporal SDK users are expected to subclass it and implement the {#execute} method by adding
5+
# their desired business logic.
6+
#
7+
# @abstract
8+
#
9+
# @example "Hello World" Activity
10+
# class HelloWorld < Temporalio::Activity
11+
# def execute(name)
12+
# "Hello, #{name}!"
13+
# end
14+
# end
215
class Activity
16+
# Specify a custom name to be used for this activity.
17+
#
18+
# By default a full class (with any namespace modules/classes) name will be used.
19+
#
20+
# @param new_name [String] Name to be used for this activity
21+
#
22+
# @example
23+
# class Test < Temporalio::Activity
24+
# activity_name 'custom-activity-name'
25+
#
26+
# def execute
27+
# ...
28+
# end
29+
# end
330
def self.activity_name(new_name)
431
@activity_name = new_name
532
end
633

34+
# Mark the activity as shielded from cancellations.
35+
#
36+
# Activity cancellations are implemented using the `Thread#raise`, which can unsafely terminate
37+
# your implementation. To disable this behaviour make sure to mark critical activities as
38+
# `shielded!`. For shielding a part of your activity consider using
39+
# {Temporalio::Activity::Context#shield}.
40+
#
41+
# @example
42+
# class Test < Temporalio::Activity
43+
# shielded!
44+
#
45+
# def execute
46+
# ...
47+
# end
48+
# end
749
def self.shielded!
850
@shielded = true
951
end
1052

53+
# @api private
1154
def self._name
1255
@activity_name || name || ''
1356
end
1457

58+
# @api private
1559
def self._shielded
1660
@shielded || false
1761
end
1862

63+
# @api private
1964
def initialize(context)
2065
@context = context
2166
end
2267

68+
# This is the main body of your activity's implementation.
69+
#
70+
# When implementing this method, you can use as many positional arguments as needed, which will
71+
# get converted based on the activity invocation in your workflow.
72+
#
73+
# Inside of this method you have access to activity's context using the `activity` method. Check
74+
# out {Temporalio::Activity::Context} for more information on how to use it.
2375
def execute(*_args)
2476
raise NoMethodError, 'must implement #execute'
2577
end

Diff for: lib/temporalio/activity/context.rb

+26
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@
22

33
module Temporalio
44
class Activity
5+
# This class provides methods that can be called from activity classes.
56
class Context
7+
# Information about the running activity.
8+
#
9+
# @return [Temporalio::Activity::Info]
610
attr_reader :info
711

12+
# @api private
813
def initialize(info, heartbeat_proc, shielded: false)
914
@thread = Thread.current
1015
@info = info
@@ -14,10 +19,25 @@ def initialize(info, heartbeat_proc, shielded: false)
1419
@mutex = Mutex.new
1520
end
1621

22+
# Send a heartbeat for the current activity.
23+
#
24+
# @param details [Array<any>] Data to store with the heartbeat.
1725
def heartbeat(*details)
1826
heartbeat_proc.call(*details)
1927
end
2028

29+
# Protect a part of activity's implementation from cancellations.
30+
#
31+
# Activity cancellations are implemented using the `Thread#raise`, which can unsafely
32+
# terminate your implementation. To disable this behaviour make sure to wrap critical parts of
33+
# your business logic in this method.
34+
#
35+
# For shielding a whole activity consider using {Temporalio::Activity.shielded!}.
36+
#
37+
# A cancellation that got requested while in a shielded block will not interrupt the execution
38+
# and will raise a {Temporalio::Error::CancelledError} right after the block has finished.
39+
#
40+
# @yield Block to be protected from cancellations.
2141
def shield(&block)
2242
# The whole activity is shielded, called from a nested shield
2343
# or while handling a CancelledError (in a rescue or ensure blocks)
@@ -40,10 +60,16 @@ def shield(&block)
4060
end
4161
end
4262

63+
# Whether a cancellation was ever requested on this activity.
64+
#
65+
# @return [Boolean] true if the activity has had a cancellation request, false otherwise.
4366
def cancelled?
4467
@cancelled
4568
end
4669

70+
# Cancel the running activity.
71+
#
72+
# @api private
4773
def cancel
4874
@cancelled = true
4975

Diff for: lib/temporalio/activity/info.rb

+39
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
module Temporalio
22
class Activity
3+
# Class containing information about an activity.
34
class Info < Struct.new(
45
:activity_id,
56
:activity_type,
@@ -20,6 +21,44 @@ class Info < Struct.new(
2021
:workflow_type,
2122
keyword_init: true,
2223
)
24+
# @!attribute [r] activity_id
25+
# @return [String] Activity ID.
26+
# @!attribute [r] activity_type
27+
# @return [String] Name of the activity.
28+
# @!attribute [r] attempt
29+
# @return [Integer] Activity's execution attempt.
30+
# @!attribute [r] current_attempt_scheduled_time
31+
# @return [Time] Scheduled time of the current attempt.
32+
# @!attribute [r] heartbeat_details
33+
# @return [Array<any>] Details submitted with the last heartbeat.
34+
# @!attribute [r] heartbeat_timeout
35+
# @return [Float] Max time between heartbeats (in seconds).
36+
# @!attribute [r] local
37+
# @return [Boolean] Whether activity is local or not.
38+
# @!attribute [r] schedule_to_close_timeout
39+
# @return [Float] Max overall activity execution time (in seconds).
40+
# @!attribute [r] scheduled_time
41+
# @return [Time] Time when activity was first scheduled.
42+
# @!attribute [r] start_to_close_timeout
43+
# @return [Floaat] Max time of a single invocation (in seconds).
44+
# @!attribute [r] started_time
45+
# @return [Time] Time when activity was started.
46+
# @!attribute [r] task_queue
47+
# @return [String] Task queue on which the activity got scheduled.
48+
# @!attribute [r] task_token
49+
# @return [String] A token for completing the activity.
50+
# @!attribute [r] workflow_id
51+
# @return [String] Workflow ID.
52+
# @!attribute [r] workflow_namespace
53+
# @return [String] Workflow namespace.
54+
# @!attribute [r] workflow_run_id
55+
# @return [String] Workflow run ID.
56+
# @!attribute [r] workflow_type
57+
# @return [String] Name of the workflow.
58+
59+
# Whether activity is local or not
60+
#
61+
# @return [Boolean] True for local activities, falst otherwise.
2362
def local?
2463
local
2564
end

Diff for: lib/temporalio/bridge.rb

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
BRIDGE_DIR = File.expand_path('..', __dir__ || '.')
66

77
module Temporalio
8+
# @api private
89
module Bridge
910
Rutie
1011
.new(:bridge, lib_path: '', lib_suffix: 'so', lib_prefix: '')

Diff for: lib/temporalio/bridge/error.rb

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
module Temporalio
22
module Bridge
3+
# @api private
34
class Error < StandardError
45
class WorkerShutdown < Error; end
56
end

Diff for: lib/temporalio/client/implementation.rb

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
module Temporalio
1414
class Client
15+
# @api private
1516
class Implementation
1617
def initialize(connection, namespace, converter, interceptors)
1718
@connection = connection

0 commit comments

Comments
 (0)