From 1b6f13a67457e0a2457dc725a3b285401085fcc3 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 21 Nov 2024 08:44:33 -0700 Subject: [PATCH] Add AsyncTransportSettings, ExecutorService (#1489) JAVA-5505 Co-authored-by: Valentin Kovalenko --- .../connection/AsyncTransportSettings.java | 98 +++++++++++++++++++ .../connection/NettyTransportSettings.java | 5 +- .../mongodb/connection/TransportSettings.java | 10 ++ .../AsynchronousSocketChannelStream.java | 19 +++- ...synchronousSocketChannelStreamFactory.java | 18 +++- ...nousSocketChannelStreamFactoryFactory.java | 18 +++- .../connection/StreamFactoryHelper.java | 56 ++++++++++- .../TlsChannelStreamFactoryFactory.java | 10 +- .../async/AsynchronousTlsChannelGroup.java | 34 +++---- .../AsyncTransportSettingsTest.java | 45 +++++++++ .../connection/StreamFactoryHelperTest.java | 8 +- .../reactivestreams/client/MongoClients.java | 25 ++--- .../client/AsyncTransportSettingsTest.java | 76 ++++++++++++++ .../connection/AsyncTransportSettings.scala | 32 ++++++ .../scala/connection/TransportSettings.scala | 8 ++ .../mongodb/scala/connection/package.scala | 7 ++ .../client/internal/MongoClientImpl.java | 12 +-- 17 files changed, 419 insertions(+), 62 deletions(-) create mode 100644 driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java create mode 100644 driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java create mode 100644 driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java create mode 100644 driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala diff --git a/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java new file mode 100644 index 00000000000..8e259392313 --- /dev/null +++ b/driver-core/src/main/com/mongodb/connection/AsyncTransportSettings.java @@ -0,0 +1,98 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * + */ +package com.mongodb.connection; + +import com.mongodb.lang.Nullable; + +import java.util.concurrent.ExecutorService; + +import static com.mongodb.assertions.Assertions.notNull; + +/** + * {@link TransportSettings} for a non-Netty-based async transport implementation. + * Shallowly immutable. + * + * @since 5.2 + */ +public final class AsyncTransportSettings extends TransportSettings { + + private final ExecutorService executorService; + + private AsyncTransportSettings(final Builder builder) { + this.executorService = builder.executorService; + } + + static Builder builder() { + return new Builder(); + } + + /** + * A builder for an instance of {@link AsyncTransportSettings} + */ + public static final class Builder { + + private ExecutorService executorService; + + private Builder() { + } + + /** + * The executor service, intended to be used exclusively by the mongo + * client. Closing the mongo client will result in {@linkplain ExecutorService#shutdown() orderly shutdown} + * of the executor service. + * + *

When {@linkplain SslSettings#isEnabled() TLS is not enabled}, see + * {@link java.nio.channels.AsynchronousChannelGroup#withThreadPool(ExecutorService)} + * for additional requirements for the executor service. + * + * @param executorService the executor service + * @return this + * @see #getExecutorService() + */ + public Builder executorService(final ExecutorService executorService) { + this.executorService = notNull("executorService", executorService); + return this; + } + + /** + * Build an instance of {@link AsyncTransportSettings} + * @return an instance of {@link AsyncTransportSettings} + */ + public AsyncTransportSettings build() { + return new AsyncTransportSettings(this); + } + } + + /** + * Gets the executor service + * + * @return the executor service + * @see Builder#executorService(ExecutorService) + */ + @Nullable + public ExecutorService getExecutorService() { + return executorService; + } + + @Override + public String toString() { + return "AsyncTransportSettings{" + + "executorService=" + executorService + + '}'; + } +} diff --git a/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java b/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java index ef9d68b32b4..cb3a7c7c090 100644 --- a/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/NettyTransportSettings.java @@ -16,7 +16,6 @@ package com.mongodb.connection; -import com.mongodb.annotations.Immutable; import com.mongodb.lang.Nullable; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.EventLoopGroup; @@ -33,10 +32,10 @@ /** * {@code TransportSettings} for a Netty-based transport implementation. + * Shallowly immutable. * * @since 4.11 */ -@Immutable public final class NettyTransportSettings extends TransportSettings { private final EventLoopGroup eventLoopGroup; @@ -137,7 +136,7 @@ public Builder sslContext(final SslContext sslContext) { /** * Build an instance of {@code NettyTransportSettings}. * - * @return factory for {@code NettyTransportSettings} + * @return an instance of {@code NettyTransportSettings} */ public NettyTransportSettings build() { return new NettyTransportSettings(this); diff --git a/driver-core/src/main/com/mongodb/connection/TransportSettings.java b/driver-core/src/main/com/mongodb/connection/TransportSettings.java index f897a481eb4..50797f541f5 100644 --- a/driver-core/src/main/com/mongodb/connection/TransportSettings.java +++ b/driver-core/src/main/com/mongodb/connection/TransportSettings.java @@ -35,4 +35,14 @@ public abstract class TransportSettings { public static NettyTransportSettings.Builder nettyBuilder() { return NettyTransportSettings.builder(); } + + /** + * A builder for {@link AsyncTransportSettings}. + * + * @return a builder for {@link AsyncTransportSettings} + * @since 5.2 + */ + public static AsyncTransportSettings.Builder asyncBuilder() { + return AsyncTransportSettings.builder(); + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java index 4818b1f7ac4..c60981c115e 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java @@ -28,6 +28,7 @@ import java.net.SocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.LinkedList; @@ -46,13 +47,24 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt private final ServerAddress serverAddress; private final InetAddressResolver inetAddressResolver; private final SocketSettings settings; + @Nullable + private final AsynchronousChannelGroup group; - public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, + AsynchronousSocketChannelStream( + final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider) { + this(serverAddress, inetAddressResolver, settings, bufferProvider, null); + } + + public AsynchronousSocketChannelStream( + final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver, + final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider, + @Nullable final AsynchronousChannelGroup group) { super(serverAddress, settings, bufferProvider); this.serverAddress = serverAddress; this.inetAddressResolver = inetAddressResolver; this.settings = settings; + this.group = group; } @Override @@ -77,7 +89,10 @@ private void initializeSocketChannel(final AsyncCompletionHandler handler, SocketAddress socketAddress = socketAddressQueue.poll(); try { - AsynchronousSocketChannel attemptConnectionChannel = AsynchronousSocketChannel.open(); + AsynchronousSocketChannel attemptConnectionChannel; + attemptConnectionChannel = group == null + ? AsynchronousSocketChannel.open() + : AsynchronousSocketChannel.open(group); attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); if (settings.getReceiveBufferSize() > 0) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java index 65dd6194dcd..1ea15abe59d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java @@ -19,8 +19,11 @@ import com.mongodb.ServerAddress; import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; +import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; +import java.nio.channels.AsynchronousChannelGroup; + import static com.mongodb.assertions.Assertions.assertFalse; import static com.mongodb.assertions.Assertions.notNull; @@ -31,6 +34,8 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory { private final PowerOfTwoBufferPool bufferProvider = PowerOfTwoBufferPool.DEFAULT; private final SocketSettings settings; private final InetAddressResolver inetAddressResolver; + @Nullable + private final AsynchronousChannelGroup group; /** * Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}. @@ -38,16 +43,25 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory { * @param settings the settings for the connection to a MongoDB server * @param sslSettings the settings for connecting via SSL */ - public AsynchronousSocketChannelStreamFactory(final InetAddressResolver inetAddressResolver, final SocketSettings settings, + public AsynchronousSocketChannelStreamFactory( + final InetAddressResolver inetAddressResolver, final SocketSettings settings, final SslSettings sslSettings) { + this(inetAddressResolver, settings, sslSettings, null); + } + + AsynchronousSocketChannelStreamFactory( + final InetAddressResolver inetAddressResolver, final SocketSettings settings, + final SslSettings sslSettings, @Nullable final AsynchronousChannelGroup group) { assertFalse(sslSettings.isEnabled()); this.inetAddressResolver = inetAddressResolver; this.settings = notNull("settings", settings); + this.group = group; } @Override public Stream create(final ServerAddress serverAddress) { - return new AsynchronousSocketChannelStream(serverAddress, inetAddressResolver, settings, bufferProvider); + return new AsynchronousSocketChannelStream( + serverAddress, inetAddressResolver, settings, bufferProvider, group); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java index db9166eda64..8c5a8f654c5 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java @@ -18,8 +18,11 @@ import com.mongodb.connection.SocketSettings; import com.mongodb.connection.SslSettings; +import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; +import java.nio.channels.AsynchronousChannelGroup; + /** * A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams. * @@ -27,17 +30,30 @@ */ public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory { private final InetAddressResolver inetAddressResolver; + @Nullable + private final AsynchronousChannelGroup group; public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) { + this(inetAddressResolver, null); + } + + AsynchronousSocketChannelStreamFactoryFactory( + final InetAddressResolver inetAddressResolver, + @Nullable final AsynchronousChannelGroup group) { this.inetAddressResolver = inetAddressResolver; + this.group = group; } @Override public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) { - return new AsynchronousSocketChannelStreamFactory(inetAddressResolver, socketSettings, sslSettings); + return new AsynchronousSocketChannelStreamFactory( + inetAddressResolver, socketSettings, sslSettings, group); } @Override public void close() { + if (group != null) { + group.shutdown(); + } } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java index ef40c164cba..1100a4e27f1 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java @@ -17,26 +17,72 @@ package com.mongodb.internal.connection; import com.mongodb.MongoClientException; +import com.mongodb.MongoClientSettings; +import com.mongodb.connection.AsyncTransportSettings; import com.mongodb.connection.NettyTransportSettings; +import com.mongodb.connection.SocketSettings; import com.mongodb.connection.TransportSettings; import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory; import com.mongodb.spi.dns.InetAddressResolver; +import java.io.IOException; +import java.nio.channels.AsynchronousChannelGroup; +import java.util.concurrent.ExecutorService; + /** *

This class is not part of the public API and may be removed or changed at any time

*/ public final class StreamFactoryHelper { - public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final TransportSettings transportSettings, + + public static StreamFactory getSyncStreamFactory(final MongoClientSettings settings, + final InetAddressResolver inetAddressResolver, final SocketSettings socketSettings) { + TransportSettings transportSettings = settings.getTransportSettings(); + if (transportSettings == null) { + return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings()); + } else if (transportSettings instanceof AsyncTransportSettings) { + throw new MongoClientException("Unsupported transport settings in sync: " + transportSettings.getClass().getName()); + } else if (transportSettings instanceof NettyTransportSettings) { + return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings) + .create(socketSettings, settings.getSslSettings()); + } else { + throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName()); + } + } + + public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClientSettings settings, final InetAddressResolver inetAddressResolver) { - if (transportSettings instanceof NettyTransportSettings) { - return NettyStreamFactoryFactory.builder().applySettings((NettyTransportSettings) transportSettings) - .inetAddressResolver(inetAddressResolver) - .build(); + TransportSettings transportSettings = settings.getTransportSettings(); + if (transportSettings == null || transportSettings instanceof AsyncTransportSettings) { + ExecutorService executorService = transportSettings == null + ? null + : ((AsyncTransportSettings) transportSettings).getExecutorService(); + if (settings.getSslSettings().isEnabled()) { + return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService); + } + AsynchronousChannelGroup group = null; + if (executorService != null) { + try { + group = AsynchronousChannelGroup.withThreadPool(executorService); + } catch (IOException e) { + throw new MongoClientException("Unable to create an asynchronous channel group", e); + } + } + return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group); + } else if (transportSettings instanceof NettyTransportSettings) { + return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings); } else { throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName()); } } + private static NettyStreamFactoryFactory getNettyStreamFactoryFactory(final InetAddressResolver inetAddressResolver, + final NettyTransportSettings transportSettings) { + return NettyStreamFactoryFactory.builder() + .applySettings(transportSettings) + .inetAddressResolver(inetAddressResolver) + .build(); + } + private StreamFactoryHelper() { } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java index 436fccb0996..daf0d8cecdd 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/TlsChannelStreamFactoryFactory.java @@ -46,6 +46,7 @@ import java.security.NoSuchAlgorithmException; import java.util.Iterator; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -71,13 +72,18 @@ public class TlsChannelStreamFactoryFactory implements StreamFactoryFactory { /** * Construct a new instance */ - public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) { + TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver, + @Nullable final ExecutorService executorService) { this.inetAddressResolver = inetAddressResolver; - this.group = new AsynchronousTlsChannelGroup(); + this.group = new AsynchronousTlsChannelGroup(executorService); selectorMonitor = new SelectorMonitor(); selectorMonitor.start(); } + public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) { + this(inetAddressResolver, null); + } + @Override public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) { assertTrue(sslSettings.isEnabled()); diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java index 2b34226ebac..57db0df66e8 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java +++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java @@ -27,6 +27,7 @@ import com.mongodb.internal.connection.tlschannel.util.Util; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; +import com.mongodb.lang.Nullable; import java.io.IOException; import java.nio.channels.CancelledKeyException; @@ -199,35 +200,30 @@ private enum Shutdown { /** * Creates an instance of this class. - * - * @param nThreads number of threads in the executor used to assist the selector loop and run - * completion handlers. */ - public AsynchronousTlsChannelGroup(int nThreads) { + public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorService) { try { selector = Selector.open(); } catch (IOException e) { throw new RuntimeException(e); } timeoutExecutor.setRemoveOnCancelPolicy(true); - this.executor = - new ThreadPoolExecutor( - nThreads, - nThreads, - 0, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(nThreads * queueLengthMultiplier), - runnable -> - new Thread(runnable, format("async-channel-group-%d-handler-executor", id)), - new ThreadPoolExecutor.CallerRunsPolicy()); + if (executorService != null) { + this.executor = executorService; + } else { + int nThreads = Runtime.getRuntime().availableProcessors(); + this.executor = new ThreadPoolExecutor( + nThreads, + nThreads, + 0, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(nThreads * queueLengthMultiplier), + runnable -> new Thread(runnable, format("async-channel-group-%d-handler-executor", id)), + new ThreadPoolExecutor.CallerRunsPolicy()); + } selectorThread.start(); } - /** Creates an instance of this class, using as many thread as available processors. */ - public AsynchronousTlsChannelGroup() { - this(Runtime.getRuntime().availableProcessors()); - } - void submit(final Runnable r) { executor.submit(r); } diff --git a/driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java b/driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java new file mode 100644 index 00000000000..180894ceb78 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/connection/AsyncTransportSettingsTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.connection; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +class AsyncTransportSettingsTest { + + @Test + public void shouldDefaultAllValuesToNull() { + AsyncTransportSettings settings = TransportSettings.asyncBuilder().build(); + + assertNull(settings.getExecutorService()); + } + + @Test + public void shouldApplySettingsFromBuilder() { + ExecutorService executorService = Executors.newFixedThreadPool(1); + AsyncTransportSettings settings = TransportSettings.asyncBuilder() + .executorService(executorService) + .build(); + + assertEquals(executorService, settings.getExecutorService()); + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java index 90989a8e133..9afd1478fe4 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java @@ -16,6 +16,7 @@ package com.mongodb.internal.connection; +import com.mongodb.MongoClientSettings; import com.mongodb.connection.NettyTransportSettings; import com.mongodb.connection.TransportSettings; import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory; @@ -37,8 +38,13 @@ void streamFactoryFactoryIsDerivedFromTransportSettings() { .allocator(PooledByteBufAllocator.DEFAULT) .socketChannelClass(io.netty.channel.socket.oio.OioSocketChannel.class) .build(); + + MongoClientSettings settings = MongoClientSettings.builder() + .transportSettings(nettyTransportSettings) + .build(); + assertEquals(NettyStreamFactoryFactory.builder().applySettings(nettyTransportSettings) .inetAddressResolver(inetAddressResolver).build(), - StreamFactoryHelper.getStreamFactoryFactoryFromSettings(nettyTransportSettings, inetAddressResolver)); + StreamFactoryHelper.getAsyncStreamFactoryFactory(settings, inetAddressResolver)); } } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java index a2f5fb9d125..57ee076039e 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java @@ -20,15 +20,13 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoClientSettings; import com.mongodb.MongoDriverInformation; -import com.mongodb.connection.TransportSettings; +import com.mongodb.connection.SocketSettings; import com.mongodb.internal.TimeoutSettings; -import com.mongodb.internal.connection.AsynchronousSocketChannelStreamFactoryFactory; import com.mongodb.internal.connection.Cluster; import com.mongodb.internal.connection.DefaultClusterFactory; import com.mongodb.internal.connection.InternalConnectionPoolSettings; import com.mongodb.internal.connection.StreamFactory; import com.mongodb.internal.connection.StreamFactoryFactory; -import com.mongodb.internal.connection.TlsChannelStreamFactoryFactory; import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.internal.MongoClientImpl; import com.mongodb.spi.dns.InetAddressResolver; @@ -36,7 +34,7 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver; -import static com.mongodb.internal.connection.StreamFactoryHelper.getStreamFactoryFactoryFromSettings; +import static com.mongodb.internal.connection.StreamFactoryHelper.getAsyncStreamFactoryFactory; import static com.mongodb.internal.event.EventListenerHelper.getCommandListener; @@ -115,17 +113,8 @@ public static MongoClient create(final MongoClientSettings settings, @Nullable f if (settings.getSocketSettings().getProxySettings().isProxyEnabled()) { throw new MongoClientException("Proxy is not supported for reactive clients"); } - InetAddressResolver inetAddressResolver = getInetAddressResolver(settings); - StreamFactoryFactory streamFactoryFactory; - TransportSettings transportSettings = settings.getTransportSettings(); - if (transportSettings != null) { - streamFactoryFactory = getStreamFactoryFactoryFromSettings(transportSettings, inetAddressResolver); - } else if (settings.getSslSettings().isEnabled()) { - streamFactoryFactory = new TlsChannelStreamFactoryFactory(inetAddressResolver); - } else { - streamFactoryFactory = new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver); - } + StreamFactoryFactory streamFactoryFactory = getAsyncStreamFactoryFactory(settings, inetAddressResolver); StreamFactory streamFactory = getStreamFactory(streamFactoryFactory, settings, false); StreamFactory heartbeatStreamFactory = getStreamFactory(streamFactoryFactory, settings, true); MongoDriverInformation wrappedMongoDriverInformation = wrapMongoDriverInformation(mongoDriverInformation); @@ -161,10 +150,12 @@ private static MongoDriverInformation wrapMongoDriverInformation(@Nullable final .driverName("reactive-streams").build(); } - private static StreamFactory getStreamFactory(final StreamFactoryFactory streamFactoryFactory, final MongoClientSettings settings, + private static StreamFactory getStreamFactory( + final StreamFactoryFactory streamFactoryFactory, final MongoClientSettings settings, final boolean isHeartbeat) { - return streamFactoryFactory.create(isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(), - settings.getSslSettings()); + SocketSettings socketSettings = isHeartbeat + ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(); + return streamFactoryFactory.create(socketSettings, settings.getSslSettings()); } private MongoClients() { diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java new file mode 100644 index 00000000000..95201cc0890 --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/AsyncTransportSettingsTest.java @@ -0,0 +1,76 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client; + +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.connection.AsyncTransportSettings; +import com.mongodb.connection.TransportSettings; +import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +class AsyncTransportSettingsTest { + + @Test + void testAsyncTransportSettings() { + ExecutorService executorService = spy(Executors.newFixedThreadPool(5)); + AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder() + .executorService(executorService) + .build(); + MongoClientSettings mongoClientSettings = getMongoClientSettingsBuilder() + .transportSettings(asyncTransportSettings) + .build(); + + try (MongoClient client = new SyncMongoClient(MongoClients.create(mongoClientSettings))) { + client.listDatabases().first(); + } + verify(executorService, atLeastOnce()).execute(any()); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @SuppressWarnings("try") + void testExternalExecutorWasShutDown(final boolean tlsEnabled) throws InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(5); + AsyncTransportSettings asyncTransportSettings = TransportSettings.asyncBuilder() + .executorService(executorService) + .build(); + MongoClientSettings mongoClientSettings = getMongoClientSettingsBuilder() + .applyToSslSettings(builder -> builder.enabled(tlsEnabled)) + .transportSettings(asyncTransportSettings) + .build(); + + try (MongoClient ignored = new SyncMongoClient(MongoClients.create(mongoClientSettings))) { + // ignored + } + + assertTrue(executorService.awaitTermination(100, TimeUnit.MILLISECONDS)); + } +} diff --git a/driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala b/driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala new file mode 100644 index 00000000000..5157c58501d --- /dev/null +++ b/driver-scala/src/main/scala/org/mongodb/scala/connection/AsyncTransportSettings.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.mongodb.scala.connection + +import com.mongodb.connection.{ AsyncTransportSettings => JAsyncTransportSettings } + +/** + * Async transport settings for the driver. + * + * @since 5.2 + */ +object AsyncTransportSettings { + + /** + * AsyncTransportSettings builder type + */ + type Builder = JAsyncTransportSettings.Builder +} diff --git a/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala b/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala index 3e194ea96ca..c41bc958d84 100644 --- a/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala +++ b/driver-scala/src/main/scala/org/mongodb/scala/connection/TransportSettings.scala @@ -31,4 +31,12 @@ object TransportSettings { * @return a new Builder for creating NettyTransportSettings. */ def nettyBuilder(): NettyTransportSettings.Builder = JTransportSettings.nettyBuilder() + + /** + * Creates a builder for AsyncTransportSettings. + * + * @return a new Builder for creating AsyncTransportSettings. + * @since 5.2 + */ + def asyncBuilder(): AsyncTransportSettings.Builder = JTransportSettings.asyncBuilder() } diff --git a/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala b/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala index adfb8a02c04..e283f4e07be 100644 --- a/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala +++ b/driver-scala/src/main/scala/org/mongodb/scala/connection/package.scala @@ -75,4 +75,11 @@ package object connection { * @since 4.11 */ type NettyTransportSettings = com.mongodb.connection.NettyTransportSettings + + /** + * TransportSettings for an async transport implementation. + * + * @since 5.2 + */ + type AsyncTransportSettings = com.mongodb.connection.AsyncTransportSettings } diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java index 473d8ec4e8e..d7ee2ff64ca 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java @@ -33,12 +33,10 @@ import com.mongodb.client.SynchronousContextProvider; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.SocketSettings; -import com.mongodb.connection.TransportSettings; import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.connection.Cluster; import com.mongodb.internal.connection.DefaultClusterFactory; import com.mongodb.internal.connection.InternalConnectionPoolSettings; -import com.mongodb.internal.connection.SocketStreamFactory; import com.mongodb.internal.connection.StreamFactory; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; @@ -58,7 +56,7 @@ import static com.mongodb.client.internal.Crypts.createCrypt; import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument; import static com.mongodb.internal.connection.ServerAddressHelper.getInetAddressResolver; -import static com.mongodb.internal.connection.StreamFactoryHelper.getStreamFactoryFactoryFromSettings; +import static com.mongodb.internal.connection.StreamFactoryHelper.getSyncStreamFactory; import static com.mongodb.internal.event.EventListenerHelper.getCommandListener; import static java.lang.String.format; import static org.bson.codecs.configuration.CodecRegistries.withUuidRepresentation; @@ -270,14 +268,8 @@ private static Cluster createCluster(final MongoClientSettings settings, private static StreamFactory getStreamFactory(final MongoClientSettings settings, final boolean isHeartbeat) { SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(); - TransportSettings transportSettings = settings.getTransportSettings(); InetAddressResolver inetAddressResolver = getInetAddressResolver(settings); - if (transportSettings == null) { - return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings()); - } else { - return getStreamFactoryFactoryFromSettings(transportSettings, inetAddressResolver) - .create(socketSettings, settings.getSslSettings()); - } + return getSyncStreamFactory(settings, inetAddressResolver, socketSettings); } public Cluster getCluster() {