diff --git a/client/src/main/java/com/networknt/client/oauth/OauthHelper.java b/client/src/main/java/com/networknt/client/oauth/OauthHelper.java index 49ad6a3254..54031069aa 100644 --- a/client/src/main/java/com/networknt/client/oauth/OauthHelper.java +++ b/client/src/main/java/com/networknt/client/oauth/OauthHelper.java @@ -20,9 +20,9 @@ import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException; import com.networknt.client.ClientConfig; import com.networknt.client.Http2Client; -import com.networknt.client.simplepool.SimpleConnectionHolder; +import com.networknt.client.simplepool.SimpleConnectionState; import com.networknt.client.simplepool.SimpleConnectionPool; -import com.networknt.client.simplepool.undertow.SimpleClientConnectionMaker; +import com.networknt.client.simplepool.undertow.SimpleUndertowConnectionMaker; import com.networknt.cluster.Cluster; import com.networknt.config.Config; import com.networknt.exception.ClientException; @@ -88,7 +88,7 @@ public class OauthHelper { private static final Logger logger = LoggerFactory.getLogger(OauthHelper.class); private static final SimpleConnectionPool pool = new SimpleConnectionPool( - ClientConfig.get().getConnectionExpireTime(), ClientConfig.get().getConnectionPoolSize(), SimpleClientConnectionMaker.instance()); + ClientConfig.get().getConnectionExpireTime(), ClientConfig.get().getConnectionPoolSize(), SimpleUndertowConnectionMaker.instance()); /** * @deprecated As of release 1.5.29, replaced with @link #getTokenResult(TokenRequest tokenRequest) @@ -413,7 +413,7 @@ public static String getKey(KeyRequest keyRequest, String envTag) throws ClientE final Http2Client client = Http2Client.getInstance(); final CountDownLatch latch = new CountDownLatch(1); ClientConnection connection = null; - SimpleConnectionHolder.ConnectionToken borrowToken = null; + SimpleConnectionState.ConnectionToken borrowToken = null; long connectionTimeout = Math.max(2, keyRequest.getKeyConnectionTimeout() / 1000); long populateKeyTimeout = Math.max(2, keyRequest.getPopulateKeyTimeout() / 1000); diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleConnection.java b/client/src/main/java/com/networknt/client/simplepool/SimpleConnection.java index 5167b64fa7..20ff719259 100644 --- a/client/src/main/java/com/networknt/client/simplepool/SimpleConnection.java +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleConnection.java @@ -22,7 +22,7 @@ /*** * SimpleConnection is an interface that contains all the required functions and properties of - * a connection that are needed by the SimpleConnectionHolder, SimpleURIConnectionPool, and + * a connection that are needed by the SimpleConnectionState, SimpleURIConnectionPool, and * SimpleConnectionPool classes. * * Concrete HTTP network connections (like Undertow's ClientConnection class) should be wrapped in diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionMaker.java b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionMaker.java index 2a9bef4542..34a96a0f3f 100644 --- a/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionMaker.java +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionMaker.java @@ -24,10 +24,46 @@ /*** * A factory that creates raw connections and wraps them in SimpleConnection objects. - * SimpleConnectionMakers are used by SimpleConnectionHolders to create connections. + * SimpleConnectionMakers are used by SimpleConnectionStates to create connections. * */ public interface SimpleConnectionMaker { - public SimpleConnection makeConnection(long createConnectionTimeout, boolean isHttp2, final URI uri, final Set allCreatedConnections); - public SimpleConnection reuseConnection(long createConnectionTimeout, SimpleConnection connection) throws RuntimeException; + /*** + * Establishes a new connection to a URI. + * Implementations of SimpleConnectionMaker are used by SimpleConnectionStates as a connection factory. + * + * @param createConnectionTimeout the maximum time in seconds to wait for a connection to be established + * @param isHttp2 if true, SimpleConnectionMaker must attempt to establish an HTTP/2 connection, otherwise it will + * attempt to create an HTTP/1.1 connection + * @param uri the URI to connect to + * @param allCreatedConnections Implementations of SimpleConnectionMaker are used by SimpleConnectionState to create + * connections to arbitrary URIs. In other words, implementations of SimpleConnectionMaker are used by + * SimpleConnectionState as a connection factory. + * + * A SimpleConnectionMaker will add all connections it creates to the Set allCreatedConnections. + * SimpleURIConnectionPool will compare the connections in this Set to those that it is tracking and close + * any untracked connections. + * + * Untracked connections can occur if there is a connection creation timeout. When such a timeout occurs, + * makeConnection() must throw a RuntimeException which will prevent SimpleURIConnectionPool from acquiring + * a SimpleConnection. However, the connection creation callback thread in makeConnection() may continue to + * execute after the timeout and ultimately succeed in creating the connection after the timeout has occurred + * and the exception has been thrown. Connections that are created but were not returned to + * SimpleURIConnectionPool are considered to be 'untracked'. + * + * Despite not being tracked by SimpleURIConnectionPool, all successfully created connections must be added + * to allCreatedConnections. + * + * SimpleURIConnectionPool prevents these untracked connections from accumulating and causing a connection + * leak over time, by periodically closing any open connections in allCreatedConnections that it is not tracking. + * + * Thread Safety: + * allCreatedConnections MUST be a threadsafe Set, or the thread safety of the connection pool cannot + * be guaranteed. + * + * @return A SimpleConnection to the specified URI + * @throws RuntimeException thrown if the connection establishment timeout (createConnectionTimeout) + * expires before a connection to the URI is established, or if there is an error establishing the connection + */ + public SimpleConnection makeConnection(long createConnectionTimeout, boolean isHttp2, final URI uri, final Set allCreatedConnections) throws RuntimeException; } diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionPool.java b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionPool.java index bb0448f61a..6480c1f211 100644 --- a/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionPool.java +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionPool.java @@ -24,7 +24,11 @@ import java.util.concurrent.ConcurrentHashMap; /** - * This is a class through which multiple URI Connection Pools can be accessed + * SimpleConnectionPool is a connection pool, which means that it manages a pool of reusable network connections. + * Threads borrow connections from the pool, use them, and then return them back to the pool. Once returned, these + * connections can then be borrowed by other threads. Since these returned connections are already active / established, + * they can be used immediately without going through the lengthy connection establishment process. This can very + * significantly increase the performance of systems that make many simultaneous outgoing API calls. */ public final class SimpleConnectionPool { private final Map pools = new ConcurrentHashMap<>(); @@ -32,22 +36,51 @@ public final class SimpleConnectionPool { private final long expireTime; private final int poolSize; + /*** + * Creates a SimpleConnectionPool + * + * @param expireTime the length of time in milliseconds a connection is eligible to be borrowed + * @param poolSize the maximum number of unexpired connections the pool can hold at a given time + * @param connectionMaker a class that SimpleConnectionPool uses to create new connections + */ public SimpleConnectionPool(long expireTime, int poolSize, SimpleConnectionMaker connectionMaker) { this.expireTime = expireTime; this.poolSize = poolSize; this.connectionMaker = connectionMaker; } - public SimpleConnectionHolder.ConnectionToken borrow(long createConnectionTimeout, boolean isHttp2, URI uri) - throws RuntimeException + /*** + * Returns a network connection to a URI + * + * @param createConnectionTimeout The maximum time in seconds to wait for a new connection to be established before + * throwing an exception + * @param isHttp2 if true, SimpleURIConnectionPool will attempt to establish an HTTP/2 connection, otherwise it will + * attempt to create an HTTP/1.1 connection + * @return a ConnectionToken object that contains the borrowed connection. The thread using the connection must + * return this connection to the pool when it is done with it by calling the borrow() method with the + * ConnectionToken as the argument + * @throws RuntimeException if connection creation takes longer than createConnectionTimeout seconds, + * or other issues that prevent connection creation + */ + public SimpleConnectionState.ConnectionToken borrow(long createConnectionTimeout, boolean isHttp2, URI uri) + throws RuntimeException { if(!pools.containsKey(uri)) pools.computeIfAbsent(uri, pool -> new SimpleURIConnectionPool(uri, expireTime, poolSize, connectionMaker)); - + return pools.get(uri).borrow(createConnectionTimeout, isHttp2); } - public void restore(SimpleConnectionHolder.ConnectionToken connectionToken) { + /*** + * Restores borrowed connections + * + * NOTE: A connection that unexpectedly closes may be removed from connection pool tracking before all of its + * ConnectionTokens have been restored. This can result in seeing log messages about CLOSED connections + * being restored to the pool that are no longer tracked / known by the connection pool + * + * @param connectionToken the connection token that represents the borrowing of a connection by a thread + */ + public void restore(SimpleConnectionState.ConnectionToken connectionToken) { if(connectionToken == null) return; diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionHolder.java b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionState.java similarity index 67% rename from client/src/main/java/com/networknt/client/simplepool/SimpleConnectionHolder.java rename to client/src/main/java/com/networknt/client/simplepool/SimpleConnectionState.java index 22191ae582..14d70f8259 100644 --- a/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionHolder.java +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleConnectionState.java @@ -28,8 +28,8 @@ import java.util.concurrent.ConcurrentHashMap; /*** - * A SimpleConnectionHolder is a simplified interface for a connection, that also keeps track of the connection's state. - * (In fact--in this document--the state of a connection and the state of its holder are used interchangeably) + * A SimpleConnectionState is a simplified interface for a connection, that also keeps track of the connection's state. + * (In fact--in this document--a connection and its state are used interchangeably) * * Connection States * @@ -84,68 +84,68 @@ * * Not doing so (i.e.: not freezing the time) may allow inconsistent states to be reached. */ -public final class SimpleConnectionHolder { - private static final Logger logger = LoggerFactory.getLogger(SimpleConnectionHolder.class); +public final class SimpleConnectionState { + private static final Logger logger = LoggerFactory.getLogger(SimpleConnectionState.class); - // how long a connection can be eligible to be borrowed + // how long in milliseconds a connection can be eligible to be borrowed private final long EXPIRE_TIME; // the maximum number of borrowed tokens a connection can have at a time private final int MAX_BORROWS; - // the time this connection was created + // the time this connection was created represented as Unix Epoch time in milliseconds private final long startTime; // the URI this connection is connected to private final URI uri; /** - if true, this connection should be treated as CLOSED - note: CLOSED may be true before a connection is actually closed since there may be a delay - between setting close = false, and the network connection actually being fully closed - */ + if true, this connection should be treated as CLOSED + note: CLOSED may be true before a connection is actually closed since there may be a delay + between setting close = false, and the network connection actually being fully closed + */ private volatile boolean closed = false; /** - If the connection is HTTP/1.1, it can only be borrowed by 1 process at a time - If the connection is HTTP/2, it can be borrowed by an unlimited number of processes at a time - */ - private final SimpleConnectionMaker connectionMaker; + If the connection is HTTP/1.1, it can only be borrowed by 1 thread at a time + If the connection is HTTP/2, it can be borrowed by an unlimited number of threads at a time + */ private final SimpleConnection connection; /** a Set containing all borrowed connection tokens */ private final Set borrowedTokens = ConcurrentHashMap.newKeySet(); /*** - * Connections and ConnectionHolders are paired 1-1. For every connection there is a single ConnectionHolder and + * Connections and ConnectionStates are paired 1-1. For every connection there is a single ConnectionState and * vice versa. * - * This is why connections are created at the same time a ConnectionHolder is created (see SimpleConnectionHolder + * This is why connections are created at the same time a ConnectionState is created (see SimpleConnectionState * constructor). * - * The connection holder acts as a simplified interface to the connection, and keeps track of how many - * processes are using it at any given time. The maximum number of processes using it at the same time - * is determined by the connections type: HTTP/1.1 (1 process at a time) or HTTP/2 (multiple processes at a time). + * The SimpleConnectionState acts as a simplified interface to the connection, and keeps track of how many + * threads are using it at any given time. The maximum number of threads that can use it at the same time + * is determined by the connection's type: HTTP/1.1 (1 thread at a time) or HTTP/2 (multiple threads at a time). * - * @param expireTime how long a connection is eligible to be borrowed - * @param connectionCreateTimeout how long it can take a connection be created before an exception thrown + * @param expireTime the length of time in milliseconds a connection is eligible to be borrowed + * @param connectionCreateTimeout The maximum time in seconds to wait for a new connection to be established before + * throwing an exception * @param isHttp2 if true, tries to upgrade to HTTP/2. if false, will try to open an HTTP/1.1 connection * @param uri the URI the connection will try to connect to * @param allCreatedConnections this Set will be passed to the callback thread that creates the connection. - * The connectionMaker will always add every successfully created connection - * to this Set. - * @param connectionMaker a class that SimpleConnectionHolder uses to create new SimpleConnection objects + * The connectionMaker will always add every successfully created connection to this Set. This Set must be + * threadsafe (such as ConcurrentHashMap.newKeySet()) + * @param connectionMaker a class that SimpleConnectionState uses to create new SimpleConnection objects + * @throws RuntimeException thrown if connection cannot be established within connectionCreateTimeout + * seconds or prematurely closes */ - public SimpleConnectionHolder( - long expireTime, - long connectionCreateTimeout, - boolean isHttp2, - URI uri, - Set allCreatedConnections, - SimpleConnectionMaker connectionMaker) + public SimpleConnectionState( + long expireTime, + long connectionCreateTimeout, + boolean isHttp2, + URI uri, + Set allCreatedConnections, + SimpleConnectionMaker connectionMaker) throws RuntimeException { - this.connectionMaker = connectionMaker; - this.uri = uri; EXPIRE_TIME = expireTime; @@ -157,63 +157,62 @@ public SimpleConnectionHolder( // throw exception if connection creation failed if(!connection.isOpen()) { - logger.debug("{} closed connection", logLabel(connection, now)); + if(logger.isDebugEnabled()) logger.debug("{} closed connection", logLabel(connection, now)); throw new RuntimeException("[" + port(connection) + "] Error creating connection to " + uri.toString()); - // start life-timer and determine connection type + // start life-timer and determine connection type } else { startTime = System.currentTimeMillis(); // HTTP/1.1 connections have a MAX_BORROW of 1, while HTTP/2 connections can have > 1 MAX_BORROWS MAX_BORROWS = connection().isMultiplexingSupported() ? Integer.MAX_VALUE : 1; - logger.debug("{} New connection : {}", logLabel(connection, now), MAX_BORROWS > 1 ? "HTTP/2" : "HTTP/1.1"); + if(logger.isDebugEnabled()) + logger.debug("{} New connection : {}", logLabel(connection, now), MAX_BORROWS > 1 ? "HTTP/2" : "HTTP/1.1"); } } - private volatile boolean firstUse = true; /** * State Transition - Borrow * - * @param connectionCreateTimeout the amount of time to wait for a connection to be created before throwing an exception - * @param now the time at which to evaluate whether there are borrowable connections or not - * @return returns a ConnectionToken representing this borrow of the connection - * @throws RuntimeException if connection closed or attempt to borrow after pool is full + * @param connectionCreateTimeout the amount of time in seconds to wait for a connection to be created before + * throwing an exception + * @param now the Unix Epoch time in milliseconds at which to evaluate whether there are borrowable connections or not + * @return returns a ConnectionToken representing the borrowing of the connection + * @throws RuntimeException if the connection is closed + * @throws IllegalStateException if the connection is not borrowable */ public synchronized ConnectionToken borrow(long connectionCreateTimeout, long now) throws RuntimeException { /*** * Connections can only be borrowed when the connection is in a BORROWABLE state. * * This will throw an IllegalStateException if borrow is called when the connection is not borrowable. - * This means that users need to check the state of the connection (i.e.: the state of the ConnectionHolder) + * This means that users need to check the state of the connection (i.e.: the state of the ConnectionState) * before using it, e.g.: * * ConnectionToken connectionToken = null; * long now = System.currentTimeMillis(); * - * if(connectionHolder.borrowable(now)) - * connectionToken = connectionHolder.borrow(connectionCreateTimeout, now); + * if(connectionState.borrowable(now)) + * connectionToken = connectionState.borrow(connectionCreateTimeout, now); * * Also note the use of a single consistent value for the current time ('now'). This ensures * that the state returned in the 'if' statement will still be true in the 'borrow' statement * (as long as the connection does not close between the 'if' and 'borrow'). * */ + ConnectionToken connectionToken; if(borrowable(now)) { - if (firstUse) { - firstUse = false; - connectionToken = new ConnectionToken(connection); - } else { - SimpleConnection reusedConnection = connectionMaker.reuseConnection(connectionCreateTimeout, connection); - connectionToken = new ConnectionToken(reusedConnection); - } + if(closed()) + throw new RuntimeException("Connection was unexpectedly closed"); // add connectionToken to the Set of borrowed tokens - borrowedTokens.add(connectionToken); + borrowedTokens.add( (connectionToken = new ConnectionToken(connection)) ); - logger.debug("{} borrow - connection now has {} borrows", logLabel(connection, now), borrowedTokens.size()); + if(logger.isDebugEnabled()) + logger.debug("{} borrow - connection now has {} borrows", logLabel(connection, now), borrowedTokens.size()); return connectionToken; } @@ -237,31 +236,33 @@ public synchronized void restore(ConnectionToken connectionToken) { borrowedTokens.remove(connectionToken); long now = System.currentTimeMillis(); - logger.debug("{} restore - connection now has {} borrows", logLabel(connection, now), borrowedTokens.size()); + if(logger.isDebugEnabled()) + logger.debug("{} restore - connection now has {} borrows", logLabel(connection, now), borrowedTokens.size()); } /** * State Transition - Close * - * @param now the time at which to evaluate whether this connection is closable or not + * @param now the Unix Epoch time in milliseconds at which to evaluate whether this connection is closable or not * @return true if the connection was closed and false otherwise */ public synchronized boolean safeClose(long now) { - logger.debug("{} close - closing connection with {} borrows...", logLabel(connection, now), borrowedTokens.size()); + if(logger.isDebugEnabled()) + logger.debug("{} close - closing connection with {} borrows...", logLabel(connection, now), borrowedTokens.size()); /** - Connection may still be open even if closed == true - However, for consistency, we treat the connection as closed as soon as closed == true, - even if IoUtils.safeClose(connection) has not completed closing the connection yet - */ + Connection may still be open even if closed == true + However, for consistency, we treat the connection as closed as soon as closed == true, + even if IoUtils.safeClose(connection) has not completed closing the connection yet + */ if(closed()) return true; /** - Ensures that a connection is never closed unless the connection is in the NOT_BORROWED_EXPIRED state - This is vital to ensure that connections are never closed until after all processes that - borrowed them are no longer using them - */ + Ensures that a connection is never closed unless the connection is in the NOT_BORROWED_EXPIRED state + This is vital to ensure that connections are never closed until after all threads that + borrowed them are no longer using them + */ boolean notBorrowedExpired = !borrowed() && expired(now); if(notBorrowedExpired != true) throw new IllegalStateException(); @@ -289,7 +290,7 @@ public synchronized boolean closed() { /** * State Property - isExpired * - * @param now the time at which to evaluate whether this connection has expired or not + * @param now the Unix Epoch time in milliseconds at which to evaluate whether this connection has expired or not * @return true if the connection has expired and false otherwise */ public synchronized boolean expired(long now) { @@ -307,7 +308,7 @@ public synchronized boolean borrowed() { /** * State Property - isAtMaxBorrows - * + * * @return true if the connection is at its maximum number of borrows, and false otherwise */ public synchronized boolean maxBorrowed() { @@ -317,7 +318,7 @@ public synchronized boolean maxBorrowed() { /** * State Property - isBorrowable * - * @param now the time at which to evaluate the borrowability of this connection + * @param now the Unix Epoch time in milliseconds at which to evaluate the borrowability of this connection * @return true if the connection is borrowable and false otherwise */ public synchronized boolean borrowable(long now) { @@ -325,24 +326,24 @@ public synchronized boolean borrowable(long now) { } /** - * Returns the SimpleConnection that SimpleConnectionHolder holds + * Returns the SimpleConnection that SimpleConnectionState holds * - * @return the SimpleConnection that SimpleConnectionHolder holds + * @return the SimpleConnection that SimpleConnectionState holds */ public SimpleConnection connection() { return connection; } public class ConnectionToken { private final SimpleConnection connection; - private final SimpleConnectionHolder holder; + private final SimpleConnectionState connectionState; private final URI uri; ConnectionToken(SimpleConnection connection) { this.connection = connection; - this.holder = SimpleConnectionHolder.this; - this.uri = SimpleConnectionHolder.this.uri; + this.connectionState = SimpleConnectionState.this; + this.uri = SimpleConnectionState.this.uri; } - SimpleConnectionHolder holder() { return holder; } + SimpleConnectionState state() { return connectionState; } SimpleConnection connection() { return connection; } public Object getRawConnection() { return connection.getRawConnection(); } public URI uri() { return uri; } diff --git a/client/src/main/java/com/networknt/client/simplepool/SimpleURIConnectionPool.java b/client/src/main/java/com/networknt/client/simplepool/SimpleURIConnectionPool.java index e06ab5fb7f..6e7b072e3e 100644 --- a/client/src/main/java/com/networknt/client/simplepool/SimpleURIConnectionPool.java +++ b/client/src/main/java/com/networknt/client/simplepool/SimpleURIConnectionPool.java @@ -28,15 +28,21 @@ import java.util.concurrent.ThreadLocalRandom; /*** - A connection pool for a single URI. - Connection pool contains 4 Sets of ConnectionHolders: - - 1. allCreatedConnections all connections created by connection makers are added to this set - 2. allKnownConnections: the set of all connections tracked by the connection pool - 3. Borrowable: connection that can be borrowed from - 4. Borrowed: connections that have borrowed tokens - 5. notBorrowedExpired: connections that have no borrowed tokens -- only these can be closed by the pool -*/ + SimpleURIConnectionPool is a connection pool for a single URI, which means that it manages a pool of reusable + connections to a single URI. + Threads borrow connections from the pool, use them, and then return them back to the pool. Since these returned + connections are already active / established, they can be used immediately without going through the lengthy connection + establishment process. This can very significantly increase the performance of systems that make many simultaneous + outgoing API calls. + + Internally, SimpleURIConnectionPool organizes connections into 4 (possibly overlapping) sets: + + 1. allCreatedConnections all connections created by connection makers are added to this set + 2. trackedConnections: the set of all connections tracked by the connection pool + 3. Borrowable: connection that can be borrowed from + 4. Borrowed: connections that have borrowed tokens + 5. notBorrowedExpired: connections that have no borrowed tokens -- only these can be closed by the pool + */ public final class SimpleURIConnectionPool { private static final Logger logger = LoggerFactory.getLogger(SimpleURIConnectionPool.class); private final SimpleConnectionMaker connectionMaker; @@ -50,15 +56,23 @@ public final class SimpleURIConnectionPool { /** The set of all connections created by the SimpleConnectionMaker for this uri */ private final Set allCreatedConnections = ConcurrentHashMap.newKeySet(); /** The set containing all connections known to this connection pool (It is not considered a state set) */ - private final Set allKnownConnections = new HashSet<>(); + private final Set trackedConnections = new HashSet<>(); /** * State Sets * The existence or non-existence of a connection in one of these sets means that the connection is or is not in * one of these states. A connection can be in multiple state sets at a time (e.g.: a connection can be both borrowed and borrowable) */ - private final Set borrowable = new HashSet<>(); - private final Set borrowed = new HashSet<>(); - private final Set notBorrowedExpired = new HashSet<>(); + private final Set borrowable = new HashSet<>(); + private final Set borrowed = new HashSet<>(); + private final Set notBorrowedExpired = new HashSet<>(); + /*** + * Creates a new SimpleURIConnectionPool + * + * @param uri the URI that this connection pool creates connections to + * @param expireTime the length of time in milliseconds a connection is eligible to be borrowed + * @param poolSize the maximum number of unexpired connections the pool can hold at a given time + * @param connectionMaker a class that SimpleConnectionPool uses to create new connections + */ public SimpleURIConnectionPool(URI uri, long expireTime, int poolSize, SimpleConnectionMaker connectionMaker) { EXPIRY_TIME = expireTime; this.uri = uri; @@ -67,33 +81,42 @@ public SimpleURIConnectionPool(URI uri, long expireTime, int poolSize, SimpleCon } /*** + * Returns a network connection to a URI * - * @param createConnectionTimeout - * @param isHttp2 - * @return - * @throws RuntimeException + * @param createConnectionTimeout The maximum time in seconds to wait for a new connection to be established + * @param isHttp2 if true, SimpleURIConnectionPool will attempt to establish an HTTP/2 connection, otherwise it will + * attempt to create an HTTP/1.1 connection + * @return a ConnectionToken object that contains the borrowed connection. The thread using the connection must + * return this connection to the pool when it is done with it by calling the borrow() method with the + * ConnectionToken as the argument + * @throws RuntimeException if connection creation takes longer than createConnectionTimeout seconds, + * or other issues that prevent connection creation */ - public synchronized SimpleConnectionHolder.ConnectionToken borrow(long createConnectionTimeout, boolean isHttp2) throws RuntimeException { + public synchronized SimpleConnectionState.ConnectionToken borrow(long createConnectionTimeout, boolean isHttp2) throws RuntimeException { long now = System.currentTimeMillis(); - final SimpleConnectionHolder holder; + final SimpleConnectionState connectionState; - readAllConnectionHolders(now); + // update the connection pool's state + applyAllConnectionStates(now); if(borrowable.size() > 0) { - holder = borrowable.toArray(new SimpleConnectionHolder[0])[ThreadLocalRandom.current().nextInt(borrowable.size())]; + connectionState = borrowable.toArray(new SimpleConnectionState[0])[ThreadLocalRandom.current().nextInt(borrowable.size())]; } else { - if (allKnownConnections.size() < poolSize) { - holder = new SimpleConnectionHolder(EXPIRY_TIME, createConnectionTimeout, isHttp2, uri, allCreatedConnections, connectionMaker); - allKnownConnections.add(holder); - } else + if (trackedConnections.size() < poolSize) { + connectionState = new SimpleConnectionState(EXPIRY_TIME, createConnectionTimeout, isHttp2, uri, allCreatedConnections, connectionMaker); + trackedConnections.add(connectionState); + } else { + findAndCloseLeakedConnections(); throw new RuntimeException("An attempt was made to exceed the maximum size was of the " + uri.toString() + " connection pool"); + } } - SimpleConnectionHolder.ConnectionToken connectionToken = holder.borrow(createConnectionTimeout, now); - readConnectionHolder(holder, now, () -> allKnownConnections.remove(holder)); + SimpleConnectionState.ConnectionToken connectionToken = connectionState.borrow(createConnectionTimeout, now); + applyConnectionState(connectionState, now, () -> trackedConnections.remove(connectionState)); - logger.debug(showConnections("borrow")); + if(logger.isDebugEnabled()) logger.debug(showConnections("borrow")); + findAndCloseLeakedConnections(); return connectionToken; } @@ -106,17 +129,20 @@ public synchronized SimpleConnectionHolder.ConnectionToken borrow(long createCon * * @param connectionToken the connection token that represents the borrowing of a connection by a thread */ - public synchronized void restore(SimpleConnectionHolder.ConnectionToken connectionToken) { + public synchronized void restore(SimpleConnectionState.ConnectionToken connectionToken) { if(connectionToken == null) return; - SimpleConnectionHolder holder = connectionToken.holder(); + SimpleConnectionState connectionState = connectionToken.state(); long now = System.currentTimeMillis(); - holder.restore(connectionToken); - readAllConnectionHolders(now); + // update this connection's state + connectionState.restore(connectionToken); + + // update the connection pool's state + applyAllConnectionStates(now); - logger.debug(showConnections("restore")); + if(logger.isDebugEnabled()) logger.debug(showConnections("restore")); } /** @@ -127,51 +153,27 @@ public synchronized void restore(SimpleConnectionHolder.ConnectionToken connecti * This method is private, and is only called either directly or transitively by synchronized * methods in this class. * - * Note: - * 'knownConnectionHolders::remove' is just Java syntactic sugar for '() -> knownConnectionHolders.remove()' - * - * @param now the current time in ms + * @param now the Unix Epoch time in milliseconds at which to evaluate the connection states */ - private void readAllConnectionHolders(long now) + private void applyAllConnectionStates(long now) { /** - * Sweep all known connections and update sets - * - * Remove any connections that have unexpectedly closed - * Move all remaining connections to appropriate sets based on their properties + * Sweep all known connections and apply their state changes to the connection pool's state. Also, close + * any unborrowed expired connections */ - final Iterator knownConnectionHolders = allKnownConnections.iterator(); - while (knownConnectionHolders.hasNext()) { - SimpleConnectionHolder connection = knownConnectionHolders.next(); - - // remove connections that have unexpectedly closed - if (connection.closed()) { - logger.debug("[{}: CLOSED]: Connection unexpectedly closed - Removing from known-connections set", port(connection.connection())); - readConnectionHolder(connection, now, knownConnectionHolders::remove); - } - // else, move connections to correct sets - else { - readConnectionHolder(connection, now, knownConnectionHolders::remove); - - // close and remove connections if they are in a closeable set - if (notBorrowedExpired.contains(connection)) { - connection.safeClose(now); - readConnectionHolder(connection, now, knownConnectionHolders::remove); - } - } - } - - // find and close any leaked connections - findAndCloseLeakedConnections(); + final Iterator trackedConnectionStates = trackedConnections.iterator(); + while (trackedConnectionStates.hasNext()) + applyConnectionState(trackedConnectionStates.next(), now, () -> trackedConnectionStates.remove()); } - private interface RemoveFromAllKnownConnections { void remove(); } + @FunctionalInterface private interface RemoveFromTrackedConnections { void remove(); } /*** * This method reads a connection and moves it to the correct sets based on its properties. - * It will also remove a connection from all sets (i.e.: stop tracking the connection) if it is closed. + * It will remove a connection from all sets (i.e.: stop tracking the connection) if it is closed. + * It will close unborrowed expired connections. * * NOTE: Closing connections and modifying sets - * readConnectionHolder() and findAndCloseLeakedConnections() are the only two methods that close connections + * readConnectionState() and findAndCloseLeakedConnections() are the only two methods that close connections * and modify sets. This can be helpful to know for debugging since the sets comprise the entirety of the * mutable state of this SimpleURIConnectionPool objects * @@ -179,12 +181,12 @@ private interface RemoveFromAllKnownConnections { void remove(); } * This method is private, and is only called either directly or transitively by synchronized * methods in this class. * - * @param connection - * @param now - * @param knownConnections a lambda expression to remove a closed-connection from allKnownConnections, either using - * an Iterator of allKnownConnections, or directly using allKnownConnections.remove() + * @param connection the connection to read and move to the appropriate sets + * @param now the Unix Epoch time in milliseconds at which to evaluate the connection's state + * @param trackedConnectionRemover a lambda expression to remove a closed-connection from trackedConnections, either using + * an Iterator of trackedConnections, or directly using trackedConnections.remove() */ - private void readConnectionHolder(SimpleConnectionHolder connection, long now, RemoveFromAllKnownConnections knownConnections) { + private void applyConnectionState(SimpleConnectionState connection, long now, RemoveFromTrackedConnections trackedConnectionRemover) { /** * Remove all references to closed connections @@ -193,14 +195,10 @@ private void readConnectionHolder(SimpleConnectionHolder connection, long now, R * (and will therefore be garbage collected) */ if(connection.closed()) { - logger.debug("[{}: CLOSED]: Connection closed - Stopping connection tracking", port(connection.connection())); - - allCreatedConnections.remove(connection.connection()); // connection.connection() returns a SimpleConnection - knownConnections.remove(); // this will remove the connection from allKnownConnections directly, or via Iterator + if(logger.isDebugEnabled()) + logger.debug("[{}: CLOSED]: Connection unexpectedly closed - Stopping connection tracking", port(connection.connection())); - borrowable.remove(connection); - borrowed.remove(connection); - notBorrowedExpired.remove(connection); + removeFromConnectionTracking(connection, trackedConnectionRemover); return; } @@ -213,51 +211,78 @@ private void readConnectionHolder(SimpleConnectionHolder connection, long now, R updateSet(borrowable, isBorrowable, connection); updateSet(borrowed, isBorrowed, connection); updateSet(notBorrowedExpired, isNotBorrowedExpired, connection); + + // close and remove connection if it is in a closeable set + if (notBorrowedExpired.contains(connection)) + { + connection.safeClose(now); + removeFromConnectionTracking(connection, trackedConnectionRemover); + + if(logger.isDebugEnabled()) + logger.debug("[{}: CLOSED]: Expired connection was closed - Connection tracking stopped", port(connection.connection())); + } } /*** - * Takes a Set, a boolean, and a connectionHolder - * If the boolean is true, it will add the connectionHolder to the Set, otherwise, it will remove it from the Set + * Removes a connection (and its connection state) from being tracked by the pool + * + * @param connection the connection state (and connection) to remove from connection tracking + * @param trackedConnectionRemover a lamda expression to remove the state that depends on whether it is removed in + * an Iterator loop or directly from the trackedConnections Set + */ + private void removeFromConnectionTracking(SimpleConnectionState connection, RemoveFromTrackedConnections trackedConnectionRemover) + { + allCreatedConnections.remove(connection.connection()); // connection.connection() returns a SimpleConnection + trackedConnectionRemover.remove(); // this will remove the connection from trackedConnections directly, or via Iterator + + borrowable.remove(connection); + borrowed.remove(connection); + notBorrowedExpired.remove(connection); + } + + /*** + * Takes a Set, a boolean, and a connectionState + * If the boolean is true, it will add the connectionState to the Set, otherwise, it will remove it from the Set * * NOTE: Thread Safety * This method is private, and is only called either directly or transitively by synchronized * methods in this class. * - * @param set the set to potentially add or remove the connectionHolder from - * @param isMember if true, it will add connectionHolder to set, otherwise, it will remove connectionHolder from set - * @param connectionHolder the connectionHolder to add or remove from the set + * @param set the set to potentially add or remove the connectionState from + * @param isMember if true, it will add connectionState to set, otherwise, it will remove connectionState from set + * @param connectionState the connectionState to add or remove from the set */ - private void updateSet(Set set, boolean isMember, SimpleConnectionHolder connectionHolder) { + private void updateSet(Set set, boolean isMember, SimpleConnectionState connectionState) { if(isMember) - set.add(connectionHolder); + set.add(connectionState); else - set.remove(connectionHolder); + set.remove(connectionState); } /** * Remove leaked connections * A leaked connection is any connection that was created by a SimpleConnectionMaker, but was not returned by the - * SimpleConnectionHolder.borrow() method. This can happen if an error occurs (specifically, if an exception is - * thrown) during the creation of a SimpleConnectionHolder. A SimpleConnectionHolder can fail to instantiate + * SimpleConnectionState.borrow() method. This can happen if an error occurs (specifically, if an exception is + * thrown) during the creation of a SimpleConnectionState. A SimpleConnectionState can fail to instantiate * (after it has created a new connection) if, for example: * * 1) the connection-creation callback thread finishes creating the connection after a timeout has occurred - * 2) the raw connection unexpectedly closes during the creation of its SimpleConnectionHolder + * 2) the raw connection unexpectedly closes during the creation of its SimpleConnectionState * * NOTE: Closing connection and modifying sets - * readConnectionHolder() and findAndCloseLeakedConnections() are the only two methods that close connections + * readConnectionState() and findAndCloseLeakedConnections() are the only two methods that close connections * and modify sets. This can be helpful to know for debugging since the sets comprise the entirety of the * mutable state of this SimpleURIConnectionPool objects */ private void findAndCloseLeakedConnections() { // remove all connections that the connection pool is tracking, from the set of all created connections - for(SimpleConnectionHolder knownConnection: allKnownConnections) - allCreatedConnections.remove(knownConnection.connection()); + for(SimpleConnectionState trackedConnection: trackedConnections) + allCreatedConnections.remove(trackedConnection.connection()); // any remaining connections are leaks, and can now be safely closed if(allCreatedConnections.size() > 0) { - logger.debug("{} untracked connection found", allCreatedConnections.size()); + if(logger.isDebugEnabled()) logger.debug("{} untracked connection(s) found", allCreatedConnections.size()); Iterator leakedConnections = allCreatedConnections.iterator(); while(leakedConnections.hasNext()) { @@ -265,9 +290,10 @@ private void findAndCloseLeakedConnections() if(leakedConnection.isOpen()) { leakedConnection.safeClose(); - logger.debug("Connection closed {} -> {}", port(leakedConnection), uri.toString()); - } else - logger.debug("Connection was already closed {} -> {}", port(leakedConnection), uri.toString()); + if(logger.isDebugEnabled()) logger.debug("Connection closed {} -> {}", port(leakedConnection), uri.toString()); + } else { + if (logger.isDebugEnabled()) logger.debug("Connection was already closed {} -> {}", port(leakedConnection), uri.toString()); + } leakedConnections.remove(); } @@ -284,14 +310,14 @@ private void findAndCloseLeakedConnections() * * NOTE: Iteration Safety * This method should not be used inside loops that iterate through elements of borrowable, borrowed, - * notBorrowedExpired, or allKnownConnections sets + * notBorrowedExpired, or trackedConnections sets */ private String showConnections(String transitionName) { return "After " + transitionName + " - " + showConnections("BORROWABLE", borrowable) + showConnections("BORROWED", borrowed) + showConnections("NOT_BORROWED_EXPIRED", notBorrowedExpired) + - showConnections("TRACKED", allKnownConnections); + showConnections("TRACKED", trackedConnections); } /*** @@ -301,15 +327,15 @@ private String showConnections(String transitionName) { * This method is private, and is only called either directly or transitively by synchronized * methods in this class. */ - private static String showConnections(String name, Set set) { + private static String showConnections(String name, Set set) { StringBuilder sb = new StringBuilder(); sb.append("[").append(name).append(": "); if(set.size() == 0) sb.append("0"); else { int numCons = set.size(); - for (SimpleConnectionHolder holder : set) { - sb.append(port(holder.connection())); + for (SimpleConnectionState state : set) { + sb.append(port(state.connection())); if (--numCons > 0) sb.append(" "); } } diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/TestConnectionMaker.java b/client/src/main/java/com/networknt/client/simplepool/mock/TestConnectionMaker.java index 09d9029abf..f2604a3824 100644 --- a/client/src/main/java/com/networknt/client/simplepool/mock/TestConnectionMaker.java +++ b/client/src/main/java/com/networknt/client/simplepool/mock/TestConnectionMaker.java @@ -38,21 +38,12 @@ public TestConnectionMaker(Class clas) { @Override public SimpleConnection makeConnection(long createConnectionTimeout, boolean isHttp2, URI uri, Set allConnections) - throws RuntimeException + throws RuntimeException { SimpleConnection connection = instantiateConnection(createConnectionTimeout, isHttp2, allConnections); return connection; } - @Override - public SimpleConnection reuseConnection(long createConnectionTimeout, SimpleConnection connection) throws RuntimeException { - if(connection == null) - return null; - if(!connection.isOpen()) - throw new RuntimeException("Reused-connection has been unexpectedly closed"); - return connection; - } - private SimpleConnection instantiateConnection(long createConnectionTimeout, final boolean isHttp2, final Set allConnections) throws RuntimeException { diff --git a/client/src/main/java/com/networknt/client/simplepool/mock/TestRunner.java b/client/src/main/java/com/networknt/client/simplepool/mock/TestRunner.java index 8ffabca740..36fadf5d73 100644 --- a/client/src/main/java/com/networknt/client/simplepool/mock/TestRunner.java +++ b/client/src/main/java/com/networknt/client/simplepool/mock/TestRunner.java @@ -19,7 +19,7 @@ */ package com.networknt.client.simplepool.mock; -import com.networknt.client.simplepool.SimpleConnectionHolder; +import com.networknt.client.simplepool.SimpleConnectionState; import com.networknt.client.simplepool.SimpleConnectionMaker; import com.networknt.client.simplepool.SimpleURIConnectionPool; import org.slf4j.Logger; @@ -55,32 +55,83 @@ public class TestRunner private boolean isHttp2 = true; /** Test length in seconds. Default 120s */ - public TestRunner setTestLength(long testLength) { this.testLength = testLength; return this; } + public TestRunner setTestLength(long testLength) { + this.testLength = testLength; + return this; + } + /** Number of borrowing threads. Default 2 */ - public TestRunner setNumBorrowerThreads(int numCallers) { this.numCallers = numCallers; return this; } + public TestRunner setNumBorrowerThreads(int numCallers) { + this.numCallers = numCallers; + return this; + } + /** Mock URI. Default https://mock-uri.com */ - public TestRunner setUri(URI uri) { this.uri = uri; return this; } + public TestRunner setUri(URI uri) { + this.uri = uri; + return this; + } + /** Maximum number of connections allowed in the connection pool. Default 100 */ - public TestRunner setConnectionPoolSize(int poolSize) { this.poolSize = poolSize; return this; } + public TestRunner setConnectionPoolSize(int poolSize) { + this.poolSize = poolSize; + return this; + } + /** Connection expiry time in seconds. Default 10s */ - public TestRunner setConnectionExpireTime(long expireTime) { this.expireTime = expireTime; return this; } + public TestRunner setConnectionExpireTime(long expireTime) { + this.expireTime = expireTime; + return this; + } + /** The SimpleConnection class used for connections -- must have a parameterless constructor. * Note: executeTest() will throw an exception if this is not set. */ - public TestRunner setSimpleConnectionClass(Class simpleConnectionClass) { this.simpleConnectionClass = simpleConnectionClass; return this; } + public TestRunner setSimpleConnectionClass(Class simpleConnectionClass) { + this.simpleConnectionClass = simpleConnectionClass; + return this; + } + /** Connection creation timeout in seconds. Default is 5s */ - public TestRunner setCreateConnectionTimeout(long createConnectionTimeout) { this.createConnectionTimeout = createConnectionTimeout; return this; } + public TestRunner setCreateConnectionTimeout(long createConnectionTimeout) { + this.createConnectionTimeout = createConnectionTimeout; + return this; + } + /** Amount of time in seconds that borrower threads hold connections before restoring them. Default 3s */ - public TestRunner setBorrowTimeLength(long borrowTime) { this.borrowTime = borrowTime; return this; } + public TestRunner setBorrowTimeLength(long borrowTime) { + this.borrowTime = borrowTime; + return this; + } + /** Max random additional time in seconds that borrower threads hold connections before restoring them. Default 4s */ - public TestRunner setBorrowTimeLengthJitter(long borrowJitter) { this.borrowJitter = borrowJitter; return this; } + public TestRunner setBorrowTimeLengthJitter(long borrowJitter) { + this.borrowJitter = borrowJitter; + return this; + } + /** Amount of time in seconds that borrower threads waits after returning a connection to borrow again. Default 2s */ - public TestRunner setWaitTimeBeforeReborrow(long reborrowTime) { this.reborrowTime = reborrowTime; return this; } + public TestRunner setWaitTimeBeforeReborrow(long reborrowTime) { + this.reborrowTime = reborrowTime; + return this; + } + /** Max random additional time in seconds that borrower threads waits after returning a connection to borrow again. Default 2s */ - public TestRunner setWaitTimeBeforeReborrowJitter(long reborrowTimeJitter) { this.reborrowTimeJitter = reborrowTimeJitter; return this; } + public TestRunner setWaitTimeBeforeReborrowJitter(long reborrowTimeJitter) { + this.reborrowTimeJitter = reborrowTimeJitter; + return this; + } + /** Max random startup delay in seconds for borrower threads. Default 3s */ - public TestRunner setBorrowerThreadStartJitter(int threadStartJitter) { this.threadStartJitter = threadStartJitter; return this; } + public TestRunner setBorrowerThreadStartJitter(int threadStartJitter) { + this.threadStartJitter = threadStartJitter; + return this; + } + /** Determines whether caller threads request HTTP/2 connections. HTTP/2 means multiple borrows per connection are allowed. Default true */ - public TestRunner setHttp2(boolean http2) { isHttp2 = http2; return this; } + public TestRunner setHttp2(boolean http2) { + isHttp2 = http2; + return this; + } public void executeTest() throws RuntimeException { if(simpleConnectionClass == null) @@ -132,7 +183,7 @@ private void createAndStartCallers( { while(numCallers-- > 0) { new CallerThread( - pool, stopped, createConnectionTimeout, isHttp2, borrowTime, borrowJitter, reborrowTime, reborrowTimeJitter, latch).start(); + pool, stopped, createConnectionTimeout, isHttp2, borrowTime, borrowJitter, reborrowTime, reborrowTimeJitter, latch).start(); if(threadStartJitter > 0) Thread.sleep(ThreadLocalRandom.current().nextLong(threadStartJitter+1) * 1000); } @@ -151,15 +202,15 @@ private static class CallerThread extends Thread { private final long reborrowTimeJitter; public CallerThread( - SimpleURIConnectionPool pool, - AtomicBoolean stopped, - long createConnectionTimeout, - boolean isHttp2, - long borrowTime, - long borrowJitter, - long reborrowTime, - long reborrowTimeJitter, - CountDownLatch latch) + SimpleURIConnectionPool pool, + AtomicBoolean stopped, + long createConnectionTimeout, + boolean isHttp2, + long borrowTime, + long borrowJitter, + long reborrowTime, + long reborrowTimeJitter, + CountDownLatch latch) { this.latch = latch; this.stopped = stopped; @@ -176,7 +227,7 @@ public CallerThread( public void run() { logger.debug("{} Starting", Thread.currentThread().getName()); while(!stopped.get()) { - SimpleConnectionHolder.ConnectionToken connectionToken = null; + SimpleConnectionState.ConnectionToken connectionToken = null; try { logger.debug("{} Borrowing connection", Thread.currentThread().getName()); connectionToken = pool.borrow(createConnectionTimeout, isHttp2); diff --git a/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnection.java b/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleUndertowConnection.java similarity index 91% rename from client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnection.java rename to client/src/main/java/com/networknt/client/simplepool/undertow/SimpleUndertowConnection.java index a75b8081d0..2ae1fa99c3 100644 --- a/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnection.java +++ b/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleUndertowConnection.java @@ -23,10 +23,10 @@ import io.undertow.client.ClientConnection; import org.xnio.IoUtils; -public class SimpleClientConnection implements SimpleConnection { +public class SimpleUndertowConnection implements SimpleConnection { private ClientConnection connection; - public SimpleClientConnection(ClientConnection connection) { + public SimpleUndertowConnection(ClientConnection connection) { this.connection = connection; } diff --git a/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnectionMaker.java b/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleUndertowConnectionMaker.java similarity index 86% rename from client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnectionMaker.java rename to client/src/main/java/com/networknt/client/simplepool/undertow/SimpleUndertowConnectionMaker.java index 7dd53beabe..a0939988a3 100644 --- a/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleClientConnectionMaker.java +++ b/client/src/main/java/com/networknt/client/simplepool/undertow/SimpleUndertowConnectionMaker.java @@ -41,16 +41,24 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -public class SimpleClientConnectionMaker implements SimpleConnectionMaker +public class SimpleUndertowConnectionMaker implements SimpleConnectionMaker { - private static final Logger logger = LoggerFactory.getLogger(SimpleClientConnectionMaker.class); + private static final Logger logger = LoggerFactory.getLogger(SimpleUndertowConnectionMaker.class); private static final ByteBufferPool BUFFER_POOL = new DefaultByteBufferPool(true, ClientConfig.get().getBufferSize() * 1024); - private static SimpleClientConnectionMaker simpleClientConnectionMaker = null; + private static final AtomicReference WORKER = new AtomicReference<>(null); + private static final AtomicReference SSL = new AtomicReference<>(null); + private static volatile SimpleUndertowConnectionMaker simpleUndertowConnectionMaker = null; + + private SimpleUndertowConnectionMaker() {} public static SimpleConnectionMaker instance() { - if(simpleClientConnectionMaker == null) - simpleClientConnectionMaker = new SimpleClientConnectionMaker(); - return simpleClientConnectionMaker; + if(simpleUndertowConnectionMaker == null) { + synchronized (SimpleUndertowConnectionMaker.class) { + if (simpleUndertowConnectionMaker == null) + simpleUndertowConnectionMaker = new SimpleUndertowConnectionMaker(); + } + } + return simpleUndertowConnectionMaker; } @Override @@ -70,8 +78,8 @@ public SimpleConnection makeConnection( ClientCallback connectionCallback = new ClientCallback() { @Override public void completed(ClientConnection connection) { - logger.debug("New connection {} established with {}", port(connection), uri); - SimpleConnection simpleConnection = new SimpleClientConnection(connection); + if(logger.isDebugEnabled()) logger.debug("New connection {} established with {}", port(connection), uri); + SimpleConnection simpleConnection = new SimpleUndertowConnection(connection); // note: its vital that allCreatedConnections and result contain the same SimpleConnection reference allCreatedConnections.add(simpleConnection); @@ -80,7 +88,7 @@ public void completed(ClientConnection connection) { @Override public void failed(IOException e) { - logger.debug("Failed to establish new connection for uri: {}", uri); + if(logger.isDebugEnabled()) logger.debug("Failed to establish new connection for uri: {}", uri); result.setException(e); } }; @@ -112,8 +120,6 @@ private static OptionMap getConnectionOptions(boolean isHttp2) { return isHttp2 ? OptionMap.create(UndertowOptions.ENABLE_HTTP2, true) : OptionMap.EMPTY; } - // TODO: Should worker be re-used? Note: Light-4J Http2Client re-uses it - private static AtomicReference WORKER = new AtomicReference<>(null); private static XnioWorker getWorker(boolean isHttp2) { if(WORKER.get() != null) return WORKER.get(); @@ -139,8 +145,6 @@ private static OptionMap getWorkerOptionMap(boolean isHttp2) return optionBuild.getMap(); } - // TODO: Should SSL be re-used? Note: Light-4J Http2Client re-uses it - private static AtomicReference SSL = new AtomicReference<>(null); private static XnioSsl getSSL(boolean isHttps, boolean isHttp2) { if(!isHttps) diff --git a/consul/src/main/java/com/networknt/consul/client/ConsulClientImpl.java b/consul/src/main/java/com/networknt/consul/client/ConsulClientImpl.java index 87e796e328..02dbd7ede3 100644 --- a/consul/src/main/java/com/networknt/consul/client/ConsulClientImpl.java +++ b/consul/src/main/java/com/networknt/consul/client/ConsulClientImpl.java @@ -19,11 +19,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.networknt.client.ClientConfig; import com.networknt.client.Http2Client; +import com.networknt.client.simplepool.SimpleConnectionState; import com.networknt.config.Config; import com.networknt.consul.*; import com.networknt.httpstring.HttpStringConstants; import com.networknt.utility.StringUtils; -import io.undertow.UndertowOptions; import io.undertow.client.ClientConnection; import io.undertow.client.ClientRequest; import io.undertow.client.ClientResponse; @@ -33,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xnio.IoUtils; -import org.xnio.OptionMap; import java.net.URI; import java.net.URISyntaxException; @@ -46,10 +45,9 @@ // use SimpleURIConnectionPool as the connection pool import com.networknt.client.simplepool.SimpleURIConnectionPool; -import com.networknt.client.simplepool.SimpleConnectionHolder; // Use Undertow ClientConnection as raw connection import com.networknt.client.simplepool.SimpleConnectionMaker; -import com.networknt.client.simplepool.undertow.SimpleClientConnectionMaker; +import com.networknt.client.simplepool.undertow.SimpleUndertowConnectionMaker; /** * A client that talks to Consul agent with REST API. @@ -90,7 +88,7 @@ public ConsulClientImpl() { } // create SimpleURIConnection pool - SimpleConnectionMaker undertowConnectionMaker = SimpleClientConnectionMaker.instance(); + SimpleConnectionMaker undertowConnectionMaker = SimpleUndertowConnectionMaker.instance(); pool = new SimpleURIConnectionPool( uri, ClientConfig.get().getConnectionExpireTime(), ClientConfig.get().getConnectionPoolSize(), undertowConnectionMaker); } @@ -100,7 +98,7 @@ public void checkPass(String serviceId, String token) { logger.trace("checkPass serviceId = {}", serviceId); String path = "/v1/agent/check/pass/" + "check-" + serviceId; ClientConnection connection = null; - SimpleConnectionHolder.ConnectionToken connectionToken = null; + SimpleConnectionState.ConnectionToken connectionToken = null; try { logger.debug("Getting connection from pool with {}", uri); @@ -125,7 +123,7 @@ public void checkFail(String serviceId, String token) { logger.trace("checkFail serviceId = {}", serviceId); String path = "/v1/agent/check/fail/" + "check-" + serviceId; ClientConnection connection = null; - SimpleConnectionHolder.ConnectionToken connectionToken = null; + SimpleConnectionState.ConnectionToken connectionToken = null; try { logger.debug("Getting connection from pool with {}", uri); @@ -149,7 +147,7 @@ public void registerService(ConsulService service, String token) { String json = service.toString(); String path = "/v1/agent/service/register"; ClientConnection connection = null; - SimpleConnectionHolder.ConnectionToken connectionToken = null; + SimpleConnectionState.ConnectionToken connectionToken = null; try { logger.debug("Getting connection from pool with {}", uri); @@ -173,7 +171,7 @@ public void registerService(ConsulService service, String token) { public void unregisterService(String serviceId, String token) { String path = "/v1/agent/service/deregister/" + serviceId; ClientConnection connection = null; - SimpleConnectionHolder.ConnectionToken connectionToken = null; + SimpleConnectionState.ConnectionToken connectionToken = null; try { logger.debug("Getting connection from pool with {}", uri); @@ -226,7 +224,7 @@ public ConsulResponse> lookupHealthService(String serviceNam } logger.trace("Consul health service path = {}", path); - SimpleConnectionHolder.ConnectionToken connectionToken = null; + SimpleConnectionState.ConnectionToken connectionToken = null; try { logger.debug("Getting connection from pool with {}", uri); // this will throw a Runtime Exception if creation of Consul connection fails