Skip to content

Commit d14da63

Browse files
committed
Handle error propagation / cancelation in Topic
1 parent 09f86e3 commit d14da63

File tree

3 files changed

+38
-3
lines changed

3 files changed

+38
-3
lines changed

core/shared/src/main/scala/fs2/concurrent/Channel.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,18 @@ sealed trait Channel[F[_], A] {
117117
*/
118118
def closeWithElement(a: A): F[Either[Channel.Closed, Unit]]
119119

120+
/** Raises an error, closing the channel with an error state.
121+
*
122+
* No-op if the channel is closed, see [[close]] for further info.
123+
*/
124+
def raiseError(e: Throwable): F[Either[Channel.Closed, Unit]]
125+
126+
/** Cancels the channel, closing it with a canceled state.
127+
*
128+
* No-op if the channel is closed, see [[close]] for further info.
129+
*/
130+
def cancel: F[Either[Channel.Closed, Unit]]
131+
120132
/** Returns true if this channel is closed */
121133
def isClosed: F[Boolean]
122134

@@ -216,6 +228,12 @@ object Channel {
216228
)
217229
}
218230

231+
def raiseError(e: Throwable): F[Either[Closed, Unit]] =
232+
closeWithExitCase(ExitCase.Errored(e))
233+
234+
def cancel: F[Either[Closed, Unit]] =
235+
closeWithExitCase(ExitCase.Canceled)
236+
219237
def isClosed = closedGate.tryGet.map(_.isDefined)
220238

221239
def closed = closedGate.get

core/shared/src/main/scala/fs2/concurrent/Topic.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package fs2
2323
package concurrent
2424

2525
import cats.effect._
26+
import cats.effect.Resource.ExitCase
2627
import cats.effect.implicits._
2728
import cats.syntax.all._
2829
import scala.collection.immutable.LongMap
@@ -208,7 +209,8 @@ object Topic {
208209
}
209210

210211
def publish: Pipe[F, A, Nothing] = { in =>
211-
in.onFinalize(close.void)
212+
in
213+
.onFinalizeCase(closeWithExitCase(_).void)
212214
.evalMap(publish1)
213215
.takeWhile(_.isRight)
214216
.drain
@@ -223,13 +225,24 @@ object Topic {
223225
def subscribers: Stream[F, Int] = subscriberCount.discrete
224226

225227
def close: F[Either[Topic.Closed, Unit]] =
228+
closeWithExitCase(ExitCase.Succeeded)
229+
230+
def closeWithExitCase(exitCase: ExitCase): F[Either[Closed, Unit]] =
226231
signalClosure
227232
.complete(())
228233
.flatMap { completedNow =>
229234
val result = if (completedNow) Topic.rightUnit else Topic.closed
230235

231236
state.get
232-
.flatMap { case (subs, _) => foreach(subs)(_.close.void) }
237+
.flatMap { case (subs, _) =>
238+
foreach(subs)(channel =>
239+
exitCase match {
240+
case ExitCase.Succeeded => channel.close.void
241+
case ExitCase.Errored(e) => channel.raiseError(e).void
242+
case ExitCase.Canceled => channel.cancel.void
243+
}
244+
)
245+
}
233246
.as(result)
234247
}
235248
.uncancelable

core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ package concurrent
2525
import cats.syntax.all._
2626
import cats.effect.IO
2727
import scala.concurrent.duration._
28+
import scala.concurrent.CancellationException
29+
2830
import cats.effect.testkit.TestControl
2931

3032
class TopicSuite extends Fs2Suite {
@@ -204,6 +206,8 @@ class TopicSuite extends Fs2Suite {
204206
.drain
205207
}
206208

207-
TestControl.executeEmbed(program) // will fail if program is deadlocked
209+
TestControl
210+
.executeEmbed(program)
211+
.intercept[CancellationException]
208212
}
209213
}

0 commit comments

Comments
 (0)