Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/perfectqueue/backend/rdb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module PerfectQueue::Backend
class RDBBackend
MAX_RETRY = ::PerfectQueue::Backend::RDBCompatBackend::MAX_RETRY
DELETE_OFFSET = ::PerfectQueue::Backend::RDBCompatBackend::DELETE_OFFSET
EVENT_HORIZON = ::PerfectQueue::Backend::RDBCompatBackend::EVENT_HORIZON
class Token < Struct.new(:key)
end

Expand Down Expand Up @@ -48,7 +49,7 @@ def submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second

def cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second))
connect {
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout-DELETE_OFFSET, id].update
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout;", now+delete_timeout-DELETE_OFFSET, id].update
return n > 0
}
end
Expand Down
20 changes: 14 additions & 6 deletions lib/perfectqueue/backend/rdb_compat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,15 @@ def finish(task_token, retention_time, options)
key = task_token.key

connect {
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL", delete_timeout, key].update
n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout", delete_timeout, key].update
if n <= 0
raise IdempotentAlreadyFinishedError, "task key=#{key} does not exist or already finished."
end
}
nil
end

# => nil
# => next_timeout
def heartbeat(task_token, alive_time, options)
now = (options[:now] || Time.now).to_i
next_timeout = now + alive_time
Expand All @@ -305,8 +305,14 @@ def heartbeat(task_token, alive_time, options)
sql << ", data=?"
params << compress_data(data.to_json, options[:compression])
end
sql << " WHERE id=? AND created_at IS NOT NULL"
params << key
if last_heartbeat = options[:last_heartbeat]
sql << " WHERE id=? AND timeout=?"
params << key
params << last_heartbeat
else
sql << " WHERE id=? AND #{EVENT_HORIZON} < timeout"
params << key
end

connect {
n = @db[*params].update
Expand All @@ -315,13 +321,15 @@ def heartbeat(task_token, alive_time, options)
if row == nil
raise PreemptedError, "task key=#{key} does not exist or preempted."
elsif row[:created_at] == nil
raise PreemptedError, "task key=#{key} preempted."
raise PreemptedError, "task key=#{key} is finished or canceled"
elsif options[:last_heartbeat] && row[:timeout] != options[:last_heartbeat]
raise PreemptedError, "task key=#{key} is preempted by another worker."
else # row[:timeout] == next_timeout
# ok
end
end
}
nil
next_timeout
end

def release(task_token, alive_time, options)
Expand Down
2 changes: 1 addition & 1 deletion lib/perfectqueue/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def initialize(client, key, attributes, task_token)
end

def heartbeat!(options={})
@client.heartbeat(@task_token, options)
self.timeout = @client.heartbeat(@task_token, options.merge(last_heartbeat: timeout))
end

def finish!(options={})
Expand Down
10 changes: 5 additions & 5 deletions lib/perfectqueue/task_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ def created_at
end

def timeout
if t = @attributes[:timeout]
return Time.at(t)
else
return nil
end
@attributes[:timeout]
end

def timeout=(v)
@attributes[:timeout] = v
end

def finished?
Expand Down
17 changes: 9 additions & 8 deletions lib/perfectqueue/task_monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def initialize(config, child_heartbeat=nil, force_stop=nil)
@child_heartbeat_interval = (@config[:child_heartbeat_interval] || 2).to_i
@task_heartbeat_interval = (@config[:task_heartbeat_interval] || 2).to_i
@last_child_heartbeat = Time.now.to_i
@last_task_heartbeat = Time.now.to_i

@task = nil

Expand Down Expand Up @@ -59,8 +58,11 @@ def set_task(task, runner)
task.runner = runner
@mutex.synchronize {
@task = task
@last_task_heartbeat = Time.now.to_i
}
now = Time.now.to_i
while @task && @task.timeout + @task_heartbeat_interval < now
sleep 1
end
end

def stop_task(immediate)
Expand Down Expand Up @@ -101,7 +103,6 @@ def external_task_heartbeat(task, &block)
@mutex.synchronize {
if task == @task
ret = block.call if block
@last_task_heartbeat = Time.now.to_i
end
ret
}
Expand All @@ -115,20 +116,18 @@ def run
next_child_heartbeat = @last_child_heartbeat + @child_heartbeat_interval

if @task
next_task_heartbeat = @last_task_heartbeat + @task_heartbeat_interval
next_task_heartbeat = @task.timeout + @task_heartbeat_interval
next_time = [next_child_heartbeat, next_task_heartbeat].min
else
next_task_heartbeat = nil
next_time = next_child_heartbeat
end

next_wait = next_time - now
@cond.wait(next_wait) if next_wait > 0

now = Time.now.to_i
if @task && next_task_heartbeat && next_task_heartbeat <= now
if @task && @task.timeout + @task_heartbeat_interval <= now
task_heartbeat
@last_task_heartbeat = now
end

if next_child_heartbeat <= now
Expand All @@ -145,10 +144,12 @@ def run

private
def task_heartbeat
@task.heartbeat!
task = @task
task.heartbeat!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like that assigning task is unnecessary.

rescue
# finished, preempted, etc.
kill_task($!)
@task = nil
end
end

Expand Down
2 changes: 1 addition & 1 deletion spec/multiprocess/child_process_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
end

describe '#process' do
let (:task){ double('task', key: double) }
let (:task){ double('task', key: double, timeout: Time.now.to_i) }
before do
expect(runner_insntace).to receive(:run)
end
Expand Down
24 changes: 18 additions & 6 deletions spec/rdb_compat_backend_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,13 @@
expect(ary[3]).to be_an_instance_of(AcquiredTask)
expect(ary[4]).to be_an_instance_of(AcquiredTask)

