diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/ConcurrentMultiDequeMap.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/ConcurrentMultiDequeMap.java new file mode 100644 index 0000000000..3957409891 --- /dev/null +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/ConcurrentMultiDequeMap.java @@ -0,0 +1,228 @@ +/** + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for + * license information. + */ + +package com.microsoft.rest.v2.http; + +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A thread-safe multi map where the values for a certain key are FIFO organized. + * @param the key type + * @param the value type + */ +public class ConcurrentMultiDequeMap { + private final Map> data; + // Size is the total number of elements in all ConcurrentLinkedQueues in the Map. + private final AtomicInteger size; + // least recently updated keys + private final LinkedList lru; + + /** + * Create a concurrent multi hash map. + */ + public ConcurrentMultiDequeMap() { + this.data = Collections.synchronizedMap(new ConcurrentHashMap>(16, 0.75f)); + this.size = new AtomicInteger(0); + this.lru = new LinkedList<>(); + } + + /** + * Add a new key value pair to the multimap. + * + * @param key the key to put + * @param value the value to put + * @return the added value + */ + public V put(K key, V value) { + assert key != null; + synchronized (size) { + if (!data.containsKey(key)) { + data.put(key, new ConcurrentLinkedDeque()); + lru.addLast(key); + } else { + lru.remove(key); + lru.addLast(key); + } + data.get(key).add(value); + size.incrementAndGet(); + return value; + } + } + + /** + * Returns the queue associated with the given key. + * + * @param key the key to query + * @return the queue associated with the key + */ + public ConcurrentLinkedDeque get(K key) { + return data.get(key); + } + + /** + * Retrieves and removes one item from the multi map. The item is from + * the least recently used key set. + * @return the item removed from the map + */ + public V poll() { + K key; + synchronized (size) { + if (size.get() == 0) { + return null; + } else { + key = lru.getFirst(); + } + } + return poll(key); + } + /** + * Retrieves and removes one item from the multi map. The item is from + * the most recently used key set. + * @return the item removed from the map + */ + public V pop() { + K key; + synchronized (size) { + if (size.get() == 0) { + return null; + } else { + key = lru.getLast(); + } + } + return pop(key); + } + + /** + * Retrieves the least recently used item in the deque for the given key. + * + * @param key the key to poll an item + * @return the least recently used item for the key + */ + public V poll(K key) { + if (!data.containsKey(key)) { + return null; + } else { + ConcurrentLinkedDeque queue = data.get(key); + V ret; + synchronized (size) { + if (queue == null || queue.isEmpty()) { + throw new NoSuchElementException("no items under key " + key); + } + size.decrementAndGet(); + ret = queue.poll(); + if (queue.isEmpty()) { + data.remove(key); + lru.remove(key); + } + } + return ret; + } + } + + /** + * Retrieves the most recently used item in the deque for the given key. + * + * @param key the key to poll an item + * @return the most recently used item for the key + */ + public V pop(K key) { + if (!data.containsKey(key)) { + return null; + } else { + ConcurrentLinkedDeque queue = data.get(key); + V ret; + synchronized (size) { + if (queue == null || queue.isEmpty()) { + throw new NoSuchElementException("no items under key " + key); + } + size.decrementAndGet(); + ret = queue.pop(); + if (queue.isEmpty()) { + data.remove(key); + lru.remove(key); + } + } + return ret; + } + } + + /** + * @return the size of the multimap. + */ + public int size() { + return size.get(); + } + + /** + * Checks if there are values associated with a key in the multimap. + * + * @param key the key to check + * @return true if there are values associated + */ + public boolean containsKey(K key) { + return data.containsKey(key) && data.get(key).size() > 0; + } + + /** + * @return the set of keys with which there are values associated + */ + public Set keys() { + Set keys = new HashSet<>(); + for (K key : data.keySet()) { + if (data.get(key).size() > 0) { + keys.add(key); + } + } + return keys; + } + + /** + * @return the set of all values for all keys in the multimap. + */ + public Set values() { + Set values = new HashSet<>(); + for (K k : keys()) { + values.addAll(data.get(k)); + } + return values; + } + + /** + * Removes a key value pair in the multimap. If there's no such key value + * pair then this returns false. Otherwise this method removes it and + * returns true. + * + * @param key the key to remove + * @param value the value to remove + * @return true if an item is removed + */ + public boolean remove(K key, V value) { + if (!data.containsKey(key)) { + return false; + } + ConcurrentLinkedDeque queue = data.get(key); + boolean removed; + synchronized (size) { + removed = queue.remove(value); + if (removed) { + size.decrementAndGet(); + } + if (queue.isEmpty()) { + data.remove(key); + lru.remove(key); + } + } + return removed; + } +} diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/ConcurrentMultiHashMap.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/ConcurrentMultiHashMap.java index 7f82927bd7..22167c175b 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/ConcurrentMultiHashMap.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/ConcurrentMultiHashMap.java @@ -19,7 +19,10 @@ * A thread-safe multi map where the values for a certain key are FIFO organized. * @param the key type * @param the value type + * + * @deprecated Use {@link ConcurrentMultiDequeMap} instead */ +@Deprecated public class ConcurrentMultiHashMap { private final Map> data; // Size is the total number of elements in all ConcurrentLinkedQueues in the Map. @@ -29,7 +32,7 @@ public class ConcurrentMultiHashMap { * Create a concurrent multi hash map. */ public ConcurrentMultiHashMap() { - this.data = Collections.synchronizedMap(new LinkedHashMap>(16, 0.75f, true)); + this.data = Collections.synchronizedMap(new LinkedHashMap<>(16, 0.75f, true)); this.size = new AtomicInteger(0); } @@ -161,8 +164,10 @@ public boolean remove(K key, V value) { ConcurrentLinkedQueue queue = data.get(key); boolean removed; synchronized (size) { - size.decrementAndGet(); removed = queue.remove(value); + if (removed) { + size.decrementAndGet(); + } } if (queue.isEmpty()) { data.remove(key); diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/NettyClient.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/NettyClient.java index a797c78fce..0d4cbf1507 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/NettyClient.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/NettyClient.java @@ -154,13 +154,13 @@ private static TransportConfig loadTransport(int groupSize) { } private static SharedChannelPool createChannelPool(Bootstrap bootstrap, TransportConfig config, - int poolSize, SslContext sslContext) { + SharedChannelPoolOptions options, SslContext sslContext) { bootstrap.group(config.eventLoopGroup); bootstrap.channel(config.channelClass); bootstrap.option(ChannelOption.AUTO_READ, false); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) TimeUnit.MINUTES.toMillis(3L)); - return new SharedChannelPool(bootstrap, new AbstractChannelPoolHandler() { + return new SharedChannelPool(bootstrap, config.eventLoopGroup, new AbstractChannelPoolHandler() { @Override public synchronized void channelCreated(Channel ch) throws Exception { // Why is it necessary to have "synchronized" to prevent NRE in pipeline().get(Class)? @@ -168,19 +168,20 @@ public synchronized void channelCreated(Channel ch) throws Exception { ch.pipeline().addLast("HttpClientCodec", new HttpClientCodec()); ch.pipeline().addLast("HttpClientInboundHandler", new HttpClientInboundHandler()); } - }, poolSize, new SharedChannelPoolOptions(), sslContext); + }, options, sslContext); } private NettyAdapter() { TransportConfig config = loadTransport(0); this.eventLoopGroup = config.eventLoopGroup; - this.channelPool = createChannelPool(new Bootstrap(), config, eventLoopGroup.executorCount() * 16, null); + this.channelPool = createChannelPool(new Bootstrap(), config, + new SharedChannelPoolOptions().withPoolSize(eventLoopGroup.executorCount() * 16).withIdleChannelKeepAliveDurationInSec(60), null); } - private NettyAdapter(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolSize, SslContext sslContext) { + private NettyAdapter(Bootstrap baseBootstrap, int eventLoopGroupSize, SharedChannelPoolOptions options, SslContext sslContext) { TransportConfig config = loadTransport(eventLoopGroupSize); this.eventLoopGroup = config.eventLoopGroup; - this.channelPool = createChannelPool(baseBootstrap, config, channelPoolSize, sslContext); + this.channelPool = createChannelPool(baseBootstrap, config, options, sslContext); } private Single sendRequestInternalAsync(final HttpRequest request, final HttpClientConfiguration configuration) { @@ -461,6 +462,7 @@ void emitError(Throwable throwable) { } } else { LOGGER.debug("Channel disposed at state {}", s); + closeAndReleaseChannel(); break; } } @@ -950,7 +952,6 @@ public void dumpChannelPool() { public static class Factory implements HttpClientFactory { private final NettyAdapter adapter; - /** * Create a Netty client factory with default settings. */ @@ -958,6 +959,15 @@ public Factory() { this.adapter = new NettyAdapter(); } + /** + * Create a Netty client factory, specifying the channel pool options. + * + * @param options the options to configure the channel pool + */ + public Factory(SharedChannelPoolOptions options) { + this(new Bootstrap(), 0, options, null); + } + /** * Create a Netty client factory, specifying the event loop group size and the * channel pool size. @@ -972,11 +982,11 @@ public Factory() { public Factory(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolSize) { this(baseBootstrap.clone(), eventLoopGroupSize, channelPoolSize, null); } - + /** * Create a Netty client factory, specifying the event loop group size and the * channel pool size. - * + * * @param baseBootstrap * a channel Bootstrap to use as a basis for channel creation * @param eventLoopGroupSize @@ -987,7 +997,24 @@ public Factory(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolS * An SslContext, can be null. */ public Factory(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolSize, SslContext sslContext) { - this.adapter = new NettyAdapter(baseBootstrap.clone(), eventLoopGroupSize, channelPoolSize, sslContext); + this(baseBootstrap, eventLoopGroupSize, new SharedChannelPoolOptions().withPoolSize(channelPoolSize), sslContext); + } + + /** + * Create a Netty client factory, specifying the event loop group size and the + * channel pool options. + * + * @param baseBootstrap + * a channel Bootstrap to use as a basis for channel creation + * @param eventLoopGroupSize + * the number of event loop executors + * @param options + * the options to configure the channel pool + * @param sslContext + * An SslContext, can be null. + */ + public Factory(Bootstrap baseBootstrap, int eventLoopGroupSize, SharedChannelPoolOptions options, SslContext sslContext) { + this.adapter = new NettyAdapter(baseBootstrap.clone(), eventLoopGroupSize, options, sslContext); } @Override diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPool.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPool.java index 0a2b8d76f4..b254a81272 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPool.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPool.java @@ -10,14 +10,17 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; import io.netty.channel.pool.ChannelPool; import io.netty.channel.pool.ChannelPoolHandler; import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.util.AttributeKey; +import io.netty.util.concurrent.FailedFuture; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.SucceededFuture; import io.reactivex.annotations.Nullable; import io.reactivex.exceptions.Exceptions; import org.slf4j.Logger; @@ -33,11 +36,7 @@ import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; -import java.util.Queue; import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -55,18 +54,19 @@ class SharedChannelPool implements ChannelPool { private static final AttributeKey CHANNEL_CREATED_SINCE = AttributeKey.newInstance("channel-created-since"); private static final AttributeKey CHANNEL_CLOSED_SINCE = AttributeKey.newInstance("channel-closed-since"); private final Bootstrap bootstrap; + private final EventLoopGroup eventLoopGroup; private final ChannelPoolHandler handler; private final int poolSize; private final AtomicInteger channelCount = new AtomicInteger(0); private final SharedChannelPoolOptions poolOptions; - private final Queue requests; - private final ConcurrentMultiHashMap available; - private final ConcurrentMultiHashMap leased; + private final ConcurrentMultiDequeMap requests; + private final ConcurrentMultiDequeMap available; + private final ConcurrentMultiDequeMap leased; private final Object sync = new Object(); private final SslContext sslContext; - private final ExecutorService executor; private volatile boolean closed = false; private final Logger logger = LoggerFactory.getLogger(SharedChannelPool.class); + AtomicInteger wip = new AtomicInteger(0); private boolean isChannelHealthy(Channel channel) { try { @@ -91,17 +91,18 @@ private boolean isChannelHealthy(Channel channel) { * Creates an instance of the shared channel pool. * @param bootstrap the bootstrap to create channels * @param handler the handler to apply to the channels on creation, acquisition and release - * @param size the upper limit of total number of channels * @param options optional settings for the pool + * @param sslContext the SSL Context for the connections */ - SharedChannelPool(final Bootstrap bootstrap, final ChannelPoolHandler handler, int size, SharedChannelPoolOptions options, SslContext sslContext) { + SharedChannelPool(final Bootstrap bootstrap, final EventLoopGroup eventLoopGroup, final ChannelPoolHandler handler, SharedChannelPoolOptions options, SslContext sslContext) { this.poolOptions = options.clone(); this.bootstrap = bootstrap.clone(); + this.eventLoopGroup = eventLoopGroup; this.handler = handler; - this.poolSize = size; - this.requests = new ConcurrentLinkedDeque<>(); - this.available = new ConcurrentMultiHashMap<>(); - this.leased = new ConcurrentMultiHashMap<>(); + this.poolSize = options.poolSize(); + this.requests = new ConcurrentMultiDequeMap<>(); + this.available = new ConcurrentMultiDequeMap<>(); + this.leased = new ConcurrentMultiDequeMap<>(); try { if (sslContext == null) { this.sslContext = SslContextBuilder.forClient().build(); @@ -111,111 +112,99 @@ private boolean isChannelHealthy(Channel channel) { } catch (SSLException e) { throw new RuntimeException(e); } - this.executor = Executors.newSingleThreadExecutor(runnable -> { - Thread thread = new Thread(runnable, "SharedChannelPool-worker"); - thread.setDaemon(true); - return thread; - }); - - executor.submit(() -> { - while (!closed) { - try { - final ChannelRequest request; - // Synchronizing just to be notified when requests is non-empty - synchronized (requests) { - while (requests.isEmpty() && !closed) { - requests.wait(); - } - } - // requests must be non-empty based on the above condition - request = requests.remove(); - - synchronized (sync) { - while (channelCount.get() >= poolSize && available.size() == 0 && !closed) { - sync.wait(); - } + } - if (closed) { - break; - } + private void drain(URI preferredUri) { + if (!wip.compareAndSet(0, 1)) { + return; + } + while (!closed && wip.updateAndGet(x -> requests.size()) != 0) { + if (channelCount.get() >= poolSize && available.size() == 0) { + wip.set(0); + break; + } + // requests must be non-empty based on the above condition + ChannelRequest request; + if (preferredUri != null && requests.containsKey(preferredUri)) { + request = requests.poll(preferredUri); + } else { + request = requests.poll(); + } - // Try to retrieve a healthy channel from pool - boolean foundHealthyChannelInPool = false; - while (available.containsKey(request.channelURI)) { - Channel channel = available.poll(request.channelURI); - if (isChannelHealthy(channel)) { - handler.channelAcquired(channel); - request.promise.setSuccess(channel); - leased.put(request.channelURI, channel); - foundHealthyChannelInPool = true; - channel.attr(CHANNEL_LEASED_SINCE).set(ZonedDateTime.now(ZoneOffset.UTC)); - logger.debug("Channel picked up from pool: {}", channel.id()); - break; - } else { - logger.debug("Channel disposed from pool due to timeout or half closure: {}", channel.id()); - closeChannel(channel); - } + boolean foundHealthyChannelInPool = false; + // Try to retrieve a healthy channel from pool + if (available.containsKey(request.channelURI)) { + Channel channel = available.pop(request.channelURI); // try most recently used + if (isChannelHealthy(channel)) { + logger.debug("Channel picked up from pool: {}", channel.id()); + leased.put(request.channelURI, channel); + foundHealthyChannelInPool = true; + channel.attr(CHANNEL_LEASED_SINCE).set(ZonedDateTime.now(ZoneOffset.UTC)); + request.promise.setSuccess(channel); + try { + handler.channelAcquired(channel); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } else { + logger.debug("Channel disposed from pool due to timeout or half closure: {}", channel.id()); + closeChannel(channel); + channelCount.decrementAndGet(); + // Delete all channels created before this + while (available.containsKey(request.channelURI)) { + Channel broken = available.pop(request.channelURI); + logger.debug("Channel disposed from pool due to timeout or half closure: {}", broken.id()); + closeChannel(broken); + channelCount.decrementAndGet(); + } + } + } + if (!foundHealthyChannelInPool) { + // Not found a healthy channel in pool. Create a new channel - remove an available one if size overflows + if (channelCount.get() >= poolSize) { + Channel nextAvailable = available.poll(); // Dispose least recently used + logger.debug("Channel disposed due to overflow: {}", nextAvailable.id()); + closeChannel(nextAvailable); + channelCount.decrementAndGet(); + } + int port; + if (request.destinationURI.getPort() < 0) { + port = "https".equals(request.destinationURI.getScheme()) ? 443 : 80; + } else { + port = request.destinationURI.getPort(); + } + channelCount.incrementAndGet(); + SharedChannelPool.this.bootstrap.clone().handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + assert ch.eventLoop().inEventLoop(); + if (request.proxy != null) { + ch.pipeline().addFirst("HttpProxyHandler", new HttpProxyHandler(request.proxy.address())); } - if (!foundHealthyChannelInPool) { - // Not found a healthy channel in pool. Create a new channel - remove an available one if size overflows - while (available.size() > 0 && channelCount.get() >= poolSize) { - Channel nextAvailable = available.poll(); - logger.debug("Channel disposed due to overflow: {}", nextAvailable.id()); - closeChannel(nextAvailable); - } - int port; - if (request.destinationURI.getPort() < 0) { - port = "https".equals(request.destinationURI.getScheme()) ? 443 : 80; - } else { - port = request.destinationURI.getPort(); - } - channelCount.incrementAndGet(); - SharedChannelPool.this.bootstrap.clone().handler(new ChannelInitializer() { - @Override - protected void initChannel(Channel ch) throws Exception { - assert ch.eventLoop().inEventLoop(); - if (request.proxy != null) { - ch.pipeline().addFirst("HttpProxyHandler", new HttpProxyHandler(request.proxy.address())); - } - handler.channelCreated(ch); - } - }).connect(request.destinationURI.getHost(), port).addListener((ChannelFuture f) -> { - if (f.isSuccess()) { - Channel channel = f.channel(); - channel.attr(CHANNEL_URI).set(request.channelURI); - - // Apply SSL handler for https connections - if ("https".equalsIgnoreCase(request.destinationURI.getScheme())) { - channel.pipeline().addBefore("HttpClientCodec", "SslHandler", this.sslContext.newHandler(channel.alloc(), request.destinationURI.getHost(), port)); - } + handler.channelCreated(ch); + } + }).connect(request.destinationURI.getHost(), port).addListener((ChannelFuture f) -> { + if (f.isSuccess()) { + Channel channel = f.channel(); + channel.attr(CHANNEL_URI).set(request.channelURI); - leased.put(request.channelURI, channel); - channel.attr(CHANNEL_CREATED_SINCE).set(ZonedDateTime.now(ZoneOffset.UTC)); - channel.attr(CHANNEL_LEASED_SINCE).set(ZonedDateTime.now(ZoneOffset.UTC)); - logger.debug("Channel created: {}", channel.id()); - handler.channelAcquired(channel); - request.promise.setSuccess(channel); - } else { - request.promise.setFailure(f.cause()); - } - }); + // Apply SSL handler for https connections + if ("https".equalsIgnoreCase(request.destinationURI.getScheme())) { + channel.pipeline().addBefore("HttpClientCodec", "SslHandler", this.sslContext.newHandler(channel.alloc(), request.destinationURI.getHost(), port)); } + leased.put(request.channelURI, channel); + channel.attr(CHANNEL_CREATED_SINCE).set(ZonedDateTime.now(ZoneOffset.UTC)); + channel.attr(CHANNEL_LEASED_SINCE).set(ZonedDateTime.now(ZoneOffset.UTC)); + logger.debug("Channel created: {}", channel.id()); + handler.channelAcquired(channel); + request.promise.setSuccess(channel); + } else { + request.promise.setFailure(f.cause()); + channelCount.decrementAndGet(); } - } catch (Exception e) { - throw Exceptions.propagate(e); - } + }); } - }); - } - - /** - * Creates an instance of the shared channel pool. - * @param bootstrap the bootstrap to create channels - * @param handler the handler to apply to the channels on creation, acquisition and release - * @param size the upper limit of total number of channels - */ - SharedChannelPool(final Bootstrap bootstrap, final ChannelPoolHandler handler, int size) { - this(bootstrap, handler, size, new SharedChannelPoolOptions(), null); + } } /** @@ -257,10 +246,8 @@ public Future acquire(URI uri, @Nullable Proxy proxy, final Promise acquire(Promise promise) { } private Future closeChannel(final Channel channel) { + if (!channel.isOpen()) { + return new SucceededFuture<>(eventLoopGroup.next(), null); + } channel.attr(CHANNEL_CLOSED_SINCE).set(ZonedDateTime.now(ZoneOffset.UTC)); logger.debug("Channel initiated to close: " + channel.id()); // Closing a channel doesn't change the channel count - return channel.close().addListener(f -> { - if (!f.isSuccess()) { - logger.warn("Possible channel leak: failed to close " + channel.id(), f.cause()); - } - }); + try { + return channel.close().addListener(f -> { + if (!f.isSuccess()) { + logger.warn("Possible channel leak: failed to close " + channel.id(), f.cause()); + } + }); + } catch (Exception e) { + logger.warn("Possible channel leak: failed to close " + channel.id(), e); + return new FailedFuture<>(eventLoopGroup.next(), e); + } } /** @@ -294,32 +289,36 @@ private Future closeChannel(final Channel channel) { * @return a Future representing the operation. */ public Future closeAndRelease(final Channel channel) { - return closeChannel(channel).addListener(future -> { - synchronized (sync) { - leased.remove(channel.attr(CHANNEL_URI).get(), channel); - channelCount.decrementAndGet(); - logger.debug("Channel closed and released out of pool: " + channel.id()); - sync.notify(); - } - }); + try { + Future closeFuture = closeChannel(channel).addListener(future -> { + URI channelUri = channel.attr(CHANNEL_URI).get(); + if (leased.remove(channelUri, channel) || available.remove(channelUri, channel)) { + channelCount.decrementAndGet(); + logger.debug("Channel closed and released out of pool: " + channel.id()); + } + drain(channelUri); + }); + return closeFuture; + } catch (Exception e) { + return bootstrap.config().group().next().newFailedFuture(e); + } } @Override public Future release(final Channel channel) { try { handler.channelReleased(channel); - synchronized (sync) { - leased.remove(channel.attr(CHANNEL_URI).get(), channel); - if (isChannelHealthy(channel)) { - available.put(channel.attr(CHANNEL_URI).get(), channel); - channel.attr(CHANNEL_AVAILABLE_SINCE).set(ZonedDateTime.now(ZoneOffset.UTC)); - logger.debug("Channel released to pool: " + channel.id()); - } else { - channelCount.decrementAndGet(); - logger.debug("Channel broken on release, dispose: " + channel.id()); - } - sync.notify(); + URI channelUri = channel.attr(CHANNEL_URI).get(); + leased.remove(channelUri, channel); + if (isChannelHealthy(channel)) { + available.put(channelUri, channel); + channel.attr(CHANNEL_AVAILABLE_SINCE).set(ZonedDateTime.now(ZoneOffset.UTC)); + logger.debug("Channel released to pool: " + channel.id()); + } else { + channelCount.decrementAndGet(); + logger.debug("Channel broken on release, dispose: " + channel.id()); } + drain(channelUri); } catch (Exception e) { return bootstrap.config().group().next().newFailedFuture(e); } @@ -340,11 +339,8 @@ public Future release(final Channel channel, final Promise promise) @Override public void close() { closed = true; - executor.shutdownNow(); - synchronized (requests) { - while (!requests.isEmpty()) { - requests.remove().promise.setFailure(new CancellationException("Channel pool was closed")); - } + while (requests.size() != 0) { + requests.poll().promise.setFailure(new CancellationException("Channel pool was closed")); } } @@ -359,35 +355,33 @@ private static class ChannelRequest { * Used to print a current overview of the channels in this pool. */ public void dump() { - synchronized (sync) { - logger.info(String.format("---- %s: size %d, keep alive (sec) %d ----", toString(), poolSize, poolOptions.idleChannelKeepAliveDurationInSec())); - logger.info("Channel\tState\tFor\tAge\tURL"); - List closed = new ArrayList<>(); - ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); - for (Channel channel : leased.values()) { - if (channel.hasAttr(CHANNEL_CLOSED_SINCE)) { - closed.add(channel); - continue; - } - long stateFor = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_LEASED_SINCE).get(), now); - long age = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_CREATED_SINCE).get(), now); - logger.info(String.format("%s\tLEASE\t%ds\t%ds\t%s", channel.id(), stateFor, age, channel.attr(CHANNEL_URI).get())); + logger.info(String.format("---- %s: size %d, keep alive (sec) %d ----", toString(), poolSize, poolOptions.idleChannelKeepAliveDurationInSec())); + logger.info("Channel\tState\tFor\tAge\tURL"); + List closed = new ArrayList<>(); + ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC); + for (Channel channel : leased.values()) { + if (channel.hasAttr(CHANNEL_CLOSED_SINCE)) { + closed.add(channel); + continue; } - for (Channel channel : available.values()) { - if (channel.hasAttr(CHANNEL_CLOSED_SINCE)) { - closed.add(channel); - continue; - } - long stateFor = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_AVAILABLE_SINCE).get(), now); - long age = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_CREATED_SINCE).get(), now); - logger.info(String.format("%s\tAVAIL\t%ds\t%ds\t%s", channel.id(), stateFor, age, channel.attr(CHANNEL_URI).get())); - } - for (Channel channel : closed) { - long stateFor = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_CLOSED_SINCE).get(), now); - long age = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_CREATED_SINCE).get(), now); - logger.info(String.format("%s\tCLOSE\t%ds\t%ds\t%s", channel.id(), stateFor, age, channel.attr(CHANNEL_URI).get())); + long stateFor = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_LEASED_SINCE).get(), now); + long age = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_CREATED_SINCE).get(), now); + logger.info(String.format("%s\tLEASE\t%ds\t%ds\t%s", channel.id(), stateFor, age, channel.attr(CHANNEL_URI).get())); + } + for (Channel channel : available.values()) { + if (channel.hasAttr(CHANNEL_CLOSED_SINCE)) { + closed.add(channel); + continue; } - logger.info("Active channels: " + channelCount.get() + " Leaked channels: " + (channelCount.get() - leased.size() - available.size())); + long stateFor = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_AVAILABLE_SINCE).get(), now); + long age = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_CREATED_SINCE).get(), now); + logger.info(String.format("%s\tAVAIL\t%ds\t%ds\t%s", channel.id(), stateFor, age, channel.attr(CHANNEL_URI).get())); + } + for (Channel channel : closed) { + long stateFor = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_CLOSED_SINCE).get(), now); + long age = ChronoUnit.SECONDS.between(channel.attr(CHANNEL_CREATED_SINCE).get(), now); + logger.info(String.format("%s\tCLOSE\t%ds\t%ds\t%s", channel.id(), stateFor, age, channel.attr(CHANNEL_URI).get())); } + logger.info("Active channels: " + channelCount.get() + " Leaked or being initialized channels: " + (channelCount.get() - leased.size() - available.size())); } } diff --git a/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPoolOptions.java b/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPoolOptions.java index 89a392f3a1..d7fa0e4709 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPoolOptions.java +++ b/client-runtime/src/main/java/com/microsoft/rest/v2/http/SharedChannelPoolOptions.java @@ -9,15 +9,16 @@ /** * Optional configurations for http channel pool. */ -class SharedChannelPoolOptions { +public class SharedChannelPoolOptions { // Default duration in sec to keep the connection alive in available pool before closing it. - private static final long DEFAULT_TTL_OF_IDLE_CHANNEL = 5 * 60; + private static final long DEFAULT_TTL_OF_IDLE_CHANNEL = 60; private long idleChannelKeepAliveDurationInSec; + private int poolSize; /** * Creates SharedChannelPoolOptions. */ - SharedChannelPoolOptions() { + public SharedChannelPoolOptions() { this.idleChannelKeepAliveDurationInSec = DEFAULT_TTL_OF_IDLE_CHANNEL; } @@ -27,7 +28,7 @@ class SharedChannelPoolOptions { * @param ttlDurationInSec the duration * @return SharedChannelPoolOptions */ - SharedChannelPoolOptions withIdleChannelKeepAliveDurationInSec(long ttlDurationInSec) { + public SharedChannelPoolOptions withIdleChannelKeepAliveDurationInSec(long ttlDurationInSec) { this.idleChannelKeepAliveDurationInSec = ttlDurationInSec; return this; } @@ -35,10 +36,27 @@ SharedChannelPoolOptions withIdleChannelKeepAliveDurationInSec(long ttlDurationI /** * @return gets duration in sec the connection alive in available pool before closing it. */ - long idleChannelKeepAliveDurationInSec() { + public long idleChannelKeepAliveDurationInSec() { return this.idleChannelKeepAliveDurationInSec; } + /** + * Sets the max number of connections allowed in the pool. + * @param poolSize the size of the pool + * @return SharedChannelPoolOptions + */ + public SharedChannelPoolOptions withPoolSize(int poolSize) { + this.poolSize = poolSize; + return this; + } + + /** + * @return the max number of connections allowed in the pool + */ + public int poolSize() { + return poolSize; + } + @Override public SharedChannelPoolOptions clone() { return new SharedChannelPoolOptions() diff --git a/client-runtime/src/test/java/com/microsoft/rest/v2/ConcurrentMultiDequeMapTests.java b/client-runtime/src/test/java/com/microsoft/rest/v2/ConcurrentMultiDequeMapTests.java new file mode 100644 index 0000000000..e889be4edb --- /dev/null +++ b/client-runtime/src/test/java/com/microsoft/rest/v2/ConcurrentMultiDequeMapTests.java @@ -0,0 +1,43 @@ +package com.microsoft.rest.v2; + +import com.microsoft.rest.v2.http.ConcurrentMultiDequeMap; +import com.microsoft.rest.v2.http.ConcurrentMultiHashMap; +import org.junit.Assert; +import org.junit.Test; + +public class ConcurrentMultiDequeMapTests { + + @Test + public void testConcurrentMultiHashMap() { + ConcurrentMultiDequeMap map = new ConcurrentMultiDequeMap<>(); + + // Populate + map.put("a", "0"); + map.put("a", "1"); + map.put("a", "2"); + map.put("a", "3"); + map.put("b", "10"); + map.put("b", "11"); + map.put("b", "12"); + map.put("c", "100"); + map.put("c", "101"); + + Assert.assertEquals(9, map.size()); + + // Poll by key + Assert.assertEquals("10", map.poll("b")); + Assert.assertEquals("0", map.poll("a")); + + // Poll by LRU + Assert.assertEquals("100", map.pop()); + Assert.assertEquals("1", map.poll()); + Assert.assertEquals("2", map.poll()); + Assert.assertEquals("101", map.pop()); + + // ContainsKey + Assert.assertFalse(map.containsKey("c")); + + // Size + Assert.assertEquals(3, map.size()); + } +} \ No newline at end of file