From e7888745e98af61312c2a5e1ea1f16ea83b6e3ef Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Wed, 2 Mar 2016 17:28:00 -0800 Subject: [PATCH 1/7] doing stuff to emulate https://github.com/lantins/resque-retry --- examples/retry.js | 99 +++++++++++++++++ lib/connection.js | 4 +- lib/plugins/retry.js | 125 ++++++++++++++++++++++ lib/plugins/simpleRetry.js | 91 ---------------- lib/worker.js | 4 +- test/plugins/{simpleRetry.js => retry.js} | 0 6 files changed, 228 insertions(+), 95 deletions(-) create mode 100644 examples/retry.js create mode 100644 lib/plugins/retry.js delete mode 100644 lib/plugins/simpleRetry.js rename test/plugins/{simpleRetry.js => retry.js} (100%) diff --git a/examples/retry.js b/examples/retry.js new file mode 100644 index 00000000..89857d11 --- /dev/null +++ b/examples/retry.js @@ -0,0 +1,99 @@ +///////////////////////// +// REQUIRE THE PACKAGE // +///////////////////////// + +var NR = require(__dirname + "/../index.js"); +// In your projects: var NR = require("node-resque"); + +/////////////////////////// +// SET UP THE CONNECTION // +/////////////////////////// + +var connectionDetails = { + pkg: 'ioredis', + host: '127.0.0.1', + password: null, + port: 6379, + database: 1, + // namespace: 'resque', + // looping: true, + // options: {password: 'abc'}, +}; + +////////////////////////////// +// DEFINE YOUR WORKER TASKS // +////////////////////////////// + +var jobs = { + "add": { + plugins: [ 'retry' ], + pluginOptions: { + retry: { + retryLimit: 3, + // retryDelay: 1000, + backoffStrategy: [1000, 5000, 10000], + }, + }, + perform: function(a,b,callback){ + var broken = true; + + if(broken){ + return callback(new Error('BUSTED')); + }else{ + return callback(null, (a + b)); + } + }, + } +}; + +//////////////////// +// START A WORKER // +//////////////////// + +var worker = new NR.worker({connection: connectionDetails, queues: ['math']}, jobs); +worker.connect(function(){ + worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host + worker.start(); +}); + +/////////////////////// +// START A SCHEDULER // +/////////////////////// + +var scheduler = new NR.scheduler({connection: connectionDetails}); +scheduler.connect(function(){ + scheduler.start(); +}); + +///////////////////////// +// REGESTER FOR EVENTS // +///////////////////////// + +worker.on('start', function(){ console.log("worker started"); }); +worker.on('end', function(){ console.log("worker ended"); }); +worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); }); +worker.on('poll', function(queue){ console.log("worker polling " + queue); }); +worker.on('job', function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); }); +worker.on('reEnqueue', function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); }); +worker.on('success', function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); }); +worker.on('failure', function(queue, job, failure){ console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); }); +worker.on('error', function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); }); +worker.on('pause', function(){ console.log("worker paused"); }); + +scheduler.on('start', function(){ console.log("scheduler started"); }); +scheduler.on('end', function(){ console.log("scheduler ended"); }); +scheduler.on('poll', function(){ console.log("scheduler polling"); }); +scheduler.on('master', function(state){ console.log("scheduler became master"); }); +scheduler.on('error', function(error){ console.log("scheduler error >> " + error); }); +scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); }); +scheduler.on('transferred_job', function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); }); + +//////////////////////////////////// +// CONNECT TO A QUEUE AND WORK IT // +//////////////////////////////////// + +var queue = new NR.queue({connection: connectionDetails}, jobs); +queue.on('error', function(error){ console.log(error); }); +queue.connect(function(){ + queue.enqueue('math', "add", [1,2]); +}); diff --git a/lib/connection.js b/lib/connection.js index 95400c38..776e7a02 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -47,9 +47,9 @@ connection.prototype.connect = function(callback){ }else{ - if(self.options['package'] && !self.options.pkg) { + if(self.options.package && !self.options.pkg) { self.emit('Depreciation warning: You need to use \'pkg\' instead of \'package\'! Please update your configuration.'); - self.options.pkg = self.options['package']; + self.options.pkg = self.options.package; } var pkg = require(self.options.pkg); self.redis = pkg.createClient(self.options.port, self.options.host, self.options.options); diff --git a/lib/plugins/retry.js b/lib/plugins/retry.js new file mode 100644 index 00000000..7cc4071b --- /dev/null +++ b/lib/plugins/retry.js @@ -0,0 +1,125 @@ +// If the job fails, sleep, and re-enqueue it. +// a port of some of the features in https://github.com/lantins/resque-retry + +var crypto = require('crypto'); + +var retry = function(worker, func, queue, job, args, options){ + var self = this; + + if(!options.retryLimit){ options.retryLimit = 1; } + if(!options.retryDelay){ options.retryDelay = 1000; } + if(!options.backoffStrategy){ options.backoffStrategy = null; } + + self.name = 'retry'; + self.worker = worker; + self.queue = queue; + self.func = func; + self.job = job; + self.args = args; + self.options = options; + + if(self.worker.queueObject){ + self.queueObject = self.worker.queueObject; + }else{ + self.queueObject = self.worker; + } +}; + +//////////////////// +// PLUGIN METHODS // +//////////////////// + +retry.prototype.retryKey = function(){ + var self = this; + // TODO: in ruby, the hash is from `args.join('-')`. Does it matter? + var hash = crypto.createHash('sha1').update(JSON.stringify(self.args)).digest('hex'); + return self.queueObject.connection.key('resque-retry', self.func, hash); +}; + +retry.prototype.failureKey = function(){ + var self = this; + var hash = crypto.createHash('sha1').update(self.args.join('-')).digest('hex'); + return self.queueObject.connection.key('failure-', self.func, hash); +}; + +retry.prototype.redis = function(){ + var self = this; + return self.queueObject.connection.redis; +}; + +retry.prototype.attemptUp = function(callback){ + var self = this; + var key = self.retryKey(); + self.redis().setnx(key, -1, function(error){ + if(error){ return callback(error); } + self.redis().incr(key, function(error, retryCount){ + // TODO: expire the key [[ Resque.redis.expire(retry_key, retry_delay + expire_retry_key_after) ]] + if(error){ return callback(error); } + var remaning = self.options.retryLimit - retryCount - 1; + return callback(null, remaning); + }); + }); +}; + +retry.prototype.saveLastError = function(callback){ + var self = this; + self.redis().set(self.failureKey(), self.worker.error, callback); +}; + +retry.prototype.cleanup = function(callback){ + var self = this; + var key = self.retryKey(); + var failureKey = self.failureKey(); + self.redis().del(key, function(error){ + if(error){ return callback(error); } + self.redis().del(failureKey, function(error){ + if(error){ return callback(error); } + return callback(); + }); + }); +}; + +retry.prototype.after_perform = function(callback){ + var self = this; + + if(!self.worker.error){ + self.clanup(callback); + } + + self.attemptUp(function(error, remaning){ + if(error){ return callback(error); } + self.saveLastError(function(error){ + if(error){ return callback(error); } + if(remaning <= 0){ + self.clanup(function(error){ + if(error){ return callback(error); } + return callback(self.worker.error, true); + }); + }else{ + var nextTryDelay = self.options.retryDelay; + if(Array.isArray(self.options.backoffStrategy)){ + var index = (self.options.retryLimit - remaning - 1); + if(index > (self.options.backoffStrategy.length - 1)){ + index = (self.options.backoffStrategy.length - 1); + } + nextTryDelay = self.options.backoffStrategy[index]; + } + + self.queueObject.enqueueIn(nextTryDelay, self.queue, self.func, self.args, function(error){ + if(error){ return callback(error); } + + self.worker.emit('reEnqueue', self.queue, self.job, { + delay: nextTryDelay, + remaningAttempts: remaning, + err: self.worker.error + }); + + delete self.worker.error; + return callback(null, true); + }); + } + }); + }); +}; + +exports.retry = retry; diff --git a/lib/plugins/simpleRetry.js b/lib/plugins/simpleRetry.js deleted file mode 100644 index a0865a4c..00000000 --- a/lib/plugins/simpleRetry.js +++ /dev/null @@ -1,91 +0,0 @@ -// If the job fails, sleep, and re-enqueue it. -// You probably never want to use this in production -var crypto = require('crypto'); - -var simpleRetry = function(worker, func, queue, job, args, options){ - var self = this; - self.name = 'simpleRetry'; - self.worker = worker; - self.queue = queue; - self.func = func; - self.job = job; - self.args = args; - self.options = options; - if (! self.options.retryInterval) { - self.options.retryInterval = [60, 300, 600, 1800, 3600] - } - - if(self.worker.queueObject){ - self.queueObject = self.worker.queueObject; - }else{ - self.queueObject = self.worker; - } - - if (self.args) { - jobHash = crypto.createHash('md5').update(JSON.stringify(self.args)).digest('hex'); - self.retryKey = self.queueObject.connection.key('retrytimes', jobHash); - } -}; - -//////////////////// -// PLUGIN METHODS // -//////////////////// - -simpleRetry.prototype.updateRetryTimes = function(callback) { - var self = this; - - self.queueObject.connection.redis.incr(self.retryKey, (function(err, result) { - if (err) { return callback(err); } - - self.queueObject.connection.redis.expire(self.retryKey, self.options.retryInterval[self.options.retryInterval.length - 1] * 2, function(err) { - if (err) { - callback(err); - } else if (result > self.options.retryInterval.length) { - err.message = '(Resque Retry Max Attempts Reached) -> ' + err.message; - callback(err); - } else { - callback(null, result); - } - }); - })); -}; - -simpleRetry.prototype.deleteRetryTimes = function(callback) { - this.queueObject.connection.redis.del(this.retryKey, callback); -}; - -simpleRetry.prototype.after_perform = function(callback){ - var self = this; - if (self.worker.error) { - self.updateRetryTimes(function(err, retryTimes) { - if (err) { - if (err.message.indexOf('(Resque Retry Max Attempts Reached) -> ') == 0) { - self.deleteRetryTimes(function(err) { - return callback(err, true); - }); - } else { - return callback(err); - } - } - - var delay = self.options.retryInterval[retryTimes - 1]; - self.queueObject.enqueueIn(delay * 1000, self.queue, self.func, self.args, function(err) { - if (err) { return callback(err); } - - self.worker.emit('reEnqueue', self.queue, self.job, { - times: retryTimes, - delay: delay, - err: self.worker.error - }); - self.worker.error = null; - return callback(err, true); - }); - }); - } else { - self.deleteRetryTimes(function(err) { - return callback(err, true); - }); - }; -}; - -exports.simpleRetry = simpleRetry; diff --git a/lib/worker.js b/lib/worker.js index c1b91721..703b8e08 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -10,7 +10,7 @@ var pluginRunner = require(__dirname + "/pluginRunner.js"); var worker = function(options, jobs){ var self = this; if(!jobs){ jobs = {}; } - + var defaults = self.defaults(); for(var i in defaults){ if(options[i] === undefined || options[i] === null){ @@ -300,7 +300,7 @@ worker.prototype.init = function(callback) { var self = this; var args, _ref; self.track(); - self.connection.redis.set(self.connection.key('worker', self.name, self.stringQueues(), 'started'), (new Date()).toString(), function(){ + self.connection.redis.set(self.connection.key('worker', self.name, self.stringQueues(), 'started'), Math.round((new Date()).getTime() / 1000), function(){ if(typeof callback === 'function'){ callback(); } }); }; diff --git a/test/plugins/simpleRetry.js b/test/plugins/retry.js similarity index 100% rename from test/plugins/simpleRetry.js rename to test/plugins/retry.js From 4d8ce4ceb12a175b59a11305dafe9225354e7827 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Fri, 4 Mar 2016 11:44:48 -0800 Subject: [PATCH 2/7] update metadata keys --- examples/retry.js | 9 ++++++--- lib/plugins/retry.js | 45 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/examples/retry.js b/examples/retry.js index 89857d11..0e2d43fc 100644 --- a/examples/retry.js +++ b/examples/retry.js @@ -31,7 +31,7 @@ var jobs = { retry: { retryLimit: 3, // retryDelay: 1000, - backoffStrategy: [1000, 5000, 10000], + backoffStrategy: [1000 * 10, 1000 * 20, 1000 * 30], }, }, perform: function(a,b,callback){ @@ -76,9 +76,12 @@ worker.on('poll', function(queue){ console.log("worker polling " + qu worker.on('job', function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); }); worker.on('reEnqueue', function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); }); worker.on('success', function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); }); -worker.on('failure', function(queue, job, failure){ console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); }); -worker.on('error', function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); }); worker.on('pause', function(){ console.log("worker paused"); }); +worker.on('error', function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); }); +worker.on('failure', function(queue, job, failure){ + console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); + setTimeout(process.exit, 2000); +}); scheduler.on('start', function(){ console.log("scheduler started"); }); scheduler.on('end', function(){ console.log("scheduler ended"); }); diff --git a/lib/plugins/retry.js b/lib/plugins/retry.js index 7cc4071b..c23908f0 100644 --- a/lib/plugins/retry.js +++ b/lib/plugins/retry.js @@ -2,6 +2,7 @@ // a port of some of the features in https://github.com/lantins/resque-retry var crypto = require('crypto'); +var os = require('os'); var retry = function(worker, func, queue, job, args, options){ var self = this; @@ -29,17 +30,21 @@ var retry = function(worker, func, queue, job, args, options){ // PLUGIN METHODS // //////////////////// +retry.prototype.argsKey = function(){ + var self = this; + // return crypto.createHash('sha1').update(self.args.join('-')).digest('hex'); + if(!self.args || self.args.length === 0){ return ''; } + return self.args.join('-'); +}; + retry.prototype.retryKey = function(){ var self = this; - // TODO: in ruby, the hash is from `args.join('-')`. Does it matter? - var hash = crypto.createHash('sha1').update(JSON.stringify(self.args)).digest('hex'); - return self.queueObject.connection.key('resque-retry', self.func, hash); + return self.queueObject.connection.key('resque-retry', self.func, self.argsKey()).replace(/\s/, ''); }; retry.prototype.failureKey = function(){ var self = this; - var hash = crypto.createHash('sha1').update(self.args.join('-')).digest('hex'); - return self.queueObject.connection.key('failure-', self.func, hash); + return self.queueObject.connection.key('failure-resque-retry:' + self.func + ':' + self.argsKey()).replace(/\s/, ''); }; retry.prototype.redis = function(){ @@ -54,6 +59,7 @@ retry.prototype.attemptUp = function(callback){ if(error){ return callback(error); } self.redis().incr(key, function(error, retryCount){ // TODO: expire the key [[ Resque.redis.expire(retry_key, retry_delay + expire_retry_key_after) ]] + // TODO: expire the failure key too if(error){ return callback(error); } var remaning = self.options.retryLimit - retryCount - 1; return callback(null, remaning); @@ -63,7 +69,28 @@ retry.prototype.attemptUp = function(callback){ retry.prototype.saveLastError = function(callback){ var self = this; - self.redis().set(self.failureKey(), self.worker.error, callback); + var now = new Date(); + var failedAt = '' + + now.getFullYear() + '/' + + (('0' + (now.getMonth() + 1)).slice(-2)) + '/' + + (('0' + now.getDate()).slice(-2)) + ' ' + + (('0' + now.getHours()).slice(-2)) + ':' + + (('0' + now.getMinutes()).slice(-2)) + ':' + + (('0' + now.getSeconds()).slice(-2)) + ; + + var data = { + failed_at : failedAt, + payload : self.args, + exception : String(self.worker.error), + error : String(self.worker.error), + backtrace : self.worker.error.stack.split(os.EOL) || [], + worker : self.func, + queue : self.queue + }; + + var expireDelay = 99999; + self.redis().setex(self.failureKey(), expireDelay, JSON.stringify(data), callback); }; retry.prototype.cleanup = function(callback){ @@ -83,7 +110,7 @@ retry.prototype.after_perform = function(callback){ var self = this; if(!self.worker.error){ - self.clanup(callback); + self.cleanup(callback); } self.attemptUp(function(error, remaning){ @@ -91,7 +118,7 @@ retry.prototype.after_perform = function(callback){ self.saveLastError(function(error){ if(error){ return callback(error); } if(remaning <= 0){ - self.clanup(function(error){ + self.cleanup(function(error){ if(error){ return callback(error); } return callback(self.worker.error, true); }); @@ -114,6 +141,8 @@ retry.prototype.after_perform = function(callback){ err: self.worker.error }); + //TODO: add a failed counter, and remove the success counter (total and for worker) + delete self.worker.error; return callback(null, true); }); From d0adc312e30f777addd7cb673b635b82ecb32ad7 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Fri, 4 Mar 2016 13:19:00 -0800 Subject: [PATCH 3/7] update ruby stuff --- examples/retry.js | 2 +- resque-web/Gemfile | 3 ++- resque-web/Gemfile.lock | 21 ++++++++++++++------- resque-web/config.ru | 6 ++++-- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/examples/retry.js b/examples/retry.js index 0e2d43fc..6b6b3ebc 100644 --- a/examples/retry.js +++ b/examples/retry.js @@ -14,7 +14,7 @@ var connectionDetails = { host: '127.0.0.1', password: null, port: 6379, - database: 1, + database: 0, // namespace: 'resque', // looping: true, // options: {password: 'abc'}, diff --git a/resque-web/Gemfile b/resque-web/Gemfile index 9eee3e0b..b18543e5 100644 --- a/resque-web/Gemfile +++ b/resque-web/Gemfile @@ -3,4 +3,5 @@ source 'http://rubygems.org' gem 'rake' gem 'sinatra' gem 'resque' -gem 'resque-scheduler' \ No newline at end of file +gem 'resque-scheduler' +gem 'resque-retry' diff --git a/resque-web/Gemfile.lock b/resque-web/Gemfile.lock index 4f9f0420..b54ba561 100644 --- a/resque-web/Gemfile.lock +++ b/resque-web/Gemfile.lock @@ -6,8 +6,8 @@ GEM rack (1.6.4) rack-protection (1.5.3) rack - rake (10.4.2) - redis (3.2.1) + rake (10.5.0) + redis (3.2.2) redis-namespace (1.5.2) redis (~> 3.0, >= 3.0.4) resque (1.25.2) @@ -16,17 +16,20 @@ GEM redis-namespace (~> 1.3) sinatra (>= 0.9.2) vegas (~> 0.1.2) - resque-scheduler (4.0.0) + resque-retry (1.5.0) + resque (~> 1.25) + resque-scheduler (~> 4.0) + resque-scheduler (4.1.0) mono_logger (~> 1.0) redis (~> 3.0) resque (~> 1.25) rufus-scheduler (~> 3.0) - rufus-scheduler (3.1.4) - sinatra (1.4.6) - rack (~> 1.4) + rufus-scheduler (3.2.0) + sinatra (1.4.7) + rack (~> 1.5) rack-protection (~> 1.4) tilt (>= 1.3, < 3) - tilt (2.0.1) + tilt (2.0.2) vegas (0.1.11) rack (>= 1.0.0) @@ -36,5 +39,9 @@ PLATFORMS DEPENDENCIES rake resque + resque-retry resque-scheduler sinatra + +BUNDLED WITH + 1.11.2 diff --git a/resque-web/config.ru b/resque-web/config.ru index 61844ecc..ee560f2d 100644 --- a/resque-web/config.ru +++ b/resque-web/config.ru @@ -4,14 +4,16 @@ require 'sinatra' require 'resque/server' require 'resque/scheduler' require 'resque/scheduler/server' +require 'resque-retry' +require 'resque-retry/server' require 'yaml' Resque.redis = Redis.new # Or, with custom options # Resque.redis = Redis.new({ -# :host => "127.0.0.1", -# :port => 6379, +# :host => "127.0.0.1", +# :port => 6379, # :db => 1, # }) # Resque.redis.namespace = 'resque_test' From 3954db0b934d5006ca7cb76c876a625d4c67c179 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Fri, 4 Mar 2016 13:30:43 -0800 Subject: [PATCH 4/7] key incr/decr --- lib/plugins/retry.js | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/lib/plugins/retry.js b/lib/plugins/retry.js index c23908f0..7126c7f2 100644 --- a/lib/plugins/retry.js +++ b/lib/plugins/retry.js @@ -47,6 +47,17 @@ retry.prototype.failureKey = function(){ return self.queueObject.connection.key('failure-resque-retry:' + self.func + ':' + self.argsKey()).replace(/\s/, ''); }; +retry.prototype.maxDelay = function(){ + var self = this; + var maxDelay = self.options.retryDelay || 1; + if(Array.isArray(self.options.backoffStrategy)){ + self.options.backoffStrategy.forEach(function(d){ + if(d > maxDelay){ maxDelay = d; } + }); + } + return maxDelay; +}; + retry.prototype.redis = function(){ var self = this; return self.queueObject.connection.redis; @@ -58,11 +69,12 @@ retry.prototype.attemptUp = function(callback){ self.redis().setnx(key, -1, function(error){ if(error){ return callback(error); } self.redis().incr(key, function(error, retryCount){ - // TODO: expire the key [[ Resque.redis.expire(retry_key, retry_delay + expire_retry_key_after) ]] - // TODO: expire the failure key too if(error){ return callback(error); } - var remaning = self.options.retryLimit - retryCount - 1; - return callback(null, remaning); + self.redis().expire(key, self.maxDelay(), function(error){ + if(error){ return callback(error); } + var remaning = self.options.retryLimit - retryCount - 1; + return callback(null, remaning); + }); }); }); }; @@ -89,8 +101,7 @@ retry.prototype.saveLastError = function(callback){ queue : self.queue }; - var expireDelay = 99999; - self.redis().setex(self.failureKey(), expireDelay, JSON.stringify(data), callback); + self.redis().setex(self.failureKey(), self.maxDelay(), JSON.stringify(data), callback); }; retry.prototype.cleanup = function(callback){ @@ -141,7 +152,11 @@ retry.prototype.after_perform = function(callback){ err: self.worker.error }); - //TODO: add a failed counter, and remove the success counter (total and for worker) + self.redis().decr(self.queueObject.connection.key('stat', 'processed')); + self.redis().decr(self.queueObject.connection.key('stat', 'processed', self.worker.name)); + + self.redis().incr(self.queueObject.connection.key('stat', 'failed')); + self.redis().incr(self.queueObject.connection.key('stat', 'failed', self.worker.name)); delete self.worker.error; return callback(null, true); From 9da0c57ceceab9567f07753cebfe43dee766e88f Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Fri, 4 Mar 2016 15:07:30 -0800 Subject: [PATCH 5/7] retry test --- lib/plugins/retry.js | 2 +- lib/worker.js | 5 +- test/plugins/retry.js | 279 +++++++++++++++++++++++++++++++++++++----- 3 files changed, 251 insertions(+), 35 deletions(-) diff --git a/lib/plugins/retry.js b/lib/plugins/retry.js index 7126c7f2..d738af5d 100644 --- a/lib/plugins/retry.js +++ b/lib/plugins/retry.js @@ -8,7 +8,7 @@ var retry = function(worker, func, queue, job, args, options){ var self = this; if(!options.retryLimit){ options.retryLimit = 1; } - if(!options.retryDelay){ options.retryDelay = 1000; } + if(!options.retryDelay){ options.retryDelay = (1000 * 5); } if(!options.backoffStrategy){ options.backoffStrategy = null; } self.name = 'retry'; diff --git a/lib/worker.js b/lib/worker.js index 703b8e08..18a82e76 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -239,8 +239,9 @@ worker.prototype.fail = function(err, job) { var self = this; self.connection.redis.incr(self.connection.key('stat', 'failed')); self.connection.redis.incr(self.connection.key('stat', 'failed', self.name)); - self.connection.redis.rpush(self.connection.key('failed'), JSON.stringify(self.failurePayload(err, job))); - self.emit('failure', self.queue, job, err); + self.connection.redis.rpush(self.connection.key('failed'), JSON.stringify(self.failurePayload(err, job)), function(){ + self.emit('failure', self.queue, job, err); + }); }; worker.prototype.pause = function() { diff --git a/test/plugins/retry.js b/test/plugins/retry.js index eb5a8550..96c43344 100644 --- a/test/plugins/retry.js +++ b/test/plugins/retry.js @@ -2,15 +2,17 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; var should = require('should'); describe('plugins', function(){ + describe('retry',function(){ var jobs = { "brokenJob": { - plugins: [ 'simpleRetry' ], - pluginOptions: { simpleRetry: { - retryInterval: [100] + plugins: [ 'retry' ], + pluginOptions: { retry: { + retryLimit: 3, + retryDelay: 100, },}, perform: function(a,b,callback){ - callback(new Error("BROKEN"), null); + callback(new Error("BUSTED"), null); }, } }; @@ -19,46 +21,259 @@ describe('plugins', function(){ specHelper.connect(function(){ specHelper.cleanup(function(){ queue = new specHelper.NR.queue({connection: specHelper.cleanConnectionDetails(), queue: specHelper.queue}, jobs); - queue.connect(done); + scheduler = new specHelper.NR.scheduler({connection: specHelper.cleanConnectionDetails(), timeout: specHelper.timeout}); + scheduler.connect(function(){ + scheduler.start(); + queue.connect(done); + }); }); }); }); + after(function(done){ + scheduler.end(done); + }); + afterEach(function(done){ specHelper.cleanup(done); }); - describe('simpleRetry',function(){ - it('bad job should not crash with simpleRetry', function(done){ - queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){ - queue.length(specHelper.queue, function(err, len){ - len.should.equal(1); - - var worker = new specHelper.NR.worker({ - connection: specHelper.connectionDetails, - timeout: specHelper.timeout, - queues: specHelper.queue - }, jobs); - - worker.connect(function(){ - worker.on('success', function(q, job, result){ - specHelper.queue.should.equal(q); - queue.scheduledAt(specHelper.queue, "brokenJob", [1,2], function(err, timestamps){ - timestamps.length.should.be.equal(1); - worker.end(); - done(); - }); - }); - - worker.on('error', function(q, job, error){ - throw new Error('should not be here'); - }); - - worker.start(); + it('will retry the job n times before finally failing', function(done){ + queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){ + queue.length(specHelper.queue, function(err, length){ + length.should.equal(1); + + var failButRetryCount = 0; + var failureCount = 0; + + var worker = new specHelper.NR.worker({ + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + queues: specHelper.queue + }, jobs); + + var complete = function(){ + failButRetryCount.should.equal(2); + failureCount.should.equal(1); + (failButRetryCount + failureCount).should.equal(3); + + specHelper.redis.llen('resque_test:failed', function(error, length){ + length.should.equal(1); + worker.end(done); + }); + }; + + worker.connect(function(){ + + worker.on('success', function(){ + failButRetryCount++; + }); + + worker.on('failure', function(){ + failureCount++; + complete(); + }); + + worker.start(); + }); + }); + }); + }); + + it('can have a retry count set', function(done){ + var customJobs = { + "jobWithRetryCount": { + plugins: [ 'retry' ], + pluginOptions: { retry: { + retryLimit: 5, + retryDelay: 100, + },}, + perform: function(a,b,callback){ + callback(new Error("BUSTED"), null); + }, + } + }; + + queue.enqueue(specHelper.queue, "jobWithRetryCount", [1,2], function(){ + queue.length(specHelper.queue, function(err, length){ + length.should.equal(1); + + var failButRetryCount = 0; + var failureCount = 0; + + var worker = new specHelper.NR.worker({ + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + queues: specHelper.queue + }, customJobs); + + var complete = function(){ + failButRetryCount.should.equal(4); + failureCount.should.equal(1); + (failButRetryCount + failureCount).should.equal(5); + + specHelper.redis.llen('resque_test:failed', function(error, length){ + length.should.equal(1); + worker.end(done); + }); + }; + + worker.connect(function(){ + + worker.on('success', function(){ + failButRetryCount++; + }); + + worker.on('failure', function(){ + failureCount++; + complete(); + }); + + worker.start(); + }); + }); + }); + }); + + it('can have custom retry times set', function(done){ + var customJobs = { + "jobWithBackoffStrategy": { + plugins: [ 'retry' ], + pluginOptions: { retry: { + retryLimit: 5, + backoffStrategy: [1,2,3,4,5] + },}, + perform: function(a,b,callback){ + callback(new Error("BUSTED"), null); + }, + } + }; + + queue.enqueue(specHelper.queue, "jobWithBackoffStrategy", [1,2], function(){ + queue.length(specHelper.queue, function(err, length){ + length.should.equal(1); + + var failButRetryCount = 0; + var failureCount = 0; + + var worker = new specHelper.NR.worker({ + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + queues: specHelper.queue + }, customJobs); + + var complete = function(){ + failButRetryCount.should.equal(4); + failureCount.should.equal(1); + (failButRetryCount + failureCount).should.equal(5); + + specHelper.redis.llen('resque_test:failed', function(error, length){ + length.should.equal(1); + worker.end(done); + }); + }; + + worker.connect(function(){ + + worker.on('success', function(){ + failButRetryCount++; + }); + + worker.on('failure', function(){ + failureCount++; + complete(); + }); + + worker.start(); + }); + }); + }); + }); + + it('when a job fails it should be re-enqueued (and not go to the failure queue)', function(done){ + queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){ + var worker = new specHelper.NR.worker({ + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + queues: specHelper.queue + }, jobs); + + var complete = function(){ + queue.scheduledAt(specHelper.queue, "brokenJob", [1,2], function(err, timestamps){ + timestamps.length.should.be.equal(1); + specHelper.redis.llen('resque_test:failed', function(error, length){ + length.should.equal(0); + worker.end(done); }); }); + }; + + worker.connect(function(){ + worker.on('success', function(){ complete(); }); + worker.start(); }); }); + }); + + it('will handle the stats properly for failing jobs', function(done){ + queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){ + var worker = new specHelper.NR.worker({ + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + queues: specHelper.queue + }, jobs); + + var complete = function(){ + specHelper.redis.get('resque_test:stat:processed', function(error, global_processed){ + specHelper.redis.get('resque_test:stat:failed', function(error, global_failed){ + specHelper.redis.get('resque_test:stat:processed:' + worker.name, function(error, worker_processed){ + specHelper.redis.get('resque_test:stat:failed:' + worker.name, function(error, worker_failed){ + global_processed.should.equal('0'); + global_failed.should.equal('1'); + worker_processed.should.equal('0'); + worker_failed.should.equal('1'); + worker.end(done); + }); + }); + }); + }); + }; + + worker.connect(function(){ + worker.on('success', function(){ complete(); }); + worker.start(); + }); + }); + }); + + it('will set the retry counter & retry data', function(done){ + queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){ + var worker = new specHelper.NR.worker({ + connection: specHelper.cleanConnectionDetails(), + timeout: specHelper.timeout, + queues: specHelper.queue + }, jobs); + + var complete = function(){ + specHelper.redis.get('resque_test:resque-retry:brokenJob:1-2', function(error, retryAttempts){ + specHelper.redis.get('resque_test:failure-resque-retry:brokenJob:1-2', function(error, failureData){ + retryAttempts.should.equal('0'); + failureData = JSON.parse(failureData); + failureData.payload.should.deepEqual([1,2]); + failureData.exception.should.equal('Error: BUSTED'); + failureData.worker.should.equal('brokenJob'); + failureData.queue.should.equal('test_queue'); + worker.end(done); + }); + }); + }; + + worker.connect(function(){ + worker.on('success', function(){ complete(); }); + worker.start(); + }); + }); + }); }); }); From cc097b0860bc7b49d3549136347a33bda7518321 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Fri, 4 Mar 2016 15:13:46 -0800 Subject: [PATCH 6/7] readme for plugins --- README.md | 30 ++++++++++++++++++++++-------- lib/plugins/retry.js | 2 +- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index b0f5be05..72b10168 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # node-resque -Delayed Tasks in nodejs. A very opinionated but compatible API with [resque](https://github.com/resque/resque) and [resque scheduler](https://github.com/resque/resque-scheduler). Resque is a background job system based on redis. It includes priority queus, plugins, locking, delayed jobs, and more! +Delayed Tasks in nodejs. A very opinionated but compatible API with [resque](https://github.com/resque/resque) and [resque scheduler](https://github.com/resque/resque-scheduler). Resque is a background job system based on redis. It includes priority queus, plugins, locking, delayed jobs, and more! [![Nodei stats](https://nodei.co/npm/node-resque.png?downloads=true)](https://npmjs.org/package/node-resque) @@ -38,9 +38,13 @@ var connectionDetails = { var jobs = { "add": { - plugins: [ 'jobLock' ], + plugins: [ 'jobLock', 'retry' ], pluginOptions: { jobLock: {}, + retry: { + retryLimit: 3, + retryDelay: (1000 * 5), + } }, perform: function(a,b,callback){ var answer = a + b; @@ -158,7 +162,7 @@ You can also pass redis client directly. var redisClient = new Redis(); var connectionDetails = { redis: redisClient } -var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs, +var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs, worker.on('error', function(){ // handler errors @@ -282,11 +286,11 @@ scheduler.connect(function(){ var queue = new NR.queue({connection: connectionDetails}, jobs, function(){ schedule.scheduleJob('10,20,30,40,50 * * * * *', function(){ // do this job every 10 seconds, CRON style - // we want to ensure that only one instance of this job is scheduled in our environment at once, + // we want to ensure that only one instance of this job is scheduled in our environment at once, // no matter how many schedulers we have running - if(scheduler.master){ + if(scheduler.master){ console.log(">>> enquing a job"); - queue.enqueue('time', "ticktock", new Date().toString() ); + queue.enqueue('time', "ticktock", new Date().toString() ); } }); }); @@ -379,6 +383,16 @@ var jobs = { } ``` +The plugins which are included with this package are: +- `delayQueueLock` + - If a job with the same name, queue, and args is already in the delayed queue(s), do not enqueue it again +- `jobLock` + - If a job with the same name, queue, and args is already running, put this job back in the queue and try later +- `queueLock` + - If a job with the same name, queue, and args is already in the queue, do not enqueue it again +- `retry` + - If a job fails, retry it N times before finally placing it into the failed queue + ## Multi Worker node-resque provides a wrapper around the `worker` object which will auto-scale the number of resque workers. This will process more than one job at a time as long as there is idle CPU within the event loop. For example, if you have a slow job that sends email via SMTP (with low rendering overhead), we can process many jobs at a time, but if you have a math-heavy operation, we'll stick to 1. The `multiWorker` handles this by spawning more and more node-resque workers and managing the pool. @@ -393,7 +407,7 @@ var connectionDetails = { } var multiWorker = new NR.multiWorker({ - connection: connectionDetails, + connection: connectionDetails, queues: ['slowQueue'], minTaskProcessors: 1, maxTaskProcessors: 100, @@ -433,7 +447,7 @@ The Options available for the multiWorker are: - `toDisconnectProcessors`: If false, all multiWorker children will share a single redis connection. If false, each child will connect and disconnect seperatly. This will lead to more redis connections, but faster retrival of events. ## Presentation -This package was featured heavily in [this presentation I gave](http://blog.evantahler.com/blog/background-tasks-for-node.html) about background jobs + node.js. It contains more examples! +This package was featured heavily in [this presentation I gave](http://blog.evantahler.com/blog/background-tasks-for-node.html) about background jobs + node.js. It contains more examples! ## Acknowledgments Most of this code was inspired by / stolen from [coffee-resque](https://npmjs.org/package/coffee-resque) and [coffee-resque-scheduler](https://github.com/leeadkins/coffee-resque-scheduler). Thanks! diff --git a/lib/plugins/retry.js b/lib/plugins/retry.js index d738af5d..58d6f1ac 100644 --- a/lib/plugins/retry.js +++ b/lib/plugins/retry.js @@ -1,4 +1,4 @@ -// If the job fails, sleep, and re-enqueue it. +// If a job fails, retry it N times before finally placing it into the failed queue // a port of some of the features in https://github.com/lantins/resque-retry var crypto = require('crypto'); From a3e77fe371c9a2d1d5e5c1ab07203a385ab46f12 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Fri, 4 Mar 2016 15:29:40 -0800 Subject: [PATCH 7/7] make fakeredis work --- test/plugins/retry.js | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/test/plugins/retry.js b/test/plugins/retry.js index 96c43344..016690ec 100644 --- a/test/plugins/retry.js +++ b/test/plugins/retry.js @@ -81,6 +81,8 @@ describe('plugins', function(){ }); it('can have a retry count set', function(done){ + this.timeout(1000 * 10); + var customJobs = { "jobWithRetryCount": { plugins: [ 'retry' ], @@ -136,6 +138,8 @@ describe('plugins', function(){ }); it('can have custom retry times set', function(done){ + this.timeout(1000 * 10); + var customJobs = { "jobWithBackoffStrategy": { plugins: [ 'retry' ], @@ -191,6 +195,8 @@ describe('plugins', function(){ }); it('when a job fails it should be re-enqueued (and not go to the failure queue)', function(done){ + this.timeout(1000 * 10); + queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){ var worker = new specHelper.NR.worker({ connection: specHelper.cleanConnectionDetails(), @@ -216,6 +222,8 @@ describe('plugins', function(){ }); it('will handle the stats properly for failing jobs', function(done){ + this.timeout(1000 * 10); + queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){ var worker = new specHelper.NR.worker({ connection: specHelper.cleanConnectionDetails(), @@ -228,10 +236,10 @@ describe('plugins', function(){ specHelper.redis.get('resque_test:stat:failed', function(error, global_failed){ specHelper.redis.get('resque_test:stat:processed:' + worker.name, function(error, worker_processed){ specHelper.redis.get('resque_test:stat:failed:' + worker.name, function(error, worker_failed){ - global_processed.should.equal('0'); - global_failed.should.equal('1'); - worker_processed.should.equal('0'); - worker_failed.should.equal('1'); + String(global_processed).should.equal('0'); + String(global_failed).should.equal('1'); + String(worker_processed).should.equal('0'); + String(worker_failed).should.equal('1'); worker.end(done); }); }); @@ -247,6 +255,8 @@ describe('plugins', function(){ }); it('will set the retry counter & retry data', function(done){ + this.timeout(1000 * 10); + queue.enqueue(specHelper.queue, "brokenJob", [1,2], function(){ var worker = new specHelper.NR.worker({ connection: specHelper.cleanConnectionDetails(), @@ -257,7 +267,7 @@ describe('plugins', function(){ var complete = function(){ specHelper.redis.get('resque_test:resque-retry:brokenJob:1-2', function(error, retryAttempts){ specHelper.redis.get('resque_test:failure-resque-retry:brokenJob:1-2', function(error, failureData){ - retryAttempts.should.equal('0'); + String(retryAttempts).should.equal('0'); failureData = JSON.parse(failureData); failureData.payload.should.deepEqual([1,2]); failureData.exception.should.equal('Error: BUSTED');