Skip to content

Commit d3ca693

Browse files
committed
respect the state update counter if present
1 parent 4dbb53c commit d3ca693

File tree

5 files changed

+157
-1
lines changed

5 files changed

+157
-1
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
module Travis
2+
module Hub
3+
module Service
4+
class StateUpdate < Struct.new(:data)
5+
class Counter < Struct.new(:job_id, :redis)
6+
TTL = 3600 * 12
7+
8+
def count
9+
@count ||= redis.get(key).to_i
10+
end
11+
12+
def increment
13+
count = redis.incr(key)
14+
redis.expire(key, TTL)
15+
count
16+
end
17+
18+
private
19+
20+
def key
21+
"job:state_update_count:#{job_id}"
22+
end
23+
end
24+
25+
include Helper::Context
26+
27+
MSGS = {
28+
missing: 'Received state update with no count for job id=%p, last known count: %p.',
29+
ordered: 'Received state update %p for job id=%p, last known count: %p',
30+
unordered: 'Received state update %p for job id=%p, last known count: %p. %s',
31+
skip: 'Skipping the message.'
32+
}
33+
34+
def apply?
35+
return missing unless given?
36+
apply = ordered? ? ordered : unordered
37+
return true unless ENV['UPDATE_COUNT']
38+
apply
39+
end
40+
41+
private
42+
43+
def given?
44+
!count.nil?
45+
end
46+
47+
def missing
48+
warn :missing, job_id, counter.count
49+
true
50+
end
51+
52+
def ordered
53+
info :ordered, count, job_id, counter.count
54+
true
55+
end
56+
57+
def unordered
58+
warn :unordered, count, job_id, counter.count, ENV['UPDATE_COUNT'] ? MSGS[:skip] : ''
59+
false
60+
end
61+
62+
def ordered?
63+
count >= counter.count
64+
end
65+
66+
def counter
67+
@counter ||= Counter.new(job_id, redis)
68+
end
69+
70+
def job_id
71+
data[:id]
72+
end
73+
74+
def count
75+
meta[:state_update_count]
76+
end
77+
78+
def meta
79+
data[:meta] || {}
80+
end
81+
end
82+
end
83+
end
84+
end

lib/travis/hub/service/update_job.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'travis/hub/helper/locking'
44
require 'travis/hub/model/job'
55
require 'travis/hub/service/error_job'
6+
require 'travis/hub/service/state_update'
67
require 'travis/hub/service/notify_workers'
78
require 'travis/hub/helper/limit'
89

@@ -16,7 +17,7 @@ class UpdateJob < Struct.new(:event, :data)
1617
EVENTS = [:receive, :reset, :start, :finish, :cancel, :restart]
1718

1819
MSGS = {
19-
skipped: 'Skipped event job:%s for <Job id=%s> trying to update state from %s to %s data=%s',
20+
skipped: 'Skipped event job:%s for <Job id=%s> trying to update state from %p to %p data=%s',
2021
}
2122

2223
def run
@@ -37,6 +38,7 @@ def job
3738
private
3839

3940
def update_job
41+
return skipped unless apply_state_update?
4042
return error_job if event == :reset && resets.limited? && !job.finished?
4143
return recancel if recancel?
4244
return skipped if skip_canceled?
@@ -72,6 +74,10 @@ def skipped
7274
warn :skipped, event, job.id, job.state, data[:state], data
7375
end
7476

77+
def apply_state_update?
78+
StateUpdate.new(context, data).apply?
79+
end
80+
7581
def resets
7682
@resets ||= Limit.new(redis, :resets, job.id, config.limit.resets)
7783
end

spec/spec_helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
Travis::Event.instance_variable_set(:@subscriptions, nil)
7676
# Travis::Addons.setup({ host: 'host.com', encryption: { key: 'secret' * 10 } }, logger)
7777
Time.stubs(:now).returns(NOW)
78+
context.redis.flushall
7879
end
7980
end
8081

spec/support/context.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module Context
66

77
included do
88
let(:stdout) { StringIO.new }
9+
let(:log) { stdout.string }
910
let(:logger) { Travis::Logger.new(stdout) }
1011
let(:context) { Travis::Hub::Context.new(logger: logger) }
1112
before { Travis::Hub.context = context }

spec/travis/hub/service/update_job_spec.rb

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,68 @@ def recieve(msg)
210210
expect(job.reload.state).to eql :passed
211211
end
212212
end
213+
214+
describe 'state update count' do
215+
let(:job) { FactoryGirl.create(:job, state: :received) }
216+
let(:event) { :start }
217+
let(:data) { { id: job.id, state: :started, meta: meta } }
218+
219+
before { ENV['UPDATE_COUNT'] = 'true' }
220+
after { ENV['UPDATE_COUNT'] = nil }
221+
222+
describe 'with no count stored' do
223+
describe 'given no meta' do
224+
let(:meta) { nil }
225+
before { subject.run }
226+
227+
it { expect(job.reload.state).to eq :started }
228+
it { expect(log).to include "W Received state update with no count for job id=#{job.id}, last known count: 0" }
229+
end
230+
231+
describe 'given no count' do
232+
let(:meta) { {} }
233+
before { subject.run }
234+
235+
it { expect(job.reload.state).to eq :started }
236+
it { expect(log).to include "W Received state update with no count for job id=#{job.id}, last known count: 0" }
237+
end
238+
239+
describe 'given a count' do
240+
let(:meta) { { state_update_count: 2 } }
241+
before { subject.run }
242+
243+
it { expect(job.reload.state).to eq :started }
244+
it { expect(log).to include "I Received state update 2 for job id=#{job.id}, last known count: 0" }
245+
end
246+
end
247+
248+
describe 'with a count stored' do
249+
before { context.redis.set("job:state_update_count:#{job.id}", 3) }
250+
251+
describe 'given no meta it skips the message' do
252+
let(:meta) { nil }
253+
before { subject.run }
254+
255+
it { expect(job.reload.state).to eq :started }
256+
it { expect(log).to include "W Received state update with no count for job id=#{job.id}, last known count: 3" }
257+
end
258+
259+
describe 'given no count it skips the message' do
260+
let(:meta) { {} }
261+
before { subject.run }
262+
263+
it { expect(job.reload.state).to eq :started }
264+
it { expect(log).to include "W Received state update with no count for job id=#{job.id}, last known count: 3" }
265+
end
266+
267+
describe 'given a count it skips the message' do
268+
let(:meta) { { state_update_count: 2 } }
269+
before { subject.run }
270+
271+
it { expect(job.reload.state).to eq :received }
272+
it { expect(log).to include "W Received state update 2 for job id=#{job.id}, last known count: 3. Skipping the message." }
273+
it { expect(log).to include "W Skipped event job:start for <Job id=#{job.id}> trying to update state from :received to :started" }
274+
end
275+
end
276+
end
213277
end

0 commit comments

Comments
 (0)