Skip to content

Commit

Permalink
Merge pull request #30 from kortemik/fix-logggin
Browse files Browse the repository at this point in the history
fixes logging to perform better
  • Loading branch information
StrongestNumber9 authored May 11, 2023
2 parents 5616dfe + 1d73b80 commit b61cdf2
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 29 deletions.
5 changes: 2 additions & 3 deletions src/main/java/com/teragrep/rlp_01/RelpClientPlainSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void open (String hostname, int port) throws IOException, TimeoutException {
@Override
void write(ByteBuffer byteBuffer) throws IOException, TimeoutException {
SelectionKey key = this.socketChannel.register(this.poll, SelectionKey.OP_WRITE);
LOGGER.trace("relpConnection.sendRelpRequestAsync> need to write: " + byteBuffer.hasRemaining());
LOGGER.trace("relpConnection.sendRelpRequestAsync> need to write <{}> ", byteBuffer.hasRemaining());

while (byteBuffer.hasRemaining()) {
int nReady = poll.select(this.writeTimeout);
Expand All @@ -147,8 +147,7 @@ void write(ByteBuffer byteBuffer) throws IOException, TimeoutException {
}
eventIter.remove();
}
LOGGER.trace("relpConnection.sendRelpRequestAsync> still need to write: "
+ byteBuffer.hasRemaining());
LOGGER.trace("relpConnection.sendRelpRequestAsync> still need to write <{}>", byteBuffer.hasRemaining());
}
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
Expand Down
9 changes: 3 additions & 6 deletions src/main/java/com/teragrep/rlp_01/RelpClientTlsSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void open(String hostname, int port) throws IOException, TimeoutException {
@Override
void write(ByteBuffer byteBuffer) throws IOException, TimeoutException {
SelectionKey key = this.socketChannel.register(this.selector, SelectionKey.OP_WRITE);
LOGGER.trace("relpConnection.sendRelpRequestAsync> need to write: " + byteBuffer.hasRemaining());
LOGGER.trace("relpConnection.sendRelpRequestAsync> need to write <{}>", byteBuffer.hasRemaining());

while (byteBuffer.hasRemaining()) {
int nReady = selector.select(this.writeTimeout);
Expand All @@ -165,8 +165,7 @@ void write(ByteBuffer byteBuffer) throws IOException, TimeoutException {
SelectionKey currentKey = eventIter.next();
// tlsChannel needs to know about both
if (currentKey.isWritable() || currentKey.isReadable()) {
LOGGER.trace("relpConnection.sendRelpRequestAsync> " +
"became writable");
LOGGER.trace("relpConnection.sendRelpRequestAsync> became writable");
try {
this.tlsChannel.write(byteBuffer);
} catch (NeedsReadException e) {
Expand All @@ -177,9 +176,7 @@ void write(ByteBuffer byteBuffer) throws IOException, TimeoutException {
}
eventIter.remove();
}
LOGGER.trace("relpConnection.sendRelpRequestAsync> still need to " +
"write: "
+ byteBuffer.hasRemaining());
LOGGER.trace("relpConnection.sendRelpRequestAsync> still need to write <{}>", byteBuffer.hasRemaining());
}
}

Expand Down
19 changes: 7 additions & 12 deletions src/main/java/com/teragrep/rlp_01/RelpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public boolean connect(String hostname, int port) throws IOException, IllegalSta
long reqId = connectionOpenBatch.putRequest(relpRequest);
this.sendBatch(connectionOpenBatch);
boolean openSuccess = connectionOpenBatch.verifyTransaction(reqId);
LOGGER.trace("relpConnection.connect> exit with: " + openSuccess);
LOGGER.trace("relpConnection.connect> exit with <{}>", openSuccess);
if (openSuccess) {
this.state = RelpConnectionState.OPEN;
}
Expand Down Expand Up @@ -200,7 +200,7 @@ public boolean disconnect() throws IOException, IllegalStateException, TimeoutEx
if (closeResponse != null && closeResponse.dataLength == 0) {
closeSuccess = true;
}
LOGGER.trace("relpConnection.disconnect> exit with: " + closeSuccess);
LOGGER.trace("relpConnection.disconnect> exit with <{}>", closeSuccess);
if(closeSuccess){
relpClientSocket.close();
this.state = RelpConnectionState.CLOSED;
Expand All @@ -227,7 +227,7 @@ public void commit(RelpBatch relpBatch) throws IOException, IllegalStateExceptio
*/
private void sendBatch(RelpBatch relpBatch) throws IOException, TimeoutException, IllegalStateException {
LOGGER.trace("relpConnection.sendBatch> entry with wq len " + relpBatch.getWorkQueueLength());
LOGGER.trace("relpConnection.sendBatch> entry with wq len <{}>", relpBatch.getWorkQueueLength());
// send a batch of requests..
RelpFrameTX relpRequest;

Expand Down Expand Up @@ -263,7 +263,7 @@ private void readAcks(RelpBatch relpBatch)

readBytes = relpClientSocket.read(preAllocatedRXBuffer);

LOGGER.trace("relpConnection.readAcks> read bytes: " + readBytes);
LOGGER.trace("relpConnection.readAcks> read bytes <{}>", readBytes);

// read from it
preAllocatedRXBuffer.flip();
Expand All @@ -277,8 +277,7 @@ private void readAcks(RelpBatch relpBatch)
parser.parse(preAllocatedRXBuffer.get());

if (parser.isComplete()) {
LOGGER.trace("relpConnection.readAcks> read parser " +
"complete: " + parser.isComplete());
LOGGER.trace("relpConnection.readAcks> read parser complete <{}>", parser.isComplete());
// one response read successfully
int txnId = parser.getTxnId();
if (window.isPending(txnId)) {
Expand Down Expand Up @@ -311,15 +310,11 @@ private void sendRelpRequestAsync(RelpFrameTX relpRequest) throws IOException, T
LOGGER.trace("relpConnection.sendRelpRequestAsync> entry");
ByteBuffer byteBuffer;
if (relpRequest.length() > this.txBufferSize) {
LOGGER.trace("relpConnection.sendRelpRequestAsync> allocate new " +
"txBuffer of size: "
+ relpRequest.length());
LOGGER.trace("relpConnection.sendRelpRequestAsync> allocate new txBuffer of size <{}>", relpRequest.length());
byteBuffer = ByteBuffer.allocateDirect(relpRequest.length());
}
else {
LOGGER.trace("relpConnection.sendRelpRequestAsync> using " +
"preAllocatedTXBuffer for size: "
+ relpRequest.length());
LOGGER.trace("relpConnection.sendRelpRequestAsync> using preAllocatedTXBuffer for size <{}>", relpRequest.length());
byteBuffer = this.preAllocatedTXBuffer;
}
relpRequest.write(byteBuffer);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/teragrep/rlp_01/RelpFrameRX.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public RelpFrameRX(int txID, String command, int dataLength, ByteBuffer src) {
super(txID, command, dataLength);
this.data = new byte[src.remaining()];
src.get(this.data);
LOGGER.trace("relpResponse> RelpFrameRX dataLength: " + dataLength);
LOGGER.trace("relpResponse> RelpFrameRX dataLength <{}>", dataLength);

}

Expand Down
22 changes: 15 additions & 7 deletions src/main/java/com/teragrep/rlp_01/RelpParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void parse(byte b) {
"be >= 0");
}
state = relpParserState.COMMAND;
LOGGER.trace( "relpParser> txnId: " + frameTxnId );
LOGGER.trace( "relpParser> txnId <[{}]>", frameTxnId );
}
else {
frameTxnIdString += new String(new byte[] {b});
Expand All @@ -118,7 +118,7 @@ public void parse(byte b) {
case COMMAND:
if (b == ' '){
state = relpParserState.LENGTH;
LOGGER.trace( "relpParser> command: " + frameCommandString );
LOGGER.trace( "relpParser> command <[{}]>", frameCommandString );
// Spec constraints.
if( frameCommandString.length() > MAX_COMMAND_LENGTH &&
!frameCommandString.equals(RelpCommand.OPEN) &&
Expand Down Expand Up @@ -159,12 +159,14 @@ public void parse(byte b) {
} else {
state = relpParserState.DATA;
}
LOGGER.trace( "relpParser> length: " + frameLengthString );
LOGGER.trace( "relpParser> length <[{}]>", frameLengthString );
if (b == '\n') {
if (frameLength == 0) {
this.isComplete = true;
}
LOGGER.trace( "relpParser> newline after LENGTH: " + new String( new byte[] {b} ) );
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("relpParser> newline after LENGTH <[{}]>", new String(new byte[]{b}));
}
}
}
else {
Expand All @@ -178,20 +180,26 @@ public void parse(byte b) {
if (frameLengthLeft > 0) {
frameData.put(b);
frameLengthLeft--;
LOGGER.trace( "relpParser> data b: " + new String( new byte[] {b} ) + " left: " + frameLengthLeft );
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("relpParser> data b <[{}]> left <{}>", new String(new byte[]{b}), frameLengthLeft);
}
}
if (frameLengthLeft == 0) {
// make ready for consumer
frameData.flip();
state = relpParserState.NL;
LOGGER.trace( "relpParser> data buffer: " + frameData.toString() );

LOGGER.trace("relpParser> data buffer <{}>", frameData);

}
break;
case NL:
if (b == '\n'){
// RELP message always ends with a newline byte.
this.isComplete = true;
LOGGER.trace( "relpParser> newline: " + new String( new byte[] {b} ) );
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("relpParser> newline <[{}]>", new String(new byte[]{b}));
}
}
else {
throw new IllegalStateException("relp frame parsing failure");
Expand Down

0 comments on commit b61cdf2

Please sign in to comment.