Skip to content

Commit 9bd2a1b

Browse files
djmbrosa
authored andcommitted
Support Active Job Continuations
[Active Job Continuations](rails/rails#55127) will be added in Rails 8.1. Queue adapters need to implement the `stopping?` method to support them. Implement the method and set it to true when a worker is stopping. Also add Rails versions so we can test against Rails main. To prevent an explosion of combinations, I've switched to only testing against minor versions of Ruby.
1 parent 0ee21e4 commit 9bd2a1b

File tree

17 files changed

+307
-118
lines changed

17 files changed

+307
-118
lines changed

.github/workflows/main.yml

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,17 @@ jobs:
2424
fail-fast: false
2525
matrix:
2626
ruby-version:
27-
- 3.1.6
28-
- 3.2.0
29-
- 3.2.4
30-
- 3.3.0
31-
- 3.3.1
32-
- 3.3.2
33-
- 3.3.4
34-
- 3.3.5
35-
- 3.3.6
36-
- 3.4.0
37-
- 3.4.1
27+
- 3.1
28+
- 3.2
29+
- 3.3
30+
- 3.4
3831
database: [ mysql, postgres, sqlite ]
32+
gemfile: [ rails_7_1, rails_7_2, rails_8_0, rails_main ]
33+
exclude:
34+
- ruby-version: "3.1"
35+
gemfile: rails_8_0
36+
- ruby-version: "3.1"
37+
gemfile: rails_main
3938
services:
4039
mysql:
4140
image: mysql:8.0.31
@@ -52,6 +51,7 @@ jobs:
5251
- 55432:5432
5352
env:
5453
TARGET_DB: ${{ matrix.database }}
54+
BUNDLE_GEMFILE: ${{ github.workspace }}/gemfiles/${{ matrix.gemfile }}.gemfile
5555
steps:
5656
- name: Checkout code
5757
uses: actions/checkout@v4
@@ -60,6 +60,9 @@ jobs:
6060
with:
6161
ruby-version: ${{ matrix.ruby-version }}
6262
bundler-cache: true
63+
- name: Update to latest Rails
64+
run: |
65+
bundle update railties
6366
- name: Setup test database
6467
run: |
6568
bin/rails db:setup

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/.bundle/
22
/doc/
3+
/gemfiles/*.lock
34
/log/*.log
45
/pkg/
56
/tmp/

Appraisals

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
appraise "rails-7-1" do
4+
# rdoc 6.14 is not compatible with Ruby 3.1
5+
gem 'rdoc', '6.13'
6+
gem "railties", "~> 7.1.0"
7+
end
8+
9+
appraise "rails-7-2" do
10+
gem 'rdoc', '6.13'
11+
gem "railties", "~> 7.2.0"
12+
end
13+
14+
appraise "rails-8-0" do
15+
gem "railties", "~> 8.0.0"
16+
end
17+
18+
appraise "rails-main" do
19+
gem "railties", github: "rails/rails", branch: "main"
20+
end

Gemfile.lock

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ GEM
5050
mutex_m
5151
securerandom (>= 0.3)
5252
tzinfo (~> 2.0)
53+
appraisal (2.5.0)
54+
bundler
55+
rake
56+
thor (>= 0.14.0)
5357
ast (2.4.2)
5458
base64 (0.2.0)
5559
benchmark (0.4.0)
@@ -189,6 +193,7 @@ PLATFORMS
189193
x86_64-linux
190194

191195
DEPENDENCIES
196+
appraisal
192197
debug (~> 1.9)
193198
logger
194199
mocha

gemfiles/rails_7_1.gemfile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# This file was generated by Appraisal
2+
3+
source "https://rubygems.org"
4+
5+
gem "rdoc", "6.13"
6+
gem "railties", "~> 7.1.0"
7+
8+
gemspec path: "../"

gemfiles/rails_7_2.gemfile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# This file was generated by Appraisal
2+
3+
source "https://rubygems.org"
4+
5+
gem "rdoc", "6.13"
6+
gem "railties", "~> 7.2.0"
7+
8+
gemspec path: "../"

gemfiles/rails_8_0.gemfile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# This file was generated by Appraisal
2+
3+
source "https://rubygems.org"
4+
5+
gem "railties", "~> 8.0.0"
6+
7+
gemspec path: "../"

gemfiles/rails_main.gemfile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# This file was generated by Appraisal
2+
3+
source "https://rubygems.org"
4+
5+
gem "railties", branch: "main", git: "https://github.com/rails/rails.git"
6+
7+
gemspec path: "../"

lib/active_job/queue_adapters/solid_queue_adapter.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ module QueueAdapters
77
# To use it set the queue_adapter config to +:solid_queue+.
88
#
99
# Rails.application.config.active_job.queue_adapter = :solid_queue
10-
class SolidQueueAdapter
10+
class SolidQueueAdapter < (Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 1 ? Object : AbstractAdapter)
11+
class_attribute :stopping, default: false, instance_writer: false
12+
SolidQueue.on_worker_stop { self.stopping = true }
13+
1114
def enqueue_after_transaction_commit?
1215
true
1316
end

solid_queue.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ Gem::Specification.new do |spec|
3232
spec.add_dependency "fugit", "~> 1.11.0"
3333
spec.add_dependency "thor", "~> 1.3.1"
3434

35+
spec.add_development_dependency "appraisal"
3536
spec.add_development_dependency "debug", "~> 1.9"
3637
spec.add_development_dependency "mocha"
3738
spec.add_development_dependency "puma"
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
begin
2+
require "active_job/continuation"
3+
rescue LoadError
4+
# Zeitwerk requires that we define the constant
5+
class ContinuableJob; end
6+
return
7+
end
8+
9+
class ContinuableJob < ApplicationJob
10+
include ActiveJob::Continuable
11+
12+
def perform(result, pause: 0)
13+
step :step_one do
14+
sleep pause if pause > 0
15+
result.update!(queue_name: queue_name, status: "stepped", value: "step_one")
16+
end
17+
step :step_two do
18+
sleep pause if pause > 0
19+
result.update!(queue_name: queue_name, status: "stepped", value: "step_two")
20+
end
21+
end
22+
end

test/integration/concurrency_controls_test.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
180180
end
181181

182182
test "verify transactions remain valid after Job creation conflicts via limits_concurrency" do
183+
# Doesn't work with enqueue_after_transaction_commit? true on SolidQueueAdapter, but only Rails 7.2 uses this
184+
skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2
185+
183186
ActiveRecord::Base.transaction do
184187
SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds)
185188
SequentialUpdateResultJob.perform_later(@result, name: "B")

test/integration/continuation_test.rb

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
5+
begin
6+
require "active_job/continuation"
7+
rescue LoadError
8+
return
9+
end
10+
11+
class ContinuationTest < ActiveSupport::TestCase
12+
self.use_transactional_tests = false
13+
14+
def setup
15+
start_processes
16+
@result = JobResult.create!
17+
end
18+
19+
teardown do
20+
terminate_process(@pid) if process_exists?(@pid)
21+
end
22+
23+
test "continuable job completes" do
24+
ContinuableJob.perform_later(@result)
25+
26+
wait_for_jobs_to_finish_for(5.seconds)
27+
28+
assert_no_unfinished_jobs
29+
assert_last_step :step_two
30+
end
31+
32+
test "continuable job can be interrupted and resumed" do
33+
job = ContinuableJob.perform_later(@result, pause: 0.5.seconds)
34+
35+
sleep 0.2.seconds
36+
signal_process(@pid, :TERM)
37+
38+
wait_for_jobs_to_be_released_for(2.seconds)
39+
40+
assert_no_claimed_jobs
41+
assert_unfinished_jobs job
42+
assert_last_step :step_one
43+
44+
ActiveJob::QueueAdapters::SolidQueueAdapter.stopping = false
45+
start_processes
46+
wait_for_jobs_to_finish_for(5.seconds)
47+
48+
assert_no_unfinished_jobs
49+
assert_last_step :step_two
50+
end
51+
52+
private
53+
def assert_last_step(step)
54+
@result.reload
55+
assert_equal "stepped", @result.status
56+
assert_equal step.to_s, @result.value
57+
end
58+
59+
def start_processes
60+
default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 }
61+
dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 }
62+
@pid = run_supervisor_as_fork(workers: [ default_worker ], dispatchers: [ dispatcher ])
63+
wait_for_registered_processes(5, timeout: 5.second) # 3 workers working the default queue + dispatcher + supervisor
64+
end
65+
end

0 commit comments

Comments
 (0)