From daf8539a482d4a1ca9b8dbca75aaac35623c0db3 Mon Sep 17 00:00:00 2001 From: George Banasios Date: Tue, 17 Jun 2025 19:48:14 +0300 Subject: [PATCH] fix flux.zip does not emit errors from concurrent sources Signed-off-by: George Banasios --- .../java/reactor/core/publisher/FluxZip.java | 87 ++++++++++++++----- .../reactor/core/publisher/FluxZipTest.java | 27 +++++- 2 files changed, 91 insertions(+), 23 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxZip.java b/reactor-core/src/main/java/reactor/core/publisher/FluxZip.java index 89f1e1dc38..b2635ddc05 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxZip.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxZip.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -780,6 +780,12 @@ public Object scanUnsafe(Attr key) { void error(Throwable e, int index) { if (Exceptions.addThrowable(ERROR, this, e)) { + // Mark the inner subscriber as 'done' only AFTER this coordinator has + // successfully registered the error above. + // This specific ordering guarantees that a concurrent drain loop cannot see + // `done == true` on the inner subscriber without the coordinator's `error` + // field also being visible. + subscribers[index].done = true; drain(null, null); } else { @@ -862,15 +868,31 @@ else if (dataSignal != null && cancelled) { return; } + // This block is the central point for terminating the operator. + // It is checked at the start of every work cycle inside the drain loop. + // We also enter this block on the loop AFTER a terminal condition has been + // atomically registered deeper inside the drain logic. if (error != null) { - cancelAll(); - discardAll(missed); + if (error == Exceptions.TERMINATED) { + // The onComplete() path is specifically triggered when a previous pass through + // the drain loop detected that a source had finished cleanly (d && sourceEmpty) + // and successfully won a potential race condition to set the `error` field to the TERMINATED + // sentinel. The recursive `drain()` call made at that time ensures we re-enter + // this loop, hit this check, and can now safely terminate. + cancelAll(); + discardAll(missed); + a.onComplete(); + return; + } + else { + cancelAll(); + discardAll(missed); - Throwable ex = Exceptions.terminate(ERROR, this); + Throwable ex = Exceptions.terminate(ERROR, this); - a.onError(ex); - - return; + a.onError(ex); + return; + } } boolean empty = false; @@ -886,11 +908,26 @@ else if (dataSignal != null && cancelled) { boolean sourceEmpty = v == null; if (d && sourceEmpty) { - cancelAll(); - discardAll(missed); + // Attempt to claim the error state. In a concurrent scenario, + // an onError() from one source could be racing with this onComplete path + // from another. `compareAndSet` ensures that only the first signal + // to arrive can set the error state using a sentinel value. + if (ERROR.compareAndSet(this, null, Exceptions.TERMINATED)) { + // If case of a race condition, we have just created a new unit of work: the + // drain loop must now see the TERMINATED state and send onComplete(). + // We call drain() here to safely increment the WIP, which + // guarantees the main drain loop will run at least one more time + // instead of exiting prematurely. + drain(null, null); + } + + // We signal to the rest of the drain loop that we cannot produce a + // zipped value in this iteration. If we didn't, an early 'break' could leave + // the `values` array partially filled, causing an NPE when the zipper + // function is called. + empty = true; - a.onComplete(); - return; + break; } if (!sourceEmpty) { values[j] = v; @@ -955,14 +992,21 @@ else if (dataSignal != null && cancelled) { } if (error != null) { - cancelAll(); - discardAll(missed); + if (error == Exceptions.TERMINATED) { + cancelAll(); + discardAll(missed); + a.onComplete(); + return; + } + else { + cancelAll(); + discardAll(missed); - Throwable ex = Exceptions.terminate(ERROR, this); + Throwable ex = Exceptions.terminate(ERROR, this); - a.onError(ex); - - return; + a.onError(ex); + return; + } } for (int j = 0; j < n; j++) { @@ -975,11 +1019,11 @@ else if (dataSignal != null && cancelled) { boolean empty = v == null; if (d && empty) { - cancelAll(); - discardAll(missed); + if (ERROR.compareAndSet(this, null, Exceptions.TERMINATED)) { + drain(null, null); + } - a.onComplete(); - return; + break; } if (!empty) { values[j] = v; @@ -1134,7 +1178,6 @@ public void onError(Throwable t) { Operators.onErrorDropped(t, currentContext()); return; } - done = true; parent.error(t, index); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxZipTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxZipTest.java index df9d6f98e5..426e6ed444 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxZipTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxZipTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -31,6 +32,7 @@ import reactor.core.CoreSubscriber; import reactor.core.Scannable; import reactor.core.TestLoggerExtension; +import reactor.core.scheduler.Schedulers; import reactor.test.ParameterizedTestWithName; import reactor.test.StepVerifier; import reactor.test.publisher.FluxOperatorTest; @@ -1428,4 +1430,27 @@ public void scanSingleSubscriber() { assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue(); assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); } + + @Test + public void testZipCorrectlyPropagatesTheErrorEmittedByAConcurrentSource() { + Flux fluxToTest = Flux.range(1, 10) + .flatMap(ignored -> { + Mono> mono1 = Mono.empty() + .publishOn(Schedulers.parallel()) + .map(Optional::of) + .defaultIfEmpty(Optional.empty()); + + Mono> mono2 = Mono.error(new NullPointerException()) + .map(Optional::of) + .defaultIfEmpty(Optional.empty()); + + return Flux.zip(mono1, mono2) + .collectList() + .onErrorResume(e -> Mono.empty()); + }) + .flatMap(evt -> + Mono.error(new RuntimeException(String.format("Unexpected empty list return by collectList of size %s", evt.size()))) + ); + StepVerifier.create(fluxToTest).expectNextCount(0).verifyComplete(); + } }