@@ -10,23 +10,23 @@ import scala.reflect.ClassTag
10
10
import scala .util .boundary .break
11
11
import scala .util .{Failure , Success , Try , boundary }
12
12
13
- /** An asynchronous cold data stream that emits values, inspired to Kotlin Flows. */
13
+ /** A cold stream of asynchronously computed values, inspired to Kotlin Flows. */
14
14
trait Flow [+ T ]:
15
15
16
- /** Start the flowing of data which can be collected reacting through the given [[collector ]] function. */
16
+ /** Start the flowing of data that can be collected reacting through the given [[collector ]] function. */
17
17
def collect (collector : Try [T ] => Unit )(using Async , AsyncOperations ): Unit
18
18
19
19
/** An interface modeling an entity capable of [[emit ]]ting [[Flow ]]able values. */
20
20
trait FlowCollector [- T ]:
21
21
22
- /** Emits a value to the flow. */
22
+ /** Emit a value to the flow. */
23
23
def emit (value : T )(using Async ): Unit
24
24
25
25
object Flow :
26
26
27
- /** Creates a new asynchronous cold [[Flow ]] from the given [[body ]].
27
+ /** Create a new asynchronous cold [[Flow ]] from the given [[body ]].
28
28
* Since it is cold, it starts emitting values only when the [[Flow.collect ]] method is called.
29
- * To emit a value use the [[FlowCollector ]] given instance.
29
+ * To emit a value use the [[FlowCollector ]] given instance with `it.emit(value)` .
30
30
*/
31
31
def apply [T ](body : (it : FlowCollector [T ]) ?=> Unit ): Flow [T ] =
32
32
val flow = FlowImpl [T ]()
@@ -52,7 +52,7 @@ object Flow:
52
52
// Ensure to leave the synchronized block after the task has been initialized
53
53
// with the correct channel instance.
54
54
sync.acquire()
55
- myChannel.foreach(t => collector(t) )
55
+ myChannel.foreach(collector)
56
56
57
57
object FlowOps :
58
58
@@ -64,14 +64,14 @@ object FlowOps:
64
64
catchFailure(collector):
65
65
flow.collect(item => collector(Success (f(item.get))))
66
66
67
- /** @return a new [[Flow ]] whose values are created by flattening the flows generated
67
+ /** @return a new [[Flow ]] whose values are emitted by flattening the flows generated
68
68
* by the given function [[f ]] applied to each emitted value of this.
69
69
*/
70
70
def flatMap [R ](f : T => Flow [R ]): Flow [R ] = new Flow [R ]:
71
71
override def collect (collector : Try [R ] => Unit )(using Async , AsyncOperations ): Unit =
72
72
catchFailure(collector):
73
73
flow.collect(item => f(item.get).collect(x => collector(Success (x.get))))
74
-
74
+
75
75
def toSeq (using Async , AsyncOperations ): Try [Seq [T ]] = boundary :
76
76
var result = Seq .empty[T ]
77
77
flow.collect:
0 commit comments