Skip to content

Commit

Permalink
IManagedRelpConnection with void ensureSent(RelpBatch relpBatch);, ma…
Browse files Browse the repository at this point in the history
…jor version up because interface change (#78)
  • Loading branch information
kortemik authored Jan 24, 2025
1 parent e8786c2 commit 939b467
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.teragrep.rlp_01.client;

import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.pool.Poolable;

import java.io.IOException;
Expand All @@ -26,4 +27,5 @@ public interface IManagedRelpConnection extends Poolable {
void connect() throws IOException;
void forceReconnect();
void ensureSent(byte[] bytes);
void ensureSent(RelpBatch relpBatch);
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,13 @@ private void tearDown() {
}

@Override
public void ensureSent(byte[] bytes) {
public void ensureSent(RelpBatch relpBatch) {
// avoid unnecessary exception for fresh connections
if (!hasConnected) {
connect();
}

final RelpBatch relpBatch = new RelpBatch();
relpBatch.insert(bytes);

boolean notSent = true;
while (notSent) {
try {
Expand All @@ -105,6 +104,13 @@ public void ensureSent(byte[] bytes) {
}
}

@Override
public void ensureSent(byte[] bytes) {
final RelpBatch relpBatch = new RelpBatch();
relpBatch.insert(bytes);
ensureSent(relpBatch);
}

@Override
public boolean isStub() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.teragrep.rlp_01.client;

import com.teragrep.rlp_01.RelpBatch;

import java.io.IOException;

public class ManagedRelpConnectionStub implements IManagedRelpConnection {
Expand All @@ -40,6 +42,11 @@ public void ensureSent(byte[] bytes) {
throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
}

@Override
public void ensureSent(RelpBatch relpBatch) {
throw new IllegalStateException("ManagedRelpConnectionStub does not support this");
}

@Override
public boolean isStub() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.teragrep.rlp_01.client;

import com.teragrep.rlp_01.RelpBatch;

import java.io.IOException;

public class RebindableRelpConnection implements IManagedRelpConnection {
Expand Down Expand Up @@ -46,12 +48,20 @@ public void forceReconnect() {

@Override
public void ensureSent(byte[] bytes) {
RelpBatch relpBatch = new RelpBatch();
relpBatch.insert(bytes);
ensureSent(relpBatch);
}

@Override
public void ensureSent(RelpBatch relpBatch) {
if (recordsSent >= rebindRequestAmount) {
reconnect();
recordsSent = 0;
}
managedRelpConnection.ensureSent(bytes);
recordsSent++;
int batchLength = relpBatch.getWorkQueueLength();
managedRelpConnection.ensureSent(relpBatch);
recordsSent = recordsSent + batchLength;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.teragrep.rlp_01.client;

import com.teragrep.rlp_01.RelpBatch;

import java.io.IOException;
import java.time.Instant;
import java.time.Duration;
Expand Down Expand Up @@ -52,11 +54,18 @@ public void forceReconnect() {

@Override
public void ensureSent(byte[] bytes) {
RelpBatch relpBatch = new RelpBatch();
relpBatch.insert(bytes);
ensureSent(relpBatch);
}

@Override
public void ensureSent(RelpBatch relpBatch) {
if (lastAccess.plus(maxIdle).isBefore(Instant.now())) {
forceReconnect();
}
lastAccess = Instant.now();
managedRelpConnection.ensureSent(bytes);
managedRelpConnection.ensureSent(relpBatch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.teragrep.net_01.eventloop.EventLoop;
import com.teragrep.net_01.eventloop.EventLoopFactory;
import com.teragrep.net_01.server.ServerFactory;
import com.teragrep.rlp_01.RelpBatch;
import com.teragrep.rlp_01.pool.Pool;
import com.teragrep.rlp_01.pool.UnboundPool;
import com.teragrep.rlp_03.frame.FrameDelegationClockFactory;
Expand Down Expand Up @@ -281,4 +282,61 @@ public void testPooledReboundConnections() {
connectionOpenCount.set(0);
connectionCleanCloseCount.set(0);
}

@Test
public void testRelpBatchSend() {
RelpConfig relpConfig = new RelpConfig(
hostname,
port,
500,
0,
false,
Duration.ZERO,
false
);

RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig);

Pool<IManagedRelpConnection> relpConnectionPool = new UnboundPool<>(relpConnectionFactory, new ManagedRelpConnectionStub());

int testCycles = 20;
CountDownLatch countDownLatch = new CountDownLatch(testCycles);



for (int i = 0; i < testCycles; i++) {
final String heyRelp = "hey this is batched relp 0 " + i;
final String heyThisBeRelpToo = "hey this is batched relp 1 " + i;
ForkJoinPool.commonPool().submit(() -> {
RelpBatch relpBatch = new RelpBatch();
relpBatch.insert(heyRelp.getBytes(StandardCharsets.UTF_8));
relpBatch.insert(heyThisBeRelpToo.getBytes(StandardCharsets.UTF_8));

IManagedRelpConnection connection = relpConnectionPool.get();

// will set timer to 5 millis
connection.ensureSent(relpBatch);

relpConnectionPool.offer(connection);
countDownLatch.countDown();
});
}

Assertions.assertDoesNotThrow(() -> countDownLatch.await());

relpConnectionPool.close();

Assertions.assertEquals(testCycles*2, messageList.size());

Pattern heyPattern = Pattern.compile("hey this is batched relp \\d \\d+");
while(!messageList.isEmpty()) {
byte[] payload = messageList.removeFirst();
Assertions.assertTrue(heyPattern.matcher(new String(payload, StandardCharsets.UTF_8)).matches());
}

Assertions.assertTrue(connectionOpenCount.get() > 1);
Assertions.assertEquals(connectionOpenCount.get(), connectionCleanCloseCount.get());
connectionOpenCount.set(0);
connectionCleanCloseCount.set(0);
}
}

0 comments on commit 939b467

Please sign in to comment.