Skip to content

Commit

Permalink
Inherit child fibers created by merged streams (#3997)
Browse files Browse the repository at this point in the history
  • Loading branch information
IMax153 authored and tim-smart committed Nov 27, 2024
1 parent 917d965 commit 6910ccd
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .changeset/smooth-flies-lie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

inherit child fibers created by merged streams
57 changes: 42 additions & 15 deletions packages/effect/src/internal/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1768,21 +1768,48 @@ export const mergeWith = dual<
}

return core.fromEffect(
Effect.zipWith(
Effect.forkIn(Effect.interruptible(pullL), scope),
Effect.forkIn(Effect.interruptible(pullR), scope),
(left, right): State =>
mergeState.BothRunning<
Env | Env1,
OutErr,
OutErr1,
OutErr2 | OutErr3,
OutElem | OutElem1,
OutDone,
OutDone1,
OutDone2 | OutDone3
>(left, right)
)
Effect.withFiberRuntime<
MergeState.MergeState<
Env | Env1,
OutErr,
OutErr1,
OutErr2 | OutErr3,
OutElem | OutElem1,
OutDone,
OutDone1,
OutDone2 | OutDone3
>,
never,
Env | Env1
>((parent) => {
const inherit = Effect.withFiberRuntime<void, never, never>((state) => {
;(state as any).transferChildren((parent as any).scope())
return Effect.void
})
const leftFiber = Effect.interruptible(pullL).pipe(
Effect.ensuring(inherit),
Effect.forkIn(scope)
)
const rightFiber = Effect.interruptible(pullR).pipe(
Effect.ensuring(inherit),
Effect.forkIn(scope)
)
return Effect.zipWith(
leftFiber,
rightFiber,
(left, right): State =>
mergeState.BothRunning<
Env | Env1,
OutErr,
OutErr1,
OutErr2 | OutErr3,
OutElem | OutElem1,
OutDone,
OutDone1,
OutDone2 | OutDone3
>(left, right)
)
})
).pipe(
core.flatMap(go),
core.embedInput(input)
Expand Down
21 changes: 21 additions & 0 deletions packages/effect/src/internal/fiberRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,27 @@ export class FiberRuntime<in out A, in out E = never> extends Effectable.Class<A
this.getChildren().delete(child)
}

/**
* Transfers all children of this fiber that are currently running to the
* specified fiber scope.
*
* **NOTE**: This method must be invoked by the fiber itself after it has
* evaluated the effects but prior to exiting.
*/
transferChildren(scope: fiberScope.FiberScope) {
const children = this._children
// Clear the children of the current fiber
this._children = null
if (children !== null && children.size > 0) {
for (const child of children) {
// If the child is still running, add it to the scope
if (child._exitValue === null) {
scope.add(this.currentRuntimeFlags, child)
}
}
}
}

/**
* On the current thread, executes all messages in the fiber's inbox. This
* method may return before all work is done, in the event the fiber executes
Expand Down
23 changes: 23 additions & 0 deletions packages/effect/test/Stream/interrupting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,27 @@ describe("Stream", () => {

assert.strictEqual(interrupted, true)
}))

it.effect("forked children are not interrupted early by interruptWhen", () =>
Effect.gen(function*() {
const queue = yield* Queue.unbounded<string>()
const ref = yield* Ref.make(0)
yield* Stream.fromQueue(queue).pipe(
Stream.runForEach(() => Ref.update(ref, (n) => n + 1)),
Effect.fork,
Effect.as(Stream.concat(Stream.succeed(""), Stream.never)),
Stream.unwrapScoped,
Stream.interruptWhen(Effect.never),
Stream.runDrain,
Effect.fork
)
yield* Queue.offer(queue, "message").pipe(
Effect.forever,
Effect.fork
)
const result = yield* Ref.get(ref).pipe(
Effect.repeat({ until: (n) => n >= 10 })
)
assert.strictEqual(result, 10)
}))
})

0 comments on commit 6910ccd

Please sign in to comment.