Skip to content

Commit

Permalink
Update cursors to refresh timeoutMS on close without affecting th…
Browse files Browse the repository at this point in the history
…e timeout of the operation (#1527)

JAVA-5615
  • Loading branch information
stIncMale authored Nov 21, 2024
1 parent 90976e4 commit 600f2c6
Show file tree
Hide file tree
Showing 12 changed files with 399 additions and 125 deletions.
49 changes: 49 additions & 0 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@

import com.mongodb.MongoClientException;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.internal.async.AsyncRunnable;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.CommandMessage;
import com.mongodb.internal.time.StartTime;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.session.ClientSession;

import java.util.Objects;
import java.util.Optional;
import java.util.function.LongConsumer;

import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_INFINITE;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

Expand Down Expand Up @@ -262,10 +268,53 @@ public int getConnectTimeoutMs() {
() -> throwMongoTimeoutException("The operation exceeded the timeout limit.")));
}

/**
* @see #hasTimeoutMS()
* @see #doWithResetTimeout(Runnable)
* @see #doWithResetTimeout(AsyncRunnable, SingleResultCallback)
*/
public void resetTimeoutIfPresent() {
getAndResetTimeoutIfPresent();
}

/**
* @see #hasTimeoutMS()
* @return A {@linkplain Optional#isPresent() non-empty} previous {@linkplain Timeout} iff {@link #hasTimeoutMS()},
* i.e., iff it was reset.
*/
private Optional<Timeout> getAndResetTimeoutIfPresent() {
Timeout result = timeout;
if (hasTimeoutMS()) {
timeout = startTimeout(timeoutSettings.getTimeoutMS());
return ofNullable(result);
}
return empty();
}

/**
* @see #resetTimeoutIfPresent()
*/
public void doWithResetTimeout(final Runnable action) {
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
try {
action.run();
} finally {
originalTimeout.ifPresent(original -> timeout = original);
}
}

