Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1909,23 +1909,23 @@ val b: Fiber[Unit, Any] < Sync =
Clock.repeatWithDelay(
startAfter = 1.minute,
delay = 1.minute
)(a)
)(a).map(_.reduced) // Note: Fiber#reduced simplifies Fiber's second type parameter

// Without an initial delay
val c: Fiber[Unit, Any] < Sync =
Clock.repeatWithDelay(1.minute)(a)
Clock.repeatWithDelay(1.minute)(a).map(_.reduced)

// Schedule at a specific interval, regardless
// of the duration of each execution
val d: Fiber[Unit, Any] < Sync =
Clock.repeatAtInterval(
startAfter = 1.minute,
interval = 1.minute
)(a)
)(a).map(_.reduced)

// Without an initial delay
val e: Fiber[Unit, Any] < Sync =
Clock.repeatAtInterval(1.minute)(a)
Clock.repeatAtInterval(1.minute)(a).map(_.reduced)
```

Use the returned `Fiber` to control scheduled tasks.
Expand All @@ -1935,7 +1935,7 @@ import kyo.*

// Example task
val a: Fiber[Unit, Any] < Sync =
Clock.repeatAtInterval(1.second)(())
Clock.repeatAtInterval(1.second)(()).map(_.reduced)

// Try to cancel a task
def b(task: Fiber[Unit, Any]): Boolean < Sync =
Expand Down Expand Up @@ -2249,7 +2249,7 @@ import kyo.*
// taken by reference and automatically
// suspended with 'Sync'
val a: Fiber[Int, Any] < Sync =
Fiber.initUnscoped(Math.cos(42).toInt)
Fiber.initUnscoped(Math.cos(42).toInt).map(_.reduced)

// It's possible to "extract" the value of a
// 'Fiber' via the 'get' method. This is also
Expand Down Expand Up @@ -2376,7 +2376,7 @@ val h: Future[Int] < Sync =
// 'Fiber' provides a monadic API with both
// 'map' and 'flatMap'
val i: Fiber[Int, Any] < Sync =
a.flatMap(v => Fiber.succeed(v.eval + 1))
a.flatMap(v => Fiber.fromResult(Abort.run(v).eval.map(_ + 1))).map(_.reduced)
```

Similarly to `Sync`, users should avoid handling the `Async` effect directly and rely on `KyoApp` instead. If strictly necessary, there are two methods to handle the `Async` effect:
Expand All @@ -2393,7 +2393,7 @@ val a: Int < Async =

// Avoid handling 'Async' directly
val b: Fiber[Int, Any] < Sync =
Fiber.initUnscoped(a)
Fiber.initUnscoped(a).map(_.reduced)

