diff --git a/lib/perfectqueue/backend/rdb.rb b/lib/perfectqueue/backend/rdb.rb index 0a16ed8..b256df6 100644 --- a/lib/perfectqueue/backend/rdb.rb +++ b/lib/perfectqueue/backend/rdb.rb @@ -6,6 +6,7 @@ module PerfectQueue::Backend class RDBBackend MAX_RETRY = ::PerfectQueue::Backend::RDBCompatBackend::MAX_RETRY DELETE_OFFSET = ::PerfectQueue::Backend::RDBCompatBackend::DELETE_OFFSET + EVENT_HORIZON = ::PerfectQueue::Backend::RDBCompatBackend::EVENT_HORIZON class Token < Struct.new(:key) end @@ -48,7 +49,7 @@ def submit(id, data, time=Process.clock_gettime(Process::CLOCK_REALTIME, :second def cancel(id, delete_timeout=3600, now=Process.clock_gettime(Process::CLOCK_REALTIME, :second)) connect { - n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL;", now+delete_timeout-DELETE_OFFSET, id].update + n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout;", now+delete_timeout-DELETE_OFFSET, id].update return n > 0 } end diff --git a/lib/perfectqueue/backend/rdb_compat.rb b/lib/perfectqueue/backend/rdb_compat.rb index 9b46f94..9b02d25 100644 --- a/lib/perfectqueue/backend/rdb_compat.rb +++ b/lib/perfectqueue/backend/rdb_compat.rb @@ -284,7 +284,7 @@ def finish(task_token, retention_time, options) key = task_token.key connect { - n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND created_at IS NOT NULL", delete_timeout, key].update + n = @db["UPDATE `#{@table}` SET timeout=?, created_at=NULL, resource=NULL WHERE id=? AND #{EVENT_HORIZON} < timeout", delete_timeout, key].update if n <= 0 raise IdempotentAlreadyFinishedError, "task key=#{key} does not exist or already finished." end @@ -292,7 +292,7 @@ def finish(task_token, retention_time, options) nil end - # => nil + # => next_timeout def heartbeat(task_token, alive_time, options) now = (options[:now] || Time.now).to_i next_timeout = now + alive_time @@ -305,8 +305,14 @@ def heartbeat(task_token, alive_time, options) sql << ", data=?" params << compress_data(data.to_json, options[:compression]) end - sql << " WHERE id=? AND created_at IS NOT NULL" - params << key + if last_heartbeat = options[:last_heartbeat] + sql << " WHERE id=? AND timeout=?" + params << key + params << last_heartbeat + else + sql << " WHERE id=? AND #{EVENT_HORIZON} < timeout" + params << key + end connect { n = @db[*params].update @@ -315,13 +321,15 @@ def heartbeat(task_token, alive_time, options) if row == nil raise PreemptedError, "task key=#{key} does not exist or preempted." elsif row[:created_at] == nil - raise PreemptedError, "task key=#{key} preempted." + raise PreemptedError, "task key=#{key} is finished or canceled" + elsif options[:last_heartbeat] && row[:timeout] != options[:last_heartbeat] + raise PreemptedError, "task key=#{key} is preempted by another worker." else # row[:timeout] == next_timeout # ok end end } - nil + next_timeout end def release(task_token, alive_time, options) diff --git a/lib/perfectqueue/multiprocess/thread_processor.rb b/lib/perfectqueue/multiprocess/thread_processor.rb index eb2cde7..718f55b 100644 --- a/lib/perfectqueue/multiprocess/thread_processor.rb +++ b/lib/perfectqueue/multiprocess/thread_processor.rb @@ -96,8 +96,9 @@ def run_loop @finish_flag.wait(@poll_interval) else begin + last_heartbeat = Time.now.to_i while task = tasks.shift - process(task) + process(task, last_heartbeat) end ensure # TODO do not call release! because rdb_compat backend @@ -120,11 +121,11 @@ def run_loop @tm.stop end - def process(task) + def process(task, last_heartbeat=Time.now.to_i) @log.info "acquired task task=#{task.key} id=#{@processor_id}: #{task.inspect}" begin r = @runner.new(task) - @tm.set_task(task, r) + @tm.set_task(task, r, last_heartbeat) begin r.run ensure diff --git a/lib/perfectqueue/task.rb b/lib/perfectqueue/task.rb index 1723fae..84e28f7 100644 --- a/lib/perfectqueue/task.rb +++ b/lib/perfectqueue/task.rb @@ -72,7 +72,7 @@ def initialize(client, key, attributes, task_token) end def heartbeat!(options={}) - @client.heartbeat(@task_token, options) + @attributes[:timeout] = @client.heartbeat(@task_token, options.merge(last_heartbeat: timeout)) end def finish!(options={}) diff --git a/lib/perfectqueue/task_metadata.rb b/lib/perfectqueue/task_metadata.rb index 2caad60..fad3032 100644 --- a/lib/perfectqueue/task_metadata.rb +++ b/lib/perfectqueue/task_metadata.rb @@ -50,11 +50,7 @@ def created_at end def timeout - if t = @attributes[:timeout] - return Time.at(t) - else - return nil - end + @attributes[:timeout] end def finished? diff --git a/lib/perfectqueue/task_monitor.rb b/lib/perfectqueue/task_monitor.rb index dced0ef..d34f682 100644 --- a/lib/perfectqueue/task_monitor.rb +++ b/lib/perfectqueue/task_monitor.rb @@ -52,15 +52,20 @@ def join @thread.join end - def set_task(task, runner) + def set_task(task, runner, last_heartbeat=Time.now.to_i) task.extend(TaskMonitorHook) task.log = @log task.task_monitor = self task.runner = runner @mutex.synchronize { @task = task - @last_task_heartbeat = Time.now.to_i + @last_task_heartbeat = last_heartbeat + @cond.broadcast } + now = Time.now.to_i + while @task && @last_task_heartbeat + @task_heartbeat_interval < now + sleep 1 + end end def stop_task(immediate) @@ -101,7 +106,6 @@ def external_task_heartbeat(task, &block) @mutex.synchronize { if task == @task ret = block.call if block - @last_task_heartbeat = Time.now.to_i end ret } @@ -118,7 +122,6 @@ def run next_task_heartbeat = @last_task_heartbeat + @task_heartbeat_interval next_time = [next_child_heartbeat, next_task_heartbeat].min else - next_task_heartbeat = nil next_time = next_child_heartbeat end @@ -126,7 +129,7 @@ def run @cond.wait(next_wait) if next_wait > 0 now = Time.now.to_i - if @task && next_task_heartbeat && next_task_heartbeat <= now + if @task && @last_task_heartbeat + @task_heartbeat_interval <= now task_heartbeat @last_task_heartbeat = now end diff --git a/spec/multiprocess/child_process_spec.rb b/spec/multiprocess/child_process_spec.rb index f87499b..6537f2e 100644 --- a/spec/multiprocess/child_process_spec.rb +++ b/spec/multiprocess/child_process_spec.rb @@ -73,7 +73,7 @@ end describe '#process' do - let (:task){ double('task', key: double) } + let (:task){ double('task', key: double, timeout: Time.now.to_i) } before do expect(runner_insntace).to receive(:run) end diff --git a/spec/rdb_compat_backend_spec.rb b/spec/rdb_compat_backend_spec.rb index 214496a..27f5ebb 100644 --- a/spec/rdb_compat_backend_spec.rb +++ b/spec/rdb_compat_backend_spec.rb @@ -242,9 +242,13 @@ expect(ary[3]).to be_an_instance_of(AcquiredTask) expect(ary[4]).to be_an_instance_of(AcquiredTask) - now1 = Time.at(now + alive_time) - expect(now1).to receive(:to_time).exactly(5).times.and_call_original - db.list({}){|task| expect(task.timeout).to eq now1.to_time } + now1 = now + alive_time + i = 0 + db.list({}) do |task| + expect(task.timeout).to eq now1 + i += 1 + end + expect(i).to eq(5) end end end @@ -294,15 +298,18 @@ let (:delete_timeout){ now + retention_time } let (:options){ {now: now} } before{ allow(STDERR).to receive(:puts) } - context 'have a queueuled task' do + context 'have a queued task' do before do db.submit(key, 'test', nil, {}) end it 'returns nil if next_run_time is not updated' do - expect(db.heartbeat(task_token, 0, {now: now})).to be_nil + expect(db.heartbeat(task_token, 0, {now: now})).to be_a(Integer) end it 'returns nil even if next_run_time is updated' do - expect(db.heartbeat(task_token, 1, {})).to be_nil + expect(db.heartbeat(task_token, 1, {})).to be_a(Integer) + end + it 'raises PreemptedError if last_heartbeat is not matched' do + expect{db.heartbeat(task_token, 1, {last_heartbeat: now-100})}.to raise_error(PreemptedError) end end context 'no tasks' do @@ -319,6 +326,11 @@ expect{db.heartbeat(task_token, 0, {})}.to raise_error(PreemptedError) end end + context 'stolen task' do + it 'raises PreemptedError if the task has unpexpected last_heartbeat' do + expect{db.heartbeat(task_token, 0, last_heartbeat: 0)}.to raise_error(PreemptedError) + end + end end context '#connect' do diff --git a/spec/task_metadata_spec.rb b/spec/task_metadata_spec.rb index 1619a3d..3d5a6db 100644 --- a/spec/task_metadata_spec.rb +++ b/spec/task_metadata_spec.rb @@ -60,10 +60,10 @@ end describe 'timeout' do - it 'returns a time of given timeout' do + it 'returns given timeout' do epoch = 72 tm = TaskMetadata.new(double, double, timeout: epoch) - expect(tm.timeout).to eq(Time.at(epoch)) + expect(tm.timeout).to eq(epoch) end end end diff --git a/spec/task_monitor_spec.rb b/spec/task_monitor_spec.rb index 975cec0..7423afa 100644 --- a/spec/task_monitor_spec.rb +++ b/spec/task_monitor_spec.rb @@ -23,7 +23,6 @@ ret = double('ret') tm.instance_variable_set(:@task, task) expect(tm.external_task_heartbeat(task){ret}).to eq(ret) - expect(tm.instance_variable_get(:@last_task_heartbeat)).to eq(epoch) end end @@ -39,23 +38,99 @@ end describe '#task_heartbeat' do - let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object) } + let (:config){ {type: 'rdb_compat', url: 'mysql2://root:@localhost/perfectqueue_test', table: 'test_queues', alive_time: 11} } + let (:client){ Client.new(config) } + let (:tm){ PerfectQueue::TaskMonitor.new(logger: double('logger').as_null_object, task_heartbeat_interval: 1) } let (:err){ StandardError.new('heartbeat preempted') } - before do - task = double('task') - allow(task).to receive(:heartbeat!){ raise err } - tm.set_task(task, double('runner')) + let (:now){ Time.now.to_i } + let (:task){ double('task', timeout: now) } + let (:runner){ double('runner') } + context 'timeout' do + before do + tm.set_task(task, double('runner')) + end + it 'calls kill_task($!) on heartbeat error' do + allow(task).to receive(:heartbeat!){ raise err } + expect(tm).to receive(:kill_task).with(err).exactly(:once) + tm.__send__(:task_heartbeat) + end + end + context 'normal' do + before do + client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } } + client.init_database + client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'}) + tm.start + end + after do + tm.stop + tm.join + end + it 'update timeout' do + tasks = client.acquire(now: now-80) + task = tasks[0] + expect(task.timeout).to eq(now-80+config[:alive_time]) + allow(Time).to receive(:now).and_return(now-50) + tm.set_task(task, runner, now-80) + expect(task.timeout).to eq(now-50+config[:alive_time]) + end end - it 'calls kill_task($!) on heartbeat error' do - expect(tm).to receive(:kill_task).with(err).exactly(:once) - tm.__send__(:task_heartbeat) + context 'stolen' do + before do + client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } } + client.init_database + client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'}) + tm.start + end + after do + tm.stop + tm.join + end + it 'raise error' do + now1 = now - 80 + tasks = client.acquire(now: now1) + task1 = tasks[0] + expect(task1.timeout).to eq(now1+config[:alive_time]) + + now2 = now - 50 + tasks = client.acquire(now: now2) + task2 = tasks[0] + expect(task2.timeout).to eq(now2+config[:alive_time]) + + now3 = now - 20 + allow(Time).to receive(:now).and_return(now3) + expect(runner).to receive(:kill) + tm.set_task(task1, runner, now1) + end + end + context 'timeout but can acquire' do + before do + client.backend.db.tap{|s| s.tables.each{|t| s.drop_table(t) } } + client.init_database + client.submit('key', 'test1', {'foo' => 1}, {now: now-90,compression: 'gzip'}) + tm.start + end + after do + tm.stop + tm.join + end + it 'raise error' do + tasks = client.acquire(now: now-80) + task1 = tasks[0] + expect(task1.timeout).to eq(now-80+config[:alive_time]) + + allow(Time).to receive(:now).and_return(now-50) + tm.set_task(task1, runner, now-50) + + expect(task1.runner).to eq(runner) + end end end end describe PerfectQueue::TaskMonitorHook do let (:task) do - obj = AcquiredTask.new(double(:client).as_null_object, 'key', {}, double) + obj = AcquiredTask.new(double(:client).as_null_object, 'key', {timeout: Time.now.to_i}, double) tm = TaskMonitor.new(logger: double('logger').as_null_object) tm.set_task(obj, double('runner')) obj