Skip to content

Commit

Permalink
Preserve scopes in stream operations (#3989)
Browse files Browse the repository at this point in the history
  • Loading branch information
IMax153 authored and tim-smart committed Nov 27, 2024
1 parent 8722b9f commit 44e41b2
Show file tree
Hide file tree
Showing 15 changed files with 1,143 additions and 974 deletions.
7 changes: 7 additions & 0 deletions .changeset/dull-ants-sip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"effect": minor
---

Ensure scopes are preserved by stream / sink / channel operations

**NOTE**: This change does modify the public signature of several `Stream` / `Sink` / `Channel` methods. Namely, certain run methods that previously removed a `Scope` from the environment will no longer do so. This was a bug with the previous implementation of how scopes were propagated, and is why this change is being made in a minor release.
21 changes: 21 additions & 0 deletions .changeset/funny-zebras-happen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"effect": minor
---

Add `Effect.scopedWith` to run an effect that depends on a `Scope`, and then closes the `Scope` after the effect has completed

```ts
import { Effect, Scope } from "effect"

const program: Effect.Effect<void> = Effect.scopedWith((scope) =>
Effect.acquireRelease(
Effect.log("Acquiring..."),
() => Effect.log("Releasing...")
).pipe(Scope.extend(scope))
)

Effect.runPromise(program)
// Output:
// timestamp=2024-11-26T16:44:54.158Z level=INFO fiber=#0 message=Acquiring...
// timestamp=2024-11-26T16:44:54.165Z level=INFO fiber=#0 message=Releasing...
```
70 changes: 65 additions & 5 deletions packages/effect/src/Channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1930,7 +1930,7 @@ export const repeated: <OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>(
*/
export const run: <OutErr, InErr, OutDone, InDone, Env>(
self: Channel<never, unknown, OutErr, InErr, OutDone, InDone, Env>
) => Effect.Effect<OutDone, OutErr, Exclude<Env, Scope.Scope>> = channel.run
) => Effect.Effect<OutDone, OutErr, Env> = channel.run

/**
* Run the channel until it finishes with a done value or fails with an error
Expand All @@ -1943,7 +1943,7 @@ export const run: <OutErr, InErr, OutDone, InDone, Env>(
*/
export const runCollect: <OutElem, OutErr, InErr, OutDone, InDone, Env>(
self: Channel<OutElem, unknown, OutErr, InErr, OutDone, InDone, Env>
) => Effect.Effect<[Chunk.Chunk<OutElem>, OutDone], OutErr, Exclude<Env, Scope.Scope>> = channel.runCollect
) => Effect.Effect<[Chunk.Chunk<OutElem>, OutDone], OutErr, Env> = channel.runCollect

/**
* Runs a channel until the end is received.
Expand All @@ -1953,7 +1953,21 @@ export const runCollect: <OutElem, OutErr, InErr, OutDone, InDone, Env>(
*/
export const runDrain: <OutElem, OutErr, InErr, OutDone, InDone, Env>(
self: Channel<OutElem, unknown, OutErr, InErr, OutDone, InDone, Env>
) => Effect.Effect<OutDone, OutErr, Exclude<Env, Scope.Scope>> = channel.runDrain
) => Effect.Effect<OutDone, OutErr, Env> = channel.runDrain

/**
* Run the channel until it finishes with a done value or fails with an error.
* The channel must not read any input or write any output.
*
* Closing the channel, which includes execution of all the finalizers
* attached to the channel will be added to the current scope as a finalizer.
*
* @since 3.11.0
* @category destructors
*/
export const runScoped: <OutErr, InErr, OutDone, InDone, Env>(
self: Channel<never, unknown, OutErr, InErr, OutDone, InDone, Env>
) => Effect.Effect<OutDone, OutErr, Env | Scope.Scope> = channel.runScoped

/**
* Use a scoped effect to emit an output element.
Expand All @@ -1965,6 +1979,18 @@ export const scoped: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Channel<A, unknown, E, unknown, unknown, unknown, Exclude<R, Scope.Scope>> = channel.scoped

/**
* Use a function that receives a scope and returns an effect to emit an output
* element. The output element will be the result of the returned effect, if
* successful.
*
* @since 3.11.0
* @category constructors
*/
export const scopedWith: <A, E, R>(
f: (scope: Scope.Scope) => Effect.Effect<A, E, R>
) => Channel<A, unknown, E, unknown, unknown, unknown, R> = channel.scopedWith

