Skip to content

Commit 398fd24

Browse files
Dave Willettantstorm
Dave Willett
andauthored
Support for invoking and processing queries (coinbase#141)
* Support for invoking and processing queries, WIP * Catch-all query handler support, feedback changes Made a handful of changes on approach from the initial spike. This is operating under an assumption that the added EventTarget type for query is a valid approach * Fixes for on_query interface, clean up workflow and spec * Fix method signature on testing context * Move catch-all handler back to block Also adding a second targeted query handler to spec * Use nil workflow class in test case * Updates to remove catch all handling, add query reject handling * More concise when no status returned from server * More consistent raise message style * Add test for reject condition not met * Simplify legacy handling and use serializers for query protos * Add specs for the new changes * Test query result & freeze them * Implement QueryRegistry * Swap Context#query_handlers with QueryRegistry * Add a spec for Workflow::Context * Rename QueryFailedFailure error to QueryFailed * Small cleanup items * Update readme Co-authored-by: antstorm <[email protected]>
1 parent 93c7102 commit 398fd24

32 files changed

+848
-79
lines changed

Diff for: README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ Besides calling activities workflows can:
178178
- Use timers
179179
- Receive signals
180180
- Execute other (child) workflows
181-
- Respond to queries [not yet implemented]
181+
- Respond to queries
182182

183183

184184
## Activities

Diff for: examples/bin/query

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/usr/bin/env ruby
2+
require_relative '../init'
3+
4+
Dir[File.expand_path('../workflows/*.rb', __dir__)].each { |f| require f }
5+
6+
workflow_class_name, workflow_id, run_id, query, args = ARGV
7+
workflow_class = Object.const_get(workflow_class_name)
8+
9+
if ![workflow_class, workflow_id, run_id, query].all?
10+
fail 'Wrong arguments, use `bin/query WORKFLOW WORKFLOW_ID RUN_ID QUERY [ARGS]`'
11+
end
12+
13+
result = Temporal.query_workflow(workflow_class, query, workflow_id, run_id, args)
14+
puts result.inspect

Diff for: examples/bin/worker

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ worker.register_workflow(MetadataWorkflow)
3636
worker.register_workflow(ParentCloseWorkflow)
3737
worker.register_workflow(ParentWorkflow)
3838
worker.register_workflow(ProcessFileWorkflow)
39+
worker.register_workflow(QueryWorkflow)
3940
worker.register_workflow(QuickTimeoutWorkflow)
4041
worker.register_workflow(RandomlyFailingWorkflow)
4142
worker.register_workflow(ReleaseWorkflow)

Diff for: examples/spec/integration/query_workflow_spec.rb

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
require 'workflows/query_workflow'
2+
require 'temporal/errors'
3+
4+
describe QueryWorkflow, :integration do
5+
subject { described_class }
6+
7+
it 'returns the correct result for the queries' do
8+
workflow_id, run_id = run_workflow(described_class)
9+
10+
# Query with nil workflow class
11+
expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id))
12+
.to eq 'started'
13+
14+
# Query with arbitrary args
15+
expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id,
16+
'upcase', 'ignored', 'reverse'))
17+
.to eq 'DETRATS'
18+
19+
# Query with no args
20+
expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id))
21+
.to eq 0
22+
23+
# Query with unregistered handler
24+
expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
25+
.to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query')
26+
27+
Temporal.signal_workflow(described_class, 'make_progress', workflow_id, run_id)
28+
29+
# Query for updated signal_count with an unsatisfied reject condition
30+
expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id, query_reject_condition: :not_open))
31+
.to eq 1
32+
33+
Temporal.signal_workflow(described_class, 'finish', workflow_id, run_id)
34+
wait_for_workflow_completion(workflow_id, run_id)
35+
36+
# Repeating original query scenarios above, expecting updated state and signal results
37+
expect(Temporal.query_workflow(nil, 'state', workflow_id, run_id))
38+
.to eq 'finished'
39+
40+
expect(Temporal.query_workflow(described_class, 'state', workflow_id, run_id,
41+
'upcase', 'ignored', 'reverse'))
42+
.to eq 'DEHSINIF'
43+
44+
expect(Temporal.query_workflow(described_class, 'signal_count', workflow_id, run_id))
45+
.to eq 2
46+
47+
expect { Temporal.query_workflow(described_class, 'unknown_query', workflow_id, run_id) }
48+
.to raise_error(Temporal::QueryFailed, 'Workflow did not register a handler for unknown_query')
49+
50+
# Now that the workflow is completed, test a query with a reject condition satisfied
51+
expect { Temporal.query_workflow(described_class, 'state', workflow_id, run_id, query_reject_condition: :not_open) }
52+
.to raise_error(Temporal::QueryFailed, 'Query rejected: status WORKFLOW_EXECUTION_STATUS_COMPLETED')
53+
end
54+
end

