Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 101 additions & 8 deletions kyo-core/shared/src/test/scala/kyo/ChannelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -758,10 +758,9 @@ class ChannelTest extends Test:
takeFiber <- Fiber.initUnscoped(
latch.await.andThen(Async.fill(60, 60)(channel.take))
)
_ <- latch.release
putRes <- putFiber.get
takeRes <- takeFiber.get
finalSize <- channel.size
_ <- latch.release
putRes <- putFiber.get
takeRes <- takeFiber.get
yield assert(putRes.flatten.toSet == takeRes.toSet))
.handle(Choice.run, _.unit, Loop.repeat(repeats))
.andThen(succeed)
Expand All @@ -783,15 +782,109 @@ class ChannelTest extends Test:
channel.takeExactly(10)
))
)
_ <- latch.release
putRes <- putFiber.get
takeRes <- takeFiber.get
finalSize <- channel.size
_ <- latch.release
putRes <- putFiber.get
takeRes <- takeFiber.get
yield assert(putRes.flatten.toSet == takeRes.flatten.toSet))
.handle(Choice.run, _.unit, Loop.repeat(repeats))
.andThen(succeed)
}

"putBatch retains groups" in run {
(for
size <- Choice.eval(0, 1, 2, 10)
channel <- Channel.init[Int](size)
latch <- Latch.init(1)
// Almost fill the channel, leaving space for one more element to increase the chances of groups being
// split when the channel is full.
seedCount = math.max(0, size - 1)
_ <- if size == 0 then Kyo.unit else channel.putBatch(1 to seedCount)
putFiber <- Fiber.init(
latch.await.andThen(Async.foreach(((seedCount + 1) to 60).grouped(3).toSeq, 60)(batch =>
channel.putBatch(batch).andThen(batch)
))
)
takeFiber <- Fiber.init(
latch.await.andThen(Async.fill(20, 1)(
channel.takeExactly(3)
))
)
_ <- latch.release
_ <- putFiber.get
takeRes <- takeFiber.get
yield assert(takeRes.forall { chunk =>
// Every group of 3 elements should be consecutive.
assert(chunk.size == 3)
val first = chunk.head
val expected = first to (first + 2)
assert(chunk == expected)
chunk == expected
}))
.handle(Choice.run, _.unit, Loop.repeat(repeats))
.andThen(succeed)
}

"putBatch and drainUpTo retains groups" in run {
(for
size <- Choice.eval(0, 1, 2, 10)
channel <- Channel.init[Int](size)
latch <- Latch.init(1)
putFiber <- Fiber.init(
latch.await.andThen(Async.foreach((1 to 60).grouped(6).toSeq, 60)(batch =>
channel.putBatch(batch)
))
)
drainFiber <- Fiber.init(
latch.await.andThen(
Loop(Chunk.empty[Int]): chunks =>
if chunks.size == 60 then Loop.done(chunks)
else channel.drainUpTo(3).map(next => Loop.continue(chunks ++ next))
)
)
_ <- latch.release
_ <- putFiber.get
drainRes <- drainFiber.get
yield assert(drainRes.grouped(6).forall { chunk =>
// Every group of 6 elements should be consecutive.
val first = chunk.head
val expected = first to (first + 5)
assert(chunk == expected)
chunk == expected
}))
.handle(Choice.run, _.unit, Loop.repeat(repeats))
.andThen(succeed)
}

"putBatch and poll retains groups" in run {
(for
size <- Choice.eval(0, 1, 2, 10)
channel <- Channel.init[Int](size)
latch <- Latch.init(1)
putFiber <- Fiber.init(
latch.await.andThen(Async.foreach((1 to 60).grouped(3).toSeq, 60)(batch =>
channel.putBatch(batch)
))
)
pollFiber <- Fiber.init(
latch.await.andThen(
Loop(Chunk.empty[Int]): chunks =>
if chunks.size == 60 then Loop.done(chunks)
else channel.poll.map(next => Loop.continue(chunks ++ next))
)
)
_ <- latch.release
_ <- putFiber.get
pollRes <- pollFiber.get
yield assert(pollRes.grouped(3).forall { chunk =>
// Every group of 3 elements should be consecutive.
val first = chunk.head
val expected = first to (first + 2)
assert(chunk == expected)
chunk == expected
}))
.handle(Choice.run, _.unit, Loop.repeat(repeats))
.andThen(succeed)
}
}

"stream" - {
Expand Down
Loading