diff --git a/lib/tcp-server.js b/lib/tcp-server.js index 2dc61af..73370af 100644 --- a/lib/tcp-server.js +++ b/lib/tcp-server.js @@ -9,6 +9,9 @@ module.exports = function serverPlugin (upring, opts, next) { upring._server.listen(opts.port, opts.host, onListen) upring._server.on('error', upring.emit.bind(upring, 'error')) + const genReqId = opts.genReqId || reqIdGenFactory() + upring.genReqId = genReqId + function handler (stream) { if (upring.closed) { stream.on('error', noop) @@ -24,13 +27,27 @@ module.exports = function serverPlugin (upring, opts, next) { upring.log.debug({ address: stream.address() }, 'closed connection') upring._inbound.delete(instance) }) - instance.on('request', upring._dispatch) + instance.on('request', onRequest) + } + + function onRequest (req, reply) { + req.id = genReqId(req) + req.log = upring.log.child({ reqId: req.id }) + upring._dispatch(req, reply) } function onListen () { upring.log.debug({ address: upring._server.address() }, 'listening') next() } + + function reqIdGenFactory () { + var maxInt = 2147483647 + var nextReqId = 0 + return function _genReqId (req) { + return req.id || (nextReqId = (nextReqId + 1) & maxInt) + } + } } function noop () {} diff --git a/test/basic.test.js b/test/basic.test.js index 81e5bcf..e402fec 100644 --- a/test/basic.test.js +++ b/test/basic.test.js @@ -176,7 +176,7 @@ test('request to node 2', { timeout: 5000 }, (t) => { i2.on('request', (req, reply) => { t.pass('request to i2') - t.deepEqual(req, { hello: 'world' }, 'correct message') + t.strictEqual(req.hello, 'world', 'correct message') reply(null, { a: 'response' }) }) @@ -332,3 +332,99 @@ test('async await support', t => { } t.end() }) + +test('every request should have an id and a child logger', { timeout: 5000 }, (t) => { + t.plan(12) + + bootTwo(t, (i1, i2) => { + let i1Key = getKey(i1) + let i2Key = getKey(i2) + + i1.request({ + key: i2Key, + hello: 42 + }, (err, response) => { + t.error(err) + t.ok(response.log) + delete response.log + t.deepEqual(response, { + replying: 'i2', + id: 1 + }, 'response matches') + }) + + i2.request({ + key: i1Key, + hello: 42 + }, (err, response) => { + t.error(err) + t.ok(response.log) + delete response.log + t.deepEqual(response, { + replying: 'i1', + id: 1 + }, 'response matches') + }) + + i1.on('request', (req, reply) => { + t.ok(req.id === 1) + t.ok(req.log) + reply(null, { replying: 'i1' }) + }) + + i2.on('request', (req, reply) => { + t.ok(req.id === 1) + t.ok(req.log) + reply(null, { replying: 'i2' }) + }) + }) +}) + +test('request and response should keep the id', { timeout: 5000 }, (t) => { + t.plan(12) + + bootTwo(t, (i1, i2) => { + let i1Key = getKey(i1) + let i2Key = getKey(i2) + + i1.request({ + key: i2Key, + hello: 42, + id: 'abc' + }, (err, response) => { + t.error(err) + t.ok(response.log) + delete response.log + t.deepEqual(response, { + replying: 'i2', + id: 'abc' + }, 'response matches') + }) + + i2.request({ + key: i1Key, + hello: 42, + id: 123 + }, (err, response) => { + t.error(err) + t.ok(response.log) + delete response.log + t.deepEqual(response, { + replying: 'i1', + id: 123 + }, 'response matches') + }) + + i1.on('request', (req, reply) => { + t.ok(req.id === 123) + t.ok(req.log) + reply(null, { replying: 'i1' }) + }) + + i2.on('request', (req, reply) => { + t.ok(req.id === 'abc') + t.ok(req.log) + reply(null, { replying: 'i2' }) + }) + }) +}) diff --git a/upring.js b/upring.js index 275a3fe..a108089 100644 --- a/upring.js +++ b/upring.js @@ -208,6 +208,8 @@ UpRing.prototype.request = function (obj, callback, _count) { return } + obj.id = this.genReqId(obj) + if (this._hashring.allocatedToMe(obj.key)) { this.log.trace({ msg: obj }, 'local call') this._dispatch(obj, dezalgo(callback)) @@ -249,6 +251,10 @@ UpRing.prototype.request = function (obj, callback, _count) { return } } + if (obj.id != null) { + result.id = obj.id + result.log = this.log.child({ reqId: result.id }) + } callback(err, result) }) }