Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ Job producers can set an expiry value for the time their job can live in active
queue.create('email', {title: 'email job with TTL'}).ttl(milliseconds).save();
```


### Job inactive TTL

Job producers can set an expiry value for the time their job can live in inactive state, to prevent overload or unavailable workers

```js
queue.create('email', {title: 'email job with TTL'}).ttlInactive(milliseconds).save();
```


### Job Logs

Job-specific logs enable you to expose information to the UI at any point in the job's life-time. To do so simply invoke `job.log()`, which accepts a message string as well as variable-arguments for sprintf-like support:
Expand Down
69 changes: 69 additions & 0 deletions lib/kue.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ Queue.prototype.setupTimers = function() {
}
this.checkJobPromotion(this._options.promotion);
this.checkActiveJobTtl(this._options.promotion);
this.checkInactiveJobTtl(this._options.promotion);
};

/**
Expand Down Expand Up @@ -259,6 +260,74 @@ Queue.prototype.checkActiveJobTtl = function( ttlOptions ) {
}, timeout);
};


Queue.prototype.checkInactiveJobTtl = function( ttlOptions ) {
ttlOptions = ttlOptions || {};
var client = this.client
, self = this
, timeout = ttlOptions.interval || 1000
, lockTtl = 2000
, limit = ttlOptions.limit || 1000;
clearInterval(this.inactiveJobsTtlTimer);
this.inactiveJobsTtlTimer = setInterval(function() {
if(self.shuttingDown || !self.lockClient) return;
self.warlock.lock('inactiveJobsTTL', lockTtl, function( err, unlock ) {
if( err ) {
// Something went wrong and we weren't able to set a lock
self.emit('error', err);
return;
}
if( typeof unlock === 'function' ) {
// If the lock is set successfully by this process, an unlock function is passed to our callback.
// filter only jobs set with a ttl (timestamped) between a large number and current time
client.zrangebyscore(client.getKey('jobs:inactive'), 100000, Date.now(), 'LIMIT', 0, limit, function( err, ids ) {
if( err || !ids.length ) return unlock();

var idsRemaining = ids.slice();
var doUnlock = _.after(ids.length, function(){
self.removeAllListeners( 'job ttl exceeded ack' );
waitForAcks && clearTimeout( waitForAcks );
unlock && unlock();
});

self.on( 'job ttl exceeded ack', function( id ) {
idsRemaining.splice( idsRemaining.indexOf( id ), 1 );
doUnlock();
});

var waitForAcks = setTimeout( function(){
idsRemaining.forEach( function( id ){
id = client.stripFIFO(id);
Job.get(id, function( err, job ) {
if( err ) return doUnlock();
job.failedAttempt( { error: true, message: 'TTL exceeded' }, function( err, hasAttempts, attempt){
if( err ) {
self.emit('error', err, job);
}else if( hasAttempts ) {
events.emit(job.id, 'failed attempt', 'TTL exceeded', attempt);
} else {
events.emit(job.id, 'failed', 'TTL exceeded', attempt);
}
doUnlock();
});
});
});
}, 1000 );

ids.forEach(function( id ) {
id = client.stripFIFO(id);
events.emit(id, 'ttl exceeded');
});
});
} else {
// The lock was not established by us, be silent
}
});
}, timeout);
};



/**
* Runs a LUA script to diff inactive jobs ZSET cardinality
* and helper pop LIST length each `ms` milliseconds and syncs helper LIST.
Expand Down
24 changes: 19 additions & 5 deletions lib/queue/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ exports.get = function( id, jobType, fn ) {
// we can just merge these
job.type = hash.type;
job._ttl = hash.ttl;
job._ttlInactive = hash.ttlInactive;
job._delay = hash.delay;
job.priority(Number(hash.priority));
job._progress = hash.progress;
Expand Down Expand Up @@ -331,6 +332,7 @@ Job.prototype.toJSON = function() {
, delay: this._delay
, workerId: this.workerId
, ttl: this._ttl
, ttlInactive : this._ttlInactive
, attempts: {
made: Number(this._attempts) || 0
, remaining: this._attempts > 0 ? this._max_attempts - this._attempts : Number(this._max_attempts) || 1
Expand All @@ -341,11 +343,11 @@ Job.prototype.toJSON = function() {


Job.prototype.refreshTtl = function() {
('active' === this.state() && this._ttl > 0)
?
this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttl), this.zid, noop)
:
noop();
if ('active' === this.state() && this._ttl > 0) {
this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttl), this.zid, noop)
}else if ('inactive' === this.state() && this._ttlInactive > 0) {
this.client.zadd(this.client.getKey('jobs:' + this.state()), Date.now() + parseInt(this._ttlInactive), this.zid, noop)
}
};


Expand Down Expand Up @@ -493,6 +495,14 @@ Job.prototype.ttl = function( param ) {
return this;
};

Job.prototype.ttlInactive = function( param ) {
if(0 === arguments.length ) return this._ttlInactive;
if( param > 0 ) {
this._ttlInactive = param;
}
return this;
};

Job.prototype._getBackoffImpl = function() {
var self = this
var supported_backoffs = {
Expand Down Expand Up @@ -696,6 +706,7 @@ Job.prototype.state = function( state, fn ) {
('active' === state && this._ttl > 0) ? multi.zadd(client.getKey('jobs:' + state), Date.now() + parseInt(this._ttl), this.zid) : noop();
('active' === state && !this._ttl) ? multi.zadd(client.getKey('jobs:' + state), this._priority<0?this._priority:-this._priority, this.zid) : noop();
('inactive' === state) ? multi.lpush(client.getKey(this.type + ':jobs'), 1) : noop();
('inactive' === state && this._ttlInactive > 0) ? multi.zadd(client.getKey('jobs:' + state), Date.now() + parseInt(this._ttlInactive), this.zid) : noop();

this.set('updated_at', Date.now());
this._state = state;
Expand Down Expand Up @@ -848,6 +859,9 @@ Job.prototype.update = function( fn ) {
if( this._ttl ) {
this.set('ttl', this._ttl);
}
if (this._ttlInactive ){
this.set('ttlInactive', this._ttlInactive);
}
if( this._removeOnComplete ) this.set('removeOnComplete', this._removeOnComplete);
if( this._backoff ) {
if( _.isPlainObject(this._backoff) ) this.set('backoff', JSON.stringify(this._backoff));
Expand Down