Diff for: examples/workflows/query_workflow.rb

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
class QueryWorkflow < Temporal::Workflow
2+
attr_reader :state, :signal_count, :last_signal_received
3+
4+
def execute
5+
@state = "started"
6+
@signal_count = 0
7+
@last_signal_received = nil
8+
9+
workflow.on_query("state") { |*args| apply_transforms(state, args) }
10+
workflow.on_query("signal_count") { signal_count }
11+
12+
workflow.on_signal do |signal|
13+
@signal_count += 1
14+
@last_signal_received = signal
15+
end
16+
17+
workflow.wait_for { last_signal_received == "finish" }
18+
@state = "finished"
19+
20+
{
21+
signal_count: signal_count,
22+
last_signal_received: last_signal_received,
23+
final_state: state
24+
}
25+
end
26+
27+
private
28+
29+
def apply_transforms(value, transforms)
30+
return value if value.nil? || transforms.empty?
31+
transforms.inject(value) do |memo, input|
32+
next memo unless memo.respond_to?(input)
33+
memo.public_send(input)
34+
end
35+
end
36+
end

Diff for: lib/temporal.rb

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ module Temporal
2020
:describe_namespace,
2121
:list_namespaces,
2222
:signal_workflow,
23+
:query_workflow,
2324
:await_workflow_result,
2425
:reset_workflow,
2526
:terminate_workflow,
@@ -48,11 +49,11 @@ def metrics
4849
end
4950

5051
private
51-
52+
5253
def default_client
5354
@default_client ||= Client.new(config)
5455
end
55-
56+
5657
def config
5758
@config ||= Configuration.new
5859
end

Diff for: lib/temporal/client.rb

+24-1
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,29 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac
176176
)
177177
end
178178

179+
# Issue a query against a running workflow
180+
#
181+
# @param workflow [Temporal::Workflow, nil] workflow class or nil
182+
# @param query [String] name of the query to issue
183+
# @param workflow_id [String]
184+
# @param run_id [String]
185+
# @param args [String, Array, nil] optional arguments for the query
186+
# @param namespace [String, nil] if nil, choose the one declared on the workflow class or the
187+
# global default
188+
# @param query_reject_condition [Symbol] check Temporal::Connection::GRPC::QUERY_REJECT_CONDITION
189+
def query_workflow(workflow, query, workflow_id, run_id, *args, namespace: nil, query_reject_condition: nil)
190+
execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)
191+
192+
connection.query_workflow(
193+
namespace: namespace || execution_options.namespace,
194+
workflow_id: workflow_id,
195+
run_id: run_id,
196+
query: query,
197+
args: args,
198+
query_reject_condition: query_reject_condition
199+
)
200+
end
201+
179202
# Long polls for a workflow to be completed and returns workflow's return value.
180203
#
181204
# @note This function times out after 30 seconds and throws Temporal::TimeoutError,
@@ -207,7 +230,7 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
207230
timeout: timeout || max_timeout,
208231
)
209232
rescue GRPC::DeadlineExceeded => e
210-
message = if timeout
233+
message = if timeout
211234
"Timed out after your specified limit of timeout: #{timeout} seconds"
212235
else
213236
"Timed out after #{max_timeout} seconds, which is the maximum supported amount."

