Skip to content

Properly handle exceptions in threads created by MongoClient #1764

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.mongodb.event.ServerMonitorListener;
import com.mongodb.event.ServerOpeningEvent;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -53,6 +55,8 @@
*/
@ThreadSafe
final class AsynchronousClusterEventListener implements ClusterListener, ServerListener, ServerMonitorListener {
private static final Logger LOGGER = Loggers.getLogger("cluster");

private final BlockingQueue<Supplier<Boolean>> eventPublishers = new LinkedBlockingQueue<>();
private final ClusterListener clusterListener;
private final ServerListener serverListener;
Expand Down Expand Up @@ -162,16 +166,21 @@ private void addEvent(final Supplier<Boolean> supplier) {
}

private void publishEvents() {
while (true) {
try {
Supplier<Boolean> eventPublisher = eventPublishers.take();
boolean isLastEvent = eventPublisher.get();
if (isLastEvent) {
break;
try {
while (true) {
try {
Supplier<Boolean> eventPublisher = eventPublishers.take();
boolean isLastEvent = eventPublisher.get();
if (isLastEvent) {
break;
}
} catch (Exception e) {
// ignore exceptions thrown from listeners, also ignore interrupts that user code may cause
}
} catch (Exception e) {
// ignore exceptions thrown from listeners, also ignore interrupts that user code may cause
}
} catch (Throwable t) {
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
throw t;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -519,39 +519,44 @@ private final class WaitQueueHandler implements Runnable {
}

public void run() {
while (!isClosed) {
CountDownLatch currentPhase = phase.get();
ClusterDescription curDescription = description;
try {
while (!isClosed) {
CountDownLatch currentPhase = phase.get();
ClusterDescription curDescription = description;

Timeout timeout = Timeout.infinite();
boolean someWaitersNotSatisfied = false;
for (Iterator<ServerSelectionRequest> iter = waitQueue.iterator(); iter.hasNext();) {
ServerSelectionRequest currentRequest = iter.next();
if (handleServerSelectionRequest(currentRequest, currentPhase, curDescription)) {
iter.remove();
} else {
someWaitersNotSatisfied = true;
timeout = Timeout.earliest(
timeout,
currentRequest.getTimeout(),
startMinWaitHeartbeatTimeout());
}
}

Timeout timeout = Timeout.infinite();
boolean someWaitersNotSatisfied = false;
for (Iterator<ServerSelectionRequest> iter = waitQueue.iterator(); iter.hasNext();) {
ServerSelectionRequest currentRequest = iter.next();
if (handleServerSelectionRequest(currentRequest, currentPhase, curDescription)) {
iter.remove();
} else {
someWaitersNotSatisfied = true;
timeout = Timeout.earliest(
timeout,
currentRequest.getTimeout(),
startMinWaitHeartbeatTimeout());
if (someWaitersNotSatisfied) {
connect();
}
}

if (someWaitersNotSatisfied) {
connect();
try {
timeout.awaitOn(currentPhase, () -> "ignored");
} catch (MongoInterruptedException closed) {
// The cluster has been closed and the while loop will exit.
}
}

try {
timeout.awaitOn(currentPhase, () -> "ignored");
} catch (MongoInterruptedException closed) {
// The cluster has been closed and the while loop will exit.
// Notify all remaining waiters that a shutdown is in progress
for (Iterator<ServerSelectionRequest> iter = waitQueue.iterator(); iter.hasNext();) {
iter.next().onResult(null, new MongoClientException("Shutdown in progress"));
iter.remove();
}
}
// Notify all remaining waiters that a shutdown is in progress
for (Iterator<ServerSelectionRequest> iter = waitQueue.iterator(); iter.hasNext();) {
iter.next().onResult(null, new MongoClientException("Shutdown in progress"));
iter.remove();
} catch (Throwable t) {
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
throw t;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ private void runAndLogUncaught(final Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
LOGGER.error("The pool is not going to work correctly from now on. You may want to recreate the MongoClient", t);
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
throw t;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,41 +75,46 @@ private class DnsSrvRecordMonitorRunnable implements Runnable {

@Override
public void run() {
while (!isClosed && shouldContinueMonitoring()) {
try {
List<String> resolvedHostNames = dnsResolver.resolveHostFromSrvRecords(hostName, srvServiceName);
Set<ServerAddress> hosts = createServerAddressSet(resolvedHostNames);

if (isClosed) {
return;
}
try {
while (!isClosed && shouldContinueMonitoring()) {
try {
List<String> resolvedHostNames = dnsResolver.resolveHostFromSrvRecords(hostName, srvServiceName);
Set<ServerAddress> hosts = createServerAddressSet(resolvedHostNames);

if (isClosed) {
return;
}

if (!hosts.equals(currentHosts)) {
try {
dnsSrvRecordInitializer.initialize(unmodifiableSet(hosts));
currentHosts = hosts;
} catch (Exception e) {
LOGGER.warn("Exception in monitor thread during notification of DNS resolution state change", e);
if (!hosts.equals(currentHosts)) {
try {
dnsSrvRecordInitializer.initialize(unmodifiableSet(hosts));
currentHosts = hosts;
} catch (Exception e) {
LOGGER.warn("Exception in monitor thread during notification of DNS resolution state change", e);
}
}
} catch (MongoException e) {
if (currentHosts.isEmpty()) {
dnsSrvRecordInitializer.initialize(e);
}
LOGGER.info("Exception while resolving SRV records", e);
} catch (Exception e) {
if (currentHosts.isEmpty()) {
dnsSrvRecordInitializer.initialize(new MongoInternalException("Unexpected runtime exception", e));
}
LOGGER.info("Unexpected runtime exception while resolving SRV record", e);
}
} catch (MongoException e) {
if (currentHosts.isEmpty()) {
dnsSrvRecordInitializer.initialize(e);
}
LOGGER.info("Exception while resolving SRV records", e);
} catch (Exception e) {
if (currentHosts.isEmpty()) {
dnsSrvRecordInitializer.initialize(new MongoInternalException("Unexpected runtime exception", e));
}
LOGGER.info("Unexpected runtime exception while resolving SRV record", e);
}

try {
Thread.sleep(getRescanFrequencyMillis());
} catch (InterruptedException closed) {
// fall through
try {
Thread.sleep(getRescanFrequencyMillis());
} catch (InterruptedException closed) {
// fall through
}
clusterType = dnsSrvRecordInitializer.getClusterType();
}
clusterType = dnsSrvRecordInitializer.getClusterType();
} catch (Throwable t) {
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
throw t;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,9 @@ public void run() {
}
} catch (InterruptedException | MongoInterruptedException closed) {
// stop the monitor
} catch (RuntimeException e) {
LOGGER.error(format("Server monitor for %s exiting with exception", serverId), e);
} catch (Throwable t) {
LOGGER.error(format("%s for %s stopped working. You may want to recreate the MongoClient", this, serverId), t);
throw t;
} finally {
if (connection != null) {
connection.close();
Expand Down Expand Up @@ -261,7 +262,7 @@ private ServerDescription lookupServerDescription(final ServerDescription curren

// Get existing connection
return doHeartbeat(currentServerDescription, shouldStreamResponses);
} catch (Throwable t) {
} catch (Exception t) {
roundTripTimeSampler.reset();
InternalConnection localConnection = withLock(lock, () -> {
InternalConnection result = connection;
Expand Down Expand Up @@ -532,7 +533,7 @@ public void run() {
} else {
pingServer(connection);
}
} catch (Throwable t) {
} catch (Exception t) {
if (connection != null) {
connection.close();
connection = null;
Expand All @@ -542,6 +543,9 @@ public void run() {
}
} catch (InterruptedException closed) {
// stop the monitor
} catch (Throwable t) {
LOGGER.error(format("%s for %s stopped working. You may want to recreate the MongoClient", this, serverId), t);
throw t;
} finally {
if (connection != null) {
connection.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,55 +363,60 @@ private void notifyWaitQueueHandler(final ServerSelectionRequest request) {

private final class WaitQueueHandler implements Runnable {
public void run() {
List<ServerSelectionRequest> timeoutList = new ArrayList<>();
while (!(isClosed() || initializationCompleted)) {
lock.lock();
try {
if (isClosed() || initializationCompleted) {
break;
}
Timeout waitTimeNanos = Timeout.infinite();

for (Iterator<ServerSelectionRequest> iterator = waitQueue.iterator(); iterator.hasNext();) {
ServerSelectionRequest next = iterator.next();

Timeout nextTimeout = next.getTimeout();
Timeout waitTimeNanosFinal = waitTimeNanos;
waitTimeNanos = nextTimeout.call(NANOSECONDS,
() -> Timeout.earliest(waitTimeNanosFinal, nextTimeout),
(ns) -> Timeout.earliest(waitTimeNanosFinal, nextTimeout),
() -> {
timeoutList.add(next);
iterator.remove();
return waitTimeNanosFinal;
});
}
if (timeoutList.isEmpty()) {
try {
waitTimeNanos.awaitOn(condition, () -> "ignored");
} catch (MongoInterruptedException unexpected) {
fail();
try {
List<ServerSelectionRequest> timeoutList = new ArrayList<>();
while (!(isClosed() || initializationCompleted)) {
lock.lock();
try {
if (isClosed() || initializationCompleted) {
break;
}
Timeout waitTimeNanos = Timeout.infinite();

for (Iterator<ServerSelectionRequest> iterator = waitQueue.iterator(); iterator.hasNext();) {
ServerSelectionRequest next = iterator.next();

Timeout nextTimeout = next.getTimeout();
Timeout waitTimeNanosFinal = waitTimeNanos;
waitTimeNanos = nextTimeout.call(NANOSECONDS,
() -> Timeout.earliest(waitTimeNanosFinal, nextTimeout),
(ns) -> Timeout.earliest(waitTimeNanosFinal, nextTimeout),
() -> {
timeoutList.add(next);
iterator.remove();
return waitTimeNanosFinal;
});
}
if (timeoutList.isEmpty()) {
try {
waitTimeNanos.awaitOn(condition, () -> "ignored");
} catch (MongoInterruptedException unexpected) {
fail();
}
}
} finally {
lock.unlock();
}
} finally {
lock.unlock();
timeoutList.forEach(request -> request.onError(createTimeoutException(request
.getOperationContext()
.getTimeoutContext())));
timeoutList.clear();
}
timeoutList.forEach(request -> request.onError(createTimeoutException(request
.getOperationContext()
.getTimeoutContext())));
timeoutList.clear();
}

// This code is executed either after closing the LoadBalancedCluster or after initializing it. In the latter case,
// waitQueue is guaranteed to be empty (as DnsSrvRecordInitializer.initialize clears it and no thread adds new elements to
// it after that). So shutdownList is not empty iff LoadBalancedCluster is closed, in which case we need to complete the
// requests in it.
List<ServerSelectionRequest> shutdownList = Locks.withLock(lock, () -> {
ArrayList<ServerSelectionRequest> result = new ArrayList<>(waitQueue);
waitQueue.clear();
return result;
});
shutdownList.forEach(request -> request.onError(createShutdownException()));
// This code is executed either after closing the LoadBalancedCluster or after initializing it. In the latter case,
// waitQueue is guaranteed to be empty (as DnsSrvRecordInitializer.initialize clears it and no thread adds new elements to
// it after that). So shutdownList is not empty iff LoadBalancedCluster is closed, in which case we need to complete the
// requests in it.
List<ServerSelectionRequest> shutdownList = Locks.withLock(lock, () -> {
ArrayList<ServerSelectionRequest> result = new ArrayList<>(waitQueue);
waitQueue.clear();
return result;
});
shutdownList.forEach(request -> request.onError(createShutdownException()));
} catch (Throwable t) {
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
throw t;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb.internal.connection;

import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.thread.DaemonThreadFactory;
import org.bson.ByteBuf;
import org.bson.ByteBufNIO;
Expand All @@ -34,6 +36,7 @@
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public class PowerOfTwoBufferPool implements BufferProvider {
private static final Logger LOGGER = Loggers.getLogger("connection");

/**
* The global default pool. Pruning is enabled on this pool. Idle buffers are pruned after one minute.
Expand Down Expand Up @@ -137,7 +140,12 @@ public void release(final ByteBuffer buffer) {
}

private void prune() {
powerOfTwoToPoolMap.values().forEach(BufferPool::prune);
try {
powerOfTwoToPoolMap.values().forEach(BufferPool::prune);
} catch (Throwable t) {
LOGGER.error(this + " stopped pruning idle buffer pools. You may want to recreate the MongoClient", t);
throw t;
}
}

static int log2(final int powerOfTwo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ void start() {
LOGGER.warn("Exception in selector loop", e);
}
}
} catch (Throwable t) {
LOGGER.error(this + " stopped working. You may want to recreate the MongoClient", t);
throw t;
} finally {
try {
selector.close();
Expand Down
Loading