/**
* Splits strings on newlines. Handles both Windows newlines (`\r\n`) and UNIX
* newlines (`\n`).
Expand Down Expand Up @@ -2034,6 +2060,27 @@ export const toPull: <OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>(
) => Effect.Effect<Effect.Effect<Either.Either<OutElem, OutDone>, OutErr, Env>, never, Scope.Scope | Env> =
channel.toPull

/**
* Returns an `Effect` that can be used to repeatedly pull elements from the
* constructed `Channel` within the provided `Scope`. The pull effect fails
* with the channel's failure in case the channel fails, or returns either the
* channel's done value or an emitted element.
*
* @since 3.11.0
* @category destructors
*/
export const toPullIn: {
(
scope: Scope.Scope
): <OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>(
self: Channel<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>
) => Effect.Effect<Effect.Effect<Either.Either<OutElem, OutDone>, OutErr, Env>, never, Env>
<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>(
self: Channel<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>,
scope: Scope.Scope
): Effect.Effect<Effect.Effect<Either.Either<OutElem, OutDone>, OutErr, Env>, never, Env>
} = channel.toPullIn

/**
* Converts a `Channel` to a `Queue`.
*
Expand Down Expand Up @@ -2073,7 +2120,8 @@ export {
}

/**
* Makes a channel from an effect that returns a channel in case of success.
* Constructs a `Channel` from an effect that will result in a `Channel` if
* successful.
*
* @since 2.0.0
* @category constructors
Expand All @@ -2083,7 +2131,8 @@ export const unwrap: <OutElem, InElem, OutErr, InErr, OutDone, InDone, R2, E, R>
) => Channel<OutElem, InElem, E | OutErr, InErr, OutDone, InDone, R | R2> = channel.unwrap

/**
* Makes a channel from a managed that returns a channel in case of success.
* Constructs a `Channel` from a scoped effect that will result in a
* `Channel` if successful.
*
* @since 2.0.0
* @category constructors
Expand All @@ -2092,6 +2141,17 @@ export const unwrapScoped: <OutElem, InElem, OutErr, InErr, OutDone, InDone, Env
self: Effect.Effect<Channel<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>, E, R>
) => Channel<OutElem, InElem, E | OutErr, InErr, OutDone, InDone, Env | Exclude<R, Scope.Scope>> = channel.unwrapScoped

/**
* Constructs a `Channel` from a function which receives a `Scope` and returns
* an effect that will result in a `Channel` if successful.
*
* @since 3.11.0
* @category constructors
*/
export const unwrapScopedWith: <OutElem, InElem, OutErr, InErr, OutDone, InDone, Env, E, R>(
f: (scope: Scope.Scope) => Effect.Effect<Channel<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>, E, R>
) => Channel<OutElem, InElem, E | OutErr, InErr, OutDone, InDone, R | Env> = channel.unwrapScopedWith

/**
* Updates a service in the context of this channel.
*
Expand Down
11 changes: 11 additions & 0 deletions packages/effect/src/Effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4203,6 +4203,17 @@ export const scope: Effect<Scope.Scope, never, Scope.Scope> = fiberRuntime.scope
export const scopeWith: <A, E, R>(f: (scope: Scope.Scope) => Effect<A, E, R>) => Effect<A, E, R | Scope.Scope> =
fiberRuntime.scopeWith

/**
* Creates a `Scope`, passes it to the specified effectful function, and then
* closes the scope as soon as the effect is complete (whether through success,
* failure, or interruption).
*
* @since 3.11.0
* @category scoping, resources & finalization
*/
export const scopedWith: <A, E, R>(f: (scope: Scope.Scope) => Effect<A, E, R>) => Effect<A, E, R> =
fiberRuntime.scopedWith

/**
* Scopes all resources used in this workflow to the lifetime of the workflow,
* ensuring that their finalizers are run as soon as this workflow completes
Expand Down
11 changes: 11 additions & 0 deletions packages/effect/src/Sink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,17 @@ export const unwrapScoped: <A, In, L, E, R>(
effect: Effect.Effect<Sink<A, In, L, E, R>, E, R>
) => Sink<A, In, L, E, Exclude<R, Scope.Scope>> = internal.unwrapScoped

/**
* Constructs a `Sink` from a function which receives a `Scope` and returns
* an effect that will result in a `Sink` if successful.
*
* @since 3.11.0
* @category constructors
*/
export const unwrapScopedWith: <A, In, L, E, R>(
f: (scope: Scope.Scope) => Effect.Effect<Sink<A, In, L, E, R>, E, R>
) => Sink<A, In, L, E, R> = internal.unwrapScopedWith