now1 = Time.at(now + alive_time)
expect(now1).to receive(:to_time).exactly(5).times.and_call_original
db.list({}){|task| expect(task.timeout).to eq now1.to_time }
now1 = now + alive_time
i = 0
db.list({}) do |task|
expect(task.timeout).to eq now1
i += 1
end
expect(i).to eq(5)
end
end
end
Expand Down Expand Up @@ -294,15 +298,18 @@
let (:delete_timeout){ now + retention_time }
let (:options){ {now: now} }
before{ allow(STDERR).to receive(:puts) }
context 'have a queueuled task' do
context 'have a queued task' do
before do
db.submit(key, 'test', nil, {})
end
it 'returns nil if next_run_time is not updated' do
expect(db.heartbeat(task_token, 0, {now: now})).to be_nil
expect(db.heartbeat(task_token, 0, {now: now})).to be_a(Integer)
end
it 'returns nil even if next_run_time is updated' do
expect(db.heartbeat(task_token, 1, {})).to be_nil
expect(db.heartbeat(task_token, 1, {})).to be_a(Integer)
end
it 'raises PreemptedError if last_heartbeat is not matched' do
expect{db.heartbeat(task_token, 1, {last_heartbeat: now-100})}.to raise_error(PreemptedError)
end
end
context 'no tasks' do
Expand All @@ -319,6 +326,11 @@
expect{db.heartbeat(task_token, 0, {})}.to raise_error(PreemptedError)
end
end
context 'stolen task' do
it 'raises PreemptedError if the task has unpexpected last_heartbeat' do
expect{db.heartbeat(task_token, 0, last_heartbeat: 0)}.to raise_error(PreemptedError)
end
end
end

context '#connect' do
Expand Down
13 changes: 11 additions & 2 deletions spec/task_metadata_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,19 @@
end

describe 'timeout' do
it 'returns a time of given timeout' do
it 'returns given timeout' do
epoch = 72
tm = TaskMetadata.new(double, double, timeout: epoch)
expect(tm.timeout).to eq(Time.at(epoch))
expect(tm.timeout).to eq(epoch)
end
end

describe 'timeout=' do
it 'sets timeout' do
epoch = 72
tm = TaskMetadata.new(double, double, timeout: 1)
tm.timeout = epoch
expect(tm.timeout).to eq(epoch)
end
end
end
89 changes: 79 additions & 10 deletions spec/task_monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
ret = double('ret')
tm.instance_variable_set(:@task, task)
expect(tm.external_task_heartbeat(task){ret}).to eq(ret)
expect(tm.instance_variable_get(:@last_task_heartbeat)).to eq(epoch)
end
end

Expand All @@ -39,23 +38,93 @@
end

describe '#task_heartbeat' do
let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object) }
let (:config){ {type: 'rdb_compat', url: 'mysql2://root:@localhost/perfectqueue_test', table: 'test_queues', alive_time: 11} }
let (:client){ Client.new(config) }
let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object, task_heartbeat_interval: 1) }
let (:err){ StandardError.new('heartbeat preempted') }
before do
task = double('task')
allow(task).to receive(:heartbeat!){ raise err }
tm.set_task(task, double('runner'))
let (:now){ Time.now.to_i }
let (:task){ double('task', timeout: now) }
let (:runner){ double('runner') }
context 'timeout' do
before do
tm.set_task(task, double('runner'))
end
it 'calls kill_task($!) on heartbeat error' do
allow(task).to receive(:heartbeat!){ raise err }
expect(tm).to receive(:kill_task).with(err).exactly(:once)
tm.__send__(:task_heartbeat)
end
end
context 'normal' do
before do
client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } }
client.init_database
client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'})
tm.start
end
after do
tm.stop
end
it 'update timeout' do
tasks = client.acquire(now: now-80)
task = tasks[0]
expect(task.timeout).to eq(now-80+config[:alive_time])
allow(Time).to receive(:now).and_return(now-50)
tm.set_task(task, runner)
expect(task.timeout).to eq(now-50+config[:alive_time])
end
end
it 'calls kill_task($!) on heartbeat error' do
expect(tm).to receive(:kill_task).with(err).exactly(:once)
tm.__send__(:task_heartbeat)
context 'stolen' do
before do
client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } }
client.init_database
client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'})
tm.start
end
after do
tm.stop
end
it 'raise error' do
tasks = client.acquire(now: now-80)
task1 = tasks[0]
expect(task1.timeout).to eq(now-80+config[:alive_time])

tasks = client.acquire(now: now-60)
task2 = tasks[0]
expect(task2.timeout).to eq(now-60+config[:alive_time])

allow(Time).to receive(:now).and_return(now-50)
expect(runner).to receive(:kill)
tm.set_task(task1, runner)
end
end
context 'timeout but can acquire' do
before do
client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } }
client.init_database
client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'})
tm.start
end
after do
tm.stop
end
it 'raise error' do
tasks = client.acquire(now: now-80)
task1 = tasks[0]
expect(task1.timeout).to eq(now-80+config[:alive_time])

allow(Time).to receive(:now).and_return(now-50)
tm.set_task(task1, runner)

expect(task1.runner).to eq(runner)
end
end
end
end

describe PerfectQueue::TaskMonitorHook do
let (:task) do
obj = AcquiredTask.new(double(:client).as_null_object, 'key', {}, double)
obj = AcquiredTask.new(double(:client).as_null_object, 'key', {timeout: Time.now.to_i}, double)
tm = TaskMonitor.new(logger: double('logger').as_null_object)
tm.set_task(obj, double('runner'))
obj
Expand Down