Skip to content

Commit 01d2d15

Browse files
author
Ilya Radchenko
committed
Merge pull request #29 from smashwilson/parallel-jobs
Configurable job parallelism
2 parents cc5a8bd + cc26e79 commit 01d2d15

File tree

6 files changed

+345
-24
lines changed

6 files changed

+345
-24
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ io(new EventEmitter): the channel of internal communication with the job work
2020
processJob(core.process):function to run a job. (task, config, ondone)
2121
pluginDirs: the directories in which to look for plugins
2222
dataDir($HOME/.strider): the directory in which to clone/test/etc
23+
concurrentJobs(1): maximum number of jobs to execute at once
2324
```
2425

2526
### Events

lib/index.js

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ var fs = require('fs-extra')
1212
, cachier = require('./cachier')
1313
, keeper = require('dirkeeper')
1414
, JobData = require('./jobdata')
15+
, JobQueue = require('./jobqueue')
16+
, branchFromJob = require('./utils').branchFromJob
1517

1618
// timeout for callbacks. Helps to kill misbehaving plugins, etc
1719
function t(time, done) {
@@ -56,28 +58,28 @@ function Runner(emitter, config) {
5658
logger: console,
5759
processJob: core.process,
5860
pluginDir: path.join(__dirname, '../node_modules'),
59-
dataDir: process.env.STRIDER_CLONE_DEST || dotStrider
61+
dataDir: process.env.STRIDER_CLONE_DEST || dotStrider,
62+
concurrentJobs: parseInt(process.env.CONCURRENT_JOBS || '1', 10) || 1,
6063
}, config)
6164
this.emitter = emitter
6265
this.log = this.config.logger.log
63-
this.queue = async.queue(this.processJob.bind(this), 1)
66+
this.queue = new JobQueue(this.processJob.bind(this), this.config.concurrentJobs)
6467
this.io = this.config.io
6568
this.callbackMap = {}
6669
this.hooks = []
6770
this.jobdata = new JobData(this.io)
6871
this.attach()
6972
}
7073

71-
// base: the base directory where all jobs data is stored
74+
// base: the base directory where all jobs data for this project and branch is stored
7275
// the job object.
7376
// done(err, {base:, data:, cache:})
74-
function initJobDirs(base, job, cache, done) {
75-
var name = job.project.name
76-
, dirs = {
77-
base: base,
78-
data: path.join(base, "data", name.replace('/','-') + "-" + job._id.toString()),
79-
cache: cache
80-
}
77+
function initJobDirs(branchBase, job, cache, done) {
78+
var dirs = {
79+
base: branchBase,
80+
data: path.join(branchBase, 'job-' + job._id.toString()),
81+
cache: cache
82+
}
8183

8284
async.series([
8385
function checkData(next) {
@@ -179,13 +181,13 @@ Runner.prototype = {
179181
this.jobdata.add(job)
180182
this.log('[runner:' + this.id + '] Queued new job. Project: ' + job.project.name + ' Job ID: ' + job._id)
181183
this.emitter.emit('browser.update', job.project.name, 'job.status.queued', [job._id, now])
182-
this.queue.push({job: job, config: config})
184+
this.queue.push(job, config)
183185
},
184186

185187
cancelJob: function (id) {
186188
var jobdata
187189
for (var i=0; i<this.queue.tasks.length; i++) {
188-
if (this.queue.tasks[i].data.job._id.toString() === id.toString()) {
190+
if (this.queue.tasks[i].job._id.toString() === id.toString()) {
189191
this.queue.tasks.splice(i, 1)
190192
this.log('[runner:' + this.id + '] Cancelled job. Job ID: ' + id)
191193
jobdata = this.jobdata.pop(id)
@@ -280,10 +282,8 @@ Runner.prototype = {
280282
})
281283
},
282284

283-
processJob: function (data, next) {
284-
var job = data.job
285-
, config = data.config
286-
, cache = this.getCache(job.project)
285+
processJob: function (job, config, next) {
286+
var cache = this.getCache(job.project)
287287
, now = new Date()
288288
, self = this
289289
, oldnext = next
@@ -292,11 +292,18 @@ Runner.prototype = {
292292
oldnext()
293293
}
294294
this.callbackMap[job._id] = next
295-
// Keep around N most recent build directories.
296-
// Default is 0, ie wipe at start of each run.
295+
296+
var projectName = job.project.name.replace('/', '-')
297+
, branchName = branchFromJob(job).replace('/', '-')
298+
, branchBase = path.join(self.config.dataDir, 'data', projectName + '-' + branchName)
299+
300+
// Keep around N most recent build directories for this branch.
301+
// The default is 0, ie wipe at start of each run.
297302
// Later, this can be configurable in the UI.
298-
keeper({baseDir: path.join(this.config.dataDir, "data"), count: 0}, function(err) {
299-
initJobDirs(self.config.dataDir, job, cache.base, jobDirsReady)
303+
keeper({baseDir: branchBase, count: 0}, function(err) {
304+
if (err) throw err;
305+
306+
initJobDirs(branchBase, job, cache.base, jobDirsReady)
300307
})
301308

302309
var jobDirsReady = function(err, dirs) {
@@ -373,4 +380,3 @@ Runner.prototype = {
373380
};
374381
}
375382
};
376-

lib/jobqueue.js

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
var branchFromJob = require('./utils').branchFromJob
2+
3+
module.exports = JobQueue
4+
5+
function JobQueue(handler, concurrency) {
6+
this.concurrency = concurrency
7+
this.handler = handler
8+
this.tasks = []
9+
10+
this.active = {}
11+
12+
this.drainCallback = null
13+
}
14+
15+
JobQueue.prototype = {
16+
// public api
17+
18+
// Add a job to the end of the queue. If the queue is not currently saturated, immediately
19+
// schedule a task to handle the new job. If a callback is provided, call it when this job's task
20+
// completes.
21+
push: function (job, config, callback) {
22+
var task = {
23+
job: job,
24+
config: config,
25+
callback: callback || function () {}
26+
}
27+
28+
task.id = task.job._id
29+
30+
// Tasks with identical keys will be prevented from being scheduled concurrently.
31+
task.key = task.job.project + branchFromJob(task.job)
32+
33+
this.tasks.push(task)
34+
35+
// Defer task execution to the next event loop tick to ensure that the push() function's
36+
// callback is *always* invoked asynchronously.
37+
// http://blog.izs.me/post/59142742143/designing-apis-for-asynchrony
38+
process.nextTick(this.drain.bind(this))
39+
},
40+
41+
// Launch the asynchronous handler function for each eligible waiting task until the queue is
42+
// saturated.
43+
drain: function () {
44+
var self = this
45+
46+
// See how much capacity we have left to fill.
47+
var launchCount = this.concurrency - Object.keys(this.active).length
48+
49+
// Identify up to launchCount eligible tasks, giving priority to those earlier in the queue.
50+
var offset = 0
51+
var launchTasks = []
52+
while (launchTasks.length < launchCount && this.tasks.length > offset) {
53+
var task = this.tasks[offset]
54+
55+
if (task.key in this.active) {
56+
// This task cannot run right now, so skip it.
57+
offset += 1
58+
} else {
59+
// This task is eligible to run. Remove it from the queue and prepare it to launch.
60+
this.tasks.splice(offset, 1)
61+
launchTasks.push(task)
62+
}
63+
}
64+
65+
// Create a task completion callback. Remove the task from the active set, invoke the tasks'
66+
// push() callback, then drain() again to see if another task is ready to run.
67+
var makeTaskHandler = function (task) {
68+
return function (err) {
69+
delete self.active[task.key]
70+
71+
task.callback(err)
72+
73+
// Defer the next drain() call again in case the task's callback was synchronous.
74+
process.nextTick(self.drain.bind(self))
75+
}
76+
}
77+
78+
// Launch the queue handler for each chosen task.
79+
for (var i = 0; i < launchTasks.length; i++) {
80+
var each = launchTasks[i]
81+
82+
this.active[each.key] = each
83+
84+
this.handler(each.job, each.config, makeTaskHandler(each))
85+
}
86+
87+
// Fire and unset the drain callback if one has been registered.
88+
if (this.drainCallback) {
89+
var lastCallback = this.drainCallback
90+
this.drainCallback = null
91+
lastCallback()
92+
}
93+
},
94+
95+
// Count the number of tasks waiting on the queue.
96+
length: function () {
97+
return this.tasks.length
98+
},
99+
100+
// Return true if "id" corresponds to the job ID of an active job.
101+
isActive: function (id) {
102+
for (var key in this.active) {
103+
if (this.active.hasOwnProperty(key) && this.active[key].id === id) {
104+
return true
105+
}
106+
}
107+
return false
108+
},
109+
110+
// Fire a callback the next time that a drain() is executed.
111+
onNextDrain: function (callback) {
112+
this.drainCallback = callback
113+
}
114+
}

lib/utils.js

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11

22
var _ = require('lodash')
3-
43
, consts = require('./consts')
4+
, stringify = require('json-stable-stringify')
55

66
module.exports = {
7-
ensureCommand: ensureCommand
7+
ensureCommand: ensureCommand,
8+
branchFromJob: branchFromJob
89
}
910

1011
function ensureCommand(phase) {
@@ -16,3 +17,23 @@ function ensureCommand(phase) {
1617
return command
1718
}
1819

20+
// Extract a branch name, suitable for use as a filesystem path, from the contents of the job's
21+
// ref field. Prefer common ref structures when available (branch, fetch) but fall back to something
22+
// that's ugly but unique and stable for arbitrary ref payloads.
23+
function branchFromJob(job) {
24+
var ref = job.ref
25+
26+
if (typeof ref === 'undefined') {
27+
return ''
28+
}
29+
30+
if ('branch' in ref) {
31+
return ref.branch
32+
} else if ('fetch' in ref) {
33+
return ref.fetch
34+
} else {
35+
// This is going to be incredibly ugly, but it will be (a) consistent for consistent refs and
36+
// (b) include only filesystem-safe characters.
37+
return encodeURIComponent(stringify(ref))
38+
}
39+
}

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
"strider-runner-core": "~2.0.0",
1616
"strider-extension-loader": "~0.4.3",
1717
"fs-extra": "~0.8.1",
18-
"dirkeeper": "~0.2.0"
18+
"dirkeeper": "~0.2.0",
19+
"json-stable-stringify": "~1.0.1"
1920
},
2021
"devDependencies": {
2122
"mocha": "^1.21.1",

0 commit comments

Comments
 (0)