Skip to content
This repository was archived by the owner on Jul 18, 2024. It is now read-only.

Commit f569b13

Browse files
Renar NarubinRenars W. Narubin
authored andcommitted
Ensure CompletedStage waits for other dependents
Previously a CompletedStage which held an exception would immediately complete the returned stage in methods like thenCombine with this exception. Instead it should wait for the other dependent stage to complete before completing the returned stage. SimpleCompletionStage was found not to have this incorrect behavior. Fixes #49
1 parent 8c9d704 commit f569b13

File tree

2 files changed

+139
-50
lines changed

2 files changed

+139
-50
lines changed

asyncutil/src/main/java/com/ibm/asyncutil/util/CompletedStage.java

Lines changed: 60 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -178,87 +178,111 @@ public CompletionStage<Void> thenRunAsync(final Runnable action, final Executor
178178
public <U, V> CompletionStage<V> thenCombine(final CompletionStage<? extends U> other,
179179
final BiFunction<? super T, ? super U, ? extends V> fn) {
180180
Objects.requireNonNull(fn);
181-
if (this.exception == null) {
182-
return other.thenApply(u -> fn.apply(this.result, u));
183-
}
184-
return typedException();
181+
return other.thenApply(
182+
this.exception == null
183+
? u -> fn.apply(this.result, u)
184+
: u -> {
185+
throw CompletedStage.wrapIfNecessary(this.exception);
186+
});
185187
}
186188

187189
@Override
188190
public <U, V> CompletionStage<V> thenCombineAsync(final CompletionStage<? extends U> other,
189191
final BiFunction<? super T, ? super U, ? extends V> fn) {
190192
Objects.requireNonNull(fn);
191-
if (this.exception == null) {
192-
return other.thenApplyAsync(u -> fn.apply(this.result, u));
193-
}
194-
return typedException();
193+
return other.thenApplyAsync(
194+
this.exception == null
195+
? u -> fn.apply(this.result, u)
196+
: u -> {
197+
throw CompletedStage.wrapIfNecessary(this.exception);
198+
});
195199
}
196200

197201
@Override
198202
public <U, V> CompletionStage<V> thenCombineAsync(final CompletionStage<? extends U> other,
199203
final BiFunction<? super T, ? super U, ? extends V> fn, final Executor executor) {
200204
Objects.requireNonNull(fn);
201-
if (this.exception == null) {
202-
return other.thenApplyAsync(u -> fn.apply(this.result, u), executor);
203-
}
204-
return typedException();
205+
return other.thenApplyAsync(
206+
this.exception == null
207+
? u -> fn.apply(this.result, u)
208+
: u -> {
209+
throw CompletedStage.wrapIfNecessary(this.exception);
210+
},
211+
executor);
205212
}
206213

207214
@Override
208215
public <U> CompletionStage<Void> thenAcceptBoth(final CompletionStage<? extends U> other,
209216
final BiConsumer<? super T, ? super U> action) {
210217
Objects.requireNonNull(action);
211-
if (this.exception == null) {
212-
return other.thenAccept(u -> action.accept(this.result, u));
213-
}
214-
return typedException();
218+
return other.thenAccept(
219+
this.exception == null
220+
? u -> action.accept(this.result, u)
221+
: u -> {
222+
throw CompletedStage.wrapIfNecessary(this.exception);
223+
});
215224
}
216225

217226
@Override
218227
public <U> CompletionStage<Void> thenAcceptBothAsync(final CompletionStage<? extends U> other,
219228
final BiConsumer<? super T, ? super U> action) {
220229
Objects.requireNonNull(action);
221-
if (this.exception == null) {
222-
return other.thenAcceptAsync(u -> action.accept(this.result, u));
223-
}
224-
return typedException();
230+
return other.thenAcceptAsync(
231+
this.exception == null
232+
? u -> action.accept(this.result, u)
233+
: u -> {
234+
throw CompletedStage.wrapIfNecessary(this.exception);
235+
});
225236
}
226237

227238
@Override
228239
public <U> CompletionStage<Void> thenAcceptBothAsync(final CompletionStage<? extends U> other,
229240
final BiConsumer<? super T, ? super U> action, final Executor executor) {
230241
Objects.requireNonNull(action);
231-
if (this.exception == null) {
232-
return other.thenAcceptAsync(u -> action.accept(this.result, u), executor);
233-
}
234-
return typedException();
242+
return other.thenAcceptAsync(
243+
this.exception == null
244+
? u -> action.accept(this.result, u)
245+
: u -> {
246+
throw CompletedStage.wrapIfNecessary(this.exception);
247+
},
248+
executor);
235249
}
236250

237251
@Override
238252
public CompletionStage<Void> runAfterBoth(final CompletionStage<?> other, final Runnable action) {
239-
if (this.exception == null) {
240-
return other.thenRun(action);
241-
}
242-
return typedException();
253+
Objects.requireNonNull(action);
254+
return other.thenRun(
255+
this.exception == null
256+
? action
257+
: () -> {
258+
throw CompletedStage.wrapIfNecessary(this.exception);
259+
});
243260
}
244261

245262
@Override
246263
public CompletionStage<Void> runAfterBothAsync(final CompletionStage<?> other,
247264
final Runnable action) {
248-
if (this.exception == null) {
249-
return other.thenRunAsync(action);
250-
}
251-
return typedException();
265+
Objects.requireNonNull(action);
266+
return other.thenRunAsync(
267+
this.exception == null
268+
? action
269+
: () -> {
270+
throw CompletedStage.wrapIfNecessary(this.exception);
271+
});
252272
}
253273

254274
@Override
255275
public CompletionStage<Void> runAfterBothAsync(final CompletionStage<?> other,
256276
final Runnable action,
257277
final Executor executor) {
258-
if (this.exception == null) {
259-
return other.thenRunAsync(action, executor);
260-
}
261-
return typedException();
278+
Objects.requireNonNull(action);
279+
return other.thenRunAsync(
280+
this.exception == null
281+
? action
282+
: () -> {
283+
throw CompletedStage.wrapIfNecessary(this.exception);
284+
},
285+
executor);
262286
}
263287

264288
@Override

asyncutil/src/test/java/com/ibm/asyncutil/util/CombinatorsTest.java

Lines changed: 79 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,19 @@
1111
import java.util.Collections;
1212
import java.util.List;
1313
import java.util.Map;
14+
import java.util.concurrent.CompletableFuture;
1415
import java.util.concurrent.CompletionException;
1516
import java.util.concurrent.CompletionStage;
17+
import java.util.concurrent.Executor;
1618
import java.util.concurrent.TimeUnit;
1719
import java.util.concurrent.TimeoutException;
20+
import java.util.function.BiConsumer;
21+
import java.util.function.BiFunction;
1822
import java.util.function.Function;
1923
import java.util.function.Supplier;
2024
import java.util.stream.Collectors;
2125
import java.util.stream.IntStream;
26+
import java.util.stream.Stream;
2227

2328
import org.junit.Assert;
2429
import org.junit.Test;
@@ -31,6 +36,8 @@
3136

3237
@RunWith(Parameterized.class)
3338
public class CombinatorsTest {
39+
private static final Executor ASYNC = CompletableFuture::runAsync;
40+
3441
private static class TestException extends RuntimeException {
3542
private static final long serialVersionUID = 1L;
3643
}
@@ -155,9 +162,9 @@ public void testAllOfError() {
155162
Arrays.asList(
156163
getCompletedStage(1),
157164
getExceptionalStage(new TestException()));
158-
assertError(Combinators.allOf(futures));
159-
assertError(Combinators.collect(futures));
160-
assertError(Combinators.collect(futures, Collectors.toList()));
165+
CombinatorsTest.assertError(Combinators.allOf(futures));
166+
CombinatorsTest.assertError(Combinators.collect(futures));
167+
CombinatorsTest.assertError(Combinators.collect(futures, Collectors.toList()));
161168
}
162169

163170
@Test
@@ -173,15 +180,15 @@ public void testAllOfErrorNoShortCircuit() {
173180
final CompletionStage<List<Integer>> collCollect =
174181
Combinators.collect(futures, Collectors.toList());
175182

176-
assertIncomplete(voidAll);
177-
assertIncomplete(collAll);
178-
assertIncomplete(collCollect);
183+
CombinatorsTest.assertIncomplete(voidAll);
184+
CombinatorsTest.assertIncomplete(collAll);
185+
CombinatorsTest.assertIncomplete(collCollect);
179186

180187
delayed.complete(1);
181188

182-
assertError(voidAll);
183-
assertError(collAll);
184-
assertError(collCollect);
189+
CombinatorsTest.assertError(voidAll);
190+
CombinatorsTest.assertError(collAll);
191+
CombinatorsTest.assertError(collCollect);
185192
}
186193

187194
@Test
@@ -208,7 +215,7 @@ public void testKeyedAllError() {
208215
}
209216
return getCompletedStage(i);
210217
}));
211-
assertError(Combinators.keyedAll(stageMap));
218+
CombinatorsTest.assertError(Combinators.keyedAll(stageMap));
212219
}
213220

