Skip to content

Configurable job parallelism #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 23, 2016
Merged
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ io(new EventEmitter): the channel of internal communication with the job work
processJob(core.process):function to run a job. (task, config, ondone)
pluginDirs: the directories in which to look for plugins
dataDir($HOME/.strider): the directory in which to clone/test/etc
concurrentJobs(1): maximum number of jobs to execute at once
```

### Events
Expand Down
48 changes: 27 additions & 21 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ var fs = require('fs-extra')
, cachier = require('./cachier')
, keeper = require('dirkeeper')
, JobData = require('./jobdata')
, JobQueue = require('./jobqueue')
, branchFromJob = require('./utils').branchFromJob

// timeout for callbacks. Helps to kill misbehaving plugins, etc
function t(time, done) {
Expand Down Expand Up @@ -56,28 +58,28 @@ function Runner(emitter, config) {
logger: console,
processJob: core.process,
pluginDir: path.join(__dirname, '../node_modules'),
dataDir: process.env.STRIDER_CLONE_DEST || dotStrider
dataDir: process.env.STRIDER_CLONE_DEST || dotStrider,
concurrentJobs: parseInt(process.env.CONCURRENT_JOBS || '1', 10) || 1,
}, config)
this.emitter = emitter
this.log = this.config.logger.log
this.queue = async.queue(this.processJob.bind(this), 1)
this.queue = new JobQueue(this.processJob.bind(this), this.config.concurrentJobs)
this.io = this.config.io
this.callbackMap = {}
this.hooks = []
this.jobdata = new JobData(this.io)
this.attach()
}

// base: the base directory where all jobs data is stored
// base: the base directory where all jobs data for this project and branch is stored
// the job object.
// done(err, {base:, data:, cache:})
function initJobDirs(base, job, cache, done) {
var name = job.project.name
, dirs = {
base: base,
data: path.join(base, "data", name.replace('/','-') + "-" + job._id.toString()),
cache: cache
}
function initJobDirs(branchBase, job, cache, done) {
var dirs = {
base: branchBase,
data: path.join(branchBase, 'job-' + job._id.toString()),
cache: cache
}

async.series([
function checkData(next) {
Expand Down Expand Up @@ -179,13 +181,13 @@ Runner.prototype = {
this.jobdata.add(job)
this.log('[runner:' + this.id + '] Queued new job. Project: ' + job.project.name + ' Job ID: ' + job._id)
this.emitter.emit('browser.update', job.project.name, 'job.status.queued', [job._id, now])
this.queue.push({job: job, config: config})
this.queue.push(job, config)
},

cancelJob: function (id) {
var jobdata
for (var i=0; i<this.queue.tasks.length; i++) {
if (this.queue.tasks[i].data.job._id.toString() === id.toString()) {
if (this.queue.tasks[i].job._id.toString() === id.toString()) {
this.queue.tasks.splice(i, 1)
this.log('[runner:' + this.id + '] Cancelled job. Job ID: ' + id)
jobdata = this.jobdata.pop(id)
Expand Down Expand Up @@ -280,10 +282,8 @@ Runner.prototype = {
})
},

processJob: function (data, next) {
var job = data.job
, config = data.config
, cache = this.getCache(job.project)
processJob: function (job, config, next) {
var cache = this.getCache(job.project)
, now = new Date()
, self = this
, oldnext = next
Expand All @@ -292,11 +292,18 @@ Runner.prototype = {
oldnext()
}
this.callbackMap[job._id] = next
// Keep around N most recent build directories.
// Default is 0, ie wipe at start of each run.

var projectName = job.project.name.replace('/', '-')
, branchName = branchFromJob(job).replace('/', '-')
, branchBase = path.join(self.config.dataDir, 'data', projectName + '-' + branchName)

// Keep around N most recent build directories for this branch.
// The default is 0, ie wipe at start of each run.
// Later, this can be configurable in the UI.
keeper({baseDir: path.join(this.config.dataDir, "data"), count: 0}, function(err) {
initJobDirs(self.config.dataDir, job, cache.base, jobDirsReady)
keeper({baseDir: branchBase, count: 0}, function(err) {
if (err) throw err;

initJobDirs(branchBase, job, cache.base, jobDirsReady)
})

var jobDirsReady = function(err, dirs) {
Expand Down Expand Up @@ -373,4 +380,3 @@ Runner.prototype = {
};
}
};

114 changes: 114 additions & 0 deletions lib/jobqueue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
var branchFromJob = require('./utils').branchFromJob

module.exports = JobQueue

function JobQueue(handler, concurrency) {
this.concurrency = concurrency
this.handler = handler
this.tasks = []

this.active = {}

this.drainCallback = null
}

JobQueue.prototype = {
// public api

// Add a job to the end of the queue. If the queue is not currently saturated, immediately
// schedule a task to handle the new job. If a callback is provided, call it when this job's task
// completes.
push: function (job, config, callback) {
var task = {
job: job,
config: config,
callback: callback || function () {}
}

task.id = task.job._id

// Tasks with identical keys will be prevented from being scheduled concurrently.
task.key = task.job.project + branchFromJob(task.job)

this.tasks.push(task)

// Defer task execution to the next event loop tick to ensure that the push() function's
// callback is *always* invoked asynchronously.
// http://blog.izs.me/post/59142742143/designing-apis-for-asynchrony
process.nextTick(this.drain.bind(this))
},

// Launch the asynchronous handler function for each eligible waiting task until the queue is
// saturated.
drain: function () {
var self = this

// See how much capacity we have left to fill.
var launchCount = this.concurrency - Object.keys(this.active).length

// Identify up to launchCount eligible tasks, giving priority to those earlier in the queue.
var offset = 0
var launchTasks = []
while (launchTasks.length < launchCount && this.tasks.length > offset) {
var task = this.tasks[offset]

if (task.key in this.active) {
// This task cannot run right now, so skip it.
offset += 1
} else {
// This task is eligible to run. Remove it from the queue and prepare it to launch.
this.tasks.splice(offset, 1)
launchTasks.push(task)
}
}

// Create a task completion callback. Remove the task from the active set, invoke the tasks'
// push() callback, then drain() again to see if another task is ready to run.
var makeTaskHandler = function (task) {
return function (err) {
delete self.active[task.key]

task.callback(err)

// Defer the next drain() call again in case the task's callback was synchronous.
process.nextTick(self.drain.bind(self))
}
}

// Launch the queue handler for each chosen task.
for (var i = 0; i < launchTasks.length; i++) {
var each = launchTasks[i]

this.active[each.key] = each

this.handler(each.job, each.config, makeTaskHandler(each))
}

// Fire and unset the drain callback if one has been registered.
if (this.drainCallback) {
var lastCallback = this.drainCallback
this.drainCallback = null
lastCallback()
}
},

// Count the number of tasks waiting on the queue.
length: function () {
return this.tasks.length
},

// Return true if "id" corresponds to the job ID of an active job.
isActive: function (id) {
for (var key in this.active) {
if (this.active.hasOwnProperty(key) && this.active[key].id === id) {
return true
}
}
return false
},

// Fire a callback the next time that a drain() is executed.
onNextDrain: function (callback) {
this.drainCallback = callback
}
}
25 changes: 23 additions & 2 deletions lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@

var _ = require('lodash')

, consts = require('./consts')
, stringify = require('json-stable-stringify')

module.exports = {
ensureCommand: ensureCommand
ensureCommand: ensureCommand,
branchFromJob: branchFromJob
}

function ensureCommand(phase) {
Expand All @@ -16,3 +17,23 @@ function ensureCommand(phase) {
return command
}

// Extract a branch name, suitable for use as a filesystem path, from the contents of the job's
// ref field. Prefer common ref structures when available (branch, fetch) but fall back to something
// that's ugly but unique and stable for arbitrary ref payloads.
function branchFromJob(job) {
var ref = job.ref

if (typeof ref === 'undefined') {
return ''
}

if ('branch' in ref) {
return ref.branch
} else if ('fetch' in ref) {
return ref.fetch
} else {
// This is going to be incredibly ugly, but it will be (a) consistent for consistent refs and
// (b) include only filesystem-safe characters.
return encodeURIComponent(stringify(ref))
}
}
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
"strider-runner-core": "~2.0.0",
"strider-extension-loader": "~0.4.3",
"fs-extra": "~0.8.1",
"dirkeeper": "~0.2.0"
"dirkeeper": "~0.2.0",
"json-stable-stringify": "~1.0.1"
},
"devDependencies": {
"mocha": "^1.21.1",
Expand Down
Loading