diff --git a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainer.java b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainer.java index 71c97386..cc60244d 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainer.java +++ b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainer.java @@ -48,13 +48,22 @@ import java.nio.ByteBuffer; /** - * Interface for a buffer container object. + * BufferContainer is a decorator for {@link ByteBuffer} with an id. */ public interface BufferContainer { + /** + * @return id of the buffer + */ long id(); + /** + * @return encapsulated {@link ByteBuffer}. + */ ByteBuffer buffer(); + /** + * @return is this a stub implementation. + */ boolean isStub(); } diff --git a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainerImpl.java b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainerImpl.java index f3af2d92..af6b7441 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainerImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainerImpl.java @@ -51,16 +51,15 @@ import java.nio.ByteBuffer; /** - * Implementation of the BufferContainer interface. Contains the buffer with a synchronized (lock-free) way of accessing - * it. + * Decorator for {@link ByteBuffer} with a synchronized access for it. */ -public class BufferContainerImpl implements BufferContainer { +final class BufferContainerImpl implements BufferContainer { private static final Logger LOGGER = LoggerFactory.getLogger(BufferContainerImpl.class); private final long id; private final ByteBuffer buffer; - public BufferContainerImpl(long id, ByteBuffer buffer) { + BufferContainerImpl(long id, ByteBuffer buffer) { this.id = id; this.buffer = buffer; } diff --git a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainerStub.java b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainerStub.java index 7813f904..bda97a83 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainerStub.java +++ b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferContainerStub.java @@ -48,9 +48,13 @@ import java.nio.ByteBuffer; /** - * Buffer container stub object. Use isStub() to check. Other methods will result in an IllegalStateException. + * Stub implementation of the {@link BufferContainer}. */ -public class BufferContainerStub implements BufferContainer { +final class BufferContainerStub implements BufferContainer { + + BufferContainerStub() { + + } @Override public long id() { diff --git a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLease.java b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLease.java index 504ac04f..9820cafc 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLease.java +++ b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLease.java @@ -47,19 +47,43 @@ import java.nio.ByteBuffer; +/** + * BufferLease is a decorator for {@link BufferContainer} with reference counter + */ public interface BufferLease { + /** + * @return identity of the decorated {@link BufferContainer}. + */ long id(); + /** + * @return current reference count. + */ long refs(); + /** + * @return encapsulated buffer of the {@link BufferContainer}. + */ ByteBuffer buffer(); - void addRef(); + /** + * Add reference, throws {@link IllegalStateException} if lease has expired. + */ + void addRef() throws IllegalStateException; - void removeRef(); + /** + * Remove reference, throws {@link IllegalStateException} if lease has expired. + */ + void removeRef() throws IllegalStateException; + /** + * @return status of the lease, {@code true} indicates that the lease has expired. + */ boolean isRefCountZero(); + /** + * @return is this a stub implementation. + */ boolean isStub(); } diff --git a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeaseImpl.java b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeaseImpl.java index 910bf913..a5e13359 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeaseImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeaseImpl.java @@ -48,13 +48,18 @@ import java.nio.ByteBuffer; import java.util.concurrent.Phaser; -public class BufferLeaseImpl implements BufferLease { +/** + * Decorator for {@link BufferContainer} that automatically clears (frees) the encapsulated {@link ByteBuffer} and + * returns the {@link BufferContainer} to {@link BufferLeasePool} when reference count hits zero. Starts with one + * initial reference. Internally uses a {@link Phaser} to track reference count in a non-blocking way. + */ +final class BufferLeaseImpl implements BufferLease { private final BufferContainer bufferContainer; private final Phaser phaser; private final BufferLeasePool bufferLeasePool; - public BufferLeaseImpl(BufferContainer bc, BufferLeasePool bufferLeasePool) { + BufferLeaseImpl(BufferContainer bc, BufferLeasePool bufferLeasePool) { this.bufferContainer = bc; this.bufferLeasePool = bufferLeasePool; diff --git a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeasePool.java b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeasePool.java index 68127349..9d1ecfe4 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeasePool.java +++ b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeasePool.java @@ -59,8 +59,12 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; -// FIXME create tests -public class BufferLeasePool { +/** + * Non-blocking pool for {@link BufferContainer} objects. All objects in the pool are {@link ByteBuffer#clear()}ed + * before returning to the pool by {@link BufferLease}. + */ +public final class BufferLeasePool { + // TODO create tests private static final Logger LOGGER = LoggerFactory.getLogger(BufferLeasePool.class); @@ -122,6 +126,10 @@ private BufferLease take() { } + /** + * @param size minimum size of the {@link BufferLease}s requested. + * @return list of {@link BufferLease}s meeting or exceeding the size requested. + */ public List take(long size) { if (close.get()) { return Collections.singletonList(bufferLeaseStub); @@ -140,10 +148,17 @@ public List take(long size) { } + // FIXME remove this, use BufferLease.removeRef() instead directly. public void offer(BufferLease bufferLease) { bufferLease.removeRef(); } + /** + * return {@link BufferContainer} into the pool. + * + * @param bufferContainer {@link BufferContainer} from {@link BufferLease} which has been + * {@link ByteBuffer#clear()}ed. + */ void internalOffer(BufferContainer bufferContainer) { // Add buffer back to pool if it is not a stub object if (!bufferContainer.isStub()) { @@ -169,6 +184,10 @@ void internalOffer(BufferContainer bufferContainer) { } } + /** + * Closes the {@link BufferLeasePool}, deallocating currently residing {@link BufferContainer}s and future ones when + * returned. + */ public void close() { LOGGER.debug("close called"); close.set(true); @@ -178,6 +197,11 @@ public void close() { } + /** + * Estimate the pool size, due to non-blocking nature of the pool, this is only an estimate. + * + * @return estimate of the pool size, counting only the residing buffers. + */ public int estimatedSize() { return queue.size(); } diff --git a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeaseStub.java b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeaseStub.java index 1118d8fc..b9a198a4 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeaseStub.java +++ b/src/main/java/com/teragrep/rlp_03/channel/buffer/BufferLeaseStub.java @@ -47,7 +47,14 @@ import java.nio.ByteBuffer; -public class BufferLeaseStub implements BufferLease { +/** + * Stub implementation of the {@link BufferLease} + */ +final class BufferLeaseStub implements BufferLease { + + BufferLeaseStub() { + + } @Override public long id() { diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/ConnectContext.java b/src/main/java/com/teragrep/rlp_03/channel/context/ConnectContext.java index 9186cfe3..0da32af1 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/ConnectContext.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/ConnectContext.java @@ -45,8 +45,6 @@ */ package com.teragrep.rlp_03.channel.context; -import com.teragrep.rlp_03.channel.InterestOps; -import com.teragrep.rlp_03.channel.InterestOpsImpl; import com.teragrep.rlp_03.channel.socket.SocketFactory; import com.teragrep.rlp_03.frame.delegate.FrameDelegate; import org.slf4j.Logger; @@ -58,7 +56,14 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; -public class ConnectContext implements Context { +/** + * Initiate type {@link Context} that produces an EstablishedContext once it receives an OP_CONNECT type + * {@link SelectionKey} from {@link com.teragrep.rlp_03.EventLoop} and socketChannel.finishConnect() succeeds. Use + * {@link com.teragrep.rlp_03.EventLoop#register(Context)} to register it to the desired + * {@link com.teragrep.rlp_03.EventLoop}, + */ +public final class ConnectContext implements Context { + // TODO should this be named InitiateContext? private static final Logger LOGGER = LoggerFactory.getLogger(ConnectContext.class); @@ -69,7 +74,7 @@ public class ConnectContext implements Context { private final Consumer establishedContextConsumer; - public ConnectContext( + ConnectContext( SocketChannel socketChannel, ExecutorService executorService, SocketFactory socketFactory, diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/ConnectContextFactory.java b/src/main/java/com/teragrep/rlp_03/channel/context/ConnectContextFactory.java index a45e3e23..12b68f48 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/ConnectContextFactory.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/ConnectContextFactory.java @@ -54,16 +54,32 @@ import java.util.concurrent.ExecutorService; import java.util.function.Consumer; -public class ConnectContextFactory { +/** + * Factory for creating {@link ConnectContext}s for initiating new connections. + */ +public final class ConnectContextFactory { private final ExecutorService executorService; private final SocketFactory socketFactory; + /** + * @param executorService {@link ExecutorService} to handle connection's events with + * @param socketFactory {@link SocketFactory} that produces the desired type + * {@link com.teragrep.rlp_03.channel.socket.Socket} for the connection. + */ public ConnectContextFactory(ExecutorService executorService, SocketFactory socketFactory) { this.executorService = executorService; this.socketFactory = socketFactory; } + /** + * @param inetSocketAddress {@link InetSocketAddress} to initiate connection to. + * @param frameDelegate {@link FrameDelegate} for processing received data with. + * @param establishedContextConsumer {@link Consumer} for handling the callback once connection + * is established. + * @return {@link ConnectContext} to be registered with {@link com.teragrep.rlp_03.EventLoop#register(Context)}. + * @throws IOException if underlying {@link SocketChannel} is unable to initiate the connection. + */ public ConnectContext create( InetSocketAddress inetSocketAddress, FrameDelegate frameDelegate, diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/Context.java b/src/main/java/com/teragrep/rlp_03/channel/context/Context.java index 2b6a74c0..a32291a1 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/Context.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/Context.java @@ -49,14 +49,34 @@ import java.nio.channels.SelectionKey; import java.nio.channels.spi.AbstractSelectableChannel; +/** + * Context is a network connection element, it may be one of the: initiate {@link ConnectContext}, listen + * {@link ListenContext} or established {@link EstablishedContext} types. + */ public interface Context extends Closeable { + /** + * Handles this context's {@link SelectionKey} events. Providing a non-related key will result in non-foreseen + * issues, and this is a programming error. + * + * @param selectionKey key of this context to handle an event for. + */ + // TODO add checks for such programming error of providing non-related key void handleEvent(SelectionKey selectionKey); + /** + * Closes the underlying network connection element and frees resources attached to it. + */ @Override void close(); // no exception is thrown + /** + * @return {@link AbstractSelectableChannel} of the network connection element. + */ AbstractSelectableChannel socketChannel(); + /** + * @return initial state of the {@link SelectionKey} which the network connection element starts with. + */ int initialSelectionKey(); } diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContext.java b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContext.java index c1b3bc70..6c6190a7 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContext.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContext.java @@ -45,14 +45,29 @@ */ package com.teragrep.rlp_03.channel.context; -import com.teragrep.rlp_03.channel.InterestOps; import com.teragrep.rlp_03.channel.socket.Socket; +import java.util.List; + +/** + * Established type of {@link Context}. It produces ingress data into the provided + * {@link com.teragrep.rlp_03.frame.delegate.FrameDelegate} via {@link RelpRead}. Egress data can be written via + * {@link RelpWrite#accept(List)}. + */ public interface EstablishedContext extends Context { + /** + * @return current {@link InterestOps} of the underlying {@link java.nio.channels.SelectionKey}. + */ InterestOps interestOps(); + /** + * @return underlying {@link Socket} of the connection. + */ Socket socket(); + /** + * @return RelpWrite of the connection for sending egress data. + */ RelpWrite relpWrite(); } diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextImpl.java b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextImpl.java index 7096d3fd..bfbecf75 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextImpl.java @@ -45,7 +45,6 @@ */ package com.teragrep.rlp_03.channel.context; -import com.teragrep.rlp_03.channel.*; import com.teragrep.rlp_03.frame.delegate.FrameDelegate; import com.teragrep.rlp_03.channel.buffer.BufferLeasePool; import com.teragrep.rlp_03.channel.socket.Socket; @@ -63,9 +62,9 @@ import static java.nio.channels.SelectionKey.OP_WRITE; /** - * A per connection object that handles reading and writing messages from and to the SocketChannel. + * Implementation of the {@link EstablishedContext} */ -public class EstablishedContextImpl implements EstablishedContext { // TODO make package-protected +final class EstablishedContextImpl implements EstablishedContext { // TODO make package-protected private static final Logger LOGGER = LoggerFactory.getLogger(EstablishedContextImpl.class); @@ -78,7 +77,7 @@ public class EstablishedContextImpl implements EstablishedContext { // TODO make private final RelpRead relpRead; private final RelpWrite relpWrite; - public EstablishedContextImpl( + EstablishedContextImpl( ExecutorService executorService, Socket socket, InterestOps interestOps, diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextStub.java b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextStub.java index cb8f9e41..5c3915b9 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextStub.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/EstablishedContextStub.java @@ -45,13 +45,19 @@ */ package com.teragrep.rlp_03.channel.context; -import com.teragrep.rlp_03.channel.InterestOps; import com.teragrep.rlp_03.channel.socket.Socket; import java.nio.channels.SelectionKey; import java.nio.channels.spi.AbstractSelectableChannel; -public class EstablishedContextStub implements EstablishedContext { +/** + * Stub implementation of {@link EstablishedContext} + */ +final class EstablishedContextStub implements EstablishedContext { + + EstablishedContextStub() { + + } @Override public void close() { diff --git a/src/main/java/com/teragrep/rlp_03/channel/InterestOps.java b/src/main/java/com/teragrep/rlp_03/channel/context/InterestOps.java similarity index 90% rename from src/main/java/com/teragrep/rlp_03/channel/InterestOps.java rename to src/main/java/com/teragrep/rlp_03/channel/context/InterestOps.java index 923f690d..1f93fcf0 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/InterestOps.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/InterestOps.java @@ -43,8 +43,13 @@ * Teragrep, the applicable Commercial License may apply to this file if you as * a licensee so wish it. */ -package com.teragrep.rlp_03.channel; +package com.teragrep.rlp_03.channel.context; +import java.nio.channels.SelectionKey; + +/** + * Encapsulates {@link SelectionKey} with stateful {@link SelectionKey#interestOps()} for use within {@link Context} + */ public interface InterestOps { void add(int op); diff --git a/src/main/java/com/teragrep/rlp_03/channel/InterestOpsImpl.java b/src/main/java/com/teragrep/rlp_03/channel/context/InterestOpsImpl.java similarity index 97% rename from src/main/java/com/teragrep/rlp_03/channel/InterestOpsImpl.java rename to src/main/java/com/teragrep/rlp_03/channel/context/InterestOpsImpl.java index d3242998..deebca8a 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/InterestOpsImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/InterestOpsImpl.java @@ -43,14 +43,14 @@ * Teragrep, the applicable Commercial License may apply to this file if you as * a licensee so wish it. */ -package com.teragrep.rlp_03.channel; +package com.teragrep.rlp_03.channel.context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.channels.SelectionKey; -public final class InterestOpsImpl implements InterestOps { +final class InterestOpsImpl implements InterestOps { private static final Logger LOGGER = LoggerFactory.getLogger(InterestOpsImpl.class); @@ -58,7 +58,7 @@ public final class InterestOpsImpl implements InterestOps { private int currentOps; - public InterestOpsImpl(SelectionKey selectionKey) { + InterestOpsImpl(SelectionKey selectionKey) { this.selectionKey = selectionKey; this.currentOps = selectionKey.interestOps(); } diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/ListenContext.java b/src/main/java/com/teragrep/rlp_03/channel/context/ListenContext.java index c7a70c38..14af9f71 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/ListenContext.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/ListenContext.java @@ -45,8 +45,6 @@ */ package com.teragrep.rlp_03.channel.context; -import com.teragrep.rlp_03.channel.InterestOps; -import com.teragrep.rlp_03.channel.InterestOpsImpl; import com.teragrep.rlp_03.channel.socket.Socket; import com.teragrep.rlp_03.channel.socket.SocketFactory; import com.teragrep.rlp_03.frame.delegate.FrameDelegate; @@ -60,7 +58,12 @@ import java.util.concurrent.ExecutorService; import java.util.function.Supplier; -public class ListenContext implements Context { +/** + * Listen type {@link Context} that produces {@link EstablishedContext} for receiving incoming connections. Use + * {@link com.teragrep.rlp_03.EventLoop#register(Context)} to register it to the desired + * {@link com.teragrep.rlp_03.EventLoop}. + */ +public final class ListenContext implements Context { private static final Logger LOGGER = LoggerFactory.getLogger(ListenContext.class); private final ServerSocketChannel serverSocketChannel; @@ -69,7 +72,7 @@ public class ListenContext implements Context { private final Supplier frameDelegateSupplier; private final EstablishedContextStub establishedContextStub; - public ListenContext( + ListenContext( ServerSocketChannel serverSocketChannel, ExecutorService executorService, SocketFactory socketFactory, @@ -82,6 +85,7 @@ public ListenContext( this.establishedContextStub = new EstablishedContextStub(); } + @Override public void handleEvent(SelectionKey selectionKey) { try { if (selectionKey.isAcceptable()) { diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/ListenContextFactory.java b/src/main/java/com/teragrep/rlp_03/channel/context/ListenContextFactory.java index 968b4a4e..5d5013ae 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/ListenContextFactory.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/ListenContextFactory.java @@ -54,7 +54,10 @@ import java.util.concurrent.ExecutorService; import java.util.function.Supplier; -public class ListenContextFactory { +/** + * Factory for creating {@link ListenContext}s for receiving new connections. + */ +public final class ListenContextFactory { private final ExecutorService executorService; private final SocketFactory socketFactory; @@ -70,6 +73,13 @@ public ListenContextFactory( this.frameDelegateSupplier = frameDelegateSupplier; } + /** + * Opens a listening socket + * + * @param inetSocketAddress address to bind to + * @return {@link ListenContext} to be registered into an {@link com.teragrep.rlp_03.EventLoop} + * @throws IOException if unable to bind to the address provided + */ public ListenContext open(InetSocketAddress inetSocketAddress) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); try { diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/RelpRead.java b/src/main/java/com/teragrep/rlp_03/channel/context/RelpRead.java index 628141e9..d7cdf75f 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/RelpRead.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/RelpRead.java @@ -47,6 +47,9 @@ import java.util.concurrent.atomic.AtomicBoolean; +/** + * Ingress {@link com.teragrep.rlp_03.frame.RelpFrame} are handled by this asynchronously. + */ public interface RelpRead extends Runnable { @Override diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/RelpReadImpl.java b/src/main/java/com/teragrep/rlp_03/channel/context/RelpReadImpl.java index 6be56d8f..e88bde1c 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/RelpReadImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/RelpReadImpl.java @@ -70,7 +70,7 @@ import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; -public class RelpReadImpl implements RelpRead { +final class RelpReadImpl implements RelpRead { private static final Logger LOGGER = LoggerFactory.getLogger(RelpReadImpl.class); private final EstablishedContextImpl establishedContext; diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/RelpWrite.java b/src/main/java/com/teragrep/rlp_03/channel/context/RelpWrite.java index 0aab1560..6b60e3b8 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/RelpWrite.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/RelpWrite.java @@ -51,9 +51,16 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +/** + * Egress {@link com.teragrep.rlp_03.frame.RelpFrame} are handled by this + */ public interface RelpWrite extends Consumer>, Runnable { - // this must be thread-safe! + /** + * Sends asynchronously the frames provided. Implementation is required to be thread-safe. + * + * @param relpFrameTXList to send + */ @Override void accept(List relpFrameTXList); diff --git a/src/main/java/com/teragrep/rlp_03/channel/context/RelpWriteImpl.java b/src/main/java/com/teragrep/rlp_03/channel/context/RelpWriteImpl.java index 61a7bc79..03dbd1b0 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/context/RelpWriteImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/context/RelpWriteImpl.java @@ -67,7 +67,7 @@ import static java.nio.channels.SelectionKey.OP_READ; import static java.nio.channels.SelectionKey.OP_WRITE; -public class RelpWriteImpl implements RelpWrite { +final class RelpWriteImpl implements RelpWrite { private static final Logger LOGGER = LoggerFactory.getLogger(RelpWriteImpl.class); diff --git a/src/main/java/com/teragrep/rlp_03/channel/info/EncryptionInfo.java b/src/main/java/com/teragrep/rlp_03/channel/socket/EncryptionInfo.java similarity index 66% rename from src/main/java/com/teragrep/rlp_03/channel/info/EncryptionInfo.java rename to src/main/java/com/teragrep/rlp_03/channel/socket/EncryptionInfo.java index 6ef89bc1..c1fb5132 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/info/EncryptionInfo.java +++ b/src/main/java/com/teragrep/rlp_03/channel/socket/EncryptionInfo.java @@ -43,26 +43,68 @@ * Teragrep, the applicable Commercial License may apply to this file if you as * a licensee so wish it. */ -package com.teragrep.rlp_03.channel.info; +package com.teragrep.rlp_03.channel.socket; import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; import javax.security.cert.X509Certificate; import java.security.Principal; import java.security.cert.Certificate; +/** + * Provides information about a {@link com.teragrep.rlp_03.channel.socket.Socket} encryption status. + */ public interface EncryptionInfo { + /** + * Information if connection is encrypted + * + * @return true if socket is an encrypted type + */ boolean isEncrypted(); + /** + * throws IllegalStateException if isEncrypted returns false + * + * @return see {@link SSLSession#getCipherSuite()} + */ String getSessionCipherSuite(); + /** + * throws IllegalStateException if isEncrypted returns false + * + * @return see {@link SSLSession#getLocalCertificates()} + */ Certificate[] getLocalCertificates(); + /** + * throws IllegalStateException if isEncrypted returns false + * + * @return see {@link SSLSession#getLocalPrincipal()} + */ Principal getLocalPrincipal(); + /** + * throws IllegalStateException if isEncrypted returns false + * + * @return see {@link SSLSession#getPeerCertificateChain()} + * @throws SSLPeerUnverifiedException + */ X509Certificate[] getPeerCertificateChain() throws SSLPeerUnverifiedException; + /** + * throws IllegalStateException if isEncrypted returns false + * + * @return see {@link SSLSession#getPeerCertificates()} + * @throws SSLPeerUnverifiedException + */ Certificate[] getPeerCertificates() throws SSLPeerUnverifiedException; + /** + * throws IllegalStateException if isEncrypted returns false + * + * @return see {@link SSLSession#getPeerPrincipal()} + * @throws SSLPeerUnverifiedException + */ Principal getPeerPrincipal() throws SSLPeerUnverifiedException; } diff --git a/src/main/java/com/teragrep/rlp_03/channel/info/EncryptionInfoStub.java b/src/main/java/com/teragrep/rlp_03/channel/socket/EncryptionInfoStub.java similarity index 92% rename from src/main/java/com/teragrep/rlp_03/channel/info/EncryptionInfoStub.java rename to src/main/java/com/teragrep/rlp_03/channel/socket/EncryptionInfoStub.java index 5ae36939..4aaec98f 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/info/EncryptionInfoStub.java +++ b/src/main/java/com/teragrep/rlp_03/channel/socket/EncryptionInfoStub.java @@ -43,14 +43,21 @@ * Teragrep, the applicable Commercial License may apply to this file if you as * a licensee so wish it. */ -package com.teragrep.rlp_03.channel.info; +package com.teragrep.rlp_03.channel.socket; import javax.net.ssl.SSLPeerUnverifiedException; import javax.security.cert.X509Certificate; import java.security.Principal; import java.security.cert.Certificate; -public class EncryptionInfoStub implements EncryptionInfo { +/** + * Stub implementation of {@link EncryptionInfo} which is used by {@link com.teragrep.rlp_03.channel.socket.PlainSocket} + */ +final class EncryptionInfoStub implements EncryptionInfo { + + EncryptionInfoStub() { + + } @Override public boolean isEncrypted() { diff --git a/src/main/java/com/teragrep/rlp_03/channel/info/EncryptionInfoTLS.java b/src/main/java/com/teragrep/rlp_03/channel/socket/EncryptionInfoTLS.java similarity index 95% rename from src/main/java/com/teragrep/rlp_03/channel/info/EncryptionInfoTLS.java rename to src/main/java/com/teragrep/rlp_03/channel/socket/EncryptionInfoTLS.java index c8ab9759..f76c9aae 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/info/EncryptionInfoTLS.java +++ b/src/main/java/com/teragrep/rlp_03/channel/socket/EncryptionInfoTLS.java @@ -43,7 +43,7 @@ * Teragrep, the applicable Commercial License may apply to this file if you as * a licensee so wish it. */ -package com.teragrep.rlp_03.channel.info; +package com.teragrep.rlp_03.channel.socket; import tlschannel.TlsChannel; @@ -52,11 +52,11 @@ import java.security.Principal; import java.security.cert.Certificate; -final public class EncryptionInfoTLS implements EncryptionInfo { +final class EncryptionInfoTLS implements EncryptionInfo { private final TlsChannel tlsChannel; - public EncryptionInfoTLS(TlsChannel tlsChannel) { + EncryptionInfoTLS(TlsChannel tlsChannel) { this.tlsChannel = tlsChannel; } diff --git a/src/main/java/com/teragrep/rlp_03/channel/socket/PlainFactory.java b/src/main/java/com/teragrep/rlp_03/channel/socket/PlainFactory.java index 51e523fc..a2a0490f 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/socket/PlainFactory.java +++ b/src/main/java/com/teragrep/rlp_03/channel/socket/PlainFactory.java @@ -47,6 +47,9 @@ import java.nio.channels.SocketChannel; +/** + * Factory for creating {@link PlainSocket} + */ public class PlainFactory extends SocketFactory { @Override diff --git a/src/main/java/com/teragrep/rlp_03/channel/socket/PlainSocket.java b/src/main/java/com/teragrep/rlp_03/channel/socket/PlainSocket.java index 67ce52a5..0a1226c6 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/socket/PlainSocket.java +++ b/src/main/java/com/teragrep/rlp_03/channel/socket/PlainSocket.java @@ -45,21 +45,16 @@ */ package com.teragrep.rlp_03.channel.socket; -import com.teragrep.rlp_03.channel.info.EncryptionInfo; -import com.teragrep.rlp_03.channel.info.EncryptionInfoStub; -import com.teragrep.rlp_03.channel.info.TransportInfo; -import com.teragrep.rlp_03.channel.info.TransportInfoImpl; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -public class PlainSocket implements Socket { +final class PlainSocket implements Socket { private final SocketChannel socketChannel; private final TransportInfo transportInfo; - public PlainSocket(SocketChannel socketChannel) { + PlainSocket(SocketChannel socketChannel) { this.socketChannel = socketChannel; EncryptionInfo encryptionInfo = new EncryptionInfoStub(); this.transportInfo = new TransportInfoImpl(socketChannel, encryptionInfo); diff --git a/src/main/java/com/teragrep/rlp_03/channel/socket/Socket.java b/src/main/java/com/teragrep/rlp_03/channel/socket/Socket.java index 9d168cdf..139175a6 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/socket/Socket.java +++ b/src/main/java/com/teragrep/rlp_03/channel/socket/Socket.java @@ -45,8 +45,6 @@ */ package com.teragrep.rlp_03.channel.socket; -import com.teragrep.rlp_03.channel.info.TransportInfo; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; diff --git a/src/main/java/com/teragrep/rlp_03/channel/socket/TLSSocket.java b/src/main/java/com/teragrep/rlp_03/channel/socket/TLSSocket.java index f59f8830..0e12f7e5 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/socket/TLSSocket.java +++ b/src/main/java/com/teragrep/rlp_03/channel/socket/TLSSocket.java @@ -45,23 +45,19 @@ */ package com.teragrep.rlp_03.channel.socket; -import com.teragrep.rlp_03.channel.info.EncryptionInfo; -import com.teragrep.rlp_03.channel.info.EncryptionInfoTLS; -import com.teragrep.rlp_03.channel.info.TransportInfo; -import com.teragrep.rlp_03.channel.info.TransportInfoImpl; import tlschannel.TlsChannel; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -public class TLSSocket implements Socket { +final class TLSSocket implements Socket { private final SocketChannel socketChannel; private final TlsChannel tlsChannel; private final TransportInfo transportInfo; - public TLSSocket(SocketChannel socketChannel, TlsChannel tlsChannel) { + TLSSocket(SocketChannel socketChannel, TlsChannel tlsChannel) { this.socketChannel = socketChannel; this.tlsChannel = tlsChannel; EncryptionInfo encryptionInfo = new EncryptionInfoTLS(tlsChannel); diff --git a/src/main/java/com/teragrep/rlp_03/channel/info/TransportInfo.java b/src/main/java/com/teragrep/rlp_03/channel/socket/TransportInfo.java similarity index 69% rename from src/main/java/com/teragrep/rlp_03/channel/info/TransportInfo.java rename to src/main/java/com/teragrep/rlp_03/channel/socket/TransportInfo.java index 652ebfa9..1b662dcd 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/info/TransportInfo.java +++ b/src/main/java/com/teragrep/rlp_03/channel/socket/TransportInfo.java @@ -43,17 +43,47 @@ * Teragrep, the applicable Commercial License may apply to this file if you as * a licensee so wish it. */ -package com.teragrep.rlp_03.channel.info; +package com.teragrep.rlp_03.channel.socket; +/** + * Provides information about a {@link com.teragrep.rlp_03.channel.socket.Socket} transport. + */ public interface TransportInfo { + /** + * See {@link java.net.Socket#getLocalAddress()} + * + * @return local {@link java.net.InetAddress} as a {@link String} + */ String getLocalAddress(); + /** + * See {@link java.net.Socket#getLocalPort()} + * + * @return local port as int + */ int getLocalPort(); + /** + * See {@link java.net.Socket#getInetAddress()} + * + * @return remote {@link java.net.InetAddress} as a {@link String} + */ String getPeerAddress(); + /** + * See {@link java.net.Socket#getPort()} + * + * @return remote port as int + */ int getPeerPort(); + /** + * {@link EncryptionInfo} provides information of connection encryption detals, use + * {@link EncryptionInfo#isEncrypted()} to verify if the connection is encrypted, otherwise other methods will + * throw. + * + * @return {@link EncryptionInfo} of the connection + */ EncryptionInfo getEncryptionInfo(); } diff --git a/src/main/java/com/teragrep/rlp_03/channel/info/TransportInfoImpl.java b/src/main/java/com/teragrep/rlp_03/channel/socket/TransportInfoImpl.java similarity index 93% rename from src/main/java/com/teragrep/rlp_03/channel/info/TransportInfoImpl.java rename to src/main/java/com/teragrep/rlp_03/channel/socket/TransportInfoImpl.java index 010f2606..077ae880 100644 --- a/src/main/java/com/teragrep/rlp_03/channel/info/TransportInfoImpl.java +++ b/src/main/java/com/teragrep/rlp_03/channel/socket/TransportInfoImpl.java @@ -43,16 +43,16 @@ * Teragrep, the applicable Commercial License may apply to this file if you as * a licensee so wish it. */ -package com.teragrep.rlp_03.channel.info; +package com.teragrep.rlp_03.channel.socket; import java.nio.channels.SocketChannel; -public class TransportInfoImpl implements TransportInfo { +final class TransportInfoImpl implements TransportInfo { private final SocketChannel socketChannel; private final EncryptionInfo encryptionInfo; - public TransportInfoImpl(SocketChannel socketChannel, EncryptionInfo encryptionInfo) { + TransportInfoImpl(SocketChannel socketChannel, EncryptionInfo encryptionInfo) { this.socketChannel = socketChannel; this.encryptionInfo = encryptionInfo; } diff --git a/src/main/java/com/teragrep/rlp_03/client/Client.java b/src/main/java/com/teragrep/rlp_03/client/Client.java index d8dcaa76..a77a42f2 100644 --- a/src/main/java/com/teragrep/rlp_03/client/Client.java +++ b/src/main/java/com/teragrep/rlp_03/client/Client.java @@ -55,7 +55,10 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; -public class Client implements Closeable { +/** + * Simple client with asynchronous transmit and {@link java.util.concurrent.Future} based receive. Other type of + */ +public final class Client implements Closeable { private final EstablishedContext establishedContext; private final ConcurrentHashMap> transactions; @@ -70,8 +73,15 @@ public class Client implements Closeable { this.txnCounter = new AtomicInteger(); } - public CompletableFuture transmit(String command, byte[] data) { - RelpFrameTX relpFrameTX = new RelpFrameTX(command, data); + /** + * Transmits {@link RelpFrame} with automatic {@link RelpFrame#txn()} + * + * @param command {@link RelpFrame#command()} + * @param payload {@link RelpFrame#payload()} + * @return {@link CompletableFuture} response {@link RelpFrame} + */ + public CompletableFuture transmit(String command, byte[] payload) { + RelpFrameTX relpFrameTX = new RelpFrameTX(command, payload); int txn = txnCounter.incrementAndGet(); relpFrameTX.setTransactionNumber(txn); if (transactions.containsKey(txn)) { @@ -84,6 +94,9 @@ public CompletableFuture transmit(String command, byte[] data) { return future; } + /** + * Closes client connection + */ @Override public void close() { establishedContext.close(); diff --git a/src/main/java/com/teragrep/rlp_03/client/ClientDelegate.java b/src/main/java/com/teragrep/rlp_03/client/ClientDelegate.java index 0499e63c..1b84b16f 100644 --- a/src/main/java/com/teragrep/rlp_03/client/ClientDelegate.java +++ b/src/main/java/com/teragrep/rlp_03/client/ClientDelegate.java @@ -55,14 +55,13 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -class ClientDelegate implements FrameDelegate { +/** + * Receive part of {@link Client} + */ +final class ClientDelegate implements FrameDelegate { private static final Logger LOGGER = LoggerFactory.getLogger(ClientDelegate.class); - // TODO perhaps > ? - // TODO what should be Something, RelpFrame is immediately deallocated after FrameDelegate - // TODO design better: Futures are not optimal for multi-complete/disruptor pattern - private final ConcurrentHashMap> transactions; ClientDelegate() { diff --git a/src/main/java/com/teragrep/rlp_03/client/ClientFactory.java b/src/main/java/com/teragrep/rlp_03/client/ClientFactory.java index 6b74675e..e7b93b5b 100644 --- a/src/main/java/com/teragrep/rlp_03/client/ClientFactory.java +++ b/src/main/java/com/teragrep/rlp_03/client/ClientFactory.java @@ -57,17 +57,37 @@ import java.util.concurrent.*; import java.util.function.Consumer; +/** + * Factory for creating {@link Client} + */ public class ClientFactory { private static final Logger LOGGER = LoggerFactory.getLogger(ClientFactory.class); private final ConnectContextFactory connectContextFactory; private final EventLoop eventLoop; + /** + * Main for Constructor for {@link ClientFactory} + * + * @param connectContextFactory {@link ConnectContextFactory} for creating new connections + * @param eventLoop {@link EventLoop} to register new connections with + */ public ClientFactory(ConnectContextFactory connectContextFactory, EventLoop eventLoop) { this.connectContextFactory = connectContextFactory; this.eventLoop = eventLoop; } + /** + * Opens up a new connection. Registers the connection to provided {@link EventLoop}. Note that the + * {@link EventLoop} needs to run in order to proceed with the connection. + * + * @param inetSocketAddress destination {@link InetSocketAddress} to connect to. + * @return {@link Client} once connection succeeds. + * @throws IOException if connection fails + * @throws InterruptedException if {@link Future} is interrupted. + * @throws ExecutionException if {@link Future} fails to complete successfully. + */ + // TODO add timeout for the future so that connection attempt times out public Client open(InetSocketAddress inetSocketAddress) throws IOException, InterruptedException, ExecutionException { // this is for returning ready connection diff --git a/src/test/java/com/teragrep/rlp_03/ManualTest.java b/src/test/java/com/teragrep/rlp_03/ManualTest.java index f3bcae66..7eb353c6 100644 --- a/src/test/java/com/teragrep/rlp_03/ManualTest.java +++ b/src/test/java/com/teragrep/rlp_03/ManualTest.java @@ -45,7 +45,7 @@ */ package com.teragrep.rlp_03; -import com.teragrep.rlp_03.channel.info.EncryptionInfo; +import com.teragrep.rlp_03.channel.socket.EncryptionInfo; import com.teragrep.rlp_03.channel.socket.PlainFactory; import com.teragrep.rlp_03.channel.socket.TLSFactory; import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate; diff --git a/src/test/java/com/teragrep/rlp_03/channel/context/EstablishedContextFake.java b/src/test/java/com/teragrep/rlp_03/channel/context/EstablishedContextFake.java index b4b8b2a2..c804254a 100644 --- a/src/test/java/com/teragrep/rlp_03/channel/context/EstablishedContextFake.java +++ b/src/test/java/com/teragrep/rlp_03/channel/context/EstablishedContextFake.java @@ -45,7 +45,6 @@ */ package com.teragrep.rlp_03.channel.context; -import com.teragrep.rlp_03.channel.InterestOps; import com.teragrep.rlp_03.channel.socket.Socket; import java.nio.channels.SelectionKey; diff --git a/src/test/java/com/teragrep/rlp_03/channel/context/InterestOpsFake.java b/src/test/java/com/teragrep/rlp_03/channel/context/InterestOpsFake.java index fc079ef4..4b354016 100644 --- a/src/test/java/com/teragrep/rlp_03/channel/context/InterestOpsFake.java +++ b/src/test/java/com/teragrep/rlp_03/channel/context/InterestOpsFake.java @@ -45,8 +45,6 @@ */ package com.teragrep.rlp_03.channel.context; -import com.teragrep.rlp_03.channel.InterestOps; - public class InterestOpsFake implements InterestOps { @Override diff --git a/src/test/java/com/teragrep/rlp_03/channel/socket/SocketFake.java b/src/test/java/com/teragrep/rlp_03/channel/socket/SocketFake.java index a8675095..f5d48690 100644 --- a/src/test/java/com/teragrep/rlp_03/channel/socket/SocketFake.java +++ b/src/test/java/com/teragrep/rlp_03/channel/socket/SocketFake.java @@ -45,9 +45,6 @@ */ package com.teragrep.rlp_03.channel.socket; -import com.teragrep.rlp_03.channel.info.TransportInfo; -import com.teragrep.rlp_03.TransportInfoFake; - import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; diff --git a/src/test/java/com/teragrep/rlp_03/TransportInfoFake.java b/src/test/java/com/teragrep/rlp_03/channel/socket/TransportInfoFake.java similarity index 89% rename from src/test/java/com/teragrep/rlp_03/TransportInfoFake.java rename to src/test/java/com/teragrep/rlp_03/channel/socket/TransportInfoFake.java index e8a915cf..6b9ba998 100644 --- a/src/test/java/com/teragrep/rlp_03/TransportInfoFake.java +++ b/src/test/java/com/teragrep/rlp_03/channel/socket/TransportInfoFake.java @@ -43,17 +43,13 @@ * Teragrep, the applicable Commercial License may apply to this file if you as * a licensee so wish it. */ -package com.teragrep.rlp_03; +package com.teragrep.rlp_03.channel.socket; -import com.teragrep.rlp_03.channel.info.EncryptionInfo; -import com.teragrep.rlp_03.channel.info.EncryptionInfoStub; -import com.teragrep.rlp_03.channel.info.TransportInfo; - -public class TransportInfoFake implements TransportInfo { +final class TransportInfoFake implements TransportInfo { private final EncryptionInfo encryptionInfo; - public TransportInfoFake() { + TransportInfoFake() { this.encryptionInfo = new EncryptionInfoStub(); }