Skip to content

Commit 9d85331

Browse files
committed
support res.removeLister("drain"), res.once("drain")
Fixes #152 Fixes #135
1 parent edb43f3 commit 9d85331

File tree

2 files changed

+221
-0
lines changed

2 files changed

+221
-0
lines changed

index.js

+28
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ function compression (options) {
6464

6565
var _end = res.end
6666
var _on = res.on
67+
var _removeListener = res.removeListener
6768
var _write = res.write
6869

6970
// flush
@@ -131,6 +132,33 @@ function compression (options) {
131132
return this
132133
}
133134

135+
res.addListener = res.on
136+
137+
res.removeListener = function removeListener (type, listener) {
138+
if (!listeners || type !== 'drain') {
139+
return _removeListener.call(this, type, listener)
140+
}
141+
142+
if (stream) {
143+
return stream.removeListener(type, listener)
144+
}
145+
146+
// remove buffered listener
147+
for (var i = listeners.length - 1; i >= 0; i--) {
148+
if (listeners[i][0] === type && listeners[i][1] === listener) {
149+
listeners.splice(i, 1)
150+
}
151+
}
152+
153+
return this
154+
}
155+
156+
/* istanbul ignore next */
157+
if (res.off) {
158+
// emitter.off was added in Node.js v10+; don't add it to earlier versions
159+
res.off = res.removeListener
160+
}
161+
134162
function nocompress (msg) {
135163
debug('no compression: %s', msg)
136164
addListeners(res, _on, listeners)

test/compression.js

+193
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,192 @@ describe('compression()', function () {
305305
.expect(200, done)
306306
})
307307

308+
it('should support removeListener("drain") after on("drain"); stream present', function (done) {
309+
// compression doesn't proxy listenerCount() to the compression stream, so
310+
// instead watch for a MaxListenersExceededWarning
311+
var hasWarned = false
312+
var onWarning = function () {
313+
hasWarned = true
314+
}
315+
process.on('warning', onWarning)
316+
var server = createServer({threshold: 0}, function (req, res) {
317+
res.setHeader('Content-Type', 'text/plain')
318+
var len = bytes('40kb')
319+
var buf = Buffer.alloc(len, '.')
320+
res.write(buf)
321+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
322+
var listener = function () {}
323+
res.on('drain', listener)
324+
res.removeListener('drain', listener)
325+
}
326+
res.end()
327+
})
328+
329+
request(server)
330+
.get('/')
331+
.set('Accept-Encoding', 'gzip')
332+
.expect(function () {
333+
process.removeListener('warning', onWarning)
334+
assert.ok(!hasWarned)
335+
})
336+
.expect(200, done)
337+
})
338+
339+
it('should support removeListener("drain") after addListener("drain")', function (done) {
340+
var hasWarned = false
341+
var onWarning = function () {
342+
hasWarned = true
343+
}
344+
process.on('warning', onWarning)
345+
var server = createServer({threshold: 0}, function (req, res) {
346+
res.setHeader('Content-Type', 'text/plain')
347+
var len = bytes('40kb')
348+
var buf = Buffer.alloc(len, '.')
349+
res.write(buf)
350+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
351+
var listener = function () {}
352+
res.addListener('drain', listener)
353+
res.removeListener('drain', listener)
354+
}
355+
res.end()
356+
})
357+
358+
request(server)
359+
.get('/')
360+
.set('Accept-Encoding', 'gzip')
361+
.expect(function () {
362+
process.removeListener('warning', onWarning)
363+
assert.ok(!hasWarned)
364+
})
365+
.expect(200, done)
366+
})
367+
368+
it('should support off("drain") after addListener("drain")', function (done) {
369+
if (!require('events').EventEmitter.prototype.off) { // off was added in Node.js v10
370+
this.skip()
371+
}
372+
var hasWarned = false
373+
var onWarning = function () {
374+
hasWarned = true
375+
}
376+
process.on('warning', onWarning)
377+
var server = createServer({threshold: 0}, function (req, res) {
378+
res.setHeader('Content-Type', 'text/plain')
379+
var len = bytes('40kb')
380+
var buf = Buffer.alloc(len, '.')
381+
res.write(buf)
382+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
383+
var listener = function () {}
384+
res.addListener('drain', listener)
385+
res.off('drain', listener)
386+
}
387+
res.end()
388+
})
389+
390+
request(server)
391+
.get('/')
392+
.set('Accept-Encoding', 'gzip')
393+
.expect(function () {
394+
process.removeListener('warning', onWarning)
395+
assert.ok(!hasWarned)
396+
})
397+
.expect(200, done)
398+
})
399+
400+
it('should support removeListener("drain"); buffered', function (done) {
401+
// Variant of above tests for scenario when the listener is buffered (stream
402+
// is not yet present).
403+
var hasWarned = false
404+
var onWarning = function () {
405+
hasWarned = true
406+
}
407+
process.on('warning', onWarning)
408+
var server = createServer({threshold: 0}, function (req, res) {
409+
res.setHeader('Content-Type', 'text/plain')
410+
res.on('end', function () {})
411+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
412+
var listener = function () {}
413+
res.on('drain', listener)
414+
res.removeListener('drain', listener)
415+
}
416+
res.end()
417+
})
418+
419+
request(server)
420+
.get('/')
421+
.set('Accept-Encoding', 'gzip')
422+
.expect(function () {
423+
process.removeListener('warning', onWarning)
424+
assert.ok(!hasWarned)
425+
})
426+
.expect(200, done)
427+
})
428+
429+
it('should support removeListener("drain"); multiple bindings of same listener, buffered', function (done) {
430+
// Variant of above test for scenario when the listener is buffered (stream
431+
// is not yet present) and the same listener is added two or more times.
432+
var hasWarned = false
433+
var onWarning = function () {
434+
hasWarned = true
435+
}
436+
process.on('warning', onWarning)
437+
var server = createServer({threshold: 0}, function (req, res) {
438+
res.setHeader('Content-Type', 'text/plain')
439+
for (var times = 0; times < res.getMaxListeners() + 1; times++) {
440+
var listener = function () {}
441+
res.on('drain', listener)
442+
res.on('drain', listener)
443+
res.removeListener('drain', listener)
444+
}
445+
res.end()
446+
})
447+
448+
request(server)
449+
.get('/')
450+
.set('Accept-Encoding', 'gzip')
451+
.expect(function () {
452+
process.removeListener('warning', onWarning)
453+
assert.ok(!hasWarned)
454+
})
455+
.expect(200, done)
456+
})
457+
458+
it('should not leak event listeners when res.unpipe() is used (#135)', function (done) {
459+
// unpipe and stream.Readable were added in v0.9.4
460+
var stream = require('stream')
461+
if (!(stream.Readable && stream.Readable.prototype.unpipe)) {
462+
this.skip()
463+
}
464+
465+
var hasWarned = false
466+
var onWarning = function () {
467+
hasWarned = true
468+
}
469+
var server = createServer({threshold: 0}, function (req, res) {
470+
var times = 0
471+
var int = setInterval(function () {
472+
var rs = require('fs').createReadStream('does not exist')
473+
rs.on('error', function (e) {
474+
rs.unpipe(res)
475+
})
476+
rs.pipe(res)
477+
if (times++ > res.getMaxListeners()) {
478+
clearInterval(int)
479+
res.end('hello, world')
480+
}
481+
})
482+
})
483+
484+
request(server)
485+
.get('/')
486+
.set('Accept-Encoding', 'gzip')
487+
.expect(function () {
488+
process.removeListener('warning', onWarning)
489+
assert.ok(!hasWarned)
490+
})
491+
.expect(200, done)
492+
})
493+
308494
describe('threshold', function () {
309495
it('should not compress responses below the threshold size', function (done) {
310496
var server = createServer({ threshold: '1kb' }, function (req, res) {
@@ -669,6 +855,13 @@ function createServer (opts, fn) {
669855
return
670856
}
671857

858+
if (typeof res.getMaxListeners !== 'function') {
859+
// Added in v0.11.2
860+
res.getMaxListeners = function getMaxListeners () {
861+
return 10
862+
}
863+
}
864+
672865
fn(req, res)
673866
})
674867
})

0 commit comments

Comments
 (0)