// The 'runAndBlock' method accepts
// arbitrary pending effects but relies
Expand All @@ -2420,7 +2420,7 @@ val b: Boolean < Sync =
// Fullfil the promise with
// another fiber
val c: Boolean < Sync =
a.map(fiber => Fiber.initUnscoped(1).map(fiber.become(_)))
a.map(fiber => Fiber.initUnscoped(1).map(v => fiber.become(v.reduced)))
```

> A `Promise` is basically a `Fiber` with all the regular functionality plus the `complete` and `become` methods to manually fulfill the promise.
Expand Down
4 changes: 2 additions & 2 deletions kyo-actor/shared/src/main/scala/kyo/Actor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ object Actor:
for
mailbox <-
// Create a bounded channel to serve as the actor's mailbox
Channel.init[A](capacity, Access.MultiProducerSingleConsumer)
Channel.initUnscoped[A](capacity, Access.MultiProducerSingleConsumer)
_subject =
// Create the actor's message interface (Subject)
// Messages sent through this subject are queued in the mailbox
Expand All @@ -452,7 +452,7 @@ object Actor:
}.handle(
Sync.ensure(mailbox.close), // Ensure mailbox cleanup by closing it when the actor completes or fails
Env.run(_subject), // Provide the actor's Subject to the environment so it can be accessed via Actor.self
Scope.run, // Close used resources
Scope.run, // Clean up resources
Fiber.init // Start the actor's processing loop in an async context
)
yield new Actor[E, A, B](_subject, _consumer):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S))
def fork[S2](
using
isolate: Isolate[S, Sync, S2],
reduce: Reducible[Abort[E]],
frame: Frame
): Fiber[A, reduce.SReduced & S2] < (Sync & S & Scope) =
): Fiber[A, Abort[E] & S2] < (Sync & S & Scope) =
Fiber.init(effect)

/** Forks this computation and uses the resulting fiber within a scoped function [[f]]. Guarantees fiber interruption after usage.
Expand All @@ -32,9 +31,8 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S))
def forkUsing[S2](
using
isolate: Isolate[S, Sync, S2],
reduce: Reducible[Abort[E]],
frame: Frame
)[B, S3](f: Fiber[A, reduce.SReduced & S2] => B < S3): B < (Sync & S & S3) =
)[B, S3](f: Fiber[A, Abort[E] & S2] => B < S3): B < (Sync & S & S3) =
Fiber.use(effect)(f)

/** Forks this computation, returning a fiber. Does not guarantee fiber interruption.
Expand All @@ -47,7 +45,7 @@ extension [A, E, S](effect: A < (Abort[E] & Async & S))
isolate: Isolate[S, Sync, S2],
reduce: Reducible[Abort[E]],
frame: Frame
): Fiber[A, reduce.SReduced & S2] < (Sync & S) =
): Fiber[A, Abort[E] & S2] < (Sync & S) =
Fiber.initUnscoped(effect)

/** Performs this computation and then the next one in parallel, discarding the result of this computation.
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/main/scala/kyo/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ object Channel:
end close

def closeAwaitEmpty()(using Frame, AllowUnsafe) =
Fiber.Unsafe.init(Result.succeed(close().isDefined))
Fiber.Unsafe.init(Result.succeed(close().isDefined)).reduced

def empty()(using AllowUnsafe, Frame) = succeedIfOpen(true)
def full()(using AllowUnsafe, Frame) = succeedIfOpen(true)
Expand Down
60 changes: 20 additions & 40 deletions kyo-core/shared/src/main/scala/kyo/Clock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -429,10 +429,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatWithDelay(Duration.Zero, delay)(f)

/** Repeatedly executes a task with a fixed delay between completions, starting after an initial delay.
Expand All @@ -454,10 +452,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatWithDelay(startAfter, delay, ())(_ => f.unit)

/** Repeatedly executes a task with a fixed delay between completions, maintaining state between executions.
Expand All @@ -484,10 +480,8 @@ object Clock:
)(
f: A => A < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[A, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[A, Abort[E]] < (Sync & S) =
repeatWithDelay(Schedule.delay(startAfter).andThen(Schedule.fixed(delay)), state)(f)

/** Repeatedly executes a task with delays determined by a custom schedule.
Expand All @@ -506,10 +500,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatWithDelay[E, Unit, S](delaySchedule, ())(_ => f.unit)

/** Repeatedly executes a task with delays determined by a custom schedule, maintaining state between executions.
Expand All @@ -533,10 +525,8 @@ object Clock:
)(
f: A => A < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[A, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[A, Abort[E]] < (Sync & S) =
Fiber.initUnscoped {
Clock.use { clock =>
Loop(state, delaySchedule) { (state, schedule) =>
Expand Down Expand Up @@ -569,10 +559,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatAtInterval(Duration.Zero, interval)(f)

/** Repeatedly executes a task at fixed time intervals, starting after an initial delay.
Expand All @@ -594,10 +582,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatAtInterval(startAfter, interval, ())(_ => f.unit)

/** Repeatedly executes a task at fixed time intervals, maintaining state between executions.
Expand All @@ -624,10 +610,8 @@ object Clock:
)(
f: A => A < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[A, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[A, Abort[E]] < (Sync & S) =
repeatAtInterval(Schedule.delay(startAfter).andThen(Schedule.fixed(interval)), state)(f)

/** Repeatedly executes a task with intervals determined by a custom schedule.
Expand All @@ -646,10 +630,8 @@ object Clock:
)(
f: => Any < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[Unit, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[Unit, Abort[E]] < (Sync & S) =
repeatAtInterval(intervalSchedule, ())(_ => f.unit)

/** Repeatedly executes a task with intervals determined by a custom schedule, maintaining state between executions.
Expand All @@ -673,10 +655,8 @@ object Clock:
)(
f: A => A < (Async & Abort[E] & S)
)(
using
frame: Frame,
reduce: Reducible[Abort[E]]
): Fiber[A, reduce.SReduced] < (Sync & S) =
using frame: Frame
): Fiber[A, Abort[E]] < (Sync & S) =
Fiber.initUnscoped {
Clock.use { clock =>
clock.now.map { now =>
Expand Down
32 changes: 17 additions & 15 deletions kyo-core/shared/src/main/scala/kyo/Fiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ opaque type Fiber[+A, -S] = IOPromiseBase[Any, A < (Async & S)]
export Fiber.Promise

object Fiber:

private val _unit = IOPromise(Result.succeed((): Unit < Any))

/** Creates a unit Fiber.
Expand Down Expand Up @@ -91,14 +90,14 @@ object Fiber:
/** Creates a Fiber from a Result.
*
* This method creates a Fiber that is immediately completed with the provided Result. The Fiber will have the same success and error
* types as the Result, with the error type reduced according to the Reducible instance.
* types as the Result.
*
* @param result
* The Result to create the Fiber from
* @return
* A Fiber that is immediately completed with the provided Result
*/
def fromResult[E, A, S](result: Result[E, A < S])(using reduce: Reducible[Abort[E]]): Fiber[A, reduce.SReduced & S] =
def fromResult[E, A, S](result: Result[E, A < S]): Fiber[A, Abort[E] & S] =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a frame?

IOPromise(result)

/** Creates a Fiber from a Future.
Expand Down Expand Up @@ -128,10 +127,8 @@ object Fiber:
)(
v: => A < (Abort[E] & Async & S)
)(
using
reduce: Reducible[Abort[E]],
frame: Frame
): Fiber[A, reduce.SReduced & S2] < (Sync & S & Scope) =
using frame: Frame
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these don't need to be named anymore, since you are not using reduce anymore.

): Fiber[A, Abort[E] & S2] < (Sync & S & Scope) =
Scope.acquireRelease(initUnscoped[E, A, S, S2](v))(_.interrupt)

/** Use an asynchronous computation running in a new Fiber, interrupting the fiber after usage.
Expand All @@ -144,11 +141,10 @@ object Fiber:
def use[E, A, S, S2](
using
isolate: Isolate[S, Sync, S2],
reduce: Reducible[Abort[E]],
frame: Frame
)(
v: => A < (Abort[E] & Async & S)
)[B, S3](f: Fiber[A, reduce.SReduced & S2] => B < S3): B < (Sync & S & S3) =
)[B, S3](f: Fiber[A, Abort[E] & S2] => B < S3): B < (Sync & S & S3) =
initUnscoped[E, A, S, S2](v).map: fiber =>
Sync.ensure(fiber.interrupt)(f(fiber))

Expand All @@ -164,14 +160,12 @@ object Fiber:
)(
v: => A < (Abort[E] & Async & S)
)(
using
reduce: Reducible[Abort[E]],
frame: Frame
): Fiber[A, reduce.SReduced & S2] < (Sync & S) =
using frame: Frame
): Fiber[A, Abort[E] & S2] < (Sync & S) =
Isolate.internal.runDetached((trace, context) =>
isolate.capture { state =>
val io = isolate.isolate(state, v).map(r => Kyo.lift(isolate.restore(r)))
IOTask(io, trace, context).asInstanceOf[Fiber[A, reduce.SReduced & S2]]
IOTask(io, trace, context)
}
)

Expand Down Expand Up @@ -227,6 +221,11 @@ object Fiber:
def waiters(using Frame): Int < Sync =
Sync.Unsafe(Unsafe.waiters(self)())

/** Reduce the Fiber's effect type. Useful especially to convert `Fiber[A, Abort[Nothing]]` to `Fiber[A, Any]`
*/
def reduced(using reduce: Reducible[S]): Fiber[A, reduce.SReduced] =
self.asInstanceOf[Fiber[A, reduce.SReduced]]

def unsafe: Fiber.Unsafe[A, S] =
self
end extension
Expand Down Expand Up @@ -361,7 +360,7 @@ object Fiber:

/** WARNING: Low-level API meant for integrations, libraries, and performance-sensitive code. See AllowUnsafe for more details. */
object Unsafe:
def init[E, A, S](result: Result[E, A < S])(using allow: AllowUnsafe, reduce: Reducible[Abort[E]]): Unsafe[A, reduce.SReduced & S] =
def init[E, A, S](result: Result[E, A < S])(using allow: AllowUnsafe): Unsafe[A, Abort[E] & S] =
IOPromise(result)

def fromFuture[A](f: => Future[A])(using AllowUnsafe): Unsafe[A, Any] =
Expand Down Expand Up @@ -399,6 +398,9 @@ object Fiber:
def safe: Fiber[A, S] = self

def waiters()(using AllowUnsafe): Int = self.lower.waiters()

def reduced(using reduce: Reducible[S]): Unsafe[A, reduce.SReduced] =
self.asInstanceOf[Unsafe[A, reduce.SReduced]]
end extension

extension [E, A, S](self: Unsafe[A, Abort[E] & S])
Expand Down
4 changes: 3 additions & 1 deletion kyo-core/shared/src/main/scala/kyo/Queue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -557,13 +557,15 @@ object Queue:
end close

final def closeAwaitEmpty()(using frame: Frame, allow: AllowUnsafe): Fiber.Unsafe[Boolean, Any] =
import scala.language.implicitConversions

val fail = Result.Failure(Closed("Queue", initFrame))
val p = Promise.Unsafe.init[Boolean, Any]()
if state.compareAndSet(State.Open, State.HalfOpen(p, fail)) then
handleHalfOpen()
p
else
Fiber.Unsafe.init(Result.succeed(false))
Fiber.Unsafe.init(Result.succeed(false)).reduced
end if
end closeAwaitEmpty

Expand Down
3 changes: 2 additions & 1 deletion kyo-core/shared/src/main/scala/kyo/Scope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ object Scope:
.map(_.foldError(_ => (), ex => Log.error("Scope finalizer failed", ex.exception)))
}
.handle(Fiber.initUnscoped[Nothing, Unit, Any, Any])
.map(promise.becomeDiscard)
.map(v => promise.becomeDiscard(v.reduced))
}
end close

def await(using Frame): Unit < Async = promise.get
end init
Expand Down
Loading