Skip to content

Commit a46f858

Browse files
committed
Handle error propagation / cancelation in Channel
1 parent ff8399b commit a46f858

File tree

2 files changed

+34
-22
lines changed

2 files changed

+34
-22
lines changed

core/shared/src/main/scala/fs2/concurrent/Channel.scala

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package concurrent
2424

2525
import cats.effect._
2626
import cats.effect.implicits._
27+
import cats.effect.Resource.ExitCase
2728
import cats.syntax.all._
2829

2930
/** Stream aware, multiple producer, single consumer closeable channel.
@@ -138,76 +139,79 @@ object Channel {
138139
size: Int,
139140
waiting: Option[Deferred[F, Unit]],
140141
producers: List[(A, Deferred[F, Unit])],
141-
closed: Boolean
142+
closed: Option[ExitCase]
142143
)
143144

144-
val open = State(List.empty, 0, None, List.empty, closed = false)
145+
val open = State(List.empty, 0, None, List.empty, closed = None)
145146

146-
def empty(isClosed: Boolean): State =
147-
if (isClosed) State(List.empty, 0, None, List.empty, closed = true)
147+
def empty(close: Option[ExitCase]): State =
148+
if (close.nonEmpty) State(List.empty, 0, None, List.empty, closed = close)
148149
else open
149150

150151
(F.ref(open), F.deferred[Unit]).mapN { (state, closedGate) =>
151152
new Channel[F, A] {
152153

153154
def sendAll: Pipe[F, A, Nothing] = { in =>
154-
in.onFinalize(close.void)
155+
in.onFinalizeCase(closeWithExitCase(_).void)
155156
.evalMap(send)
156157
.takeWhile(_.isRight)
157158
.drain
158159
}
159160

160-
def sendImpl(a: A, close: Boolean) =
161+
def sendImpl(a: A, close: Option[ExitCase]) =
161162
F.deferred[Unit].flatMap { producer =>
162163
state.flatModifyFull { case (poll, state) =>
163164
state match {
164-
case s @ State(_, _, _, _, closed @ true) =>
165+
case s @ State(_, _, _, _, Some(_)) =>
165166
(s, Channel.closed[Unit].pure[F])
166167

167-
case State(values, size, waiting, producers, closed @ false) =>
168+
case State(values, size, waiting, producers, None) =>
168169
if (size < capacity)
169170
(
170171
State(a :: values, size + 1, None, producers, close),
171-
signalClosure.whenA(close) *> notifyStream(waiting).as(rightUnit)
172+
signalClosure.whenA(close.nonEmpty) *> notifyStream(waiting).as(rightUnit)
172173
)
173174
else
174175
(
175176
State(values, size, None, (a, producer) :: producers, close),
176-
signalClosure.whenA(close) *>
177+
signalClosure.whenA(close.nonEmpty) *>
177178
notifyStream(waiting).as(rightUnit) <*
178-
waitOnBound(producer, poll).unlessA(close)
179+
waitOnBound(producer, poll).unlessA(close.nonEmpty)
179180
)
180181
}
181182
}
182183
}
183184

184-
def send(a: A) = sendImpl(a, false)
185+
def send(a: A) = sendImpl(a, None)
185186

186-
def closeWithElement(a: A) = sendImpl(a, true)
187+
def closeWithElement(a: A) = sendImpl(a, Some(ExitCase.Succeeded))
187188

188189
def trySend(a: A) =
189190
state.flatModify {
190-
case s @ State(_, _, _, _, closed @ true) =>
191+
case s @ State(_, _, _, _, Some(_)) =>
191192
(s, Channel.closed[Boolean].pure[F])
192193

193-
case s @ State(values, size, waiting, producers, closed @ false) =>
194+
case s @ State(values, size, waiting, producers, None) =>
194195
if (size < capacity)
195196
(
196-
State(a :: values, size + 1, None, producers, false),
197+
State(a :: values, size + 1, None, producers, None),
197198
notifyStream(waiting).as(rightTrue)
198199
)
199200
else
200201
(s, rightFalse.pure[F])
201202
}
202203

203204
def close =
205+
closeWithExitCase(ExitCase.Succeeded)
206+
207+
def closeWithExitCase(exitCase: ExitCase): F[Either[Closed, Unit]] =
204208
state.flatModify {
205-
case s @ State(_, _, _, _, closed @ true) =>
209+
case s @ State(_, _, _, _, Some(_)) =>
206210
(s, Channel.closed[Unit].pure[F])
207211

208-
case State(values, size, waiting, producers, closed @ false) =>
212+
case State(values, size, waiting, producers, None) =>
209213
(
210-
State(values, size, None, producers, true),
214+
State(values, size, None, producers, Some(exitCase)),
211215
notifyStream(waiting).as(rightUnit) <* signalClosure
212216
)
213217
}
@@ -250,8 +254,12 @@ object Channel {
250254
unblock.as(Pull.output(toEmit) >> consumeLoop)
251255
} else {
252256
F.pure(
253-
if (closed) Pull.done
254-
else Pull.eval(waiting.get) >> consumeLoop
257+
closed match {
258+
case Some(ExitCase.Succeeded) => Pull.done
259+
case Some(ExitCase.Errored(e)) => Pull.raiseError(e)
260+
case Some(ExitCase.Canceled) => Pull.eval(F.canceled)
261+
case None => Pull.eval(waiting.get) >> consumeLoop
262+
}
255263
)
256264
}
257265
}

core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import scala.concurrent.duration._
2929

3030
import org.scalacheck.effect.PropF.forAllF
3131

32+
import scala.concurrent.CancellationException
33+
3234
class ChannelSuite extends Fs2Suite {
3335

3436
test("receives some simple elements above capacity and closes") {
@@ -336,6 +338,8 @@ class ChannelSuite extends Fs2Suite {
336338
ch.stream.concurrently(producer).compile.drain
337339
}
338340

339-
TestControl.executeEmbed(program) // will fail if program is deadlocked
341+
TestControl
342+
.executeEmbed(program)
343+
.intercept[CancellationException]
340344
}
341345
}

0 commit comments

Comments
 (0)