Diff for: lib/temporal/concerns/payloads.rb

+8
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ def from_signal_payloads(payloads)
2121
from_payloads(payloads)&.first
2222
end
2323

24+
def from_query_payloads(payloads)
25+
from_payloads(payloads)&.first
26+
end
27+
2428
def from_payload_map(payload_map)
2529
payload_map.map { |key, value| [key, from_payload(value)] }.to_h
2630
end
@@ -45,6 +49,10 @@ def to_signal_payloads(data)
4549
to_payloads([data])
4650
end
4751

52+
def to_query_payloads(data)
53+
to_payloads([data])
54+
end
55+
4856
def to_payload_map(data)
4957
data.transform_values(&method(:to_payload))
5058
end

Diff for: lib/temporal/connection/grpc.rb

+58-10
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ class GRPC
2626
close: Temporal::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
2727
}.freeze
2828

29+
QUERY_REJECT_CONDITION = {
30+
none: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NONE,
31+
not_open: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_OPEN,
32+
not_completed_cleanly: Temporal::Api::Enums::V1::QueryRejectCondition::QUERY_REJECT_CONDITION_NOT_COMPLETED_CLEANLY
33+
}.freeze
34+
2935
DEFAULT_OPTIONS = {
3036
max_page_size: 100
3137
}.freeze
@@ -142,7 +148,7 @@ def get_workflow_execution_history(
142148
event_type: :all,
143149
timeout: nil
144150
)
145-
if wait_for_new_event
151+
if wait_for_new_event
146152
if timeout.nil?
147153
# This is an internal error. Wrappers should enforce this.
148154
raise "You must specify a timeout when wait_for_new_event = true."
@@ -183,13 +189,28 @@ def poll_workflow_task_queue(namespace:, task_queue:)
183189
poll_request.execute
184190
end
185191

186-
def respond_workflow_task_completed(namespace:, task_token:, commands:)
192+
def respond_query_task_completed(namespace:, task_token:, query_result:)
193+
query_result_proto = Serializer.serialize(query_result)
194+
request = Temporal::Api::WorkflowService::V1::RespondQueryTaskCompletedRequest.new(
195+
task_token: task_token,
196+
namespace: namespace,
197+
completed_type: query_result_proto.result_type,
198+
query_result: query_result_proto.answer,
199+
error_message: query_result_proto.error_message,
200+
)
201+
202+
client.respond_query_task_completed(request)
203+
end
204+
205+
def respond_workflow_task_completed(namespace:, task_token:, commands:, query_results: {})
187206
request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new(
188207
namespace: namespace,
189208
identity: identity,
190209
task_token: task_token,
191-
commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }
210+
commands: Array(commands).map { |(_, command)| Serializer.serialize(command) },
211+
query_results: query_results.transform_values { |value| Serializer.serialize(value) }
192212
)
213+
193214
client.respond_workflow_task_completed(request)
194215
end
195216

@@ -452,16 +473,43 @@ def get_search_attributes
452473
raise NotImplementedError
453474
end
454475

455-
def respond_query_task_completed
456-
raise NotImplementedError
457-
end
458-
459476
def reset_sticky_task_queue
460477
raise NotImplementedError
461478
end
462479

463-
def query_workflow
464-
raise NotImplementedError
480+
def query_workflow(namespace:, workflow_id:, run_id:, query:, args: nil, query_reject_condition: nil)
481+
request = Temporal::Api::WorkflowService::V1::QueryWorkflowRequest.new(
482+
namespace: namespace,
483+
execution: Temporal::Api::Common::V1::WorkflowExecution.new(
484+
workflow_id: workflow_id,
485+
run_id: run_id
486+
),
487+
query: Temporal::Api::Query::V1::WorkflowQuery.new(
488+
query_type: query,
489+
query_args: to_query_payloads(args)
490+
)
491+
)
492+
if query_reject_condition
493+
condition = QUERY_REJECT_CONDITION[query_reject_condition]
494+
raise Client::ArgumentError, 'Unknown query_reject_condition specified' unless condition
495+
496+
request.query_reject_condition = condition
497+
end
498+
499+
begin
500+
response = client.query_workflow(request)
501+
rescue ::GRPC::InvalidArgument => e
502+
raise Temporal::QueryFailed, e.details
503+
end
504+
505+
if response.query_rejected
506+
rejection_status = response.query_rejected.status || 'not specified by server'
507+
raise Temporal::QueryFailed, "Query rejected: status #{rejection_status}"
508+
elsif !response.query_result
509+
raise Temporal::QueryFailed, 'Invalid response from server'
510+
else
511+
from_query_payloads(response.query_result)
512+
end
465513
end
466514

