Skip to content

Commit ff8399b

Browse files
committed
Prevent deadlock when publisher is canceled
1 parent abe9b03 commit ff8399b

File tree

4 files changed

+38
-2
lines changed

4 files changed

+38
-2
lines changed

core/shared/src/main/scala/fs2/concurrent/Channel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ object Channel {
151151
new Channel[F, A] {
152152

153153
def sendAll: Pipe[F, A, Nothing] = { in =>
154-
(in ++ Stream.exec(close.void))
154+
in.onFinalize(close.void)
155155
.evalMap(send)
156156
.takeWhile(_.isRight)
157157
.drain

core/shared/src/main/scala/fs2/concurrent/Topic.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ object Topic {
208208
}
209209

210210
def publish: Pipe[F, A, Nothing] = { in =>
211-
(in ++ Stream.exec(close.void))
211+
in.onFinalize(close.void)
212212
.evalMap(publish1)
213213
.takeWhile(_.isRight)
214214
.drain

core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,4 +323,19 @@ class ChannelSuite extends Fs2Suite {
323323
racingSendOperations(channel)
324324
}
325325

326+
test("stream should terminate when sendAll is interrupted") {
327+
val program =
328+
Channel
329+
.bounded[IO, Unit](1)
330+
.flatMap { ch =>
331+
val producer =
332+
Stream
333+
.eval(IO.canceled)
334+
.through(ch.sendAll)
335+
336+
ch.stream.concurrently(producer).compile.drain
337+
}
338+
339+
TestControl.executeEmbed(program) // will fail if program is deadlocked
340+
}
326341
}

core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,4 +185,25 @@ class TopicSuite extends Fs2Suite {
185185

186186
TestControl.executeEmbed(program) // will fail if program is deadlocked
187187
}
188+
189+
test("publisher cancellation does not deadlock") {
190+
val program =
191+
Topic[IO, String]
192+
.flatMap { topic =>
193+
val publisher =
194+
Stream
195+
.constant("1")
196+
.covary[IO]
197+
.evalTap(_ => IO.canceled)
198+
.through(topic.publish)
199+
200+
Stream
201+
.resource(topic.subscribeAwait(1))
202+
.flatMap(subscriber => subscriber.concurrently(publisher))
203+
.compile
204+
.drain
205+
}
206+
207+
TestControl.executeEmbed(program) // will fail if program is deadlocked
208+
}
188209
}

0 commit comments

Comments
 (0)