From fad1e94bc2ef51e2a074f3f585b448561ce2a265 Mon Sep 17 00:00:00 2001 From: Adam Hearn <22334119+hearnadam@users.noreply.github.com> Date: Sat, 3 Jan 2026 11:23:43 -0800 Subject: [PATCH] Abort: remove usage of erased Tags --- .../shared/src/main/scala/kyo/Actor.scala | 4 +- .../src/main/scala/kyo/AbortCombinators.scala | 147 ++++++++++++------ .../src/main/scala/kyo/AsyncCombinators.scala | 8 +- .../main/scala/kyo/ChoiceCombinators.scala | 2 +- .../src/main/scala/kyo/Constructors.scala | 11 +- .../src/main/scala/kyo/MaybeCombinators.scala | 2 +- .../shared/src/main/scala/kyo/Async.scala | 67 +++++--- .../shared/src/main/scala/kyo/Fiber.scala | 4 +- .../shared/src/main/scala/kyo/KyoApp.scala | 4 +- .../shared/src/main/scala/kyo/Retry.scala | 4 +- .../main/scala/kyo/StreamCoreExtensions.scala | 105 +++++++++---- .../shared/src/main/scala/kyo/System.scala | 21 ++- .../scala/kyo/internal/BaseKyoCoreTest.scala | 2 +- .../src/test/scala/kyo/KyoAppTest.scala | 21 +++ .../src/test/scala/kyo/SystemTest.scala | 4 +- .../shared/src/main/scala/kyo/Abort.scala | 80 +++++++--- .../shared/src/test/scala/kyo/AbortTest.scala | 2 +- kyo-stm/shared/src/main/scala/kyo/STM.scala | 29 +++- .../shared/src/main/scala/kyo/Requests.scala | 7 +- .../shared/src/main/scala/kyo/Routes.scala | 4 +- kyo-zio/shared/src/main/scala/kyo/ZIOs.scala | 2 +- .../shared/src/main/scala/kyo/ZLayers.scala | 2 +- .../shared/src/main/scala/kyo/ZStreams.scala | 3 +- 23 files changed, 372 insertions(+), 163 deletions(-) diff --git a/kyo-actor/shared/src/main/scala/kyo/Actor.scala b/kyo-actor/shared/src/main/scala/kyo/Actor.scala index 8eb3f50f8..0dca94952 100644 --- a/kyo-actor/shared/src/main/scala/kyo/Actor.scala +++ b/kyo-actor/shared/src/main/scala/kyo/Actor.scala @@ -44,7 +44,7 @@ import scala.annotation.* * @tparam B * The type of result this actor produces upon completion */ -sealed abstract class Actor[+E, A, B](_subject: Subject[A], _fiber: Fiber[B, Abort[Closed | E]]): +sealed abstract class Actor[E, A, B](_subject: Subject[A], _fiber: Fiber[B, Abort[Closed | E]]): /** Returns the message subject interface for sending messages to this actor. * @@ -75,7 +75,7 @@ sealed abstract class Actor[+E, A, B](_subject: Subject[A], _fiber: Fiber[B, Abo * @return * The actor's final result of type B */ - def await(using Frame): B < (Async & Abort[Closed | E]) = fiber.get + def await(using Frame, Tag[Abort[Closed | E]]): B < (Async & Abort[Closed | E]) = fiber.get /** Closes the actor's mailbox, preventing it from receiving any new messages. * diff --git a/kyo-combinators/shared/src/main/scala/kyo/AbortCombinators.scala b/kyo-combinators/shared/src/main/scala/kyo/AbortCombinators.scala index 4e5f7717c..ec163bbf3 100644 --- a/kyo-combinators/shared/src/main/scala/kyo/AbortCombinators.scala +++ b/kyo-combinators/shared/src/main/scala/kyo/AbortCombinators.scala @@ -16,6 +16,7 @@ extension [A, S, E](effect: A < (Abort[E] & S)) def result( using ct: ConcreteTag[E], + t: Tag[Abort[E]], fr: Frame ): Result[E, A] < S = Abort.run[E](effect) @@ -28,7 +29,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) def resultPartial( using ct: ConcreteTag[E], - fr: Frame + fr: Frame, + tag: Tag[Abort[E]] ): Result.Partial[E, A] < (Abort[Nothing] & S) = Abort.runPartial(effect) @@ -40,7 +42,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) def resultPartialOrThrow( using ct: ConcreteTag[E], - fr: Frame + fr: Frame, + tag: Tag[Abort[E]] ): Result.Partial[E, A] < S = Abort.runPartialOrThrow(effect) @@ -55,7 +58,9 @@ extension [A, S, E](effect: A < (Abort[E] & S)) using ct: ConcreteTag[E], ct1: ConcreteTag[E1], - fr: Frame + fr: Frame, + tag1: Tag[Abort[E1]], + tag2: Tag[Abort[E]] ): A < (Abort[E1] & S & S1) = effect.recover(e => fn(e).map(Kyo.fail)) @@ -94,7 +99,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) def abortToChoiceDrop( using ct: ConcreteTag[E], - fr: Frame + fr: Frame, + tag: Tag[Abort[E]] ): A < (S & Choice) = effect.result.map(result => result.foldError(value => Choice.eval(value), _ => Choice.drop)) @@ -106,7 +112,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) def abortToAbsent( using ct: ConcreteTag[E], - fr: Frame + fr: Frame, + tag: Tag[Abort[E]] ): A < (S & Abort[Absent]) = effect.result.map { case Result.Failure(_) => Abort.fail(Absent) @@ -124,13 +131,14 @@ extension [A, S, E](effect: A < (Abort[E] & S)) using ng: NotGiven[E <:< Throwable], ct: ConcreteTag[E], - fr: Frame + fr: Frame, + tag: Tag[Abort[E]] ): A < (S & Abort[Throwable]) = effect.result.map { case Result.Success(a) => a - case Result.Failure(thr: Throwable) => Abort.fail(thr) - case Result.Failure(err) => Abort.fail(PanicException(err)) - case p: Result.Panic => Abort.get(p) + case Result.Failure(thr: Throwable) => Abort.fail[Throwable](thr) + case Result.Failure(err) => Abort.fail[Throwable](PanicException(err)) + case p: Result.Panic => Abort.get[Throwable](p) } /** Handles the Abort effect and applies a recovery function to the error. @@ -141,6 +149,7 @@ extension [A, S, E](effect: A < (Abort[E] & S)) def recover[A1 >: A, S1](fn: E => A1 < S1)( using ct: ConcreteTag[E], + tag: Tag[Abort[E]], fr: Frame ): A1 < (S & S1 & Abort[Nothing]) = effect.result.map { @@ -169,7 +178,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) )( using ct: ConcreteTag[E], - fr: Frame + fr: Frame, + tag: Tag[Abort[E]] ): B < (Abort[Nothing] & S & S1) = Abort.fold[E](onSuccess, onFail)(effect) @@ -196,7 +206,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) )( using ct: ConcreteTag[E], - fr: Frame + fr: Frame, + tag: Tag[Abort[E]] ): B < (S & S1) = Abort.fold[E](onSuccess, onFail, onPanic)(effect) @@ -220,7 +231,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) )( using ct: ConcreteTag[E], - fr: Frame + fr: Frame, + tag: Tag[Abort[E]] ): B < (S & S1) = Abort.foldOrThrow(onSuccess, onFail)(effect) @@ -232,7 +244,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) def recoverSome[A1 >: A, S1](fn: PartialFunction[E, A1 < S1])( using ct: ConcreteTag[E], - frame: Frame + frame: Frame, + tag: Tag[Abort[E]] ): A1 < (S & S1 & Abort[E]) = effect.result.map { case Result.Failure(e) => @@ -251,10 +264,12 @@ extension [A, S, E](effect: A < (Abort[E] & S)) using cta: ConcreteTag[A], cte: ConcreteTag[E], - frame: Frame + frame: Frame, + tag1: Tag[Abort[A]], + tag2: Tag[Abort[E]] ): E < (S & Abort[A]) = val handled: Result[E, A] < S = effect.result - handled.map((v: Result[E, A]) => Abort.get(v.swap)) + handled.map((v: Result[E, A]) => Abort.get[A](v.swap)) end swapAbort /** Converts all Aborts to Panic, wrapping non-Throwable Failures in PanicException @@ -265,7 +280,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) def orPanic( using ct: ConcreteTag[E], - frame: Frame + frame: Frame, + tag: Tag[Abort[E]] ): A < (Abort[Nothing] & S) = Abort.run[E](effect).map: case Result.Success(v) => v @@ -285,7 +301,8 @@ extension [A, S, E](effect: A < (Abort[E] & S)) def orThrow( using ct: ConcreteTag[E], - frame: Frame + frame: Frame, + tag: Tag[Abort[E]] ): A < S = Abort.run[E](effect).map: case Result.Success(v) => v @@ -301,7 +318,7 @@ extension [A, S, E](effect: A < (Abort[E] & S)) * @return * A computation that produces the result of this computation with Async and Abort[E] effects */ - def retry(schedule: Schedule)(using ConcreteTag[E], Frame): A < (S & Async & Abort[E]) = + def retry(schedule: Schedule)(using ConcreteTag[E], Frame, Tag[Abort[E]]): A < (S & Async & Abort[E]) = Retry[E](schedule)(effect) /** Performs this computation repeatedly with a limit in case of failures. @@ -311,7 +328,7 @@ extension [A, S, E](effect: A < (Abort[E] & S)) * @return * A computation that produces the result of this computation with Async and Abort[E] effects */ - def retry(n: Int)(using ConcreteTag[E], Frame): A < (S & Abort[E]) = + def retry(n: Int)(using ConcreteTag[E], Frame, Tag[Abort[E]]): A < (S & Abort[E]) = Loop(n): i => Abort.fold[E]( (result: A) => Loop.done[Int, A](result), @@ -330,7 +347,7 @@ extension [A, S, E](effect: A < (Abort[E] & S)) * @return * A computation that produces the result of this computation with Async and no Abort[E] */ - def retryForever(using ConcreteTag[E], Frame): A < S = + def retryForever(using ConcreteTag[E], Frame, Tag[Abort[E]]): A < S = Loop.foreach: Abort.fold[E]( (result: A) => Loop.done[Unit, A](result), @@ -374,7 +391,8 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: ev: E => E1 | ER, ct: ConcreteTag[E1], reduce: Reducible[Abort[ER]], - frame: Frame + frame: Frame, + tag: Tag[Abort[E1]] ): Result[E1, A] < (S & reduce.SReduced) = Abort.run[E1](effect.asInstanceOf[A < (Abort[E1 | ER] & S)]) @@ -387,7 +405,8 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: using ev: E => E1 | ER, ct: ConcreteTag[E1], - frame: Frame + frame: Frame, + tag: Tag[Abort[E1]] ): Result.Partial[E1, A] < (S & Abort[ER]) = Abort.runPartial[E1](effect.asInstanceOf[A < (Abort[E1 | ER] & S)]) @@ -404,9 +423,19 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: ct: ConcreteTag[E1], ct1: ConcreteTag[E2], reduce: Reducible[Abort[ER]], - fr: Frame + fr: Frame, + tag: Tag[Abort[E2]], + tag1: Tag[Abort[E1]], + tagE: Tag[Abort[E]], + tagER: Tag[Abort[ER]] ): A < (Abort[E2] & reduce.SReduced & S & S1) = - recover(e => fn(e).map(Kyo.fail)) + Abort.run[Any](effect.asInstanceOf[A < (Abort[Any] & S)]).map { + case Result.Success(v) => v + case Result.Failure(e) if ct.accepts(e) => fn(e.asInstanceOf[E1]).map(e2 => Abort.fail[E2](e2)) + case Result.Failure(e) => Abort.fail[ER](e.asInstanceOf[ER]) + case panic: Result.Panic => Abort.error[Nothing](panic) + }.asInstanceOf[A < (Abort[E2] & reduce.SReduced & S & S1)] + end mapAbort /** Handles the partial Abort[E1] effect and applies a recovery function to the error. * @@ -418,14 +447,17 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: ev: E => E1 | ER, reduce: Reducible[Abort[ER]], ct: ConcreteTag[E1], - frame: Frame + frame: Frame, + tag: Tag[Abort[E1]], + tagE: Tag[Abort[E]], + tagER: Tag[Abort[ER]] ): [A1 >: A, S1] => (E1 => A1 < S1) => A1 < (S & S1 & reduce.SReduced) = [A1 >: A, S1] => (fn: E1 => A1 < S1) => reduce(Abort.run[E1](effect.asInstanceOf[A < (Abort[E1 | ER] & S)]).map { - case Result.Failure(e1) => fn(e1) - case Result.Success(v) => v - case ab @ Result.Panic(_) => Abort.get(ab.asInstanceOf[Result[Nothing, Nothing]]) + case Result.Failure(e1) => fn(e1) + case Result.Success(v) => v + case ab: Result.Panic => Abort.get[Nothing](ab) }) /** Recovers from an Abort failure by applying the provided function. @@ -447,11 +479,21 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: onFail: E1 => B < S1 )( using - ct: ConcreteTag[E1], + ct1: ConcreteTag[E1], + ct2: ConcreteTag[E], ev: E => E1 | ER, + t1: Tag[Abort[E]], + t2: Tag[Abort[ER]], + tagE1: Tag[Abort[E1]], fr: Frame ): B < (S & S1 & Abort[ER]) = - Abort.fold[E1](onSuccess, onFail)(effect.asInstanceOf[A < (Abort[E1 | ER] & S)]) + Abort.run[Any](effect.asInstanceOf[A < (Abort[Any] & S)]).map { + case Result.Success(v) => onSuccess(v) + case Result.Failure(e) if ct1.accepts(e) => onFail(e.asInstanceOf[E1]) + case Result.Failure(e) => Abort.fail[ER](e.asInstanceOf[ER]) + case panic: Result.Panic => Abort.error[Nothing](panic) + }.asInstanceOf[B < (S & S1 & Abort[ER])] + end fold /** Recovers from an Abort failure by applying the provided function. * @@ -478,7 +520,9 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: ct: ConcreteTag[E1], ev: E => E1 | ER, reduce: Reducible[Abort[ER]], - fr: Frame + fr: Frame, + tag1: Tag[Abort[E1]], + tag2: Tag[Abort[ER]] ): B < (S & S1 & reduce.SReduced) = Abort.fold[E1](onSuccess, onFail, onPanic)(effect.asInstanceOf[A < (Abort[E1 | ER] & S)]) @@ -491,7 +535,8 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: using ev: E => E1 | ER, ct: ConcreteTag[E1], - frame: Frame + frame: Frame, + tag: Tag[Abort[E1]] ): [A1 >: A, S1] => PartialFunction[E1, A1 < S1] => A1 < (S & S1 & Abort[E]) = [A1 >: A, S1] => (fn: PartialFunction[E1, A1 < S1]) => @@ -511,7 +556,8 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: ev: E => E1 | ER, ct: ConcreteTag[E1], reduce: Reducible[Abort[ER]], - frame: Frame + frame: Frame, + tag: Tag[Abort[E1]] ): A < (S & reduce.SReduced & Choice) = Abort.run[E1](effect.asInstanceOf[A < (Abort[E1 | ER] & S)]).map(result => result.foldError(value => Choice.eval(value), _ => Choice.drop) @@ -527,12 +573,13 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: ev: E => E1 | ER, ct: ConcreteTag[E1], reduce: Reducible[Abort[ER]], - frame: Frame + frame: Frame, + tag: Tag[Abort[E1]] ): A < (S & reduce.SReduced & Abort[Absent]) = Abort.run[E1](effect.asInstanceOf[A < (Abort[E1 | ER] & S)]).map { - case Result.Failure(_) => Abort.get(Result.Failure(Absent)) - case p @ Result.Panic(_) => Abort.get(p.asInstanceOf[Result[Nothing, Nothing]]) - case s @ Result.Success(_) => Abort.get(s.asInstanceOf[Result[Nothing, A]]) + case _: Result.Failure[?] => Abort.fail(Absent) + case p: Result.Panic => Abort.get[Absent](p) + case Result.Success(value) => value } /** Translates the partial Abort[E1] effect to an Abort[Throwable] effect in case of failure, by converting non-throwable errors to @@ -547,13 +594,14 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: ev: E => E1 | ER, ct: ConcreteTag[E1], reduce: Reducible[Abort[ER]], - fr: Frame + fr: Frame, + tag: Tag[Abort[E1]] ): A < (S & Abort[Throwable] & reduce.SReduced) = Abort.run[E1](effect.asInstanceOf[A < (Abort[E1 | ER] & S)]).map { case Result.Success(a) => a - case Result.Failure(thr: Throwable) => Abort.fail(thr) - case Result.Failure(err) => Abort.fail(PanicException(err)) - case p: Result.Panic => Abort.get(p) + case Result.Failure(thr: Throwable) => Abort.fail[Throwable](thr) + case Result.Failure(err) => Abort.fail[Throwable](PanicException(err)) + case p: Result.Panic => Abort.get[Throwable](p) } /** Translates the partial Abort[E1] effect by swapping the error and success types. @@ -566,10 +614,12 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: ev: E => E1 | ER, reduce: Reducible[Abort[ER]], ct: ConcreteTag[E1], - frame: Frame + frame: Frame, + tag: Tag[Abort[E1]], + tag2: Tag[Abort[A]] ): E1 < (S & reduce.SReduced & Abort[A]) = val handled = Abort.run[E1](effect.asInstanceOf[A < (Abort[E1 | ER] & S)]) - handled.map((v: Result[E1, A]) => Abort.get(v.swap)) + handled.map((v: Result[E1, A]) => Abort.get[A](v.swap)) end swap /** Catches partial Abort and panics instead @@ -581,7 +631,8 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: using ev: E => E1 | ER, ct: ConcreteTag[E1], - frame: Frame + frame: Frame, + tag: Tag[Abort[E1]] ): A < (S & Abort[ER]) = Abort.runPartial[E1](effect.asInstanceOf[A < (Abort[E1 | ER] & S)]).map: case Result.Success(v) => v @@ -600,6 +651,8 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: using E => E1 | ER, ConcreteTag[E1], + Tag[Abort[E1]], + Tag[Abort[ER]], Frame ): A < (S & Async & Abort[E1 | ER]) = Retry[E1](schedule)(effect.asInstanceOf[A < (S & Abort[E1 | ER])]) @@ -616,6 +669,8 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: using E => E1 | ER, ConcreteTag[E1], + Tag[Abort[E1]], + Tag[Abort[ER]], Frame ): A < (S & Abort[E1 | ER]) = val retypedEffect = effect.asInstanceOf[A < (S & Abort[E1 | ER])] @@ -641,7 +696,9 @@ class ForAbortOps[A, S, E, E1 <: E](effect: A < (Abort[E] & S)) extends AnyVal: E => E1 | ER, ConcreteTag[E1], ConcreteTag[E1 | ER], - Frame + Frame, + Tag[Abort[E1]], + Tag[Abort[ER]] ): A < (S & Abort[ER]) = val retypedEffect = effect.asInstanceOf[A < (S & Abort[E1 | ER])] Loop.foreach: diff --git a/kyo-combinators/shared/src/main/scala/kyo/AsyncCombinators.scala b/kyo-combinators/shared/src/main/scala/kyo/AsyncCombinators.scala index d7b8de6e6..6e360ef1e 100644 --- a/kyo-combinators/shared/src/main/scala/kyo/AsyncCombinators.scala +++ b/kyo-combinators/shared/src/main/scala/kyo/AsyncCombinators.scala @@ -61,6 +61,8 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S)) def &>[A1, E1, S2, S3](next: A1 < (Abort[E1] & Async & S2))( using fr: Frame, + tag1: Tag[Abort[E]], + tag2: Tag[Abort[E1]], i1: Isolate[S, Sync, S3], i2: Isolate[S2, Sync, S3] ): A1 < (Abort[E | E1] & Async & S & S2 & S3) = @@ -82,6 +84,8 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S)) def <&[A1, E1, S2, S3](next: A1 < (Abort[E1] & Async & S2))( using f: Frame, + tag1: Tag[Abort[E]], + tag2: Tag[Abort[E1]], i1: Isolate[S, Sync, S3], i2: Isolate[S2, Sync, S3] ): A < (Abort[E | E1] & Async & S & S2 & S3) = @@ -103,6 +107,8 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S)) def <&>[A1, E1, S2, S3](next: A1 < (Abort[E1] & Async & S2))( using fr: Frame, + tag1: Tag[Abort[E]], + tag2: Tag[Abort[E1]], i1: Isolate[S, Sync, S3], i2: Isolate[S2, Sync, S3], zippable: Zippable[A, A1] @@ -123,7 +129,7 @@ extension [A, E, S, S2](fiber: Fiber[A, Abort[E] & S2] < S) * @return * A computation that produces the result of this computation with Async effect */ - def join(using Frame): A < (S & S2 & Abort[E] & Async) = + def join(using Frame, Tag[Abort[E]]): A < (S & S2 & Abort[E] & Async) = fiber.map(_.get) /** Awaits the completion of the fiber and returns its result as a `Unit`. diff --git a/kyo-combinators/shared/src/main/scala/kyo/ChoiceCombinators.scala b/kyo-combinators/shared/src/main/scala/kyo/ChoiceCombinators.scala index da8a61a6f..3a71319fa 100644 --- a/kyo-combinators/shared/src/main/scala/kyo/ChoiceCombinators.scala +++ b/kyo-combinators/shared/src/main/scala/kyo/ChoiceCombinators.scala @@ -50,7 +50,7 @@ extension [A, S](effect: A < (S & Choice)) * @return * A computation that produces the result of this computation with dropped Choice translated to Abort[E] */ - def choiceDropToFailure[E](error: => E)(using Frame): A < (Choice & Abort[E] & S) = + def choiceDropToFailure[E](error: => E)(using Frame, Tag[Abort[E]]): A < (Choice & Abort[E] & S) = Choice.run(effect).map: case seq if seq.isEmpty => Abort.fail(error) case other => Choice.evalSeq(other) diff --git a/kyo-combinators/shared/src/main/scala/kyo/Constructors.scala b/kyo-combinators/shared/src/main/scala/kyo/Constructors.scala index 4eb7b4771..274244141 100644 --- a/kyo-combinators/shared/src/main/scala/kyo/Constructors.scala +++ b/kyo-combinators/shared/src/main/scala/kyo/Constructors.scala @@ -37,7 +37,10 @@ extension (kyoObject: Kyo.type) * @return * An effect that can be completed by the given register function */ - def async[A, E](register: (A < (Abort[E] & Async) => Unit) => Any < (Abort[E] & Async))(using Frame): A < (Abort[E] & Async) = + def async[A, E](register: (A < (Abort[E] & Async) => Unit) => Any < (Abort[E] & Async))(using + Frame, + Tag[Abort[E]] + ): A < (Abort[E] & Async) = for promise <- Promise.init[A, Abort[E]] registerFn = (eff: A < (Abort[E] & Async)) => @@ -88,7 +91,7 @@ extension (kyoObject: Kyo.type) * @return * An effect that fails with the given error */ - def fail[E](error: => E)(using Frame): Nothing < Abort[E] = + def fail[E](error: => E)(using Frame, Tag[Abort[E]]): Nothing < Abort[E] = Abort.fail(error) /** Applies a function to each element in parallel and returns a new sequence with the results. @@ -143,7 +146,7 @@ extension (kyoObject: Kyo.type) * @return * An effect that attempts to run the given effect and handles Left[E] to Abort[E]. */ - def fromEither[E, A](either: Either[E, A])(using Frame): A < Abort[E] = + def fromEither[E, A](either: Either[E, A])(using Frame, Tag[Abort[E]]): A < Abort[E] = Abort.get(either) /** Creates an effect from an Option[A] and handles None to Abort[Absent]. @@ -173,7 +176,7 @@ extension (kyoObject: Kyo.type) * @return * An effect that attempts to run the given effect and handles Result.Failure[E] to Abort[E]. */ - def fromResult[E, A](result: Result[E, A])(using Frame): A < Abort[E] = + def fromResult[E, A](result: Result[E, A])(using Frame, Tag[Abort[E]]): A < Abort[E] = Abort.get(result) /** Creates an effect from a Future[A] and handles the Future to Async. diff --git a/kyo-combinators/shared/src/main/scala/kyo/MaybeCombinators.scala b/kyo-combinators/shared/src/main/scala/kyo/MaybeCombinators.scala index 7bb4399c9..534e7cfd6 100644 --- a/kyo-combinators/shared/src/main/scala/kyo/MaybeCombinators.scala +++ b/kyo-combinators/shared/src/main/scala/kyo/MaybeCombinators.scala @@ -46,7 +46,7 @@ extension [A, S, E](effect: A < (Abort[Absent] & S)) * @return * A computation that produces the result of this computation with the Abort[Absent] effect translated to Abort[E] */ - def absentToFailure(failure: => E)(using Frame): A < (S & Abort[E]) = + def absentToFailure(failure: => E)(using Frame, Tag[Abort[E]]): A < (S & Abort[E]) = for res <- effect.forAbort[Absent].result yield res match diff --git a/kyo-core/shared/src/main/scala/kyo/Async.scala b/kyo-core/shared/src/main/scala/kyo/Async.scala index 3ba925026..a4f3ea976 100644 --- a/kyo-core/shared/src/main/scala/kyo/Async.scala +++ b/kyo-core/shared/src/main/scala/kyo/Async.scala @@ -117,7 +117,9 @@ object Async extends AsyncPlatformSpecific: def mask[E, A, S]( using isolate: Isolate[S, Abort[E] & Async, S] )(v: => A < (Abort[E] & Async & S))( - using frame: Frame + using + Frame, + Tag[Abort[E]] ): A < (Abort[E] & Async & S) = isolate.capture { state => Fiber.initUnscoped(isolate.isolate(state, v)).map(_.mask.map(fiber => isolate.restore(fiber.get))) @@ -162,7 +164,10 @@ object Async extends AsyncPlatformSpecific: */ def timeout[E, A, S]( using isolate: Isolate[S, Abort[E] & Async, S] - )(after: Duration)(v: => A < (Abort[E] & Async & S))(using frame: Frame): A < (Abort[E | Timeout] & Async & S) = + )(after: Duration)(v: => A < (Abort[E] & Async & S))(using + frame: Frame, + t: Tag[Abort[E]] + ): A < (Abort[E | Timeout] & Async & S) = if !after.isFinite then v else isolate.capture { state => @@ -197,7 +202,9 @@ object Async extends AsyncPlatformSpecific: def race[E, A, S]( using isolate: Isolate[S, Abort[E] & Async, S] )(iterable: Iterable[A < (Abort[E] & Async & S)])( - using frame: Frame + using + frame: Frame, + t: Tag[Abort[E]] ): A < (Abort[E] & Async & S) = require(iterable.nonEmpty, "Can't race an empty collection.") isolate.capture { state => @@ -223,7 +230,9 @@ object Async extends AsyncPlatformSpecific: first: A < (Abort[E] & Async & S), rest: A < (Abort[E] & Async & S)* )( - using frame: Frame + using + Frame, + Tag[Abort[E]] ): A < (Abort[E] & Async & S) = race[E, A, S](first +: rest) @@ -245,7 +254,9 @@ object Async extends AsyncPlatformSpecific: def raceFirst[E, A, S]( using isolate: Isolate[S, Abort[E] & Async, S] )(iterable: Iterable[A < (Abort[E] & Async & S)])( - using frame: Frame + using + Frame, + Tag[Abort[E]] ): A < (Abort[E] & Async & S) = require(iterable.nonEmpty, "Can't race an empty collection.") isolate.capture { state => @@ -272,7 +283,9 @@ object Async extends AsyncPlatformSpecific: first: A < (Abort[E] & Async & S), rest: A < (Abort[E] & Async & S)* )( - using frame: Frame + using + frame: Frame, + t: Tag[Abort[E]] ): A < (Abort[E] & Async & S) = raceFirst[E, A, S](first +: rest) @@ -292,7 +305,7 @@ object Async extends AsyncPlatformSpecific: )( first: A < (Abort[E] & Async & S), rest: A < (Abort[E] & Async & S)* - )(using frame: Frame): Chunk[A] < (Abort[E] & Async & S) = + )(using Frame, Tag[Abort[E]]): Chunk[A] < (Abort[E] & Async & S) = gather(first +: rest) /** Concurrently executes two or more and collects up to `max` successful results. @@ -314,7 +327,9 @@ object Async extends AsyncPlatformSpecific: first: A < (Abort[E] & Async & S), rest: A < (Abort[E] & Async & S)* )( - using frame: Frame + using + Frame, + Tag[Abort[E]] ): Chunk[A] < (Abort[E] & Async & S) = gather(max)(first +: rest) @@ -335,7 +350,9 @@ object Async extends AsyncPlatformSpecific: def gather[E, A, S]( using Isolate[S, Abort[E] & Async, S] )(iterable: Iterable[A < (Abort[E] & Async & S)])( - using frame: Frame + using + Frame, + Tag[Abort[E]] ): Chunk[A] < (Abort[E] & Async & S) = gather(iterable.size)(iterable) @@ -356,7 +373,9 @@ object Async extends AsyncPlatformSpecific: def gather[E, A, S]( using isolate: Isolate[S, Abort[E] & Async, S] )(max: Int)(iterable: Iterable[A < (Abort[E] & Async & S)])( - using frame: Frame + using + Frame, + Tag[Abort[E]] ): Chunk[A] < (Abort[E] & Async & S) = isolate.capture { state => Fiber.internal.gather(max)(iterable.map(isolate.isolate(state, _))) @@ -377,7 +396,8 @@ object Async extends AsyncPlatformSpecific: def foreachIndexed[E, A, B, S]( using isolate: Isolate[S, Abort[E] & Async, S] )(iterable: Iterable[A], concurrency: Int = defaultConcurrency)(f: (Int, A) => B < (Abort[E] & Async & S))(using - Frame + Frame, + Tag[Abort[E]] ): Chunk[B] < (Abort[E] & Async & S) = if concurrency <= 1 then Kyo.foreachIndexed(Chunk.from(iterable))(f) @@ -413,7 +433,7 @@ object Async extends AsyncPlatformSpecific: using isolate: Isolate[S, Abort[E] & Async, S] )(iterable: Iterable[A], concurrency: Int = defaultConcurrency)( f: A => B < (Abort[E] & Async & S) - )(using Frame): Chunk[B] < (Abort[E] & Async & S) = + )(using Frame, Tag[Abort[E]]): Chunk[B] < (Abort[E] & Async & S) = foreachIndexed(iterable, concurrency)((_, v) => f(v)) /** Executes a sequence of computations in parallel, discarding the results. @@ -429,7 +449,7 @@ object Async extends AsyncPlatformSpecific: using isolate: Isolate[S, Abort[E] & Async, S] )(iterable: Iterable[A], concurrency: Int = defaultConcurrency)( f: A => B < (Abort[E] & Async & S) - )(using Frame): Unit < (Abort[E] & Async & S) = + )(using Frame, Tag[Abort[E]]): Unit < (Abort[E] & Async & S) = foreach(iterable, concurrency)(f).unit /** Filters elements from a sequence using bounded concurrency. @@ -447,7 +467,7 @@ object Async extends AsyncPlatformSpecific: using isolate: Isolate[S, Abort[E] & Async, S] )(iterable: Iterable[A], concurrency: Int = defaultConcurrency)( f: A => Boolean < (Abort[E] & Async & S) - )(using Frame): Chunk[A] < (Abort[E] & Async & S) = + )(using Frame, Tag[Abort[E]]): Chunk[A] < (Abort[E] & Async & S) = collect(iterable, concurrency)(v => f(v).map(Maybe.when(_)(v))) /** Transforms and filters elements from a sequence using bounded concurrency. @@ -465,7 +485,7 @@ object Async extends AsyncPlatformSpecific: using isolate: Isolate[S, Abort[E] & Async, S] )(iterable: Iterable[A], concurrency: Int = defaultConcurrency)( f: A => Maybe[B] < (Abort[E] & Async & S) - )(using Frame): Chunk[B] < (Abort[E] & Async & S) = + )(using Frame, Tag[Abort[E]]): Chunk[B] < (Abort[E] & Async & S) = foreach(iterable, concurrency)(f).map(_.flatten) /** Executes a sequence of computations using bounded concurrency. @@ -480,7 +500,8 @@ object Async extends AsyncPlatformSpecific: def collectAll[E, A, S]( using isolate: Isolate[S, Abort[E] & Async, S] )(iterable: Iterable[A < (Abort[E] & Async & S)], concurrency: Int = defaultConcurrency)(using - Frame + Frame, + Tag[Abort[E]] ): Chunk[A] < (Abort[E] & Async & S) = foreach(iterable, concurrency)(identity) @@ -493,7 +514,10 @@ object Async extends AsyncPlatformSpecific: */ def collectAllDiscard[E, A, S]( using isolate: Isolate[S, Abort[E] & Async, S] - )(iterable: Iterable[A < (Abort[E] & Async & S)], concurrency: Int = defaultConcurrency)(using Frame): Unit < (Abort[E] & Async & S) = + )(iterable: Iterable[A < (Abort[E] & Async & S)], concurrency: Int = defaultConcurrency)(using + Frame, + Tag[Abort[E]] + ): Unit < (Abort[E] & Async & S) = foreachDiscard(iterable, concurrency)(identity) /** Repeats a computation n times in parallel. @@ -511,7 +535,7 @@ object Async extends AsyncPlatformSpecific: using isolate: Isolate[S, Abort[E] & Async, S] )(n: Int, concurrency: Int = defaultConcurrency)( f: => A < (Abort[E] & Async & S) - )(using Frame): Chunk[A] < (Abort[E] & Async & S) = + )(using Frame, Tag[Abort[E]]): Chunk[A] < (Abort[E] & Async & S) = fillIndexed(n, concurrency)(_ => f) /** Repeats a computation n times in parallel with index access. @@ -532,7 +556,7 @@ object Async extends AsyncPlatformSpecific: using isolate: Isolate[S, Abort[E] & Async, S] )(n: Int, concurrency: Int = defaultConcurrency)( f: Int => A < (Abort[E] & Async & S) - )(using Frame): Chunk[A] < (Abort[E] & Async & S) = + )(using Frame, Tag[Abort[E]]): Chunk[A] < (Abort[E] & Async & S) = foreach(0 until n, concurrency)(f) /** Executes two computations in parallel and returns their results as a tuple. @@ -786,7 +810,10 @@ object Async extends AsyncPlatformSpecific: private[kyo] inline def get[E, A](v: IOPromise[? <: E, ? <: A])(using Frame): A < (Abort[E] & Async) = use(v)(identity) - private[kyo] inline def use[E, A, B, S](v: IOPromise[? <: E, ? <: A])(f: A => B < S)(using Frame): B < (Abort[E] & Async & S) = + private[kyo] inline def use[E, A, B, S](v: IOPromise[? <: E, ? <: A])(f: A => B < S)(using + Frame, + Tag[Abort[E]] + ): B < (Abort[E] & Async & S) = useResult(v)(_.fold(f, Abort.fail, Abort.panic)) sealed trait Join extends ArrowEffect[IOPromise[?, *], Result[Nothing, *]] diff --git a/kyo-core/shared/src/main/scala/kyo/Fiber.scala b/kyo-core/shared/src/main/scala/kyo/Fiber.scala index f8540ced1..7a7507946 100644 --- a/kyo-core/shared/src/main/scala/kyo/Fiber.scala +++ b/kyo-core/shared/src/main/scala/kyo/Fiber.scala @@ -239,7 +239,7 @@ object Fiber: * @return * The result of the Fiber */ - def get(using Frame): A < (Abort[E] & Async & S) = + def get(using Frame, Tag[Abort[E]]): A < (Abort[E] & Async & S) = Async.use(self.lower)(identity) /** Uses the result of the Fiber to compute a new value. @@ -249,7 +249,7 @@ object Fiber: * @return * The result of applying the function to the Fiber's result */ - def use[B, S2](f: A => B < S2)(using Frame): B < (Abort[E] & Async & S & S2) = + def use[B, S2](f: A => B < S2)(using Frame, Tag[Abort[E]]): B < (Abort[E] & Async & S & S2) = Async.use(self.lower)(_.map(f)) /** Gets the result of the Fiber as a Result. diff --git a/kyo-core/shared/src/main/scala/kyo/KyoApp.scala b/kyo-core/shared/src/main/scala/kyo/KyoApp.scala index bca156702..5295eb925 100644 --- a/kyo-core/shared/src/main/scala/kyo/KyoApp.scala +++ b/kyo-core/shared/src/main/scala/kyo/KyoApp.scala @@ -49,7 +49,9 @@ object KyoApp: def runAndBlock[E, A, S]( using isolate: Isolate[S, Sync, Any] )(timeout: Duration)(v: => A < (Abort[E] & Async & S))( - using frame: Frame + using + frame: Frame, + t: Tag[Abort[E | Timeout]] ): A < (Abort[E | Timeout] & Sync & S) = Fiber.initUnscoped(v).map { fiber => fiber.block(timeout).map(Abort.get(_)) diff --git a/kyo-core/shared/src/main/scala/kyo/Retry.scala b/kyo-core/shared/src/main/scala/kyo/Retry.scala index df4bed924..dfb65eb31 100644 --- a/kyo-core/shared/src/main/scala/kyo/Retry.scala +++ b/kyo-core/shared/src/main/scala/kyo/Retry.scala @@ -37,7 +37,7 @@ object Retry: * @return * The result of the operation, or an abort if all retries fail */ - def apply[E: ConcreteTag](using Frame)[A, S](v: => A < (Abort[E] & S)): A < (Async & Abort[E] & S) = + def apply[E: ConcreteTag](using Frame, Tag[Abort[E]])[A, S](v: => A < (Abort[E] & S)): A < (Async & Abort[E] & S) = apply(defaultSchedule)(v) /** Retries an operation using a custom policy builder. @@ -49,7 +49,7 @@ object Retry: * @return * The result of the operation, or an abort if all retries fail. */ - def apply[E: ConcreteTag](using Frame)[A, S](schedule: Schedule)(v: => A < (Abort[E] & S)): A < (Async & Abort[E] & S) = + def apply[E: ConcreteTag](using Frame, Tag[Abort[E]])[A, S](schedule: Schedule)(v: => A < (Abort[E] & S)): A < (Async & Abort[E] & S) = Abort.run[E](v).map { case Result.Success(value) => value case result: Result.Failure[E] @unchecked => diff --git a/kyo-core/shared/src/main/scala/kyo/StreamCoreExtensions.scala b/kyo-core/shared/src/main/scala/kyo/StreamCoreExtensions.scala index 2f23a4d17..dec4372df 100644 --- a/kyo-core/shared/src/main/scala/kyo/StreamCoreExtensions.scala +++ b/kyo-core/shared/src/main/scala/kyo/StreamCoreExtensions.scala @@ -40,7 +40,8 @@ object StreamCoreExtensions: )( using Tag[Emit[Chunk[A]]], - Tag[Emit[Chunk[Chunk[A]]]] + Tag[Emit[Chunk[Chunk[A]]]], + Tag[Abort[E]] ) extends StreamHub[A, E]: private def emit(listener: Hub.Listener[Result.Partial[E, Maybe[Chunk[A]]]])(using Frame) = listener @@ -102,6 +103,7 @@ object StreamCoreExtensions: Tag[A], Tag[Emit[Chunk[A]]], Tag[Emit[Chunk[Chunk[A]]]], + Tag[Abort[E]], Frame ): StreamHubImpl[A, E] < (Async & Scope) = Sync.Unsafe: @@ -127,7 +129,8 @@ object StreamCoreExtensions: using Isolate[S, Sync, S], Tag[Emit[Chunk[V]]], - Frame + Frame, + Tag[Abort[E]] ): Stream[V, S & Async] = Stream: Channel.use[Maybe[Chunk[V]]](bufferSize, Access.MultiProducerMultiConsumer): channel => @@ -243,6 +246,7 @@ object StreamCoreExtensions: using Isolate[S, Sync, S], Tag[Emit[Chunk[V]]], + Tag[Abort[E]], Frame ): Stream[V, S & Async] = Stream: @@ -278,7 +282,8 @@ object StreamCoreExtensions: Isolate[S, Sync, S], ConcreteTag[E], Tag[Emit[Chunk[V]]], - Frame + Frame, + Tag[Abort[E]] ): Stream[V, Abort[E] & S & Async] = val streams: Seq[Stream[V, Abort[E] & S & Async]] = Seq(stream, other) Stream.collectAll[V, E, S](streams) @@ -299,7 +304,8 @@ object StreamCoreExtensions: Isolate[S, Sync, S], ConcreteTag[E], Tag[Emit[Chunk[V]]], - Frame + Frame, + Tag[Abort[E]] ): Stream[V, Abort[E] & S & S2 & Async] = Stream.collectAllHalting[V, E, S](Seq(stream, other)) @@ -318,8 +324,9 @@ object StreamCoreExtensions: )( using Isolate[S, Sync, S], - ConcreteTag[E], + Tag[Abort[E | Closed]], Tag[Emit[Chunk[V]]], + ConcreteTag[E], Frame ): Stream[V, Abort[E] & S & Async] = Stream: @@ -350,11 +357,12 @@ object StreamCoreExtensions: )( using i: Isolate[S, Sync, S], - sct: ConcreteTag[E], - t: Tag[Emit[Chunk[V]]], + t: Tag[Abort[E | Closed]], + t2: Tag[Emit[Chunk[V]]], + t3: ConcreteTag[E], f: Frame ): Stream[V, Abort[E] & S & Async] = - other.mergeHaltingLeft(stream)(using i, sct, t, f) + other.mergeHaltingLeft(stream)(using i, t, t2, t3, f) /** Applies effectful transformation of stream elements asynchronously, mapping them in parallel. Preserves chunk boundaries. * @@ -370,6 +378,8 @@ object StreamCoreExtensions: t1: Tag[Emit[Chunk[V]]], t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], + t4: Tag[Abort[E | Closed]], + t5: Tag[Abort[E]], i: Isolate[S & S2, Sync, S & S2], ev: ConcreteTag[E | Closed], frame: Frame @@ -406,8 +416,9 @@ object StreamCoreExtensions: // When finished, set output channel to close once it's drained onSuccess = _ => channelOut.closeAwaitEmpty.unit, onFail = { - case _: Closed => bug("buffer closed unexpectedly") - case e: E @unchecked => cleanup.andThen(Abort.fail(e)) + case _: Closed => bug("buffer closed unexpectedly") + case e: E @unchecked => + cleanup.andThen(Abort.fail[E](e)) }, onPanic = e => cleanup.andThen(Abort.panic(e)) )(handleEmit) @@ -418,7 +429,10 @@ object StreamCoreExtensions: channelOut.take.map: chunkFiber => chunkFiber.get.map: chunk => if chunk.nonEmpty then Emit.value(chunk) else Kyo.unit - Abort.run[Closed](emit).unit + Abort.run[E | Closed](emit).map: + case Result.Failure(_: Closed) => () + case Result.Failure(e: E @unchecked) => Abort.fail[E](e) + case _ => () end emitResults // Stream from output channel, running handlers in background @@ -438,11 +452,13 @@ object StreamCoreExtensions: t1: Tag[Emit[Chunk[V]]], t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], + t4: Tag[Abort[E | Closed]], + t5: Tag[Abort[E]], i: Isolate[S & S2, Sync, S & S2], ev: ConcreteTag[E | Closed], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = - mapPar(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, i, ev, frame) + mapPar(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, t4, t5, i, ev, frame) /** Applies effectful transformation of stream elements asynchronously, mapping them in parallel. Does not preserve chunk * boundaries. @@ -459,6 +475,8 @@ object StreamCoreExtensions: t1: Tag[Emit[Chunk[V]]], t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], + t4: Tag[Abort[E | Closed]], + t5: Tag[Abort[E]], i: Isolate[S & S2, Sync, S & S2], ev: ConcreteTag[E | Closed], frame: Frame @@ -508,7 +526,7 @@ object StreamCoreExtensions: onSuccess = _ => channelOut.closeAwaitEmpty.unit, onFail = { case _: Closed => bug("buffer closed unexpectedly") - case e: E @unchecked => cleanup.andThen(Abort.fail(e)) + case e: E @unchecked => cleanup.andThen(Abort.fail[E](e)) }, onPanic = e => cleanup.andThen(Abort.panic(e)) )(Async.foreachDiscard(Seq(handleEmit, handlePar))(identity).unit) @@ -531,11 +549,13 @@ object StreamCoreExtensions: t1: Tag[Emit[Chunk[V]]], t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], + t4: Tag[Abort[E | Closed]], + t5: Tag[Abort[E]], i: Isolate[S & S2, Sync, S & S2], ev: ConcreteTag[E | Closed], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = - mapParUnordered(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, i, ev, frame) + mapParUnordered(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, t4, t5, i, ev, frame) /** Applies effectful transformation of stream chunks asynchronously, mapping chunks in parallel. Preserves chunk boundaries. * @@ -554,6 +574,8 @@ object StreamCoreExtensions: t1: Tag[Emit[Chunk[V]]], t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], + t4: Tag[Abort[E | Closed]], + t5: Tag[Abort[E]], i: Isolate[S & S2, Sync, S & S2], ev: ConcreteTag[E | Closed], frame: Frame @@ -588,7 +610,7 @@ object StreamCoreExtensions: onSuccess = _ => channelOut.closeAwaitEmpty.unit, onFail = { case _: Closed => bug("buffer closed unexpectedly") - case e: E @unchecked => cleanup.andThen(Abort.fail(e)) + case e: E @unchecked => cleanup.andThen(Abort.fail[E](e)) }, onPanic = e => cleanup.andThen(Abort.panic(e)) )(handleEmit) @@ -599,7 +621,9 @@ object StreamCoreExtensions: channelOut.take.map: chunkFiber => chunkFiber.use: chunk => if chunk.nonEmpty then Emit.value(chunk) else Kyo.unit - Abort.run[Closed](emit).unit + Abort.run[E | Closed](emit).map: + case Result.Failure(e: E @unchecked) => Abort.fail[E](e) + case _ => () end emitResults // Stream from output channel, running handlers in background @@ -619,11 +643,13 @@ object StreamCoreExtensions: t1: Tag[Emit[Chunk[V]]], t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], + t4: Tag[Abort[E | Closed]], + t5: Tag[Abort[E]], i: Isolate[S & S2, Sync, S & S2], ev: ConcreteTag[E | Closed], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = - mapChunkPar(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, i, ev, frame) + mapChunkPar(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, t4, t5, i, ev, frame) /** Applies effectful transformation of stream chunks asynchronously, mapping chunks in parallel. Does not preserve chunk * boundaries. @@ -646,6 +672,8 @@ object StreamCoreExtensions: t1: Tag[Emit[Chunk[V]]], t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], + t4: Tag[Abort[E | Closed]], + t5: Tag[Abort[E]], i: Isolate[S & S2, Sync, S & S2], ev: ConcreteTag[E | Closed], frame: Frame @@ -695,7 +723,7 @@ object StreamCoreExtensions: onSuccess = _ => channelOut.closeAwaitEmpty.unit, onFail = { case _: Closed => bug("buffer closed unexpectedly") - case e: E @unchecked => cleanup.andThen(Abort.fail(e)) + case e: E @unchecked => cleanup.andThen(Abort.fail[E](e)) }, onPanic = e => cleanup.andThen(Abort.panic(e)) )(Async.foreachDiscard(Seq(handleEmit, handlePar))(identity)) @@ -727,11 +755,13 @@ object StreamCoreExtensions: t1: Tag[Emit[Chunk[V]]], t2: Tag[Emit[Chunk[V2]]], t3: Tag[V2], + t4: Tag[Abort[E | Closed]], + t5: Tag[Abort[E]], i: Isolate[S & S2, Sync, S & S2], ev: ConcreteTag[E | Closed], frame: Frame ): Stream[V2, Abort[E] & Async & S & S2] = - mapChunkParUnordered(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, i, ev, frame) + mapChunkParUnordered(Async.defaultConcurrency, defaultAsyncStreamBufferSize)(f)(using t1, t2, t3, t4, t5, i, ev, frame) /** Broadcast to two streams that can be evaluated in parallel. Original stream begins to run as soon as either of the original * streams does. @@ -747,7 +777,8 @@ object StreamCoreExtensions: t1: Tag[V], t2: Tag[Emit[Chunk[V]]], t3: Tag[Emit[Chunk[Chunk[V]]]], - t4: ConcreteTag[E], + t4: Tag[Abort[E]], + t5: ConcreteTag[E], fr: Frame ): (Stream[V, Abort[E] & Async], Stream[V, Abort[E] & Scope & Async]) < (Scope & Async & S) = broadcastDynamicWith(bufferSize) { streamHub => @@ -755,7 +786,7 @@ object StreamCoreExtensions: s1 <- streamHub.subscribe s2 <- streamHub.subscribe yield (s1, s2) - }(using i, t1, t2, t3, t4, fr) + }(using i, t1, t2, t3, t4, t5, fr) /** Broadcast to three streams that can be evaluated in parallel. */ @@ -765,7 +796,8 @@ object StreamCoreExtensions: t1: Tag[V], t2: Tag[Emit[Chunk[V]]], t3: Tag[Emit[Chunk[Chunk[V]]]], - t4: ConcreteTag[E], + t4: Tag[Abort[E]], + t5: ConcreteTag[E], fr: Frame ): ( Stream[V, Abort[E] & Async], @@ -778,7 +810,7 @@ object StreamCoreExtensions: s2 <- streamHub.subscribe s3 <- streamHub.subscribe yield (s1, s2, s3) - }(using i, t1, t2, t3, t4, fr) + }(using i, t1, t2, t3, t4, t5, fr) /** Broadcast to four streams that can be evaluated in parallel. */ @@ -788,7 +820,8 @@ object StreamCoreExtensions: t1: Tag[V], t2: Tag[Emit[Chunk[V]]], t3: Tag[Emit[Chunk[Chunk[V]]]], - t4: ConcreteTag[E], + t4: Tag[Abort[E]], + t5: ConcreteTag[E], fr: Frame ): ( Stream[V, Abort[E] & Async], @@ -803,7 +836,7 @@ object StreamCoreExtensions: s3 <- streamHub.subscribe s4 <- streamHub.subscribe yield (s1, s2, s3, s4) - }(using i, t1, t2, t3, t4, fr) + }(using i, t1, t2, t3, t4, t5, fr) /** Broadcast to five streams that can be evaluated in parallel. */ @@ -813,7 +846,8 @@ object StreamCoreExtensions: t1: Tag[V], t2: Tag[Emit[Chunk[V]]], t3: Tag[Emit[Chunk[Chunk[V]]]], - t4: ConcreteTag[E], + t4: Tag[Abort[E]], + t5: ConcreteTag[E], fr: Frame ): ( Stream[V, Abort[E] & Async], @@ -830,7 +864,7 @@ object StreamCoreExtensions: s4 <- streamHub.subscribe s5 <- streamHub.subscribe yield (s1, s2, s3, s4, s5) - }(using i, t1, t2, t3, t4, fr) + }(using i, t1, t2, t3, t4, t5, fr) /** Broadcast to a specified number of streams that can be evaluated in parallel. * @@ -847,7 +881,8 @@ object StreamCoreExtensions: t1: Tag[V], t2: Tag[Emit[Chunk[V]]], t3: Tag[Emit[Chunk[Chunk[V]]]], - t4: ConcreteTag[E], + t4: Tag[Abort[E]], + t5: ConcreteTag[E], fr: Frame ): Chunk[Stream[V, Abort[E] & Scope & Async]] < (Scope & Async & S) = broadcastDynamicWith(bufferSize) { streamHub => @@ -858,7 +893,7 @@ object StreamCoreExtensions: else streamHub.subscribe.map: stream => Sync.defer(builder.addOne(stream)).andThen(Loop.continue(remaining - 1)) - }(using i, t1, t2, t3, t4, fr) + }(using i, t1, t2, t3, t4, t5, fr) /** Convert to a reusable stream that can be run multiple times in parallel to consume the same original elements. Original stream * begins to run as soon as the broadcasted stream is run for the first time. @@ -879,7 +914,8 @@ object StreamCoreExtensions: t1: Tag[V], t2: Tag[Emit[Chunk[V]]], t3: Tag[Emit[Chunk[Chunk[V]]]], - t4: ConcreteTag[E], + t4: Tag[Abort[E]], + t5: ConcreteTag[E], fr: Frame ): Stream[V, Abort[E] & Async & Scope] < (Scope & Async & S) = broadcastDynamic(bufferSize).map: streamHub => @@ -905,7 +941,8 @@ object StreamCoreExtensions: t1: Tag[V], t2: Tag[Emit[Chunk[V]]], t3: Tag[Emit[Chunk[Chunk[V]]]], - t4: ConcreteTag[E], + t4: Tag[Abort[E]], + t5: ConcreteTag[E], fr: Frame ): StreamHub[V, E] < (Scope & Async & S) = Latch.initWith(1): latch => @@ -931,7 +968,8 @@ object StreamCoreExtensions: t1: Tag[V], t2: Tag[Emit[Chunk[V]]], t3: Tag[Emit[Chunk[Chunk[V]]]], - t4: ConcreteTag[E], + t4: Tag[Abort[E]], + t5: ConcreteTag[E], fr: Frame ): A < (Scope & Async & S & S1) = StreamHubImpl.init[V, E](bufferSize).map: streamHub => @@ -956,6 +994,7 @@ object StreamCoreExtensions: t2: Tag[Emit[Chunk[V]]], t3: Tag[Emit[Chunk[Chunk[V]]]], t4: ConcreteTag[E], + t: Tag[Abort[E]], fr: Frame ): A < (Scope & Async & S & S1) = StreamHubImpl.init[V, E](defaultAsyncStreamBufferSize).map: streamHub => @@ -978,6 +1017,8 @@ object StreamCoreExtensions: def groupedWithin(maxSize: Int, maxTime: Duration, bufferSize: Int = defaultAsyncStreamBufferSize)(using t1: Tag[Emit[Chunk[V]]], t2: Tag[Emit[Chunk[Chunk[V]]]], + t3: Tag[Abort[E]], + t4: Tag[Abort[E | Closed]], i: Isolate[S, Sync, S], ct: ConcreteTag[Closed | E], fr: Frame diff --git a/kyo-core/shared/src/main/scala/kyo/System.scala b/kyo-core/shared/src/main/scala/kyo/System.scala index 6b6174f9d..ecb325ac6 100644 --- a/kyo-core/shared/src/main/scala/kyo/System.scala +++ b/kyo-core/shared/src/main/scala/kyo/System.scala @@ -37,8 +37,8 @@ import java.time.format.DateTimeParseException */ abstract class System extends Serializable: def unsafe: System.Unsafe - def env[E, A](name: String)(using Parser[E, A], Frame): Maybe[A] < (Abort[E] & Sync) - def property[E, A](name: String)(using Parser[E, A], Frame): Maybe[A] < (Abort[E] & Sync) + def env[E, A](name: String)(using Parser[E, A], Frame, Tag[Abort[E]]): Maybe[A] < (Abort[E] & Sync) + def property[E, A](name: String)(using Parser[E, A], Frame, Tag[Abort[E]]): Maybe[A] < (Abort[E] & Sync) def lineSeparator(using Frame): String < Sync def userName(using Frame): String < Sync def operatingSystem(using Frame): System.OS < Sync @@ -63,13 +63,13 @@ object System: def apply(u: Unsafe): System = new System: - def env[E, A](name: String)(using p: Parser[E, A], frame: Frame): Maybe[A] < (Abort[E] & Sync) = + def env[E, A](name: String)(using p: Parser[E, A], frame: Frame, t: Tag[Abort[E]]): Maybe[A] < (Abort[E] & Sync) = Sync.Unsafe { u.env(name) match case Absent => Absent case Present(v) => Abort.get(p(v).map(Maybe(_))) } - def property[E, A](name: String)(using p: Parser[E, A], frame: Frame): Maybe[A] < (Abort[E] & Sync) = + def property[E, A](name: String)(using p: Parser[E, A], frame: Frame, t: Tag[Abort[E]]): Maybe[A] < (Abort[E] & Sync) = Sync.Unsafe { u.property(name) match case Absent => Absent @@ -132,7 +132,9 @@ object System: * @return * A `Maybe` containing the parsed value if it exists, or `Maybe.empty` otherwise. */ - def env[A](using Frame)[E](name: String)(using parser: Parser[E, A], reduce: Reducible[Abort[E]]): Maybe[A] < (reduce.SReduced & Sync) = + def env[A](using + Frame + )[E](name: String)(using parser: Parser[E, A], reduce: Reducible[Abort[E]], t: Tag[Abort[E]]): Maybe[A] < (reduce.SReduced & Sync) = reduce(local.use(_.env[E, A](name))) /** Retrieves an environment variable with a default value. @@ -151,7 +153,8 @@ object System: )[E](name: String, default: => A)( using parser: Parser[E, A], - reduce: Reducible[Abort[E]] + reduce: Reducible[Abort[E]], + t: Tag[Abort[E]] ): A < (reduce.SReduced & Sync) = reduce(local.use(_.env[E, A](name).map(_.getOrElse(default)))) @@ -169,7 +172,8 @@ object System: )[E](name: String)( using parser: Parser[E, A], - reduce: Reducible[Abort[E]] + reduce: Reducible[Abort[E]], + t: Tag[Abort[E]] ): Maybe[A] < (reduce.SReduced & Sync) = reduce(local.use(_.property[E, A](name))) @@ -189,7 +193,8 @@ object System: )[E](name: String, default: => A)( using parser: Parser[E, A], - reduce: Reducible[Abort[E]] + reduce: Reducible[Abort[E]], + t: Tag[Abort[E]] ): A < (reduce.SReduced & Sync) = reduce(local.use(_.property[E, A](name).map(_.getOrElse(default)))) diff --git a/kyo-core/shared/src/main/scala/kyo/internal/BaseKyoCoreTest.scala b/kyo-core/shared/src/main/scala/kyo/internal/BaseKyoCoreTest.scala index 22fff7a1e..eae49e984 100644 --- a/kyo-core/shared/src/main/scala/kyo/internal/BaseKyoCoreTest.scala +++ b/kyo-core/shared/src/main/scala/kyo/internal/BaseKyoCoreTest.scala @@ -10,7 +10,7 @@ private[kyo] trait BaseKyoCoreTest extends BaseKyoKernelTest[Abort[Any] & Async Scope.run, Abort.recover[Any] { case ex: Throwable => throw ex - case e => throw new IllegalStateException(s"Test aborted with $e") + case e => throw new IllegalStateException(s"Test aborted with [$e]") }, Async.timeout(timeout), Fiber.initUnscoped, diff --git a/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala b/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala index 3eebd18a0..4bdf1be15 100644 --- a/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/KyoAppTest.scala @@ -103,6 +103,27 @@ class KyoAppTest extends Test: case _ => fail("Unexpected Success...") } + "env" in runNotJS { + type F1[S] = (Int, String) => String < S + type F2[S] = String => Int < S + + def f1Impl(num: Int, str: String): String < Async = num.toString + str + def f2Impl(str: String): Int < Abort[String] = str.toIntOption match + case Some(num) => num + case None => Abort.fail(str) + + def fn[S](num: Int, str: String) = + for + str <- Env.use[F1[S]](_(num, str)) + res <- Env.use[F2[S]](_(str)) + yield res + + object Main extends KyoApp: + run: + Env.run(f1Impl)(Env.run(f2Impl)(Abort.run[String](fn(4, "something")))) + Main.main(Array.empty) + succeed + } "effect mismatch" in { typeCheckFailure(""" new KyoApp: diff --git a/kyo-core/shared/src/test/scala/kyo/SystemTest.scala b/kyo-core/shared/src/test/scala/kyo/SystemTest.scala index bf1b32a36..0a231fdce 100644 --- a/kyo-core/shared/src/test/scala/kyo/SystemTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/SystemTest.scala @@ -85,9 +85,9 @@ class SystemTest extends Test: "custom System implementation" in run { val customSystem = new System: def unsafe: Unsafe = ??? - def env[E, A](name: String)(using Parser[E, A], Frame): Maybe[A] < (Abort[E] & Sync) = + def env[E, A](name: String)(using Parser[E, A], Frame, Tag[Abort[E]]): Maybe[A] < (Abort[E] & Sync) = Sync.defer(Maybe("custom_env").asInstanceOf[Maybe[A]]) - def property[E, A](name: String)(using Parser[E, A], Frame): Maybe[A] < (Abort[E] & Sync) = + def property[E, A](name: String)(using Parser[E, A], Frame, Tag[Abort[E]]): Maybe[A] < (Abort[E] & Sync) = Sync.defer(Maybe("custom_property").asInstanceOf[Maybe[A]]) def lineSeparator(using Frame): String < Sync = Sync.defer("custom_separator") def userName(using Frame): String < Sync = Sync.defer("custom_user") diff --git a/kyo-prelude/shared/src/main/scala/kyo/Abort.scala b/kyo-prelude/shared/src/main/scala/kyo/Abort.scala index 752e1fd5e..04a1d9890 100644 --- a/kyo-prelude/shared/src/main/scala/kyo/Abort.scala +++ b/kyo-prelude/shared/src/main/scala/kyo/Abort.scala @@ -44,7 +44,6 @@ object Abort: import internal.* given eliminateAbort: Reducible.Eliminable[Abort[Nothing]] with {} - private inline def erasedTag[E]: Tag[Abort[E]] = Tag[Abort[Any]].asInstanceOf[Tag[Abort[E]]] /** Fails the computation with the given value. * @@ -53,7 +52,7 @@ object Abort: * @return * A computation that immediately fails with the given value */ - inline def fail[E](inline value: E)(using inline frame: Frame): Nothing < Abort[E] = + inline def fail[E](inline value: E)(using Frame, Tag[Abort[E]]): Nothing < Abort[E] = error(Failure(value)) /** Fails the computation with a panic value (unchecked exception). @@ -66,8 +65,11 @@ object Abort: inline def panic[E](inline ex: Throwable)(using inline frame: Frame): Nothing < Abort[E] = error(Panic(ex)) - inline def error[E](inline error: Error[E])(using inline frame: Frame): Nothing < Abort[E] = - ArrowEffect.suspendWith[Any](erasedTag[E], error)(_ => ???) + inline def error[E](inline panic: Panic)(using inline f: Frame): Nothing < Abort[E] = + ArrowEffect.suspendWith[Any](Tag[Abort[Nothing]], panic)(_ => ???) + + inline def error[E](inline error: Error[E])(using inline f: Frame, inline t: Tag[Abort[E]]): Nothing < Abort[E] = + ArrowEffect.suspendWith[Any](t, error)(_ => ???) /** Fails the computation if the condition is true. * @@ -78,7 +80,10 @@ object Abort: * @return * A unit computation that may fail if the condition is true */ - inline def when[E, S](b: Boolean < S)(inline value: => E < S)(using inline frame: Frame): Unit < (Abort[E] & S) = + inline def when[E, S](b: Boolean < S)(inline value: => E < S)(using + inline frame: Frame, + inline tag: Tag[Abort[E]] + ): Unit < (Abort[E] & S) = ensuring(b.map(!_), ())(value) /** Fails the computation if the condition is false. @@ -90,7 +95,10 @@ object Abort: * @return * A unit computation that may fail if the condition is false */ - inline def unless[E, S](b: Boolean < S)(inline value: => E < S)(using inline frame: Frame): Unit < (Abort[E] & S) = + inline def unless[E, S](b: Boolean < S)(inline value: => E < S)(using + inline frame: Frame, + inline tag: Tag[Abort[E]] + ): Unit < (Abort[E] & S) = ensuring(b, ())(value) /** Ensures a condition is met before returning the provided result. @@ -109,7 +117,8 @@ object Abort: * A computation that succeeds with the result if the condition is true, or fails with the given value if it's false */ inline def ensuring[E, A, S](cond: Boolean < S, result: => A < S)(inline value: => E < S)(using - inline frame: Frame + inline frame: Frame, + inline tag: Tag[Abort[E]] ): A < (Abort[E] & S) = cond.map { case true => result @@ -123,7 +132,7 @@ object Abort: * @return * A computation that succeeds with the Right value or fails with the Left value */ - inline def get[E](using inline frame: Frame)[A](either: Either[E, A]): A < Abort[E] = + inline def get[E](using inline frame: Frame, inline tag: Tag[Abort[E]])[A](either: Either[E, A]): A < Abort[E] = either match case Right(value) => value case Left(value) => fail(value) @@ -135,7 +144,7 @@ object Abort: * @return * A computation that succeeds with the Some value or fails with Absent */ - inline def get[A](opt: Option[A])(using inline frame: Frame): A < Abort[Absent] = + inline def get[A](opt: Option[A])(using inline frame: Frame, inline tag: Tag[Abort[Absent]]): A < Abort[Absent] = opt match case None => fail(Absent) case Some(v) => v @@ -147,7 +156,7 @@ object Abort: * @return * A computation that succeeds with the Success value or fails with the Failure exception */ - inline def get[A](e: scala.util.Try[A])(using inline frame: Frame): A < Abort[Throwable] = + inline def get[A](e: scala.util.Try[A])(using inline frame: Frame, inline tag: Tag[Abort[Throwable]]): A < Abort[Throwable] = e match case scala.util.Success(t) => t case scala.util.Failure(v) => fail(v) @@ -159,7 +168,7 @@ object Abort: * @return * A computation that succeeds with the Success value or fails with the Failure value */ - inline def get[E](using inline frame: Frame)[A](r: Result[E, A]): A < Abort[E] = + inline def get[E](using Frame, Tag[Abort[E]])[A](r: Result[E, A]): A < Abort[E] = r.foldError(identity, Abort.error) /** Lifts a Maybe into the Abort effect. @@ -195,6 +204,7 @@ object Abort: )( using ct: ConcreteTag[E], + t: Tag[Abort[E]], reduce: Reducible[Abort[ER]] ): B < (S & reduce.SReduced & S2) = reduce { @@ -208,7 +218,7 @@ object Abort: Abort[ER] & S, S2 ]( - erasedTag[E], + t, v.map(Result.succeed[E, A](_)) )( accept = [C] => @@ -243,6 +253,7 @@ object Abort: )[A, S, ER](v: => A < (Abort[E | ER] & S))( using ct: ConcreteTag[E], + t: Tag[Abort[E]], reduce: Reducible[Abort[ER]] ): Result[E, A] < (S & reduce.SReduced) = runWith[E](v)(identity) @@ -263,10 +274,12 @@ object Abort: def runPartial[E]( using Frame )[A, S, ER](v: => A < (Abort[E | ER] & S))( - using ct: ConcreteTag[E] + using + ct: ConcreteTag[E], + t: Tag[Abort[E]] ): Result.Partial[E, A] < (S & Abort[ER]) = Abort.runWith[E](v): - case panic: Panic => Abort.error(panic) + case panic: Panic => Abort.error[Nothing](panic) case other: Partial[E, A] @unchecked => other /** Completely handles an Abort effect, converting it to a partial Result and throwing any Panic exceptions. @@ -285,6 +298,7 @@ object Abort: def runPartialOrThrow[E, A, S](v: => A < (Abort[E] & S))( using ct: ConcreteTag[E], + t: Tag[Abort[E]], frame: Frame ): Result.Partial[E, A] < S = Abort.runWith[E](v): @@ -308,12 +322,13 @@ object Abort: )[A, B, S, ER](onFail: E => B < S)(v: => A < (Abort[E | ER] & S))( using ct: ConcreteTag[E], + t: Tag[Abort[E]], reduce: Reducible[Abort[ER]] ): (A | B) < (S & reduce.SReduced & Abort[Nothing]) = runWith[E](v): case Success(a) => a case Failure(e) => onFail(e) - case panic: Panic => Abort.error(panic) + case panic: Panic => Abort.error[Nothing](panic) /** Recovers from an Abort failure or panic by applying the provided functions. * @@ -334,6 +349,7 @@ object Abort: )[A, B, S, ER](onFail: E => B < S, onPanic: Throwable => B < S)(v: => A < (Abort[E | ER] & S))( using ct: ConcreteTag[E], + t: Tag[Abort[E]], reduce: Reducible[Abort[ER]] ): (A | B) < (S & reduce.SReduced) = runWith[E](v): @@ -354,7 +370,8 @@ object Abort: def recoverOrThrow[A, E, B, S](onFail: E => B < S)(v: => A < (Abort[E] & S))( using frame: Frame, - ct: ConcreteTag[E] + ct: ConcreteTag[E], + t: Tag[Abort[E]] ): (A | B) < S = runWith[E](v): case Success(a) => a @@ -378,6 +395,7 @@ object Abort: )[A, B, S, ER](onError: Error[E] => B < S)(v: => A < (Abort[E | ER] & S))( using ct: ConcreteTag[E], + t: Tag[Abort[E]], reduce: Reducible[Abort[ER]] ): (A | B) < (S & reduce.SReduced & Abort[Nothing]) = runWith[E](v)(_.foldError(identity, onError)) @@ -401,11 +419,11 @@ object Abort: )[A, B, S, ER]( onSuccess: A => B < S, onFail: E => B < S - )(v: => A < (Abort[E | ER] & S))(using ct: ConcreteTag[E]): B < (S & Abort[ER]) = + )(v: => A < (Abort[E | ER] & S))(using ct: ConcreteTag[E], t1: Tag[Abort[E]], t2: Tag[Abort[ER]]): B < (S & Abort[ER]) = runWith[E](v): case Success(a) => onSuccess(a) case Failure(e) => onFail(e) - case panic: Panic => Abort.error(panic) + case panic: Panic => Abort.error[Nothing](panic) /** Recovers from an Abort failure by applying the provided function. * @@ -431,6 +449,7 @@ object Abort: )(v: => A < (Abort[E | ER] & S))( using ct: ConcreteTag[E], + t: Tag[Abort[E]], reduce: Reducible[Abort[ER]] ): B < (S & reduce.SReduced) = runWith[E](v): @@ -455,7 +474,8 @@ object Abort: def foldOrThrow[A, B, E, S](onSuccess: A => B < S, onFail: E => B < S)(v: => A < (Abort[E] & S))( using frame: Frame, - ct: ConcreteTag[E] + ct: ConcreteTag[E], + t: Tag[Abort[E]] ): B < S = runWith[E](v): case Success(a) => onSuccess(a) @@ -484,6 +504,7 @@ object Abort: )(v: => A < (Abort[E | ER] & S))( using ct: ConcreteTag[E], + t: Tag[Abort[E]], reduce: Reducible[Abort[ER]] ): B < (S & reduce.SReduced) = runWith[E](v)(_.foldError(onSuccess, onError)) @@ -501,7 +522,7 @@ object Abort: */ def catching[E]( using Frame - )[A, S](v: => A < S)(using ct: ConcreteTag[E]): A < (Abort[E] & S) = + )[A, S](v: => A < S)(using ct: ConcreteTag[E], t: Tag[Abort[E]]): A < (Abort[E] & S) = Effect.catching(v) { case ct(ex) => Abort.fail(ex) case ex => Abort.panic(ex) @@ -521,7 +542,9 @@ object Abort: def catching[E]( using Frame )[A, S, E1](f: E => E1)(v: => A < S)( - using ct: ConcreteTag[E] + using + ct: ConcreteTag[E], + t: Tag[Abort[E1]] ): A < (Abort[E1] & S) = Effect.catching(v) { case ct(ex) => Abort.fail(f(ex)) @@ -549,7 +572,7 @@ object Abort: * @return * A computation that immediately fails with the given literal value, preserving its exact type */ - inline def fail[V <: Singleton](inline value: V)(using inline frame: Frame): Nothing < Abort[V] = + inline def fail[V <: Singleton](inline value: V)(using inline frame: Frame, inline tag: Tag[Abort[V]]): Nothing < Abort[V] = Abort.fail(value) /** Fails the computation if the condition is true, using a literal failure value. @@ -561,7 +584,10 @@ object Abort: * @return * A unit computation that may fail with the literal value if the condition is true */ - inline def when[V <: Singleton, S](b: Boolean < S)(inline value: V)(using inline frame: Frame): Unit < (Abort[V] & S) = + inline def when[V <: Singleton, S](b: Boolean < S)(inline value: V)(using + inline frame: Frame, + inline tag: Tag[Abort[V]] + ): Unit < (Abort[V] & S) = Abort.when(b)(value) /** Fails the computation if the condition is false, using a literal failure value. @@ -573,7 +599,10 @@ object Abort: * @return * A unit computation that may fail with the literal value if the condition is false */ - inline def unless[V <: Singleton, S](b: Boolean < S)(inline value: V)(using inline frame: Frame): Unit < (Abort[V] & S) = + inline def unless[V <: Singleton, S](b: Boolean < S)(inline value: V)(using + inline frame: Frame, + inline tag: Tag[Abort[V]] + ): Unit < (Abort[V] & S) = Abort.unless(b)(value) /** Ensures a condition is met before returning the provided result, using a literal failure value. @@ -590,7 +619,8 @@ object Abort: * A computation that succeeds with the result if the condition is true, or fails with the literal value if it's false */ inline def ensuring[V <: Singleton, A, S](cond: Boolean < S, result: => A < S)(inline value: V)(using - inline frame: Frame + inline frame: Frame, + inline tag: Tag[Abort[V]] ): A < (Abort[V] & S) = Abort.ensuring(cond, result)(value) diff --git a/kyo-prelude/shared/src/test/scala/kyo/AbortTest.scala b/kyo-prelude/shared/src/test/scala/kyo/AbortTest.scala index b7a00e468..e8e12ac15 100644 --- a/kyo-prelude/shared/src/test/scala/kyo/AbortTest.scala +++ b/kyo-prelude/shared/src/test/scala/kyo/AbortTest.scala @@ -1282,7 +1282,7 @@ class AbortTest extends Test: } "generic" in { - def test[A](a: A): Nothing < Abort[A] = + def test[A](a: A)(using Tag[Abort[a.type]]): Nothing < Abort[A] = Abort.literal.fail(a) val result = test(1) diff --git a/kyo-stm/shared/src/main/scala/kyo/STM.scala b/kyo-stm/shared/src/main/scala/kyo/STM.scala index b90b42322..f7b047e4a 100644 --- a/kyo-stm/shared/src/main/scala/kyo/STM.scala +++ b/kyo-stm/shared/src/main/scala/kyo/STM.scala @@ -67,9 +67,14 @@ object STM: * @return * The result of the computation if successful */ - def run[E: ConcreteTag, A, S]( + def run[E, A, S]( using Isolate[S, Async & Abort[E | FailedTransaction], S] - )(v: A < (STM & Abort[E] & Async & S))(using frame: Frame): A < (S & Async & Abort[E | FailedTransaction]) = + )(v: A < (STM & Abort[E] & Async & S))(using + Frame, + ConcreteTag[E | FailedTransaction], + Tag[Abort[E]], + Tag[Abort[E | FailedTransaction]] + ): A < (S & Async & Abort[E | FailedTransaction]) = run(defaultRetrySchedule)(v) /** Executes a transactional computation with state isolation and the a custom retry schedule. @@ -81,23 +86,31 @@ object STM: * @return * The result of the computation if successful */ - def run[E: ConcreteTag, A, S]( + def run[E, A, S]( using isolate: Isolate[S, Async & Abort[E | FailedTransaction], S] )(retrySchedule: Schedule)(v: A < (STM & Abort[E] & Async & S))( - using frame: Frame + using + Frame, + ConcreteTag[E | FailedTransaction], + Tag[Abort[E]], + Tag[Abort[E | FailedTransaction]] ): A < (S & Async & Abort[E | FailedTransaction]) = isolate.capture { st => isolate.restore(run(retrySchedule)(isolate.isolate(st, v))) } - private def run[E: ConcreteTag, A](retrySchedule: Schedule)(v: A < (STM & Abort[E] & Async))( - using Frame + private def run[E, A](retrySchedule: Schedule)(v: A < (STM & Abort[E] & Async))( + using + Frame, + ConcreteTag[E | FailedTransaction], + Tag[Abort[E]], + Tag[Abort[E | FailedTransaction]] ): A < (Async & Abort[E | FailedTransaction]) = TID.useIO { case -1L => TID.useNew { tid => v.handle( - Abort.recoverError[E] { error => + Abort.recoverError[E | FailedTransaction] { error => // Retry arbitrary E failures in case the transaction is inconsistent Var.use[TRefLog] { log => Sync.Unsafe { @@ -120,7 +133,7 @@ object STM: } } }.handle( - Retry[FailedTransaction](retrySchedule) + Retry[FailedTransaction | E](retrySchedule) ) case parent => // Nested transaction inherits parent's transaction context but isolates RefLog. diff --git a/kyo-sttp/shared/src/main/scala/kyo/Requests.scala b/kyo-sttp/shared/src/main/scala/kyo/Requests.scala index cda92353e..300338fec 100644 --- a/kyo-sttp/shared/src/main/scala/kyo/Requests.scala +++ b/kyo-sttp/shared/src/main/scala/kyo/Requests.scala @@ -78,7 +78,10 @@ object Requests: * @return * The response body wrapped in an effect */ - def apply[E, A](f: BasicRequest => Request[Either[E, A], Any])(using Frame): A < (Async & Abort[FailedRequest | E]) = + def apply[E, A](f: BasicRequest => Request[Either[E, A], Any])(using + Frame, + Tag[Abort[FailedRequest | E]] + ): A < (Async & Abort[FailedRequest | E]) = request(f(basicRequest)) /** Sends an HTTP request @@ -92,7 +95,7 @@ object Requests: * @return * The response body wrapped in an effect */ - def request[E, A](req: Request[Either[E, A], Any])(using Frame): A < (Async & Abort[FailedRequest | E]) = + def request[E, A](req: Request[Either[E, A], Any])(using Frame, Tag[Abort[FailedRequest | E]]): A < (Async & Abort[FailedRequest | E]) = local.use(_.send(req)).map { r => Abort.get(r.body) } diff --git a/kyo-tapir/shared/src/main/scala/kyo/Routes.scala b/kyo-tapir/shared/src/main/scala/kyo/Routes.scala index 35c2f0be8..a8a0fb1c7 100644 --- a/kyo-tapir/shared/src/main/scala/kyo/Routes.scala +++ b/kyo-tapir/shared/src/main/scala/kyo/Routes.scala @@ -51,7 +51,7 @@ object Routes: */ def add[A: Tag, I, E: ConcreteTag, O](e: Endpoint[A, I, E, O, Any])( f: I => O < (Async & Env[A] & Abort[E]) - )(using Frame): Unit < Routes = + )(using Frame, Tag[Abort[E]]): Unit < Routes = Emit.value( Route( e.serverSecurityLogic[A, KyoSttpMonad.M](a => Right(a)).serverLogic((a: A) => @@ -78,7 +78,7 @@ object Routes: e: PublicEndpoint[Unit, Unit, Unit, Any] => Endpoint[A, I, E, O, Any] )( f: I => O < (Async & Env[A] & Abort[E]) - )(using Frame): Unit < Routes = + )(using Frame, Tag[Abort[E]]): Unit < Routes = add(e(endpoint))(f) /** Collects multiple route initializations into a single Routes effect. diff --git a/kyo-zio/shared/src/main/scala/kyo/ZIOs.scala b/kyo-zio/shared/src/main/scala/kyo/ZIOs.scala index d826c7dd9..6fe99fc04 100644 --- a/kyo-zio/shared/src/main/scala/kyo/ZIOs.scala +++ b/kyo-zio/shared/src/main/scala/kyo/ZIOs.scala @@ -21,7 +21,7 @@ object ZIOs: * @return * A Kyo effect that, when run, will execute the zio.ZIO */ - def get[E, A](v: => ZIO[Any, E, A])(using f: Frame, t: Trace): A < (Abort[E] & Async) = + def get[E, A](v: => ZIO[Any, E, A])(using f: Frame, t: Trace, tag: Tag[Abort[E]]): A < (Abort[E] & Async) = Sync.Unsafe { Unsafe.unsafely { given ce: CanEqual[E, E] = CanEqual.derived diff --git a/kyo-zio/shared/src/main/scala/kyo/ZLayers.scala b/kyo-zio/shared/src/main/scala/kyo/ZLayers.scala index 0d564fa73..b6160cf9b 100644 --- a/kyo-zio/shared/src/main/scala/kyo/ZLayers.scala +++ b/kyo-zio/shared/src/main/scala/kyo/ZLayers.scala @@ -22,7 +22,7 @@ object ZLayers: * @return * A Kyo Layer that, when run, will instantiate the resource from the ZLayer */ - def get[E, A: ZTag: Tag](layer: => ZLayer[Any, E, A])(using Frame, Trace): Layer[A, Abort[E] & Async & Scope] = + def get[E, A: ZTag: Tag](layer: => ZLayer[Any, E, A])(using Frame, Trace, Tag[Abort[E]]): Layer[A, Abort[E] & Async & Scope] = Layer { Sync.Unsafe { val scope = Unsafe.unsafely(ZScope.unsafe.make) diff --git a/kyo-zio/shared/src/main/scala/kyo/ZStreams.scala b/kyo-zio/shared/src/main/scala/kyo/ZStreams.scala index b82d53841..31f507d1e 100644 --- a/kyo-zio/shared/src/main/scala/kyo/ZStreams.scala +++ b/kyo-zio/shared/src/main/scala/kyo/ZStreams.scala @@ -25,7 +25,8 @@ object ZStreams: def get[E, A](stream: => ZStream[Any, E, A])(using Frame, Trace, - Tag[Emit[Chunk[A]]] + Tag[Emit[Chunk[A]]], + Tag[Abort[E]] ): Stream[A, Abort[E] & Async] = Stream: Sync.defer {