diff --git a/kyo-core/shared/src/main/scala/kyo/Queue.scala b/kyo-core/shared/src/main/scala/kyo/Queue.scala index d0d208354..34e6b3b37 100644 --- a/kyo-core/shared/src/main/scala/kyo/Queue.scala +++ b/kyo-core/shared/src/main/scala/kyo/Queue.scala @@ -681,8 +681,13 @@ object Queue: offerOp( q.offer(v), q.poll() match - case Maybe.Present(polled) => !(polled.asInstanceOf[AnyRef] eq v.asInstanceOf[AnyRef]) - case _ => true + case Maybe.Present(polled) => + val isOurs = polled.asInstanceOf[AnyRef] eq v.asInstanceOf[AnyRef] + if !isOurs then + // Polled someone else's element — put it back + discard(q.offer(polled)) + !isOurs + case _ => true ) def poll()(using AllowUnsafe) = pollOp(q.poll()) def peek()(using AllowUnsafe) = op(q.peek()) diff --git a/kyo-core/shared/src/test/scala/kyo/ChannelTest.scala b/kyo-core/shared/src/test/scala/kyo/ChannelTest.scala index 5230292c2..8c3cea32e 100644 --- a/kyo-core/shared/src/test/scala/kyo/ChannelTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/ChannelTest.scala @@ -637,7 +637,7 @@ class ChannelTest extends Test: .andThen(succeed) } - "offer and poll" in runNotNative { + "offer and poll" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) channel <- Channel.init[Int](size) @@ -657,7 +657,7 @@ class ChannelTest extends Test: .andThen(succeed) } - "put and take" in runNotNative { + "put and take" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) channel <- Channel.init[Int](size) diff --git a/kyo-core/shared/src/test/scala/kyo/QueueTest.scala b/kyo-core/shared/src/test/scala/kyo/QueueTest.scala index eaca24eba..60024cb57 100644 --- a/kyo-core/shared/src/test/scala/kyo/QueueTest.scala +++ b/kyo-core/shared/src/test/scala/kyo/QueueTest.scala @@ -9,7 +9,7 @@ class QueueTest extends Test: "bounded" - { access.foreach { access => access.toString() - { - "initWith" in runNotNative { + "initWith" in run { val effect = Queue.initWith[Int](2, access) { q => for b <- q.offer(1) @@ -24,7 +24,7 @@ class QueueTest extends Test: q.closed.map: isClosed => assert(isClosed && b && v == Maybe(1)) } - "use" in runNotNative { + "use" in run { val effect = Queue.use[Int](2, access) { q => for b <- q.offer(1) @@ -39,27 +39,27 @@ class QueueTest extends Test: q.closed.map: isClosed => assert(isClosed && b && v == Maybe(1)) } - "isEmpty" in runNotNative { + "isEmpty" in run { for q <- Queue.init[Int](2, access) b <- q.empty yield assert(b && q.capacity == 2) } - "offer and poll" in runNotNative { + "offer and poll" in run { for q <- Queue.init[Int](2, access) b <- q.offer(1) v <- q.poll yield assert(b && v == Maybe(1)) } - "peek" in runNotNative { + "peek" in run { for q <- Queue.init[Int](2, access) _ <- q.offer(1) v <- q.peek yield assert(v == Maybe(1)) } - "full" in runNotNative { + "full" in run { for q <- Queue.init[Int](2, access) _ <- q.offer(1) @@ -67,7 +67,7 @@ class QueueTest extends Test: b <- q.offer(3) yield assert(!b) } - "full 4" in runNotNative { + "full 4" in run { for q <- Queue.init[Int](4, access) _ <- q.offer(1) @@ -77,7 +77,7 @@ class QueueTest extends Test: b <- q.offer(5) yield assert(!b) } - "zero capacity" in runNotNative { + "zero capacity" in run { for q <- Queue.init[Int](0, access) b <- q.offer(1) @@ -88,7 +88,7 @@ class QueueTest extends Test: } } - "close" in runNotNative { + "close" in run { for q <- Queue.init[Int](2) b <- q.offer(1) @@ -114,7 +114,7 @@ class QueueTest extends Test: ) } - "drain" in runNotNative { + "drain" in run { for q <- Queue.init[Int](2) _ <- q.offer(1) @@ -123,7 +123,7 @@ class QueueTest extends Test: yield assert(v == Seq(1, 2)) } - "drainUpTo" in runNotNative { + "drainUpTo" in run { for q <- Queue.init[Int](4) _ <- Kyo.foreach(1 to 4)(q.offer) @@ -134,7 +134,7 @@ class QueueTest extends Test: "unbounded" - { access.foreach { access => access.toString() - { - "initWith" in runNotNative { + "initWith" in run { val effect = Queue.Unbounded.initWith[Int](access) { q => for b <- q.offer(1) @@ -149,7 +149,7 @@ class QueueTest extends Test: q.closed.map: isClosed => assert(isClosed && b && v == Maybe(1)) } - "use" in runNotNative { + "use" in run { val effect = Queue.Unbounded.use[Int](access) { q => for b <- q.offer(1) @@ -164,27 +164,27 @@ class QueueTest extends Test: q.closed.map: isClosed => assert(isClosed && b && v == Maybe(1)) } - "isEmpty" in runNotNative { + "isEmpty" in run { for q <- Queue.Unbounded.init[Int](access) b <- q.empty yield assert(b) } - "offer and poll" in runNotNative { + "offer and poll" in run { for q <- Queue.Unbounded.init[Int](access) b <- q.offer(1) v <- q.poll yield assert(b && v == Maybe(1)) } - "peek" in runNotNative { + "peek" in run { for q <- Queue.Unbounded.init[Int](access) _ <- q.offer(1) v <- q.peek yield assert(v == Maybe(1)) } - "add and poll" in runNotNative { + "add and poll" in run { for q <- Queue.Unbounded.init[Int](access) _ <- q.add(1) @@ -198,7 +198,7 @@ class QueueTest extends Test: "dropping" - { access.foreach { access => access.toString() - { - "initWith" in runNotNative { + "initWith" in run { val effect = Queue.Unbounded.initDropping[Int](2, access).map { q => for b <- q.offer(1) @@ -213,7 +213,7 @@ class QueueTest extends Test: q.closed.map: isClosed => assert(isClosed && b && v == Maybe(1)) } - "use" in runNotNative { + "use" in run { val effect = Queue.Unbounded.useDropping[Int](2, access) { q => for b <- q.offer(1) @@ -228,7 +228,7 @@ class QueueTest extends Test: q.closed.map: isClosed => assert(isClosed && b && v == Maybe(1)) } - "add/poll" in runNotNative { + "add/poll" in run { for q <- Queue.Unbounded.initDropping[Int](2) _ <- q.add(1) @@ -247,7 +247,7 @@ class QueueTest extends Test: "sliding" - { access.foreach { access => access.toString() - { - "initWith" in runNotNative { + "initWith" in run { val effect = Queue.Unbounded.initSliding[Int](2, access).map { q => for b <- q.offer(1) @@ -262,7 +262,7 @@ class QueueTest extends Test: q.closed.map: isClosed => assert(isClosed && b && v == Maybe(1)) } - "use" in runNotNative { + "use" in run { val effect = Queue.Unbounded.useSliding[Int](2, access) { q => for b <- q.offer(1) @@ -277,7 +277,7 @@ class QueueTest extends Test: q.closed.map: isClosed => assert(isClosed && b && v == Maybe(1)) } - "add/poll" in runNotNative { + "add/poll" in run { for q <- Queue.Unbounded.initSliding[Int](2, access) _ <- q.add(1) @@ -349,7 +349,7 @@ class QueueTest extends Test: val repeats = 100 - "offer and close" in runNotNative { + "offer and close" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) queue <- Queue.init[Int](size) @@ -375,7 +375,7 @@ class QueueTest extends Test: .andThen(succeed) } - "offer and poll" in runNotNative { + "offer and poll" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) queue <- Queue.init[Int](size) @@ -395,7 +395,7 @@ class QueueTest extends Test: .andThen(succeed) } - "offer to full queue during close" in runNotNative { + "offer to full queue during close" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) queue <- Queue.init[Int](size) @@ -418,7 +418,7 @@ class QueueTest extends Test: .andThen(succeed) } - "concurrent close attempts" in runNotNative { + "concurrent close attempts" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) queue <- Queue.init[Int](size) @@ -442,7 +442,7 @@ class QueueTest extends Test: .andThen(succeed) } - "offer, poll and close" in runNotNative { + "offer, poll and close" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) queue <- Queue.init[Int](size) @@ -499,14 +499,14 @@ class QueueTest extends Test: end if "Kyo computations" - { - "Sync" in runNotNative { + "Sync" in run { for queue <- Queue.init[Int < Sync](2) _ <- queue.offer(Sync.defer(42)) result <- queue.poll.map(_.get) yield assert(result == 42) } - "AtomicBoolean" in runNotNative { + "AtomicBoolean" in run { for flag <- AtomicBoolean.init(false) queue <- Queue.init[Int < Sync](2) @@ -516,7 +516,7 @@ class QueueTest extends Test: after <- flag.get yield assert(!before && result == 42 && after) } - "Env" in runNotNative { + "Env" in run { for queue <- Queue.init[Int < Env[Int]](2) _ <- queue.offer(Env.use[Int](_ + 22)) @@ -526,7 +526,7 @@ class QueueTest extends Test: } "closeAwaitEmpty" - { - "allowed following ops when empty" in runNotNative { + "allowed following ops when empty" in run { for q <- Queue.init[Int](2) c1 <- q.closeAwaitEmpty @@ -551,7 +551,7 @@ class QueueTest extends Test: ) } - "allowed following ops when not empty" in runNotNative { + "allowed following ops when not empty" in run { for promise <- Promise.init[Unit, Any] q <- Queue.init[Int](2) @@ -581,14 +581,14 @@ class QueueTest extends Test: ) } - "returns true when queue is already empty" in runNotNative { + "returns true when queue is already empty" in run { for queue <- Queue.init[Int](10) result <- queue.closeAwaitEmpty yield assert(result) } - "returns true when queue becomes empty after closing" in runNotNative { + "returns true when queue becomes empty after closing" in run { for queue <- Queue.init[Int](10) _ <- queue.offer(1) @@ -600,7 +600,7 @@ class QueueTest extends Test: yield assert(result) } - "returns false if queue is already closed" in runNotNative { + "returns false if queue is already closed" in run { for queue <- Queue.init[Int](10) _ <- queue.close @@ -609,14 +609,14 @@ class QueueTest extends Test: } "unbounded queue" - { - "returns true when queue is already empty" in runNotNative { + "returns true when queue is already empty" in run { for queue <- Queue.Unbounded.init[Int]() result <- queue.closeAwaitEmpty yield assert(result) } - "returns true when queue becomes empty after closing" in runNotNative { + "returns true when queue becomes empty after closing" in run { for queue <- Queue.Unbounded.init[Int]() _ <- queue.add(1) @@ -629,7 +629,7 @@ class QueueTest extends Test: } } - "concurrent polling and waiting" in runNotNative { + "concurrent polling and waiting" in run { for queue <- Queue.init[Int](10) _ <- Kyo.foreach(1 to 5)(i => queue.offer(i)) @@ -639,7 +639,7 @@ class QueueTest extends Test: yield assert(result) } - "sliding queue" in runNotNative { + "sliding queue" in run { for queue <- Queue.Unbounded.initSliding[Int](2) _ <- queue.add(1) @@ -651,7 +651,7 @@ class QueueTest extends Test: yield assert(result) } - "dropping queue" in runNotNative { + "dropping queue" in run { for queue <- Queue.Unbounded.initDropping[Int](2) _ <- queue.add(1) @@ -663,14 +663,14 @@ class QueueTest extends Test: yield assert(result) } - "zero capacity queue" in runNotNative { + "zero capacity queue" in run { for queue <- Queue.init[Int](0) result <- queue.closeAwaitEmpty yield assert(result) } - "race between closeAwaitEmpty and close" in runNotNative { + "race between closeAwaitEmpty and close" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) queue <- Queue.init[Int](size) @@ -695,7 +695,7 @@ class QueueTest extends Test: .andThen(succeed) } - "two producers calling closeAwaitEmpty" in runNotNative { + "two producers calling closeAwaitEmpty" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) queue <- Queue.init[Int](size) @@ -734,7 +734,7 @@ class QueueTest extends Test: .andThen(succeed) } - "producer calling closeAwaitEmpty and another calling close" in runNotNative { + "producer calling closeAwaitEmpty and another calling close" in run { (for size <- Choice.eval(0, 1, 2, 10, 100) queue <- Queue.init[Int](size)