467515
def describe_workflow_execution(namespace:, workflow_id:, run_id:)
@@ -534,7 +582,7 @@ def serialize_status_filter(value)
534582

535583
sym = Temporal::Workflow::Status::API_STATUS_MAP.invert[value]
536584
status = Temporal::Api::Enums::V1::WorkflowExecutionStatus.resolve(sym)
537-
585+
538586
Temporal::Api::Filter::V1::StatusFilter.new(status: status)
539587
end
540588
end

Diff for: lib/temporal/connection/serializer.rb

+5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'temporal/workflow/command'
2+
require 'temporal/workflow/query_result'
23
require 'temporal/connection/serializer/schedule_activity'
34
require 'temporal/connection/serializer/start_child_workflow'
45
require 'temporal/connection/serializer/request_activity_cancellation'
@@ -10,6 +11,8 @@
1011
require 'temporal/connection/serializer/fail_workflow'
1112
require 'temporal/connection/serializer/signal_external_workflow'
1213
require 'temporal/connection/serializer/upsert_search_attributes'
14+
require 'temporal/connection/serializer/query_answer'
15+
require 'temporal/connection/serializer/query_failure'
1316

1417
module Temporal
1518
module Connection
@@ -26,6 +29,8 @@ module Serializer
2629
Workflow::Command::FailWorkflow => Serializer::FailWorkflow,
2730
Workflow::Command::SignalExternalWorkflow => Serializer::SignalExternalWorkflow,
2831
Workflow::Command::UpsertSearchAttributes => Serializer::UpsertSearchAttributes,
32+
Workflow::QueryResult::Answer => Serializer::QueryAnswer,
33+
Workflow::QueryResult::Failure => Serializer::QueryFailure,
2934
}.freeze
3035

3136
def self.serialize(object)

Diff for: lib/temporal/connection/serializer/query_answer.rb

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
require 'temporal/connection/serializer/base'
2+
require 'temporal/concerns/payloads'
3+
4+
module Temporal
5+
module Connection
6+
module Serializer
7+
class QueryAnswer < Base
8+
include Concerns::Payloads
9+
10+
def to_proto
11+
Temporal::Api::Query::V1::WorkflowQueryResult.new(
12+
result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_ANSWERED,
13+
answer: to_query_payloads(object.result)
14+
)
15+
end
16+
end
17+
end
18+
end
19+
end

Diff for: lib/temporal/connection/serializer/query_failure.rb

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
require 'temporal/connection/serializer/base'
2+
3+
module Temporal
4+
module Connection
5+
module Serializer
6+
class QueryFailure < Base
7+
def to_proto
8+
Temporal::Api::Query::V1::WorkflowQueryResult.new(
9+
result_type: Temporal::Api::Enums::V1::QueryResultType::QUERY_RESULT_TYPE_FAILED,
10+
error_message: object.error.message
11+
)
12+
end
13+
end
14+
end
15+
end
16+
end

Diff for: lib/temporal/errors.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class ClientVersionNotSupportedFailure < ApiError; end
6767
class FeatureVersionNotSupportedFailure < ApiError; end
6868
class NamespaceAlreadyExistsFailure < ApiError; end
6969
class CancellationAlreadyRequestedFailure < ApiError; end
70-
class QueryFailedFailure < ApiError; end
70+
class QueryFailed < ApiError; end
7171
class UnexpectedResponse < ApiError; end
7272

7373
end

0 commit comments

Comments
 (0)