Skip to content

Commit

Permalink
refactor to use scattered reads and reusable buffers (#54)
Browse files Browse the repository at this point in the history
* tests pass, although it does not mean that this is an RC

* it does actually work

* change to debug, remove sout

* remove MessageWriter.java

* re-enable selector wakes after interestOps change

* optimize a bit

* Reporter for ManualTest to log the records per second

* more meaningful messages regarding write errors

* slight tuning of error message

* support resumed writes, support tls

* make sure selector thread does not exit due to cancelledKeyException

* add todo

* implement FrameProcessorPool

* add old tests back, run previous perf test with mvn clean test -DrunServerPerformanceTest=true

* enhance SocketPoll.poll to catch CancelledKeyException for accepted keys as well

* close context if CancelledKeyException is noticed during poll

* implement server.startup.waitForCompletion

* resumed writes now supported

* null is bad

* move comment to proper place

* debug-wip-sync

* tidy up!

* catch those CancelledKeyExceptions

* TODO is now done for InterestOpsImpl

* log about proper instance in FrameProcessorPool.offer close path

* support closure

* make FrameProcessor return List<RelpFrameTX> instead of single frame, now close notifications should work
move FrameProcessorPool to server class, so it's shared across the connections

* better IOException messages in RelpWrite

* avoid null, by using Optional, in RelpWrite, not bad as null but still wicked

* refactor testing, refactor RelpFrameServerRX to include ConnectionContext

* remove obsolete FrameProcessor implementations

* make InterestOps final in ConnectionContextImpl

* refactor SocketPoll into ServerSocket that produces ServerSocketOpen when listening

* stops now correctly

* refactor Server into immutable by using ServerFactory

* refactor FrameProcessorPool out, it was not adequate place for a pool

* make ManualPerformanceTest threads configureable via ServerPerformanceTestThreads

* make ManualPerformanceTest log number of threads

* make ManualPerformanceTest log number of threads

* remove string concat from SocketPoll.poll debug logging

* make ManualPerformanceTest port configurable

* SafeAccess

* some SafeAccess on the FragmentImpl

* refactor SafeAccess to Access and Lease, avoid race condition by locking

* fix function package functions to work properly with partial buffers

* refactor RelpFrame to an interface

* refactor RelpFrameAssembly into RelpFrame

* make RelpFrameAccess immutable

* works, but tests are essential, functions need content validation too!

* crude support for partial message

* test files

* command function does now size check

* EndOfTransfer test

* improve EndOfTransfer parseTest

* PayloadFunctionTest

* PayloadLengthFunction tests

* TransactionFunction tests

* change extra logging to debug or disable

* remove TODO for Access

* RelpFrameTest testConsecutiveFrames

* fix frame functions slice calculation

* function max length calculation fixes

* comment out extra logging in RelpFrameTest

* disable logging in FragmentImpl

* add toString into RelpFrameLeaseful

* RelpFrameImpl stop parsing of partial buffers when frame complete

* wip commit, however it works

* ByteBuffer cast to RelpWriteImpl to ensure java version compatibility

* default to no error in RelpReadImpl readData

* log about throwables in RelpReadImpl

* ByteBuffer and Buffer casts to ensure java version compatibility

* implement non-proper locking for BufferLeasePool

* Make ServerFactory create() public

* restore FrameProcessorPool

* add size() to Fragment

* hardcode maximum command size

* proposal for FragmentByteStream

* pool locking fixed

* create BufferLeasePoolTest

* change debug print behind isDebugEnabled guard

* move locking into BufferLeaseImpl from BufferLeasePool to assign responsibility of self to BufferLeaseImpl

* close BufferLeasePool

* add ConnectionStormTest for 10k connections

* cleanup LOGGER calls

* use Lock instead of Atomic in Access, use volatile isOpen in Lease

* use static final fields in functions for reduced init cost

* remove empty buffer checks from BufferLeasePool.take(), BufferLeaseImpl takes care of clearing them now

* add TODOs about semaphore use

* add AccessTest content

* add content to FragmentTest

---------

Co-authored-by: StrongestNumber9 <[email protected]>
  • Loading branch information
kortemik and StrongestNumber9 authored Feb 16, 2024
1 parent 54014ba commit 60da329
Show file tree
Hide file tree
Showing 92 changed files with 4,250 additions and 1,913 deletions.
57 changes: 0 additions & 57 deletions src/main/java/com/teragrep/rlp_03/ConnectionOperation.java

This file was deleted.

23 changes: 23 additions & 0 deletions src/main/java/com/teragrep/rlp_03/EncryptionInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.teragrep.rlp_03;

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.security.Principal;
import java.security.cert.Certificate;

public interface EncryptionInfo {

boolean isEncrypted();

String getSessionCipherSuite();

Certificate[] getLocalCertificates();

Principal getLocalPrincipal();

X509Certificate[] getPeerCertificateChain() throws SSLPeerUnverifiedException;

Certificate[] getPeerCertificates() throws SSLPeerUnverifiedException;

Principal getPeerPrincipal() throws SSLPeerUnverifiedException;
}
44 changes: 44 additions & 0 deletions src/main/java/com/teragrep/rlp_03/EncryptionInfoStub.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.teragrep.rlp_03;

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.security.Principal;
import java.security.cert.Certificate;

public class EncryptionInfoStub implements EncryptionInfo {

@Override
public boolean isEncrypted() {
return false;
}

@Override
public String getSessionCipherSuite() {
throw new IllegalStateException("not encrypted");
}

@Override
public Certificate[] getLocalCertificates() {
throw new IllegalStateException("not encrypted");
}

@Override
public Principal getLocalPrincipal() {
throw new IllegalStateException("not encrypted");
}

@Override
public X509Certificate[] getPeerCertificateChain() throws SSLPeerUnverifiedException {
throw new IllegalStateException("not encrypted");
}

@Override
public Certificate[] getPeerCertificates() throws SSLPeerUnverifiedException {
throw new IllegalStateException("not encrypted");
}

@Override
public Principal getPeerPrincipal() throws SSLPeerUnverifiedException {
throw new IllegalStateException("not encrypted");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,48 @@

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.security.cert.Certificate;

public class TlsTransportInfo extends TransportInfo {
final public class EncryptionInfoTLS implements EncryptionInfo {

private final TlsChannel tlsChannel;

TlsTransportInfo(SocketChannel socketChannel, TlsChannel tlsChannel) {
super(socketChannel);
public EncryptionInfoTLS(TlsChannel tlsChannel) {
this.tlsChannel = tlsChannel;
}

@Override
public boolean isEncrypted() {
return true;
}

@Override
public String getSessionCipherSuite() {
return tlsChannel.getSslEngine().getSession().getCipherSuite();
}

@Override
public Certificate[] getLocalCertificates() {
return tlsChannel.getSslEngine().getSession().getLocalCertificates();
}

@Override
public Principal getLocalPrincipal() {
return tlsChannel.getSslEngine().getSession().getLocalPrincipal();
}

@Override
public X509Certificate[] getPeerCertificateChain() throws SSLPeerUnverifiedException {
return tlsChannel.getSslEngine().getSession().getPeerCertificateChain();
}

@Override
public Certificate[] getPeerCertificates() throws SSLPeerUnverifiedException {
return tlsChannel.getSslEngine().getSession().getPeerCertificates();
}

@Override
public Principal getPeerPrincipal() throws SSLPeerUnverifiedException {
return tlsChannel.getSslEngine().getSession().getPeerPrincipal();
}
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/com/teragrep/rlp_03/FrameContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.teragrep.rlp_03;

import com.teragrep.rlp_03.context.ConnectionContext;
import com.teragrep.rlp_03.context.frame.RelpFrame;

public class FrameContext {
private final ConnectionContext connectionContext;
private final RelpFrame relpFrame;
public FrameContext(ConnectionContext connectionContext, RelpFrame relpFrame) {
this.connectionContext = connectionContext;
this.relpFrame = relpFrame;
}

public ConnectionContext connectionContext() {
return connectionContext;
}

public RelpFrame relpFrame() {
return relpFrame;
}
}
12 changes: 6 additions & 6 deletions src/main/java/com/teragrep/rlp_03/FrameProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@

package com.teragrep.rlp_03;

import java.util.Deque;

import com.teragrep.rlp_01.RelpFrameTX;
import java.util.function.Consumer;

/*
* Simple queue interface for processing incoming requests and producing responses for them.
* FrameProcessor is responsible for processing RelpFrames
*/
public interface FrameProcessor {
Deque<RelpFrameTX> process(Deque<RelpFrameServerRX> rxDeque);
public interface FrameProcessor extends Consumer<FrameContext>, AutoCloseable {
void accept(FrameContext frameServerRX);

void close() throws Exception;

boolean isStub();
}

86 changes: 86 additions & 0 deletions src/main/java/com/teragrep/rlp_03/FrameProcessorPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.teragrep.rlp_03;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

public class FrameProcessorPool {
private static final Logger LOGGER = LoggerFactory.getLogger(FrameProcessorPool.class);

private final Supplier<FrameProcessor> frameProcessorSupplier;

private final ConcurrentLinkedQueue<FrameProcessor> queue;

private final FrameProcessor frameProcessorStub;

private final Lock lock = new ReentrantLock();

private final AtomicBoolean close;

public FrameProcessorPool(final Supplier<FrameProcessor> frameProcessorSupplier) {
this.frameProcessorSupplier = frameProcessorSupplier;
this.queue = new ConcurrentLinkedQueue<>();
this.frameProcessorStub = new FrameProcessorStub();
this.close = new AtomicBoolean();

// TODO maximum number of available frameProcessors should be perhaps limited?
}

public FrameProcessor take() {
FrameProcessor frameProcessor;
if (close.get()) {
frameProcessor = frameProcessorStub;
}
else {
// get or create
frameProcessor = queue.poll();
if (frameProcessor == null) {
frameProcessor = frameProcessorSupplier.get();
}
}

return frameProcessor;
}

public void offer(FrameProcessor frameProcessor) {
if (!frameProcessor.isStub()) {
queue.add(frameProcessor);
}

if (close.get()) {
while (queue.peek() != null) {
if (lock.tryLock()) {
while (true) {
FrameProcessor queuedFrameProcessor = queue.poll();
if (queuedFrameProcessor == null) {
break;
} else {
try {
LOGGER.debug("Closing frameProcessor <{}>", queuedFrameProcessor);
queuedFrameProcessor.close();
LOGGER.debug("Closed frameProcessor <{}>", queuedFrameProcessor);
} catch (Exception exception) {
LOGGER.warn("Exception <{}> while closing frameProcessor <{}>", exception.getMessage(), queuedFrameProcessor);
}
}
}
lock.unlock();
} else {
break;
}
}
}
}

public void close() {
close.set(true);

// close all that are in the pool right now
offer(frameProcessorStub);
}
}
20 changes: 20 additions & 0 deletions src/main/java/com/teragrep/rlp_03/FrameProcessorStub.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.teragrep.rlp_03;


public class FrameProcessorStub implements FrameProcessor {

@Override
public void accept(FrameContext frameServerRX) {

}

@Override
public void close() throws Exception {
throw new IllegalArgumentException("FrameProcessorStub can not close");
}

@Override
public boolean isStub() {
return true;
}
}
Loading

0 comments on commit 60da329

Please sign in to comment.