Skip to content

Commit

Permalink
Drain loop implementation of channel pool (#595)
Browse files Browse the repository at this point in the history
* Channel pool counter fix

* Fix nuke next channel logic

* Attempt for drain loop and uri indexed pool

* Change default keep-alive to 60 sec

* Update counter before a channel is being closed

* Use LIFO

* Implement LRU in concurrent deque map

* Improve wip check

* Adjust synchronized blocks

* Fix synchronized block in deque map

* Minor fixes

* Close channels failing in available pool

* Checkstyle

* Fix tests

* Fix tests

* Checkstyle
  • Loading branch information
jianghaolu authored Mar 27, 2019
1 parent da98b74 commit f30c79b
Show file tree
Hide file tree
Showing 6 changed files with 504 additions and 189 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/

package com.microsoft.rest.v2.http;

import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

/**
* A thread-safe multi map where the values for a certain key are FIFO organized.
* @param <K> the key type
* @param <V> the value type
*/
public class ConcurrentMultiDequeMap<K, V> {
private final Map<K, ConcurrentLinkedDeque<V>> data;
// Size is the total number of elements in all ConcurrentLinkedQueues in the Map.
private final AtomicInteger size;
// least recently updated keys
private final LinkedList<K> lru;

/**
* Create a concurrent multi hash map.
*/
public ConcurrentMultiDequeMap() {
this.data = Collections.synchronizedMap(new ConcurrentHashMap<K, ConcurrentLinkedDeque<V>>(16, 0.75f));
this.size = new AtomicInteger(0);
this.lru = new LinkedList<>();
}

/**
* Add a new key value pair to the multimap.
*
* @param key the key to put
* @param value the value to put
* @return the added value
*/
public V put(K key, V value) {
assert key != null;
synchronized (size) {
if (!data.containsKey(key)) {
data.put(key, new ConcurrentLinkedDeque<V>());
lru.addLast(key);
} else {
lru.remove(key);
lru.addLast(key);
}
data.get(key).add(value);
size.incrementAndGet();
return value;
}
}

/**
* Returns the queue associated with the given key.
*
* @param key the key to query
* @return the queue associated with the key
*/
public ConcurrentLinkedDeque<V> get(K key) {
return data.get(key);
}

/**
* Retrieves and removes one item from the multi map. The item is from
* the least recently used key set.
* @return the item removed from the map
*/
public V poll() {
K key;
synchronized (size) {
if (size.get() == 0) {
return null;
} else {
key = lru.getFirst();
}
}
return poll(key);
}
/**
* Retrieves and removes one item from the multi map. The item is from
* the most recently used key set.
* @return the item removed from the map
*/
public V pop() {
K key;
synchronized (size) {
if (size.get() == 0) {
return null;
} else {
key = lru.getLast();
}
}
return pop(key);
}

/**
* Retrieves the least recently used item in the deque for the given key.
*
* @param key the key to poll an item
* @return the least recently used item for the key
*/
public V poll(K key) {
if (!data.containsKey(key)) {
return null;
} else {
ConcurrentLinkedDeque<V> queue = data.get(key);
V ret;
synchronized (size) {
if (queue == null || queue.isEmpty()) {
throw new NoSuchElementException("no items under key " + key);
}
size.decrementAndGet();
ret = queue.poll();
if (queue.isEmpty()) {
data.remove(key);
lru.remove(key);
}
}
return ret;
}
}

/**
* Retrieves the most recently used item in the deque for the given key.
*
* @param key the key to poll an item
* @return the most recently used item for the key
*/
public V pop(K key) {
if (!data.containsKey(key)) {
return null;
} else {
ConcurrentLinkedDeque<V> queue = data.get(key);
V ret;
synchronized (size) {
if (queue == null || queue.isEmpty()) {
throw new NoSuchElementException("no items under key " + key);
}
size.decrementAndGet();
ret = queue.pop();
if (queue.isEmpty()) {
data.remove(key);
lru.remove(key);
}
}
return ret;
}
}

/**
* @return the size of the multimap.
*/
public int size() {
return size.get();
}

/**
* Checks if there are values associated with a key in the multimap.
*
* @param key the key to check
* @return true if there are values associated
*/
public boolean containsKey(K key) {
return data.containsKey(key) && data.get(key).size() > 0;
}

/**
* @return the set of keys with which there are values associated
*/
public Set<K> keys() {
Set<K> keys = new HashSet<>();
for (K key : data.keySet()) {
if (data.get(key).size() > 0) {
keys.add(key);
}
}
return keys;
}

/**
* @return the set of all values for all keys in the multimap.
*/
public Set<V> values() {
Set<V> values = new HashSet<>();
for (K k : keys()) {
values.addAll(data.get(k));
}
return values;
}

/**
* Removes a key value pair in the multimap. If there's no such key value
* pair then this returns false. Otherwise this method removes it and
* returns true.
*
* @param key the key to remove
* @param value the value to remove
* @return true if an item is removed
*/
public boolean remove(K key, V value) {
if (!data.containsKey(key)) {
return false;
}
ConcurrentLinkedDeque<V> queue = data.get(key);
boolean removed;
synchronized (size) {
removed = queue.remove(value);
if (removed) {
size.decrementAndGet();
}
if (queue.isEmpty()) {
data.remove(key);
lru.remove(key);
}
}
return removed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
* A thread-safe multi map where the values for a certain key are FIFO organized.
* @param <K> the key type
* @param <V> the value type
*
* @deprecated Use {@link ConcurrentMultiDequeMap} instead
*/
@Deprecated
public class ConcurrentMultiHashMap<K, V> {
private final Map<K, ConcurrentLinkedQueue<V>> data;
// Size is the total number of elements in all ConcurrentLinkedQueues in the Map.
Expand All @@ -29,7 +32,7 @@ public class ConcurrentMultiHashMap<K, V> {
* Create a concurrent multi hash map.
*/
public ConcurrentMultiHashMap() {
this.data = Collections.synchronizedMap(new LinkedHashMap<K, ConcurrentLinkedQueue<V>>(16, 0.75f, true));
this.data = Collections.synchronizedMap(new LinkedHashMap<>(16, 0.75f, true));
this.size = new AtomicInteger(0);
}

Expand Down Expand Up @@ -161,8 +164,10 @@ public boolean remove(K key, V value) {
ConcurrentLinkedQueue<V> queue = data.get(key);
boolean removed;
synchronized (size) {
size.decrementAndGet();
removed = queue.remove(value);
if (removed) {
size.decrementAndGet();
}
}
if (queue.isEmpty()) {
data.remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,33 +154,34 @@ private static TransportConfig loadTransport(int groupSize) {
}

private static SharedChannelPool createChannelPool(Bootstrap bootstrap, TransportConfig config,
int poolSize, SslContext sslContext) {
SharedChannelPoolOptions options, SslContext sslContext) {
bootstrap.group(config.eventLoopGroup);
bootstrap.channel(config.channelClass);
bootstrap.option(ChannelOption.AUTO_READ, false);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) TimeUnit.MINUTES.toMillis(3L));
return new SharedChannelPool(bootstrap, new AbstractChannelPoolHandler() {
return new SharedChannelPool(bootstrap, config.eventLoopGroup, new AbstractChannelPoolHandler() {
@Override
public synchronized void channelCreated(Channel ch) throws Exception {
// Why is it necessary to have "synchronized" to prevent NRE in pipeline().get(Class<T>)?
// Is channelCreated not run on the eventLoop assigned to the channel?
ch.pipeline().addLast("HttpClientCodec", new HttpClientCodec());
ch.pipeline().addLast("HttpClientInboundHandler", new HttpClientInboundHandler());
}
}, poolSize, new SharedChannelPoolOptions(), sslContext);
}, options, sslContext);
}

private NettyAdapter() {
TransportConfig config = loadTransport(0);
this.eventLoopGroup = config.eventLoopGroup;
this.channelPool = createChannelPool(new Bootstrap(), config, eventLoopGroup.executorCount() * 16, null);
this.channelPool = createChannelPool(new Bootstrap(), config,
new SharedChannelPoolOptions().withPoolSize(eventLoopGroup.executorCount() * 16).withIdleChannelKeepAliveDurationInSec(60), null);
}

private NettyAdapter(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolSize, SslContext sslContext) {
private NettyAdapter(Bootstrap baseBootstrap, int eventLoopGroupSize, SharedChannelPoolOptions options, SslContext sslContext) {
TransportConfig config = loadTransport(eventLoopGroupSize);
this.eventLoopGroup = config.eventLoopGroup;
this.channelPool = createChannelPool(baseBootstrap, config, channelPoolSize, sslContext);
this.channelPool = createChannelPool(baseBootstrap, config, options, sslContext);
}

private Single<HttpResponse> sendRequestInternalAsync(final HttpRequest request, final HttpClientConfiguration configuration) {
Expand Down Expand Up @@ -461,6 +462,7 @@ void emitError(Throwable throwable) {
}
} else {
LOGGER.debug("Channel disposed at state {}", s);
closeAndReleaseChannel();
break;
}
}
Expand Down Expand Up @@ -950,14 +952,22 @@ public void dumpChannelPool() {
public static class Factory implements HttpClientFactory {
private final NettyAdapter adapter;


/**
* Create a Netty client factory with default settings.
*/
public Factory() {
this.adapter = new NettyAdapter();
}

/**
* Create a Netty client factory, specifying the channel pool options.
*
* @param options the options to configure the channel pool
*/
public Factory(SharedChannelPoolOptions options) {
this(new Bootstrap(), 0, options, null);
}

/**
* Create a Netty client factory, specifying the event loop group size and the
* channel pool size.
Expand All @@ -972,11 +982,11 @@ public Factory() {
public Factory(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolSize) {
this(baseBootstrap.clone(), eventLoopGroupSize, channelPoolSize, null);
}

/**
* Create a Netty client factory, specifying the event loop group size and the
* channel pool size.
*
*
* @param baseBootstrap
* a channel Bootstrap to use as a basis for channel creation
* @param eventLoopGroupSize
Expand All @@ -987,7 +997,24 @@ public Factory(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolS
* An SslContext, can be null.
*/
public Factory(Bootstrap baseBootstrap, int eventLoopGroupSize, int channelPoolSize, SslContext sslContext) {
this.adapter = new NettyAdapter(baseBootstrap.clone(), eventLoopGroupSize, channelPoolSize, sslContext);
this(baseBootstrap, eventLoopGroupSize, new SharedChannelPoolOptions().withPoolSize(channelPoolSize), sslContext);
}

/**
* Create a Netty client factory, specifying the event loop group size and the
* channel pool options.
*
* @param baseBootstrap
* a channel Bootstrap to use as a basis for channel creation
* @param eventLoopGroupSize
* the number of event loop executors
* @param options
* the options to configure the channel pool
* @param sslContext
* An SslContext, can be null.
*/
public Factory(Bootstrap baseBootstrap, int eventLoopGroupSize, SharedChannelPoolOptions options, SslContext sslContext) {
this.adapter = new NettyAdapter(baseBootstrap.clone(), eventLoopGroupSize, options, sslContext);
}

@Override
Expand Down
Loading

0 comments on commit f30c79b

Please sign in to comment.