/**
* Returns the sink that executes this one and times its execution.
*
Expand Down
49 changes: 30 additions & 19 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3972,26 +3972,23 @@ export const run: {
* @since 2.0.0
* @category destructors
*/
export const runCollect: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Chunk.Chunk<A>, E, Exclude<R, Scope.Scope>> =
internal.runCollect
export const runCollect: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<Chunk.Chunk<A>, E, R> = internal.runCollect

/**
* Runs the stream and emits the number of elements processed
*
* @since 2.0.0
* @category destructors
*/
export const runCount: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<number, E, Exclude<R, Scope.Scope>> =
internal.runCount
export const runCount: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<number, E, R> = internal.runCount

/**
* Runs the stream only for its effects. The emitted elements are discarded.
*
* @since 2.0.0
* @category destructors
*/
export const runDrain: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E, Exclude<R, Scope.Scope>> =
internal.runDrain
export const runDrain: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E, R> = internal.runDrain

/**
* Executes a pure fold over the stream of values - reduces all elements in
Expand All @@ -4001,8 +3998,8 @@ export const runDrain: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<void, E
* @category destructors
*/
export const runFold: {
<S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Exclude<R, Scope.Scope>>
<A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Effect.Effect<S, E, Exclude<R, Scope.Scope>>
<S, A>(s: S, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, R>
<A, E, R, S>(self: Stream<A, E, R>, s: S, f: (s: S, a: A) => S): Effect.Effect<S, E, R>
} = internal.runFold

/**
Expand Down Expand Up @@ -4062,17 +4059,8 @@ export const runFoldScopedEffect: {
* @category destructors
*/
export const runFoldWhile: {
<S, A>(
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => S
): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, Exclude<R, Scope.Scope>>
<A, E, R, S>(
self: Stream<A, E, R>,
s: S,
cont: Predicate<S>,
f: (s: S, a: A) => S
): Effect.Effect<S, E, Exclude<R, Scope.Scope>>
<S, A>(s: S, cont: Predicate<S>, f: (s: S, a: A) => S): <E, R>(self: Stream<A, E, R>) => Effect.Effect<S, E, R>
<A, E, R, S>(self: Stream<A, E, R>, s: S, cont: Predicate<S>, f: (s: S, a: A) => S): Effect.Effect<S, E, R>
} = internal.runFoldWhile

/**
Expand Down Expand Up @@ -4483,6 +4471,17 @@ export const scheduleWith: {
export const scoped: <A, E, R>(effect: Effect.Effect<A, E, R>) => Stream<A, E, Exclude<R, Scope.Scope>> =
internal.scoped

/**
* Use a function that receives a scope and returns an effect to emit an output
* element. The output element will be the result of the returned effect, if
* successful.
*
* @since 3.11.0
* @category constructors
*/
export const scopedWith: <A, E, R>(f: (scope: Scope.Scope) => Effect.Effect<A, E, R>) => Stream<A, E, R> =
internal.scopedWith

/**
* Emits a sliding window of `n` elements.
*
Expand Down Expand Up @@ -5304,6 +5303,18 @@ export const unwrapScoped: <A, E2, R2, E, R>(
effect: Effect.Effect<Stream<A, E2, R2>, E, R>
) => Stream<A, E | E2, R2 | Exclude<R, Scope.Scope>> = internal.unwrapScoped

/**
* Creates a stream produced from a function which receives a `Scope` and
* returns an `Effect`. The resulting stream will emit a single element, which
* will be the result of the returned effect, if successful.
*
* @since 3.11.0
* @category constructors
*/
export const unwrapScopedWith: <A, E2, R2, E, R>(
f: (scope: Scope.Scope) => Effect.Effect<Stream<A, E2, R2>, E, R>
) => Stream<A, E | E2, R | R2> = internal.unwrapScopedWith

/**
* Updates the specified service within the context of the `Stream`.
*
Expand Down
Loading

0 comments on commit 44e41b2

Please sign in to comment.