Skip to content

Commit e11300d

Browse files
author
Maksym Lozbin
committed
Flux/Mono.usingWhen late resource memory leak (reactor#3695)
UsingWhen method may lead to a memory or resource leak of allocation was finished after the main pipeline was cancelled. This change assures that asyncCancel is always called even if deferred. Fixes reactor#3695.
1 parent d10724d commit e11300d

File tree

4 files changed

+178
-133
lines changed

4 files changed

+178
-133
lines changed

reactor-core/src/main/java/reactor/core/publisher/FluxUsingWhen.java

-10
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.reactivestreams.Publisher;
2626
import org.reactivestreams.Subscriber;
2727
import org.reactivestreams.Subscription;
28-
2928
import reactor.core.CoreSubscriber;
3029
import reactor.core.Exceptions;
3130
import reactor.core.Fuseable.ConditionalSubscriber;
@@ -242,15 +241,6 @@ public void onSubscribe(Subscription s) {
242241
}
243242
}
244243

245-
@Override
246-
public void cancel() {
247-
if (!resourceProvided) {
248-
resourceSubscription.cancel();
249-
}
250-
251-
super.cancel();
252-
}
253-
254244
@Override
255245
public Object scanUnsafe(Attr key) {
256246
if (key == Attr.PARENT) return resourceSubscription;

reactor-core/src/main/java/reactor/core/publisher/MonoUsingWhen.java

-10
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.reactivestreams.Publisher;
2525
import org.reactivestreams.Subscriber;
2626
import org.reactivestreams.Subscription;
27-
2827
import reactor.core.CoreSubscriber;
2928
import reactor.core.Exceptions;
3029
import reactor.core.publisher.FluxUsingWhen.UsingWhenSubscriber;
@@ -222,15 +221,6 @@ public void onSubscribe(Subscription s) {
222221
}
223222
}
224223

225-
@Override
226-
public void cancel() {
227-
if (!resourceProvided) {
228-
resourceSubscription.cancel();
229-
}
230-
231-
super.cancel();
232-
}
233-
234224
@Override
235225
public Object scanUnsafe(Attr key) {
236226
if (key == Attr.PARENT) return resourceSubscription;

reactor-core/src/test/java/reactor/core/publisher/FluxUsingWhenTest.java

+39-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323
import java.util.concurrent.atomic.AtomicReference;
2424
import java.util.concurrent.atomic.LongAdder;
25+
import java.util.concurrent.locks.LockSupport;
2526
import java.util.function.Function;
2627
import java.util.logging.Level;
2728
import java.util.stream.Stream;
@@ -33,20 +34,19 @@
3334
import org.mockito.Mockito;
3435
import org.reactivestreams.Publisher;
3536
import org.reactivestreams.Subscription;
36-
3737
import reactor.core.CoreSubscriber;
3838
import reactor.core.Disposable;
3939
import reactor.core.Fuseable;
4040
import reactor.core.Scannable.Attr;
4141
import reactor.core.TestLoggerExtension;
4242
import reactor.core.publisher.FluxUsingWhen.ResourceSubscriber;
4343
import reactor.core.publisher.FluxUsingWhen.UsingWhenSubscriber;
44+
import reactor.core.scheduler.Schedulers;
4445
import reactor.test.ParameterizedTestWithName;
4546
import reactor.test.StepVerifier;
4647
import reactor.test.publisher.PublisherProbe;
4748
import reactor.test.publisher.TestPublisher;
4849
import reactor.test.util.TestLogger;
49-
import reactor.util.Loggers;
5050
import reactor.util.annotation.Nullable;
5151
import reactor.util.context.Context;
5252
import reactor.util.function.Tuple2;
@@ -705,6 +705,43 @@ public void apiCancel(Flux<String> transactionToCancel) {
705705
.matches(tr -> tr.cancelProbe.wasSubscribed(), "cancel method used");
706706
}
707707

708+
@ParameterizedTestWithName
709+
@MethodSource("sourcesFullTransaction")
710+
public void lateResourcePublisherCleanupIsDeferredOnCancel(Flux<String> transactionToCancel)
711+
throws InterruptedException {
712+
AtomicReference<TestResource> ref = new AtomicReference<>();
713+
CountDownLatch resourceSubscribeLatch = new CountDownLatch(1);
714+
CountDownLatch resourceCancelLatch = new CountDownLatch(1);
715+
Flux<String> flux = Flux.usingWhen(Mono.fromCallable(() -> {
716+
LockSupport.parkNanos(Duration.ofMillis(100).toNanos());
717+
TestResource testResource = new TestResource();
718+
ref.set(testResource);
719+
resourceSubscribeLatch.countDown();
720+
return testResource;
721+
}).subscribeOn(Schedulers.single()),
722+
d -> transactionToCancel,
723+
TestResource::commit,
724+
TestResource::rollback,
725+
testResource -> testResource.cancel()
726+
.doOnSubscribe(unused -> resourceCancelLatch.countDown()));
727+
728+
StepVerifier.create(flux.take(Duration.ofMillis(10)), 1)
729+
.verifyComplete();
730+
731+
assertThat(resourceSubscribeLatch.await(1, TimeUnit.SECONDS))
732+
.as("Resource create subscribed")
733+
.isTrue();
734+
assertThat(resourceCancelLatch.await(1, TimeUnit.SECONDS))
735+
.as("Resource cancel subscribed")
736+
.isTrue();
737+
738+
assertThat(ref.get())
739+
.isNotNull()
740+
.matches(tr -> !tr.commitProbe.wasSubscribed(), "no commit")
741+
.matches(tr -> !tr.rollbackProbe.wasSubscribed(), "no rollback")
742+
.matches(tr -> tr.cancelProbe.wasSubscribed(), "cancel method used");
743+
}
744+
708745
@ParameterizedTestWithName
709746
@MethodSource("sourcesFullTransaction")
710747
@TestLoggerExtension.Redirect

0 commit comments

Comments
 (0)