diff --git a/.changeset/rare-planets-obey.md b/.changeset/rare-planets-obey.md new file mode 100644 index 00000000000..8e23d24f3fc --- /dev/null +++ b/.changeset/rare-planets-obey.md @@ -0,0 +1,5 @@ +--- +"@effect/platform-node": patch +--- + +optimize streaming response for NodeHttpServer diff --git a/packages/platform-node/src/internal/httpServer.ts b/packages/platform-node/src/internal/httpServer.ts index c04d97256ff..7c7a3f0361f 100644 --- a/packages/platform-node/src/internal/httpServer.ts +++ b/packages/platform-node/src/internal/httpServer.ts @@ -15,6 +15,7 @@ import type * as Multipart from "@effect/platform/Multipart" import type * as Path from "@effect/platform/Path" import * as Socket from "@effect/platform/Socket" import type * as Cause from "effect/Cause" +import * as Chunk from "effect/Chunk" import * as Config from "effect/Config" import * as Effect from "effect/Effect" import * as FiberSet from "effect/FiberSet" @@ -33,7 +34,6 @@ import { pipeline } from "node:stream/promises" import * as WS from "ws" import * as NodeContext from "../NodeContext.js" import * as NodeHttpClient from "../NodeHttpClient.js" -import * as NodeSink from "../NodeSink.js" import { HttpIncomingMessageImpl } from "./httpIncomingMessage.js" import * as internalPlatform from "./httpPlatform.js" @@ -451,27 +451,21 @@ const handleResponse = (request: ServerRequest.HttpServerRequest, response: Serv } case "Stream": { nodeResponse.writeHead(response.status, headers) - return Stream.run( - Stream.mapError( - body.stream, - (cause) => - new Error.ResponseError({ - request, - response, - reason: "Decode", - cause - }) + return body.stream.pipe( + Stream.runForEachChunk((chunk) => + Effect.async((resume) => { + const array = Chunk.toReadonlyArray(chunk) + for (let i = 0; i < array.length - 1; i++) { + nodeResponse.write(array[i]) + } + nodeResponse.write(array[array.length - 1], () => resume(Effect.void)) + }) ), - NodeSink.fromWritable(() => nodeResponse, (cause) => - new Error.ResponseError({ - request, - response, - reason: "Decode", - cause - })) - ).pipe( Effect.interruptible, - Effect.tapErrorCause(handleCause(nodeResponse)) + Effect.matchCause({ + onSuccess: () => nodeResponse.end(), + onFailure: handleCause(nodeResponse) + }) ) } }