Skip to content

Commit

Permalink
Add AsyncTransportSettings, ExecutorService (#1489)
Browse files Browse the repository at this point in the history
JAVA-5505

Co-authored-by: Valentin Kovalenko <[email protected]>
  • Loading branch information
katcharov and stIncMale authored Nov 21, 2024
1 parent 600f2c6 commit 1b6f13a
Show file tree
Hide file tree
Showing 17 changed files with 419 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.mongodb.connection;

import com.mongodb.lang.Nullable;

import java.util.concurrent.ExecutorService;

import static com.mongodb.assertions.Assertions.notNull;

/**
* {@link TransportSettings} for a non-<a href="http://netty.io/">Netty</a>-based async transport implementation.
* Shallowly immutable.
*
* @since 5.2
*/
public final class AsyncTransportSettings extends TransportSettings {

private final ExecutorService executorService;

private AsyncTransportSettings(final Builder builder) {
this.executorService = builder.executorService;
}

static Builder builder() {
return new Builder();
}

/**
* A builder for an instance of {@link AsyncTransportSettings}
*/
public static final class Builder {

private ExecutorService executorService;

private Builder() {
}

/**
* The executor service, intended to be used exclusively by the mongo
* client. Closing the mongo client will result in {@linkplain ExecutorService#shutdown() orderly shutdown}
* of the executor service.
*
* <p>When {@linkplain SslSettings#isEnabled() TLS is not enabled}, see
* {@link java.nio.channels.AsynchronousChannelGroup#withThreadPool(ExecutorService)}
* for additional requirements for the executor service.
*
* @param executorService the executor service
* @return this
* @see #getExecutorService()
*/
public Builder executorService(final ExecutorService executorService) {
this.executorService = notNull("executorService", executorService);
return this;
}

/**
* Build an instance of {@link AsyncTransportSettings}
* @return an instance of {@link AsyncTransportSettings}
*/
public AsyncTransportSettings build() {
return new AsyncTransportSettings(this);
}
}

/**
* Gets the executor service
*
* @return the executor service
* @see Builder#executorService(ExecutorService)
*/
@Nullable
public ExecutorService getExecutorService() {
return executorService;
}

@Override
public String toString() {
return "AsyncTransportSettings{"
+ "executorService=" + executorService
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.mongodb.connection;

import com.mongodb.annotations.Immutable;
import com.mongodb.lang.Nullable;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
Expand All @@ -33,10 +32,10 @@

/**
* {@code TransportSettings} for a <a href="http://netty.io/">Netty</a>-based transport implementation.
* Shallowly immutable.
*
* @since 4.11
*/
@Immutable
public final class NettyTransportSettings extends TransportSettings {

private final EventLoopGroup eventLoopGroup;
Expand Down Expand Up @@ -137,7 +136,7 @@ public Builder sslContext(final SslContext sslContext) {
/**
* Build an instance of {@code NettyTransportSettings}.
*
* @return factory for {@code NettyTransportSettings}
* @return an instance of {@code NettyTransportSettings}
*/
public NettyTransportSettings build() {
return new NettyTransportSettings(this);
Expand Down
10 changes: 10 additions & 0 deletions driver-core/src/main/com/mongodb/connection/TransportSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,14 @@ public abstract class TransportSettings {
public static NettyTransportSettings.Builder nettyBuilder() {
return NettyTransportSettings.builder();
}

/**
* A builder for {@link AsyncTransportSettings}.
*
* @return a builder for {@link AsyncTransportSettings}
* @since 5.2
*/
public static AsyncTransportSettings.Builder asyncBuilder() {
return AsyncTransportSettings.builder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.LinkedList;
Expand All @@ -46,13 +47,24 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt
private final ServerAddress serverAddress;
private final InetAddressResolver inetAddressResolver;
private final SocketSettings settings;
@Nullable
private final AsynchronousChannelGroup group;

public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
AsynchronousSocketChannelStream(
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider) {
this(serverAddress, inetAddressResolver, settings, bufferProvider, null);
}

public AsynchronousSocketChannelStream(
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider,
@Nullable final AsynchronousChannelGroup group) {
super(serverAddress, settings, bufferProvider);
this.serverAddress = serverAddress;
this.inetAddressResolver = inetAddressResolver;
this.settings = settings;
this.group = group;
}

@Override
Expand All @@ -77,7 +89,10 @@ private void initializeSocketChannel(final AsyncCompletionHandler<Void> handler,
SocketAddress socketAddress = socketAddressQueue.poll();

try {
AsynchronousSocketChannel attemptConnectionChannel = AsynchronousSocketChannel.open();
AsynchronousSocketChannel attemptConnectionChannel;
attemptConnectionChannel = group == null
? AsynchronousSocketChannel.open()
: AsynchronousSocketChannel.open(group);
attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
if (settings.getReceiveBufferSize() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.mongodb.ServerAddress;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;

import java.nio.channels.AsynchronousChannelGroup;

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.notNull;

Expand All @@ -31,23 +34,34 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
private final PowerOfTwoBufferPool bufferProvider = PowerOfTwoBufferPool.DEFAULT;
private final SocketSettings settings;
private final InetAddressResolver inetAddressResolver;
@Nullable
private final AsynchronousChannelGroup group;

/**
* Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}.
*
* @param settings the settings for the connection to a MongoDB server
* @param sslSettings the settings for connecting via SSL
*/
public AsynchronousSocketChannelStreamFactory(final InetAddressResolver inetAddressResolver, final SocketSettings settings,
public AsynchronousSocketChannelStreamFactory(
final InetAddressResolver inetAddressResolver, final SocketSettings settings,
final SslSettings sslSettings) {
this(inetAddressResolver, settings, sslSettings, null);
}

AsynchronousSocketChannelStreamFactory(
final InetAddressResolver inetAddressResolver, final SocketSettings settings,
final SslSettings sslSettings, @Nullable final AsynchronousChannelGroup group) {
assertFalse(sslSettings.isEnabled());
this.inetAddressResolver = inetAddressResolver;
this.settings = notNull("settings", settings);
this.group = group;
}

@Override
public Stream create(final ServerAddress serverAddress) {
return new AsynchronousSocketChannelStream(serverAddress, inetAddressResolver, settings, bufferProvider);
return new AsynchronousSocketChannelStream(
serverAddress, inetAddressResolver, settings, bufferProvider, group);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,42 @@

import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;

import java.nio.channels.AsynchronousChannelGroup;

/**
* A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams.
*
* @see java.nio.channels.AsynchronousSocketChannel
*/
public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory {
private final InetAddressResolver inetAddressResolver;
@Nullable
private final AsynchronousChannelGroup group;

public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
this(inetAddressResolver, null);
}

AsynchronousSocketChannelStreamFactoryFactory(
final InetAddressResolver inetAddressResolver,
@Nullable final AsynchronousChannelGroup group) {
this.inetAddressResolver = inetAddressResolver;
this.group = group;
}

@Override
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
return new AsynchronousSocketChannelStreamFactory(inetAddressResolver, socketSettings, sslSettings);
return new AsynchronousSocketChannelStreamFactory(
inetAddressResolver, socketSettings, sslSettings, group);
}

@Override
public void close() {
if (group != null) {
group.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,72 @@
package com.mongodb.internal.connection;

import com.mongodb.MongoClientException;
import com.mongodb.MongoClientSettings;
import com.mongodb.connection.AsyncTransportSettings;
import com.mongodb.connection.NettyTransportSettings;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
import com.mongodb.spi.dns.InetAddressResolver;

import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.concurrent.ExecutorService;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public final class StreamFactoryHelper {
public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final TransportSettings transportSettings,

public static StreamFactory getSyncStreamFactory(final MongoClientSettings settings,
final InetAddressResolver inetAddressResolver, final SocketSettings socketSettings) {
TransportSettings transportSettings = settings.getTransportSettings();
if (transportSettings == null) {
return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings());
} else if (transportSettings instanceof AsyncTransportSettings) {
throw new MongoClientException("Unsupported transport settings in sync: " + transportSettings.getClass().getName());
} else if (transportSettings instanceof NettyTransportSettings) {
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings)
.create(socketSettings, settings.getSslSettings());
} else {
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
}
}

public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClientSettings settings,
final InetAddressResolver inetAddressResolver) {
if (transportSettings instanceof NettyTransportSettings) {
return NettyStreamFactoryFactory.builder().applySettings((NettyTransportSettings) transportSettings)
.inetAddressResolver(inetAddressResolver)
.build();
TransportSettings transportSettings = settings.getTransportSettings();
if (transportSettings == null || transportSettings instanceof AsyncTransportSettings) {
ExecutorService executorService = transportSettings == null
? null
: ((AsyncTransportSettings) transportSettings).getExecutorService();
if (settings.getSslSettings().isEnabled()) {
return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService);
}
AsynchronousChannelGroup group = null;
if (executorService != null) {
try {
group = AsynchronousChannelGroup.withThreadPool(executorService);
} catch (IOException e) {
throw new MongoClientException("Unable to create an asynchronous channel group", e);
}
}
return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group);
} else if (transportSettings instanceof NettyTransportSettings) {
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings);
} else {
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
}
}

private static NettyStreamFactoryFactory getNettyStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
final NettyTransportSettings transportSettings) {
return NettyStreamFactoryFactory.builder()
.applySettings(transportSettings)
.inetAddressResolver(inetAddressResolver)
.build();
}

private StreamFactoryHelper() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand All @@ -71,13 +72,18 @@ public class TlsChannelStreamFactoryFactory implements StreamFactoryFactory {
/**
* Construct a new instance
*/
public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
@Nullable final ExecutorService executorService) {
this.inetAddressResolver = inetAddressResolver;
this.group = new AsynchronousTlsChannelGroup();
this.group = new AsynchronousTlsChannelGroup(executorService);
selectorMonitor = new SelectorMonitor();
selectorMonitor.start();
}

public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
this(inetAddressResolver, null);
}

@Override
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
assertTrue(sslSettings.isEnabled());
Expand Down
Loading

0 comments on commit 1b6f13a

Please sign in to comment.