Skip to content

8316580: HttpClient with StructuredTaskScope does not close when a task fails #3706

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -279,23 +279,25 @@ public String toString() {
}
}

static void registerPending(PendingRequest pending) {
static <T> CompletableFuture<T> registerPending(PendingRequest pending, CompletableFuture<T> res) {
// shortcut if cf is already completed: no need to go through the trouble of
// registering it
if (pending.cf.isDone()) return;
if (pending.cf.isDone()) return res;

var client = pending.client;
var cf = pending.cf;
var id = pending.id;
boolean added = client.pendingRequests.add(pending);
// this may immediately remove `pending` from the set is the cf is already completed
pending.ref = cf.whenComplete((r,t) -> client.pendingRequests.remove(pending));
var ref = res.whenComplete((r,t) -> client.pendingRequests.remove(pending));
pending.ref = ref;
assert added : "request %d was already added".formatted(id);
// should not happen, unless the selector manager has already
// exited abnormally
if (client.selmgr.isClosed()) {
pending.abort(client.selmgr.selectorClosedException());
}
return ref;
}

static void abortPendingRequests(HttpClientImpl client, Throwable reason) {
Expand Down Expand Up @@ -875,8 +877,9 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
cf = sendAsync(req, responseHandler, null, null);
return cf.get();
} catch (InterruptedException ie) {
if (cf != null )
if (cf != null) {
cf.cancel(true);
}
throw ie;
} catch (ExecutionException e) {
final Throwable throwable = e.getCause();
Expand Down Expand Up @@ -991,19 +994,23 @@ private void debugCompleted(String tag, long startNanos, HttpRequest req) {
(b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));
}

// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
if (exchangeExecutor != null) {
res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
}

// The mexCf is the Cf we need to abort if the SelectorManager thread
// is aborted.
PendingRequest pending = new PendingRequest(id, requestImpl, mexCf, mex, this);
registerPending(pending);
return res;
} catch(Throwable t) {
res = registerPending(pending, res);

if (exchangeExecutor != null) {
// makes sure that any dependent actions happen in the CF default
// executor. This is only needed for sendAsync(...), when
// exchangeExecutor is non-null.
return res.isDone() ? res
: res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);
} else {
// make a defensive copy that can be safely canceled
// by the caller
return res.isDone() ? res : res.copy();
}
} catch (Throwable t) {
requestUnreference();
debugCompleted("ClientImpl (async)", start, userRequest);
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class MultiExchange<T> implements Cancelable {
Exchange<T> previous;
volatile Throwable retryCause;
volatile boolean expiredOnce;
volatile HttpResponse<T> response = null;
volatile HttpResponse<T> response;

// Maximum number of times a request will be retried/redirected
// for any reason
Expand Down Expand Up @@ -274,11 +274,19 @@ public void cancel(IOException cause) {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = this.cancelled;
boolean firstCancel = false;
if (!cancelled && mayInterruptIfRunning) {
if (interrupted.get() == null) {
interrupted.compareAndSet(null,
firstCancel = interrupted.compareAndSet(null,
new CancellationException("Request cancelled"));
}
if (debug.on()) {
if (firstCancel) {
debug.log("multi exchange recording: " + interrupted.get());
} else {
debug.log("multi exchange recorded: " + interrupted.get());
}
}
this.cancelled = true;
var exchange = getExchange();
if (exchange != null) {
Expand Down Expand Up @@ -360,17 +368,30 @@ private CompletableFuture<HttpResponse<T>> handleNoBody(Response r, Exchange<T>
}).exceptionallyCompose(this::whenCancelled);
}

// returns a CancellationExcpetion that wraps the given cause
// if cancel(boolean) was called, the given cause otherwise
private Throwable wrapIfCancelled(Throwable cause) {
CancellationException interrupt = interrupted.get();
if (interrupt == null) return cause;

var cancel = new CancellationException(interrupt.getMessage());
// preserve the stack trace of the original exception to
// show where the call to cancel(boolean) came from
cancel.setStackTrace(interrupt.getStackTrace());
cancel.initCause(Utils.getCancelCause(cause));
return cancel;
}

// if the request failed because the multi exchange was cancelled,
// make sure the reported exception is wrapped in CancellationException
private CompletableFuture<HttpResponse<T>> whenCancelled(Throwable t) {
CancellationException x = interrupted.get();
if (x != null) {
// make sure to fail with CancellationException if cancel(true)
// was called.
t = x.initCause(Utils.getCancelCause(t));
var x = wrapIfCancelled(t);
if (x instanceof CancellationException) {
if (debug.on()) {
debug.log("MultiExchange interrupted with: " + t.getCause());
debug.log("MultiExchange interrupted with: " + x.getCause());
}
}
return MinimalFuture.failedFuture(t);
return MinimalFuture.failedFuture(x);
}

static class NullSubscription implements Flow.Subscription {
Expand Down
Loading