Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,7 +70,7 @@ public Object scanUnsafe(Attr key) {
static final class ConcatIterableSubscriber<T>
extends Operators.MultiSubscriptionSubscriber<T, T> {

final Iterator<? extends Publisher<? extends T>> it;
Iterator<? extends Publisher<? extends T>> it;

volatile int wip;
@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -99,6 +99,12 @@ public void onComplete() {
Iterator<? extends Publisher<? extends T>> a = this.it;
do {
if (isCancelled()) {
this.it = null;
return;
}

if (a == null) {
actual.onComplete();
return;
}

Expand All @@ -108,16 +114,19 @@ public void onComplete() {
b = a.hasNext();
}
catch (Throwable e) {
this.it = null;
onError(Operators.onOperatorError(this, e,
actual.currentContext()));
return;
}

if (isCancelled()) {
this.it = null;
return;
}

if (!b) {
this.it = null;
actual.onComplete();
return;
}
Expand All @@ -129,12 +138,14 @@ public void onComplete() {
"The Publisher returned by the iterator is null");
}
catch (Throwable e) {
this.it = null;
actual.onError(Operators.onOperatorError(this, e,
actual.currentContext()));
return;
}

if (isCancelled()) {
this.it = null;
return;
}

Expand All @@ -148,6 +159,7 @@ public void onComplete() {
p.subscribe(this);

if (isCancelled()) {
this.it = null;
return;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2025 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -207,7 +207,7 @@ static final class IterableSubscription<T>

final CoreSubscriber<? super T> actual;

final Spliterator<? extends T> spliterator;
Spliterator<? extends T> spliterator;
final boolean knownToBeFinite;
final Runnable onClose;

Expand Down Expand Up @@ -323,6 +323,7 @@ void slowPath(long n) {
"The iterator returned a null value");
}
catch (Throwable ex) {
this.spliterator = null;
s.onError(ex);
onCloseWithDropError();
return;
Expand All @@ -344,6 +345,7 @@ void slowPath(long n) {
b = hasNext();
}
catch (Throwable ex) {
this.spliterator = null;
s.onError(ex);
onCloseWithDropError();
return;
Expand All @@ -354,6 +356,7 @@ void slowPath(long n) {
}

if (!b) {
this.spliterator = null;
s.onComplete();
onCloseWithDropError();
return;
Expand Down Expand Up @@ -390,6 +393,7 @@ void fastPath() {
"The iterator returned a null value");
}
catch (Exception ex) {
this.spliterator = null;
s.onError(ex);
onCloseWithDropError();
return;
Expand All @@ -411,6 +415,7 @@ void fastPath() {
b = hasNext();
}
catch (Exception ex) {
this.spliterator = null;
s.onError(ex);
onCloseWithDropError();
return;
Expand All @@ -421,6 +426,7 @@ void fastPath() {
}

if (!b) {
this.spliterator = null;
s.onComplete();
onCloseWithDropError();
return;
Expand All @@ -433,7 +439,11 @@ public void cancel() {
onCloseWithDropError();
cancelled = true;
Operators.onDiscard(nextElement, actual.currentContext());
Operators.onDiscardMultiple(this.spliterator, this.knownToBeFinite, actual.currentContext());
Spliterator<? extends T> sp = this.spliterator;
if (sp != null) {
this.spliterator = null;
Operators.onDiscardMultiple(sp, this.knownToBeFinite, actual.currentContext());
}
}

@Override
Expand Down Expand Up @@ -536,7 +546,7 @@ static final class IterableSubscriptionConditional<T>

final ConditionalSubscriber<? super T> actual;

final Spliterator<? extends T> spliterator;
Spliterator<? extends T> spliterator;
final boolean knownToBeFinite;
final Runnable onClose;

Expand Down Expand Up @@ -652,6 +662,7 @@ void slowPath(long n) {
"The iterator returned a null value");
}
catch (Throwable ex) {
this.spliterator = null;
s.onError(ex);
onCloseWithDropError();
return;
Expand All @@ -673,6 +684,7 @@ void slowPath(long n) {
b = hasNext();
}
catch (Throwable ex) {
this.spliterator = null;
s.onError(ex);
onCloseWithDropError();
return;
Expand All @@ -683,6 +695,7 @@ void slowPath(long n) {
}

if (!b) {
this.spliterator = null;
s.onComplete();
onCloseWithDropError();
return;
Expand Down Expand Up @@ -721,6 +734,7 @@ void fastPath() {
"The iterator returned a null value");
}
catch (Exception ex) {
this.spliterator = null;
s.onError(ex);
onCloseWithDropError();
return;
Expand All @@ -742,6 +756,7 @@ void fastPath() {
b = hasNext();
}
catch (Exception ex) {
this.spliterator = null;
s.onError(ex);
onCloseWithDropError();
return;
Expand All @@ -752,6 +767,7 @@ void fastPath() {
}

if (!b) {
this.spliterator = null;
s.onComplete();
onCloseWithDropError();
return;
Expand All @@ -764,7 +780,11 @@ public void cancel() {
onCloseWithDropError();
cancelled = true;
Operators.onDiscard(this.nextElement, actual.currentContext());
Operators.onDiscardMultiple(this.spliterator, this.knownToBeFinite, actual.currentContext());
Spliterator<? extends T> sp = this.spliterator;
if (sp != null) {
this.spliterator = null;
Operators.onDiscardMultiple(sp, this.knownToBeFinite, actual.currentContext());
}
}

@Override
Expand Down
Loading