diff --git a/lib/temporal/activity/poller.rb b/lib/temporal/activity/poller.rb index b9d7276e..2db90fd8 100644 --- a/lib/temporal/activity/poller.rb +++ b/lib/temporal/activity/poller.rb @@ -66,7 +66,7 @@ def poll_loop end def poll_for_task - client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue) + client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue, max_tasks_per_second: options[:max_tasks_per_second]) rescue StandardError => error Temporal.logger.error("Unable to poll activity task queue: #{error.inspect}") nil diff --git a/lib/temporal/client/grpc_client.rb b/lib/temporal/client/grpc_client.rb index eab4f1a3..90ffca89 100644 --- a/lib/temporal/client/grpc_client.rb +++ b/lib/temporal/client/grpc_client.rb @@ -160,12 +160,15 @@ def respond_workflow_task_failed(task_token:, cause:, exception: nil) client.respond_workflow_task_failed(request) end - def poll_activity_task_queue(namespace:, task_queue:) + def poll_activity_task_queue(namespace:, task_queue:, max_tasks_per_second: nil) request = Temporal::Api::WorkflowService::V1::PollActivityTaskQueueRequest.new( identity: identity, namespace: namespace, task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new( name: task_queue + ), + task_queue_meta: Temporal::Api::TaskQueue::V1::TaskQueueMetadata.new( + max_tasks_per_second: max_tasks_per_second ) ) diff --git a/lib/temporal/worker.rb b/lib/temporal/worker.rb index ca2bf870..ae846ce9 100644 --- a/lib/temporal/worker.rb +++ b/lib/temporal/worker.rb @@ -9,7 +9,7 @@ module Temporal class Worker # activity_thread_pool_size: number of threads that the poller can use to run activities. # can be set to 1 if you want no paralellism in your activities, at the cost of throughput. - def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size]) + def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OPTIONS[:thread_pool_size], activity_max_tasks_per_second: nil) @workflows = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @activities = Hash.new { |hash, key| hash[key] = ExecutableLookup.new } @pollers = [] @@ -18,6 +18,7 @@ def initialize(activity_thread_pool_size: Temporal::Activity::Poller::DEFAULT_OP @shutting_down = false @activity_poller_options = { thread_pool_size: activity_thread_pool_size, + max_tasks_per_second: activity_max_tasks_per_second } end diff --git a/spec/unit/lib/temporal/activity/poller_spec.rb b/spec/unit/lib/temporal/activity/poller_spec.rb index 06ebc90c..5663a8e5 100644 --- a/spec/unit/lib/temporal/activity/poller_spec.rb +++ b/spec/unit/lib/temporal/activity/poller_spec.rb @@ -32,7 +32,7 @@ expect(client) .to have_received(:poll_activity_task_queue) - .with(namespace: namespace, task_queue: task_queue) + .with(namespace: namespace, task_queue: task_queue, max_tasks_per_second: nil) .twice end diff --git a/spec/unit/lib/temporal/worker_spec.rb b/spec/unit/lib/temporal/worker_spec.rb index 4823cac5..6d4fee14 100644 --- a/spec/unit/lib/temporal/worker_spec.rb +++ b/spec/unit/lib/temporal/worker_spec.rb @@ -144,12 +144,12 @@ class TestWorkerActivity < Temporal::Activity allow(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 20}) + .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: nil, thread_pool_size: 20}) .and_return(activity_poller_1) allow(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'other-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 20}) + .with('default-namespace', 'other-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: nil, thread_pool_size: 20}) .and_return(activity_poller_2) subject.register_workflow(TestWorkerWorkflow) @@ -169,7 +169,7 @@ class TestWorkerActivity < Temporal::Activity activity_poller = instance_double(Temporal::Activity::Poller, start: nil) expect(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {thread_pool_size: 10}) + .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: nil, thread_pool_size: 10}) .and_return(activity_poller) worker = Temporal::Worker.new(activity_thread_pool_size: 10) @@ -180,7 +180,23 @@ class TestWorkerActivity < Temporal::Activity worker.start expect(activity_poller).to have_received(:start) + end + it 'can have an activity poller which throttles tasks per second' do + activity_poller = instance_double(Temporal::Activity::Poller, start: nil) + expect(Temporal::Activity::Poller) + .to receive(:new) + .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [], {max_tasks_per_second: 15, thread_pool_size: 20}) + .and_return(activity_poller) + + worker = Temporal::Worker.new(activity_max_tasks_per_second: 15) + allow(worker).to receive(:shutting_down?).and_return(true) + worker.register_workflow(TestWorkerWorkflow) + worker.register_activity(TestWorkerActivity) + + worker.start + + expect(activity_poller).to have_received(:start) end context 'when middleware is configured' do @@ -212,7 +228,7 @@ class TestWorkerActivity < Temporal::Activity allow(Temporal::Activity::Poller) .to receive(:new) - .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [entry_2], thread_pool_size: 20) + .with('default-namespace', 'default-task-queue', an_instance_of(Temporal::ExecutableLookup), [entry_2], max_tasks_per_second: nil, thread_pool_size: 20) .and_return(activity_poller_1) subject.register_workflow(TestWorkerWorkflow)