diff --git a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java
new file mode 100644
index 00000000000..8e259392313
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+package com.mongodb.connection;
+
+import com.mongodb.lang.Nullable;
+
+import java.util.concurrent.ExecutorService;
+
+import static com.mongodb.assertions.Assertions.notNull;
+
+/**
+ * {@link TransportSettings} for a non-Netty-based async transport implementation.
+ * Shallowly immutable.
+ *
+ * @since 5.2
+ */
+public final class AsyncTransportSettings extends TransportSettings {
+
+ private final ExecutorService executorService;
+
+ private AsyncTransportSettings(final Builder builder) {
+ this.executorService = builder.executorService;
+ }
+
+ static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * A builder for an instance of {@link AsyncTransportSettings}
+ */
+ public static final class Builder {
+
+ private ExecutorService executorService;
+
+ private Builder() {
+ }
+
+ /**
+ * The executor service, intended to be used exclusively by the mongo
+ * client. Closing the mongo client will result in {@linkplain ExecutorService#shutdown() orderly shutdown}
+ * of the executor service.
+ *
+ *
When {@linkplain SslSettings#isEnabled() TLS is not enabled}, see
+ * {@link java.nio.channels.AsynchronousChannelGroup#withThreadPool(ExecutorService)}
+ * for additional requirements for the executor service.
+ *
+ * @param executorService the executor service
+ * @return this
+ * @see #getExecutorService()
+ */
+ public Builder executorService(final ExecutorService executorService) {
+ this.executorService = notNull("executorService", executorService);
+ return this;
+ }
+
+ /**
+ * Build an instance of {@link AsyncTransportSettings}
+ * @return an instance of {@link AsyncTransportSettings}
+ */
+ public AsyncTransportSettings build() {
+ return new AsyncTransportSettings(this);
+ }
+ }
+
+ /**
+ * Gets the executor service
+ *
+ * @return the executor service
+ * @see Builder#executorService(ExecutorService)
+ */
+ @Nullable
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ @Override
+ public String toString() {
+ return "AsyncTransportSettings{"
+ + "executorService=" + executorService
+ + '}';
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java b/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java
index ef9d68b32b4..cb3a7c7c090 100644
--- a/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java
+++ b/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java
@@ -16,7 +16,6 @@
package com.mongodb.connection;
-import com.mongodb.annotations.Immutable;
import com.mongodb.lang.Nullable;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
@@ -33,10 +32,10 @@
/**
* {@code TransportSettings} for a Netty-based transport implementation.
+ * Shallowly immutable.
*
* @since 4.11
*/
-@Immutable
public final class NettyTransportSettings extends TransportSettings {
private final EventLoopGroup eventLoopGroup;
@@ -137,7 +136,7 @@ public Builder sslContext(final SslContext sslContext) {
/**
* Build an instance of {@code NettyTransportSettings}.
*
- * @return factory for {@code NettyTransportSettings}
+ * @return an instance of {@code NettyTransportSettings}
*/
public NettyTransportSettings build() {
return new NettyTransportSettings(this);
diff --git a/driver-core/src/main/com/mongodb/connection/TransportSettings.java b/driver-core/src/main/com/mongodb/connection/TransportSettings.java
index f897a481eb4..50797f541f5 100644
--- a/driver-core/src/main/com/mongodb/connection/TransportSettings.java
+++ b/driver-core/src/main/com/mongodb/connection/TransportSettings.java
@@ -35,4 +35,14 @@ public abstract class TransportSettings {
public static NettyTransportSettings.Builder nettyBuilder() {
return NettyTransportSettings.builder();
}
+
+ /**
+ * A builder for {@link AsyncTransportSettings}.
+ *
+ * @return a builder for {@link AsyncTransportSettings}
+ * @since 5.2
+ */
+ public static AsyncTransportSettings.Builder asyncBuilder() {
+ return AsyncTransportSettings.builder();
+ }
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java
index 4818b1f7ac4..c60981c115e 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java
@@ -28,6 +28,7 @@
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.LinkedList;
@@ -46,13 +47,24 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt
private final ServerAddress serverAddress;
private final InetAddressResolver inetAddressResolver;
private final SocketSettings settings;
+ @Nullable
+ private final AsynchronousChannelGroup group;
- public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
+ AsynchronousSocketChannelStream(
+ final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider) {
+ this(serverAddress, inetAddressResolver, settings, bufferProvider, null);
+ }
+
+ public AsynchronousSocketChannelStream(
+ final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
+ final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider,
+ @Nullable final AsynchronousChannelGroup group) {
super(serverAddress, settings, bufferProvider);
this.serverAddress = serverAddress;
this.inetAddressResolver = inetAddressResolver;
this.settings = settings;
+ this.group = group;
}
@Override
@@ -77,7 +89,10 @@ private void initializeSocketChannel(final AsyncCompletionHandler handler,
SocketAddress socketAddress = socketAddressQueue.poll();
try {
- AsynchronousSocketChannel attemptConnectionChannel = AsynchronousSocketChannel.open();
+ AsynchronousSocketChannel attemptConnectionChannel;
+ attemptConnectionChannel = group == null
+ ? AsynchronousSocketChannel.open()
+ : AsynchronousSocketChannel.open(group);
attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
if (settings.getReceiveBufferSize() > 0) {
diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java
index 65dd6194dcd..1ea15abe59d 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java
@@ -19,8 +19,11 @@
import com.mongodb.ServerAddress;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
+import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;
+import java.nio.channels.AsynchronousChannelGroup;
+
import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.notNull;
@@ -31,6 +34,8 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
private final PowerOfTwoBufferPool bufferProvider = PowerOfTwoBufferPool.DEFAULT;
private final SocketSettings settings;
private final InetAddressResolver inetAddressResolver;
+ @Nullable
+ private final AsynchronousChannelGroup group;
/**
* Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}.
@@ -38,16 +43,25 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
* @param settings the settings for the connection to a MongoDB server
* @param sslSettings the settings for connecting via SSL
*/
- public AsynchronousSocketChannelStreamFactory(final InetAddressResolver inetAddressResolver, final SocketSettings settings,
+ public AsynchronousSocketChannelStreamFactory(
+ final InetAddressResolver inetAddressResolver, final SocketSettings settings,
final SslSettings sslSettings) {
+ this(inetAddressResolver, settings, sslSettings, null);
+ }
+
+ AsynchronousSocketChannelStreamFactory(
+ final InetAddressResolver inetAddressResolver, final SocketSettings settings,
+ final SslSettings sslSettings, @Nullable final AsynchronousChannelGroup group) {
assertFalse(sslSettings.isEnabled());
this.inetAddressResolver = inetAddressResolver;
this.settings = notNull("settings", settings);
+ this.group = group;
}
@Override
public Stream create(final ServerAddress serverAddress) {
- return new AsynchronousSocketChannelStream(serverAddress, inetAddressResolver, settings, bufferProvider);
+ return new AsynchronousSocketChannelStream(
+ serverAddress, inetAddressResolver, settings, bufferProvider, group);
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java
index db9166eda64..8c5a8f654c5 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java
@@ -18,8 +18,11 @@
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
+import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;
+import java.nio.channels.AsynchronousChannelGroup;
+
/**
* A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams.
*
@@ -27,17 +30,30 @@
*/
public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory {
private final InetAddressResolver inetAddressResolver;
+ @Nullable
+ private final AsynchronousChannelGroup group;
public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
+ this(inetAddressResolver, null);
+ }
+
+ AsynchronousSocketChannelStreamFactoryFactory(
+ final InetAddressResolver inetAddressResolver,
+ @Nullable final AsynchronousChannelGroup group) {
this.inetAddressResolver = inetAddressResolver;
+ this.group = group;
}
@Override
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
- return new AsynchronousSocketChannelStreamFactory(inetAddressResolver, socketSettings, sslSettings);
+ return new AsynchronousSocketChannelStreamFactory(
+ inetAddressResolver, socketSettings, sslSettings, group);
}
@Override
public void close() {
+ if (group != null) {
+ group.shutdown();
+ }
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java
index ef40c164cba..1100a4e27f1 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java
@@ -17,26 +17,72 @@
package com.mongodb.internal.connection;
import com.mongodb.MongoClientException;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.connection.AsyncTransportSettings;
import com.mongodb.connection.NettyTransportSettings;
+import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
import com.mongodb.spi.dns.InetAddressResolver;
+import java.io.IOException;
+import java.nio.channels.AsynchronousChannelGroup;
+import java.util.concurrent.ExecutorService;
+
/**
* This class is not part of the public API and may be removed or changed at any time
*/
public final class StreamFactoryHelper {
- public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final TransportSettings transportSettings,
+
+ public static StreamFactory getSyncStreamFactory(final MongoClientSettings settings,
+ final InetAddressResolver inetAddressResolver, final SocketSettings socketSettings) {
+ TransportSettings transportSettings = settings.getTransportSettings();
+ if (transportSettings == null) {
+ return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings());
+ } else if (transportSettings instanceof AsyncTransportSettings) {
+ throw new MongoClientException("Unsupported transport settings in sync: " + transportSettings.getClass().getName());
+ } else if (transportSettings instanceof NettyTransportSettings) {
+ return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings)
+ .create(socketSettings, settings.getSslSettings());
+ } else {
+ throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
+ }
+ }
+
+ public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClientSettings settings,
final InetAddressResolver inetAddressResolver) {
- if (transportSettings instanceof NettyTransportSettings) {
- return NettyStreamFactoryFactory.builder().applySettings((NettyTransportSettings) transportSettings)
- .inetAddressResolver(inetAddressResolver)
- .build();
+ TransportSettings transportSettings = settings.getTransportSettings();
+ if (transportSettings == null || transportSettings instanceof AsyncTransportSettings) {
+ ExecutorService executorService = transportSettings == null
+ ? null
+ : ((AsyncTransportSettings) transportSettings).getExecutorService();
+ if (settings.getSslSettings().isEnabled()) {
+ return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService);
+ }
+ AsynchronousChannelGroup group = null;
+ if (executorService != null) {
+ try {
+ group = AsynchronousChannelGroup.withThreadPool(executorService);
+ } catch (IOException e) {
+ throw new MongoClientException("Unable to create an asynchronous channel group", e);
+ }
+ }
+ return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group);
+ } else if (transportSettings instanceof NettyTransportSettings) {
+ return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings);
} else {
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
}
}
+ private static NettyStreamFactoryFactory getNettyStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
+ final NettyTransportSettings transportSettings) {
+ return NettyStreamFactoryFactory.builder()
+ .applySettings(transportSettings)
+ .inetAddressResolver(inetAddressResolver)
+ .build();
+ }
+
private StreamFactoryHelper() {
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java
index 436fccb0996..daf0d8cecdd 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java
@@ -46,6 +46,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -71,13 +72,18 @@ public class TlsChannelStreamFactoryFactory implements StreamFactoryFactory {
/**
* Construct a new instance
*/
- public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
+ TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
+ @Nullable final ExecutorService executorService) {
this.inetAddressResolver = inetAddressResolver;
- this.group = new AsynchronousTlsChannelGroup();
+ this.group = new AsynchronousTlsChannelGroup(executorService);
selectorMonitor = new SelectorMonitor();
selectorMonitor.start();
}
+ public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
+ this(inetAddressResolver, null);
+ }
+
@Override
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
assertTrue(sslSettings.isEnabled());
diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java
index 2b34226ebac..57db0df66e8 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java
@@ -27,6 +27,7 @@
import com.mongodb.internal.connection.tlschannel.util.Util;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
+import com.mongodb.lang.Nullable;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
@@ -199,35 +200,30 @@ private enum Shutdown {
/**
* Creates an instance of this class.
- *
- * @param nThreads number of threads in the executor used to assist the selector loop and run
- * completion handlers.
*/
- public AsynchronousTlsChannelGroup(int nThreads) {
+ public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorService) {
try {
selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
timeoutExecutor.setRemoveOnCancelPolicy(true);
- this.executor =
- new ThreadPoolExecutor(
- nThreads,
- nThreads,
- 0,
- TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(nThreads * queueLengthMultiplier),
- runnable ->
- new Thread(runnable, format("async-channel-group-%d-handler-executor", id)),
- new ThreadPoolExecutor.CallerRunsPolicy());
+ if (executorService != null) {
+ this.executor = executorService;
+ } else {
+ int nThreads = Runtime.getRuntime().availableProcessors();
+ this.executor = new ThreadPoolExecutor(
+ nThreads,
+ nThreads,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(nThreads * queueLengthMultiplier),
+ runnable -> new Thread(runnable, format("async-channel-group-%d-handler-executor", id)),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ }
selectorThread.start();
}
- /** Creates an instance of this class, using as many thread as available processors. */
- public AsynchronousTlsChannelGroup() {
- this(Runtime.getRuntime().availableProcessors());
- }
-
void submit(final Runnable r) {
executor.submit(r);
}
diff --git a/driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java b/driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java
new file mode 100644
index 00000000000..180894ceb78
--- /dev/null
+++ b/driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.connection;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class AsyncTransportSettingsTest {
+
+ @Test
+ public void shouldDefaultAllValuesToNull() {
+ AsyncTransportSettings settings = TransportSettings.asyncBuilder().build();
+
+ assertNull(settings.getExecutorService());
+ }
+
+ @Test
+ public void shouldApplySettingsFromBuilder() {
+ ExecutorService executorService = Executors.newFixedThreadPool(1);
+ AsyncTransportSettings settings = TransportSettings.asyncBuilder()
+ .executorService(executorService)
+ .build();
+
+ assertEquals(executorService, settings.getExecutorService());
+ }
+}
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java
index 90989a8e133..9afd1478fe4 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java
@@ -16,6 +16,7 @@
package com.mongodb.internal.connection;
+import com.mongodb.MongoClientSettings;
import com.mongodb.connection.NettyTransportSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
@@ -37,8 +38,13 @@ void streamFactoryFactoryIsDerivedFromTransportSettings() {
.allocator(PooledByteBufAllocator.DEFAULT)
.socketChannelClass(io.netty.channel.socket.oio.OioSocketChannel.class)
.build();
+
+ MongoClientSettings settings = MongoClientSettings.builder()
+ .transportSettings(nettyTransportSettings)
+ .build();
+
assertEquals(NettyStreamFactoryFactory.builder().applySettings(nettyTransportSettings)
.inetAddressResolver(inetAddressResolver).build(),
- StreamFactoryHelper.getStreamFactoryFactoryFromSettings(nettyTransportSettings, inetAddressResolver));
+ StreamFactoryHelper.getAsyncStreamFactoryFactory(settings, inetAddressResolver));
}
}
diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java
index a2f5fb9d125..57ee076039e 100644
--- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java
+++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java
@@ -20,15 +20,13 @@
import com.mongodb.MongoClientException;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoDriverInformation;
-import com.mongodb.connection.TransportSettings;
+import com.mongodb.connection.SocketSettings;
import com.mongodb.internal.TimeoutSettings;
-import com.mongodb.internal.connection.AsynchronousSocketChannelStreamFactoryFactory;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.DefaultClusterFactory;
import com.mongodb.internal.connection.InternalConnectionPoolSettings;
import com.mongodb.internal.connection.StreamFactory;
import com.mongodb.internal.connection.StreamFactoryFactory;
-import com.mongodb.internal.connection.TlsChannelStreamFactoryFactory;
import com.mongodb.lang.Nullable;
import com.mongodb.reactivestreams.client.internal.MongoClientImpl;
import com.mongodb.spi.dns.InetAddressResolver;
@@ -36,7 +34,7 @@
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver;
-import static com.mongodb.internal.connection.StreamFactoryHelper.getStreamFactoryFactoryFromSettings;
+import static com.mongodb.internal.connection.StreamFactoryHelper.getAsyncStreamFactoryFactory;
import static com.mongodb.internal.event.EventListenerHelper.getCommandListener;
@@ -115,17 +113,8 @@ public static MongoClient create(final MongoClientSettings settings, @Nullable f
if (settings.getSocketSettings().getProxySettings().isProxyEnabled()) {
throw new MongoClientException("Proxy is not supported for reactive clients");
}
-
InetAddressResolver inetAddressResolver = getInetAddressResolver(settings);
- StreamFactoryFactory streamFactoryFactory;
- TransportSettings transportSettings = settings.getTransportSettings();
- if (transportSettings != null) {
- streamFactoryFactory = getStreamFactoryFactoryFromSettings(transportSettings, inetAddressResolver);
- } else if (settings.getSslSettings().isEnabled()) {
- streamFactoryFactory = new TlsChannelStreamFactoryFactory(inetAddressResolver);
- } else {
- streamFactoryFactory = new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver);
- }
+ StreamFactoryFactory streamFactoryFactory = getAsyncStreamFactoryFactory(settings, inetAddressResolver);
StreamFactory streamFactory = getStreamFactory(streamFactoryFactory, settings, false);
StreamFactory heartbeatStreamFactory = getStreamFactory(streamFactoryFactory, settings, true);
MongoDriverInformation wrappedMongoDriverInformation = wrapMongoDriverInformation(mongoDriverInformation);
@@ -161,10 +150,12 @@ private static MongoDriverInformation wrapMongoDriverInformation(@Nullable final
.driverName("reactive-streams").build();
}
- private static StreamFactory getStreamFactory(final StreamFactoryFactory streamFactoryFactory, final MongoClientSettings settings,
+ private static StreamFactory getStreamFactory(
+ final StreamFactoryFactory streamFactoryFactory, final MongoClientSettings settings,
final boolean isHeartbeat) {
- return streamFactoryFactory.create(isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(),
- settings.getSslSettings());
+ SocketSettings socketSettings = isHeartbeat
+ ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings();
+ return streamFactoryFactory.create(socketSettings, settings.getSslSettings());
}
private MongoClients() {
diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java
new file mode 100644
index 00000000000..95201cc0890
--- /dev/null
+++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.mongodb.reactivestreams.client;
+
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoClient;
+import com.mongodb.connection.AsyncTransportSettings;
+import com.mongodb.connection.TransportSettings;
+import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static com.mongodb.assertions.Assertions.assertTrue;
+import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+class AsyncTransportSettingsTest {
+
+ @Test
+ void testAsyncTransportSettings() {
+ ExecutorService executorService = spy(Executors.newFixedThreadPool(5));
+ AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder()
+ .executorService(executorService)
+ .build();
+ MongoClientSettings mongoClientSettings = getMongoClientSettingsBuilder()
+ .transportSettings(asyncTransportSettings)
+ .build();
+
+ try (MongoClient client = new SyncMongoClient(MongoClients.create(mongoClientSettings))) {
+ client.listDatabases().first();
+ }
+ verify(executorService, atLeastOnce()).execute(any());
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @SuppressWarnings("try")
+ void testExternalExecutorWasShutDown(final boolean tlsEnabled) throws InterruptedException {
+ ExecutorService executorService = Executors.newFixedThreadPool(5);
+ AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder()
+ .executorService(executorService)
+ .build();
+ MongoClientSettings mongoClientSettings = getMongoClientSettingsBuilder()
+ .applyToSslSettings(builder -> builder.enabled(tlsEnabled))
+ .transportSettings(asyncTransportSettings)
+ .build();
+
+ try (MongoClient ignored = new SyncMongoClient(MongoClients.create(mongoClientSettings))) {
+ // ignored
+ }
+
+ assertTrue(executorService.awaitTermination(100, TimeUnit.MILLISECONDS));
+ }
+}
diff --git a/driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala b/driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala
new file mode 100644
index 00000000000..5157c58501d
--- /dev/null
+++ b/driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.mongodb.scala.connection
+
+import com.mongodb.connection.{ AsyncTransportSettings => JAsyncTransportSettings }
+
+/**
+ * Async transport settings for the driver.
+ *
+ * @since 5.2
+ */
+object AsyncTransportSettings {
+
+ /**
+ * AsyncTransportSettings builder type
+ */
+ type Builder = JAsyncTransportSettings.Builder
+}
diff --git a/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala b/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala
index 3e194ea96ca..c41bc958d84 100644
--- a/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala
+++ b/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala
@@ -31,4 +31,12 @@ object TransportSettings {
* @return a new Builder for creating NettyTransportSettings.
*/
def nettyBuilder(): NettyTransportSettings.Builder = JTransportSettings.nettyBuilder()
+
+ /**
+ * Creates a builder for AsyncTransportSettings.
+ *
+ * @return a new Builder for creating AsyncTransportSettings.
+ * @since 5.2
+ */
+ def asyncBuilder(): AsyncTransportSettings.Builder = JTransportSettings.asyncBuilder()
}
diff --git a/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala b/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala
index adfb8a02c04..e283f4e07be 100644
--- a/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala
+++ b/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala
@@ -75,4 +75,11 @@ package object connection {
* @since 4.11
*/
type NettyTransportSettings = com.mongodb.connection.NettyTransportSettings
+
+ /**
+ * TransportSettings for an async transport implementation.
+ *
+ * @since 5.2
+ */
+ type AsyncTransportSettings = com.mongodb.connection.AsyncTransportSettings
}
diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java
index 473d8ec4e8e..d7ee2ff64ca 100644
--- a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java
+++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java
@@ -33,12 +33,10 @@
import com.mongodb.client.SynchronousContextProvider;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.SocketSettings;
-import com.mongodb.connection.TransportSettings;
import com.mongodb.internal.TimeoutSettings;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.DefaultClusterFactory;
import com.mongodb.internal.connection.InternalConnectionPoolSettings;
-import com.mongodb.internal.connection.SocketStreamFactory;
import com.mongodb.internal.connection.StreamFactory;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
@@ -58,7 +56,7 @@
import static com.mongodb.client.internal.Crypts.createCrypt;
import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument;
import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver;
-import static com.mongodb.internal.connection.StreamFactoryHelper.getStreamFactoryFactoryFromSettings;
+import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactory;
import static com.mongodb.internal.event.EventListenerHelper.getCommandListener;
import static java.lang.String.format;
import static org.bson.codecs.configuration.CodecRegistries.withUuidRepresentation;
@@ -270,14 +268,8 @@ private static Cluster createCluster(final MongoClientSettings settings,
private static StreamFactory getStreamFactory(final MongoClientSettings settings, final boolean isHeartbeat) {
SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings();
- TransportSettings transportSettings = settings.getTransportSettings();
InetAddressResolver inetAddressResolver = getInetAddressResolver(settings);
- if (transportSettings == null) {
- return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings());
- } else {
- return getStreamFactoryFactoryFromSettings(transportSettings, inetAddressResolver)
- .create(socketSettings, settings.getSslSettings());
- }
+ return getSyncStreamFactory(settings, inetAddressResolver, socketSettings);
}
public Cluster getCluster() {