-
Notifications
You must be signed in to change notification settings - Fork 38.9k
Description
Not sure how to put this in enough words to fit into title, so feel free to update accordingly.
While reproducing an SSE shutdown issue, I have stumbled upon WebClient handling (ungracful) server shutdown ungracefully.
Consider the reproducer: https://github.com/ZIRAKrezovic/spring-boot-issues/tree/graceful-shutdown
Remove onErrorComplete() from test https://github.com/ZIRAKrezovic/spring-boot-issues/blob/graceful-shutdown/src/test/java/com/example/demo/DemoApplicationTests.java#L52C1-L53C1
Observe the error
2026-02-19T18:42:34.573+01:00 ERROR 22880 --- [demo] [undedElastic-19] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.web.reactive.function.client.WebClientResponseException: 200 OK from GET http://localhost:8080/test/sse, but response failed with cause: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
Caused by: org.springframework.web.reactive.function.client.WebClientResponseException: 200 OK from GET http://localhost:8080/test/sse, but response failed with cause: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:324) ~[spring-webflux-7.0.5.jar:7.0.5]
at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:214) ~[spring-webflux-7.0.5.jar:7.0.5]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onError(FluxOnErrorReturn.java:196) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onError(FluxDefaultIfEmpty.java:156) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:545) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:143) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:123) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:341) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:383) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.MonoCollect$CollectSubscriber.onError(MonoCollect.java:134) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:225) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:178) ~[reactor-netty-core-1.3.3.jar:1.3.3]
at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:148) ~[reactor-netty-core-1.3.3.jar:1.3.3]
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:67) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340) ~[reactor-netty-core-1.3.3.jar:1.3.3]
at reactor.core.publisher.Mono.subscribe(Mono.java:4569) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:104) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:545) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.maybeOnError(FluxConcatMapNoPrefetch.java:326) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onError(FluxConcatMapNoPrefetch.java:220) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onError(FluxBufferPredicate.java:345) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onError(FluxPeekFuseable.java:554) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:341) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:123) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onError(FluxDoFinally.java:119) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onError(FluxConcatArray.java:190) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:352) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:725) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onError(FluxFlattenIterable.java:264) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:225) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134) ~[reactor-core-3.8.3.jar:3.8.3]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:484) ~[reactor-netty-core-1.3.3.jar:1.3.3]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:277) ~[reactor-netty-core-1.3.3.jar:1.3.3]
at reactor.netty.channel.FluxReceive.onInboundError(FluxReceive.java:472) ~[reactor-netty-core-1.3.3.jar:1.3.3]
at reactor.netty.channel.ChannelOperations.onInboundError(ChannelOperations.java:536) ~[reactor-netty-core-1.3.3.jar:1.3.3]
at reactor.netty.http.client.HttpClientOperations.onInboundClose(HttpClientOperations.java:408) ~[reactor-netty-http-1.3.3.jar:1.3.3]
at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:73) ~[reactor-netty-core-1.3.3.jar:1.3.3]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:251) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.handler.codec.http.HttpContentDecoder.channelInactive(HttpContentDecoder.java:241) ~[netty-codec-http-4.2.10.Final.jar:4.2.10.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:251) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelInactive(CombinedChannelDuplexHandler.java:416) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:427) ~[netty-codec-base-4.2.10.Final.jar:4.2.10.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:392) ~[netty-codec-base-4.2.10.Final.jar:4.2.10.Final]
at io.netty.handler.codec.http.HttpClientCodec$Decoder.channelInactive(HttpClientCodec.java:410) ~[netty-codec-http-4.2.10.Final.jar:4.2.10.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelInactive(CombinedChannelDuplexHandler.java:219) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:251) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1424) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:876) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe$6.run(AbstractChannel.java:676) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:148) ~[netty-common-4.2.10.Final.jar:4.2.10.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:141) ~[netty-common-4.2.10.Final.jar:4.2.10.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.2.10.Final.jar:4.2.10.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:535) ~[netty-common-4.2.10.Final.jar:4.2.10.Final]
at io.netty.channel.SingleThreadIoEventLoop.run(SingleThreadIoEventLoop.java:201) ~[netty-transport-4.2.10.Final.jar:4.2.10.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:1195) ~[netty-common-4.2.10.Final.jar:4.2.10.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.2.10.Final.jar:4.2.10.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.2.10.Final.jar:4.2.10.Final]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Error has been observed at the following site(s):
*__checkpoint ⇢ Body from GET http://localhost:8080/test/sse [DefaultClientResponse]
*__checkpoint ⇢ Body from GET http://localhost:8080/test/sse [DefaultClientResponse]
I was hoping this would be handled by framework gracefully, especially since Flux creation is handled by framework. At the moment I either have to use .onErrorComplete() as shown in the test case, or write more specific variations such as
.onErrorComplete(t -> t.getCause() != null && t.getCause().getClass().getName().contains("PrematureCloseException"))
This is a bit cumbersome to get right for multiple underlying clients (Jetty, JDK, Netty, Apache Http Client, ...)
I wonder if you'd support (un)graceful completion of a streaming response when a server closes the response (due to shut down, for example).
If you don't think this should be handled by the framework, feel free to close the issue.
Thanks.
PS: I'm not really sure if "ungraceful" is the right word for "opposite of graceful" in this scenario, but that's the context I've used it with.