-
Notifications
You must be signed in to change notification settings - Fork 165
/
Copy pathsupervisor_test.rb
191 lines (139 loc) · 5.88 KB
/
supervisor_test.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
require "test_helper"
class SupervisorTest < ActiveSupport::TestCase
self.use_transactional_tests = false
setup do
@previous_pidfile = SolidQueue.supervisor_pidfile
@pidfile = Rails.application.root.join("tmp/pids/pidfile_#{SecureRandom.hex}.pid")
SolidQueue.supervisor_pidfile = @pidfile
end
teardown do
SolidQueue.supervisor_pidfile = @previous_pidfile
File.delete(@pidfile) if File.exist?(@pidfile)
end
test "start" do
pid = run_supervisor_as_fork
wait_for_registered_processes(4)
assert_registered_supervisor(pid)
assert_registered_workers(count: 2, supervisor_pid: pid)
assert_registered_dispatcher(supervisor_pid: pid)
terminate_process(pid)
assert_no_registered_processes
end
test "start with provided configuration" do
pid = run_supervisor_as_fork(dispatchers: [ { batch_size: 100 } ])
wait_for_registered_processes(2, timeout: 2) # supervisor + dispatcher
assert_registered_supervisor(pid)
assert_registered_workers(count: 0)
assert_registered_dispatcher(supervisor_pid: pid)
terminate_process(pid)
assert_no_registered_processes
end
test "start with empty configuration" do
pid, _out, error = run_supervisor_as_fork_with_captured_io(workers: [], dispatchers: [])
sleep(0.5)
assert_no_registered_processes
assert_not process_exists?(pid)
assert_match %r{No processes configured}, error
end
test "start with invalid recurring tasks" do
pid, _out, error = run_supervisor_as_fork_with_captured_io(recurring_schedule_file: config_file_path(:recurring_with_invalid), skip_recurring: false)
sleep(0.5)
assert_no_registered_processes
assert_not process_exists?(pid)
assert_match %r{Invalid recurring tasks}, error
end
test "create and delete pidfile" do
assert_not File.exist?(@pidfile)
pid = run_supervisor_as_fork
wait_for_registered_processes(4)
assert File.exist?(@pidfile)
assert_equal pid, File.read(@pidfile).strip.to_i
terminate_process(pid)
assert_not File.exist?(@pidfile)
end
test "abort if there's already a pidfile for a supervisor" do
FileUtils.mkdir_p(File.dirname(@pidfile))
File.write(@pidfile, ::Process.pid.to_s)
pid, _out, err = run_supervisor_as_fork_with_captured_io
wait_for_registered_processes(4)
assert File.exist?(@pidfile)
assert_not_equal pid, File.read(@pidfile).strip.to_i
assert_match %r{A Solid Queue supervisor is already running}, err
wait_for_process_termination_with_timeout(pid, exitstatus: 1)
end
test "delete previous pidfile if the owner is dead" do
pid = run_supervisor_as_fork
wait_for_registered_processes(4)
terminate_process(pid, signal: :KILL)
assert File.exist?(@pidfile)
assert_equal pid, File.read(@pidfile).strip.to_i
wait_for_registered_processes(0)
pid = run_supervisor_as_fork
wait_for_registered_processes(4)
assert File.exist?(@pidfile)
assert_equal pid, File.read(@pidfile).strip.to_i
terminate_process(pid)
end
test "fail orphaned executions" do
3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) }
process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123")
SolidQueue::ReadyExecution.claim("*", 5, process.id)
assert_equal 3, SolidQueue::ClaimedExecution.count
assert_equal 0, SolidQueue::ReadyExecution.count
assert_equal [ process.id ], SolidQueue::ClaimedExecution.last(3).pluck(:process_id).uniq
# Simnulate orphaned executions by just wiping the claiming process
process.delete
pid = run_supervisor_as_fork(workers: [ { queues: "background", polling_interval: 10, processes: 2 } ])
wait_for_registered_processes(3)
assert_registered_supervisor(pid)
terminate_process(pid)
skip_active_record_query_cache do
assert_equal 0, SolidQueue::ClaimedExecution.count
assert_equal 3, SolidQueue::FailedExecution.count
end
end
test "prune processes with expired heartbeats" do
pruned = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-42")
# Simulate expired heartbeats
SolidQueue::Process.update_all(last_heartbeat_at: 10.minutes.ago)
not_pruned = SolidQueue::Process.register(kind: "Worker", pid: 44, name: "worker-44")
assert_equal 2, SolidQueue::Process.count
pid = run_supervisor_as_fork(load_configuration_from: { workers: [ { queues: :background } ] })
wait_for_registered_processes(4)
terminate_process(pid)
skip_active_record_query_cache do
assert_equal 1, SolidQueue::Process.count
assert_nil SolidQueue::Process.find_by(id: pruned.id)
assert SolidQueue::Process.find_by(id: not_pruned.id).present?
end
end
test "attempt to restart supervisor if it fails unexpectedly" do
SolidQueue.stubs(:max_restart_attempts).returns(2)
SolidQueue::Supervisor.any_instance.expects(:start).raises(StandardError).times(SolidQueue.max_restart_attempts + 1)
assert_raises StandardError do
SolidQueue::Supervisor.start
end
end
test "skip restart attempt if configured not to" do
SolidQueue.stubs(:max_restart_attempts).returns(0)
SolidQueue::Supervisor.any_instance.expects(:start).raises(StandardError).times(1)
assert_raises StandardError do
SolidQueue::Supervisor.start
end
end
private
def assert_registered_workers(supervisor_pid: nil, count: 1)
assert_registered_processes(kind: "Worker", count: count, supervisor_pid: supervisor_pid)
end
def assert_registered_dispatcher(supervisor_pid: nil)
assert_registered_processes(kind: "Dispatcher", count: 1, supervisor_pid: supervisor_pid)
end
def assert_registered_supervisor(pid)
skip_active_record_query_cache do
processes = find_processes_registered_as("Supervisor")
assert_equal 1, processes.count
assert_nil processes.first.supervisor
assert_equal pid, processes.first.pid
end
end
end