Skip to content

Commit

Permalink
optimize Stream.toReadableStream (#4352)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Jan 28, 2025
1 parent 430c846 commit 9172efb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .changeset/thirty-clocks-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

optimize Stream.toReadableStream
61 changes: 24 additions & 37 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import * as RcRef from "../RcRef.js"
import * as Ref from "../Ref.js"
import * as Runtime from "../Runtime.js"
import * as Schedule from "../Schedule.js"
import * as Scope from "../Scope.js"
import type * as Scope from "../Scope.js"
import type * as Sink from "../Sink.js"
import type * as Stream from "../Stream.js"
import type * as Emit from "../StreamEmit.js"
Expand Down Expand Up @@ -7085,52 +7085,39 @@ export const toReadableStreamRuntime = dual<
runtime: Runtime.Runtime<XR>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): ReadableStream<A> => {
const runSync = Runtime.runSync(runtime)
const runFork = Runtime.runFork(runtime)
let currentResolve: (() => void) | undefined = undefined
let fiber: Fiber.RuntimeFiber<void, E> | undefined = undefined
const latch = Effect.unsafeMakeLatch(false)

let pull: Effect.Effect<void, never, R>
let scope: Scope.CloseableScope
return new ReadableStream<A>({
start(controller) {
scope = runSync(Scope.make())
const pullChunk: Effect.Effect<Chunk.Chunk<A>, Option.Option<E>, R> = pipe(
toPull(self),
Scope.extend(scope),
runSync,
Effect.flatMap((chunk) => Chunk.isEmpty(chunk) ? pullChunk : Effect.succeed(chunk))
)
pull = pipe(
pullChunk,
Effect.tap((chunk) =>
Effect.sync(() => {
Chunk.map(chunk, (a) => {
controller.enqueue(a)
})
})
),
Effect.tapErrorCause(() => Scope.close(scope, Exit.void)),
Effect.catchTags({
"None": () =>
Effect.sync(() => {
controller.close()
}),
"Some": (error) =>
Effect.sync(() => {
controller.error(error.value)
})
}),
Effect.asVoid
)
fiber = runFork(runForEachChunk(self, (chunk) =>
latch.whenOpen(Effect.sync(() => {
latch.unsafeClose()
for (const item of chunk) {
controller.enqueue(item)
}
currentResolve!()
currentResolve = undefined
}))))
fiber.addObserver((exit) => {
if (exit._tag === "Failure") {
controller.error(Cause.squash(exit.cause))
} else {
controller.close()
}
})
},
pull() {
return new Promise<void>((resolve) => {
runFork(pull, { scope }).addObserver((_) => resolve())
currentResolve = resolve
Effect.runSync(latch.open)
})
},
cancel() {
return new Promise<void>((resolve) => {
runFork(Scope.close(scope, Exit.void)).addObserver((_) => resolve())
})
if (!fiber) return
return Effect.runPromise(Effect.asVoid(Fiber.interrupt(fiber)))
}
}, options?.strategy)
}
Expand Down

0 comments on commit 9172efb

Please sign in to comment.