Skip to content

Readable streams 309 #312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ const { inherits, format } = require('node:util')

const fp = require('fastify-plugin')
const encodingNegotiator = require('@fastify/accept-negotiator')
const pump = require('pump')
const mimedb = require('mime-db')
const peek = require('peek-stream')
const { Minipass } = require('minipass')
const pumpify = require('pumpify')
const { Readable } = require('readable-stream')
const { pipeline } = require('node:stream')

const { isStream, isGzip, isDeflate, intoAsyncIterator } = require('./lib/utils')

Expand Down Expand Up @@ -267,12 +267,12 @@ function buildRouteCompress (fastify, params, routeOptions, decorateOnly) {
encoding === undefined
? reply.removeHeader('Content-Encoding')
: reply.header('Content-Encoding', 'identity')
pump(stream, payload = unzipStream(params.uncompressStream), onEnd.bind(reply))
pipeline(stream, payload = unzipStream(params.uncompressStream), onEnd.bind(reply))
}
return next(null, payload)
}

if (typeof payload.pipe !== 'function') {
if (typeof payload.pipe !== 'function' && !(payload instanceof ReadableStream)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately using instanceof here would break compatibility everywhere.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I suspected as such.
I think solving the linked issue (#309) probably requires more significant re-architecture of this plugin to more modern standards (as you discussed also here #297 (comment)) which I don't have the time/ability to work on right now, so I will close this PR, though i'll leave my branch here as the test case may be useful to anyone else who might want to pick this up

if (Buffer.byteLength(payload) < params.threshold) {
return next()
}
Expand All @@ -286,7 +286,7 @@ function buildRouteCompress (fastify, params, routeOptions, decorateOnly) {
}

stream = zipStream(params.compressStream, encoding)
pump(payload, stream, onEnd.bind(reply))
pipeline(payload, stream, onEnd.bind(reply))
next(null, stream)
}
}
Expand Down Expand Up @@ -348,7 +348,7 @@ function buildRouteDecompress (fastify, params, routeOptions) {
raw.on('data', trackEncodedLength.bind(decompresser))
raw.on('end', removeEncodedLengthTracking)

next(null, pump(raw, decompresser))
next(null, pipeline(raw, decompresser))
}
}

Expand Down Expand Up @@ -385,18 +385,17 @@ function compress (params) {
encoding === undefined
? this.removeHeader('Content-Encoding')
: this.header('Content-Encoding', 'identity')
pump(stream, payload = unzipStream(params.uncompressStream), onEnd.bind(this))
pipeline(stream, payload = unzipStream(params.uncompressStream), onEnd.bind(this))
}
return this.send(payload)
}

if (typeof payload.pipe !== 'function') {
if (typeof payload.pipe !== 'function' && !(payload instanceof ReadableStream)) {

if (!Buffer.isBuffer(payload) && typeof payload !== 'string') {
payload = this.serialize(payload)
}
}

if (typeof payload.pipe !== 'function') {
if (Buffer.byteLength(payload) < params.threshold) {
return this.send(payload)
}
Expand All @@ -410,7 +409,7 @@ function compress (params) {
}

stream = zipStream(params.compressStream, encoding)
pump(payload, stream, onEnd.bind(this))
pipeline(payload, stream, onEnd.bind(this))
this.send(stream)
}
}
Expand Down
41 changes: 41 additions & 0 deletions test/regression/issue-309.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict'

const { test } = require('node:test')
const Fastify = require('fastify')
const fastifyCompress = require('../..')
const zlib = require('node:zlib')

test('it should handle ReadableStream', async (t) => {
const expectedData = {
data: 'to compress'
}

const fastify = new Fastify()
t.after(() => fastify.close())
fastify.register(async (instance, opts) => {
await fastify.register(fastifyCompress, {
threshold: 8
})
instance.get('/broken', async (req, reply) => {
const stream = ReadableStream.from(JSON.stringify(expectedData))
reply
.type('application/json')
.send(stream)
return reply
})
})
try {
const response = await fastify.inject({
method: 'GET',
url: '/broken',
headers: {
'accept-encoding': 'deflate'
}
})
const payload = zlib.inflateSync(response.rawPayload)

t.assert.deepStrictEqual(JSON.parse(payload.toString('utf-8')), expectedData)
} catch (e) {
t.assert.fail(e)
}
})