/**
* @see #resetTimeoutIfPresent()
*/
public void doWithResetTimeout(final AsyncRunnable action, final SingleResultCallback<Void> callback) {
beginAsync().thenRun(c -> {
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
beginAsync().thenRun(c2 -> {
action.finish(c2);
}).thenAlwaysRunAndFinish(() -> {
originalTimeout.ifPresent(original -> timeout = original);
}, c);
}).finish(callback);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.operation;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoNamespace;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.MongoSocketException;
Expand Down Expand Up @@ -51,6 +52,7 @@
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.doesNotThrow;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH;
Expand All @@ -63,16 +65,18 @@
class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {

private final MongoNamespace namespace;
private final long maxTimeMS;
private final Decoder<T> decoder;
@Nullable
private final BsonValue comment;
private final int maxWireVersion;
private final boolean firstBatchEmpty;
private final ResourceManager resourceManager;
private final OperationContext operationContext;
private final TimeoutMode timeoutMode;
private final AtomicBoolean processedInitial = new AtomicBoolean();
private int batchSize;
private volatile CommandCursorResult<T> commandCursorResult;
private boolean resetTimeoutWhenClosing;

AsyncCommandBatchCursor(
final TimeoutMode timeoutMode,
Expand All @@ -86,24 +90,25 @@ class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T>
this.commandCursorResult = toCommandCursorResult(connectionDescription.getServerAddress(), FIRST_BATCH, commandCursorDocument);
this.namespace = commandCursorResult.getNamespace();
this.batchSize = batchSize;
this.maxTimeMS = maxTimeMS;
this.decoder = decoder;
this.comment = comment;
this.maxWireVersion = connectionDescription.getMaxWireVersion();
this.firstBatchEmpty = commandCursorResult.getResults().isEmpty();
operationContext = connectionSource.getOperationContext();
this.timeoutMode = timeoutMode;

connectionSource.getOperationContext().getTimeoutContext().setMaxTimeOverride(maxTimeMS);
operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);

AsyncConnection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER
? connection : null;
resourceManager = new ResourceManager(timeoutMode, namespace, connectionSource, connectionToPin,
commandCursorResult.getServerCursor());
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
resetTimeoutWhenClosing = true;
}

@Override
public void next(final SingleResultCallback<List<T>> callback) {
resourceManager.execute(funcCallback -> {
resourceManager.checkTimeoutModeAndResetTimeoutContextIfIteration();
checkTimeoutModeAndResetTimeoutContextIfIteration();
ServerCursor localServerCursor = resourceManager.getServerCursor();
boolean serverCursorIsNull = localServerCursor == null;
List<T> batchResults = emptyList();
Expand Down Expand Up @@ -168,6 +173,12 @@ public int getMaxWireVersion() {
return maxWireVersion;
}

void checkTimeoutModeAndResetTimeoutContextIfIteration() {
if (timeoutMode == TimeoutMode.ITERATION) {
operationContext.getTimeoutContext().resetTimeoutIfPresent();
}
}

private void getMore(final ServerCursor cursor, final SingleResultCallback<List<T>> callback) {
resourceManager.executeWithConnection((connection, wrappedCallback) ->
getMoreLoop(assertNotNull(connection), cursor, wrappedCallback), callback);
Expand Down Expand Up @@ -216,21 +227,24 @@ private CommandCursorResult<T> toCommandCursorResult(final ServerAddress serverA
return commandCursorResult;
}

void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) {
this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset);
/**
* Configures the cursor to {@link #close()}
* without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}.
* This is useful when managing the {@link #close()} behavior externally.
*/
AsyncCommandBatchCursor<T> disableTimeoutResetWhenClosing() {
resetTimeoutWhenClosing = false;
return this;
}

@ThreadSafe
private static final class ResourceManager extends CursorResourceManager<AsyncConnectionSource, AsyncConnection> {

private final class ResourceManager extends CursorResourceManager<AsyncConnectionSource, AsyncConnection> {
ResourceManager(
final TimeoutMode timeoutMode,
final MongoNamespace namespace,
final AsyncConnectionSource connectionSource,
@Nullable final AsyncConnection connectionToPin,
@Nullable final ServerCursor serverCursor) {
super(connectionSource.getOperationContext().getTimeoutContext(), timeoutMode, namespace, connectionSource, connectionToPin,
serverCursor);
super(namespace, connectionSource, connectionToPin, serverCursor);
}

/**
Expand All @@ -244,7 +258,7 @@ <R> void execute(final AsyncCallbackSupplier<R> operation, final SingleResultCal
} else {
operation.whenComplete(() -> {
endOperation();
if (getServerCursor() == null) {
if (super.getServerCursor() == null) {
// At this point all resources have been released,
// but `isClose` may still be returning `false` if `close` have not been called.
// Self-close to update the state managed by `ResourceManger`, and so that `isClosed` return `true`.
Expand All @@ -261,23 +275,41 @@ void markAsPinned(final AsyncConnection connectionToPin, final Connection.Pinnin

@Override
void doClose() {
if (isSkipReleasingServerResourcesOnClose()) {
unsetServerCursor();
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
timeoutContext.resetToDefaultMaxTime();
SingleResultCallback<Void> thenDoNothing = (r, t) -> {};
if (resetTimeoutWhenClosing) {
timeoutContext.doWithResetTimeout(this::releaseResourcesAsync, thenDoNothing);
} else {
releaseResourcesAsync(thenDoNothing);
}
}

resetTimeout();
if (getServerCursor() != null) {
getConnection((connection, t) -> {
if (connection != null) {
releaseServerAndClientResources(connection);
} else {
unsetServerCursor();
releaseClientResources();
}
});
} else {
private void releaseResourcesAsync(final SingleResultCallback<Void> callback) {
beginAsync().thenRunTryCatchAsyncBlocks(c -> {
if (isSkipReleasingServerResourcesOnClose()) {
unsetServerCursor();
}
if (super.getServerCursor() != null) {
beginAsync().<AsyncConnection>thenSupply(c2 -> {
getConnection(c2);
}).thenConsume((connection, c3) -> {
beginAsync().thenRun(c4 -> {
releaseServerResourcesAsync(connection, c4);
}).thenAlwaysRunAndFinish(() -> {
connection.release();
}, c3);
}).finish(c);
} else {
c.complete(c);
}
}, MongoException.class, (e, c5) -> {
c5.complete(c5); // ignore exceptions when releasing server resources
}).thenAlwaysRunAndFinish(() -> {
// guarantee that regardless of exceptions, `serverCursor` is null and client resources are released
unsetServerCursor();
releaseClientResources();
}
}, callback);
}

<R> void executeWithConnection(final AsyncCallableConnectionWithCallback<R> callable, final SingleResultCallback<R> callback) {
Expand Down Expand Up @@ -314,25 +346,21 @@ private void getConnection(final SingleResultCallback<AsyncConnection> callback)
}
}

private void releaseServerAndClientResources(final AsyncConnection connection) {
AsyncCallbackSupplier<Void> callbackSupplier = funcCallback -> {
ServerCursor localServerCursor = getServerCursor();
private void releaseServerResourcesAsync(final AsyncConnection connection, final SingleResultCallback<Void> callback) {
beginAsync().thenRun((c) -> {
ServerCursor localServerCursor = super.getServerCursor();
if (localServerCursor != null) {
killServerCursor(getNamespace(), localServerCursor, connection, funcCallback);
killServerCursorAsync(getNamespace(), localServerCursor, connection, callback);
} else {
c.complete(c);
}
};
callbackSupplier.whenComplete(() -> {
}).thenAlwaysRunAndFinish(() -> {
unsetServerCursor();
releaseClientResources();
}).whenComplete(connection::release).get((r, t) -> { /* do nothing */ });
}, callback);
}

private void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor,
private void killServerCursorAsync(final MongoNamespace namespace, final ServerCursor localServerCursor,
final AsyncConnection localConnection, final SingleResultCallback<Void> callback) {
OperationContext operationContext = assertNotNull(getConnectionSource()).getOperationContext();
TimeoutContext timeoutContext = operationContext.getTimeoutContext();
timeoutContext.resetToDefaultMaxTime();

localConnection.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor),
NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(),
operationContext, (r, t) -> callback.onResult(null, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ private AggregateOperationImpl<RawBsonDocument> getAggregateOperation(final Time
@Override
public BatchCursor<T> execute(final ReadBinding binding) {
TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext();
CommandBatchCursor<RawBsonDocument> cursor = (CommandBatchCursor<RawBsonDocument>) getAggregateOperation(timeoutContext).execute(binding);
cursor.setCloseWithoutTimeoutReset(true);
CommandBatchCursor<RawBsonDocument> cursor = ((CommandBatchCursor<RawBsonDocument>) getAggregateOperation(timeoutContext).execute(binding))
.disableTimeoutResetWhenClosing();

return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
Expand All @@ -210,8 +210,8 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
if (t != null) {
callback.onResult(null, t);
} else {
AsyncCommandBatchCursor<RawBsonDocument> cursor = (AsyncCommandBatchCursor<RawBsonDocument>) assertNotNull(result);
cursor.setCloseWithoutTimeoutReset(true);
AsyncCommandBatchCursor<RawBsonDocument> cursor = ((AsyncCommandBatchCursor<RawBsonDocument>) assertNotNull(result))
.disableTimeoutResetWhenClosing();

callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding,
setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(),
Expand Down
Loading

0 comments on commit 600f2c6

Please sign in to comment.