From 693a80377fa143f7e077fc84bf31431afe5b775e Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Tue, 26 Nov 2024 16:36:28 -0500 Subject: [PATCH] Inherit child fibers created by merged streams (#3997) --- .changeset/smooth-flies-lie.md | 5 ++ packages/effect/src/internal/channel.ts | 57 ++++++++++++++----- packages/effect/src/internal/fiberRuntime.ts | 21 +++++++ .../effect/test/Stream/interrupting.test.ts | 23 ++++++++ 4 files changed, 91 insertions(+), 15 deletions(-) create mode 100644 .changeset/smooth-flies-lie.md diff --git a/.changeset/smooth-flies-lie.md b/.changeset/smooth-flies-lie.md new file mode 100644 index 00000000000..f076619ab6d --- /dev/null +++ b/.changeset/smooth-flies-lie.md @@ -0,0 +1,5 @@ +--- +"effect": patch +--- + +inherit child fibers created by merged streams diff --git a/packages/effect/src/internal/channel.ts b/packages/effect/src/internal/channel.ts index b23bf6aed64..61db4641896 100644 --- a/packages/effect/src/internal/channel.ts +++ b/packages/effect/src/internal/channel.ts @@ -1768,21 +1768,48 @@ export const mergeWith = dual< } return core.fromEffect( - Effect.zipWith( - Effect.forkIn(Effect.interruptible(pullL), scope), - Effect.forkIn(Effect.interruptible(pullR), scope), - (left, right): State => - mergeState.BothRunning< - Env | Env1, - OutErr, - OutErr1, - OutErr2 | OutErr3, - OutElem | OutElem1, - OutDone, - OutDone1, - OutDone2 | OutDone3 - >(left, right) - ) + Effect.withFiberRuntime< + MergeState.MergeState< + Env | Env1, + OutErr, + OutErr1, + OutErr2 | OutErr3, + OutElem | OutElem1, + OutDone, + OutDone1, + OutDone2 | OutDone3 + >, + never, + Env | Env1 + >((parent) => { + const inherit = Effect.withFiberRuntime((state) => { + ;(state as any).transferChildren((parent as any).scope()) + return Effect.void + }) + const leftFiber = Effect.interruptible(pullL).pipe( + Effect.ensuring(inherit), + Effect.forkIn(scope) + ) + const rightFiber = Effect.interruptible(pullR).pipe( + Effect.ensuring(inherit), + Effect.forkIn(scope) + ) + return Effect.zipWith( + leftFiber, + rightFiber, + (left, right): State => + mergeState.BothRunning< + Env | Env1, + OutErr, + OutErr1, + OutErr2 | OutErr3, + OutElem | OutElem1, + OutDone, + OutDone1, + OutDone2 | OutDone3 + >(left, right) + ) + }) ).pipe( core.flatMap(go), core.embedInput(input) diff --git a/packages/effect/src/internal/fiberRuntime.ts b/packages/effect/src/internal/fiberRuntime.ts index 8b884b125b3..9e9f586e87a 100644 --- a/packages/effect/src/internal/fiberRuntime.ts +++ b/packages/effect/src/internal/fiberRuntime.ts @@ -620,6 +620,27 @@ export class FiberRuntime extends Effectable.Class 0) { + for (const child of children) { + // If the child is still running, add it to the scope + if (child._exitValue === null) { + scope.add(this.currentRuntimeFlags, child) + } + } + } + } + /** * On the current thread, executes all messages in the fiber's inbox. This * method may return before all work is done, in the event the fiber executes diff --git a/packages/effect/test/Stream/interrupting.test.ts b/packages/effect/test/Stream/interrupting.test.ts index 3d84f285dd7..bf27d15ec09 100644 --- a/packages/effect/test/Stream/interrupting.test.ts +++ b/packages/effect/test/Stream/interrupting.test.ts @@ -188,4 +188,27 @@ describe("Stream", () => { assert.strictEqual(interrupted, true) })) + + it.effect("forked children are not interrupted early by interruptWhen", () => + Effect.gen(function*() { + const queue = yield* Queue.unbounded() + const ref = yield* Ref.make(0) + yield* Stream.fromQueue(queue).pipe( + Stream.runForEach(() => Ref.update(ref, (n) => n + 1)), + Effect.fork, + Effect.as(Stream.concat(Stream.succeed(""), Stream.never)), + Stream.unwrapScoped, + Stream.interruptWhen(Effect.never), + Stream.runDrain, + Effect.fork + ) + yield* Queue.offer(queue, "message").pipe( + Effect.forever, + Effect.fork + ) + const result = yield* Ref.get(ref).pipe( + Effect.repeat({ until: (n) => n >= 10 }) + ) + assert.strictEqual(result, 10) + })) })