214221
@Test
@@ -220,26 +227,84 @@ public void testKeyedAllErrorNoShortCircuit() {
220227
final CompletionStage<Map<Integer, Integer>> fut = Combinators.keyedAll(stageMap);
221228
int i = 0;
222229
for (final CompletableStage<Integer> future : stageMap.values()) {
223-
assertIncomplete(fut);
230+
CombinatorsTest.assertIncomplete(fut);
224231
if (i == 3) {
225232
future.completeExceptionally(new TestException());
226233
} else {
227234
future.complete(i);
228235
}
229236
i++;
230237
}
231-
assertError(fut);
238+
CombinatorsTest.assertError(fut);
239+
}
240+
241+
/**
242+
* Test that CompletionStage methods which depend on both of two stages always wait for both
243+
* stages to complete.
244+
*/
245+
@Test
246+
public void testCombineEtAl() {
247+
for (final BiFunction<CompletionStage<?>, CompletionStage<?>, CompletionStage<Void>> combineMethod : Stream
248+
.<BiFunction<CompletionStage<?>, CompletionStage<?>, CompletionStage<Void>>>of(
249+
(a, b) -> a.thenCombine(b, CombinatorsTest.voidFunction()),
250+
(a, b) -> a.thenCombineAsync(b, CombinatorsTest.voidFunction()),
251+
(a, b) -> a.thenCombineAsync(b, CombinatorsTest.voidFunction(), ASYNC),
252+
(a, b) -> a.thenAcceptBoth(b, CombinatorsTest.voidConsumer()),
253+
(a, b) -> a.thenAcceptBothAsync(b, CombinatorsTest.voidConsumer()),
254+
(a, b) -> a.thenAcceptBothAsync(b, CombinatorsTest.voidConsumer(), ASYNC),
255+
(a, b) -> a.runAfterBoth(b, CombinatorsTest.voidRunnable()),
256+
(a, b) -> a.runAfterBothAsync(b, CombinatorsTest.voidRunnable()),
257+
(a, b) -> a.runAfterBothAsync(b, CombinatorsTest.voidRunnable(), ASYNC))
258+
// include all the functions in reverse as well, switch a and b in argument order
259+
.flatMap(function -> Stream.of(function, (a, b) -> function.apply(b, a)))
260+
.collect(Collectors.toList())) {
261+
{
262+
final CompletionStage<String> doneNormal = getCompletedStage("a");
263+
final CompletableStage<String> incompleteNormal = getCompletableStage();
264+
265+
final CompletionStage<Void> combine = combineMethod.apply(doneNormal, incompleteNormal);
266+
267+
CombinatorsTest.assertIncomplete(combine);
268+
incompleteNormal.complete("b");
269+
TestUtil.join(combine);
270+
}
271+
{
272+
final CompletionStage<String> doneExceptional = getExceptionalStage(new TestException());
273+
final CompletableStage<String> incompleteNormal = getCompletableStage();
274+
275+
final CompletionStage<Void> combine =
276+
combineMethod.apply(doneExceptional, incompleteNormal);
277+
278+
CombinatorsTest.assertIncomplete(combine);
279+
incompleteNormal.complete("b");
280+
CombinatorsTest.assertError(combine);
281+
}
282+
}
283+
}
284+
285+
private static <T, V> BiFunction<T, V, Void> voidFunction() {
286+
return (ig1, ig2) -> null;
287+
}
288+
289+
private static <T, V> BiConsumer<T, V> voidConsumer() {
290+
return (ig1, ig2) -> {
291+
};
292+
}
293+
294+
private static Runnable voidRunnable() {
295+
return () -> {
296+
};
232297
}
233298

234-
private <T> void assertError(final CompletionStage<T> stage) {
299+
private static <T> void assertError(final CompletionStage<T> stage) {
235300
try {
236301
TestUtil.join(stage);
237302
} catch (final CompletionException e) {
238303
Assert.assertTrue(e.getCause() instanceof TestException);
239304
}
240305
}
241306

242-
private <T> void assertIncomplete(final CompletionStage<T> stage) {
307+
private static <T> void assertIncomplete(final CompletionStage<T> stage) {
243308
try {
244309
TestUtil.join(stage, 20, TimeUnit.MILLISECONDS);
245310
Assert.fail("not all futures complete, get should timeout");

0 commit comments

Comments
 (0)