From 939b46732c2980cc2827144862bdb2ec83077388 Mon Sep 17 00:00:00 2001 From: Mikko Kortelainen Date: Fri, 24 Jan 2025 13:38:07 +0200 Subject: [PATCH] IManagedRelpConnection with void ensureSent(RelpBatch relpBatch);, major version up because interface change (#78) --- .../rlp_01/client/IManagedRelpConnection.java | 2 + .../rlp_01/client/ManagedRelpConnection.java | 12 +++- .../client/ManagedRelpConnectionStub.java | 7 +++ .../client/RebindableRelpConnection.java | 14 ++++- .../client/RenewableRelpConnection.java | 11 +++- .../rlp_01/client/ManagedConnectionTest.java | 58 +++++++++++++++++++ 6 files changed, 98 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java index 07bc935..cdcf16b 100644 --- a/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java +++ b/src/main/java/com/teragrep/rlp_01/client/IManagedRelpConnection.java @@ -16,6 +16,7 @@ */ package com.teragrep.rlp_01.client; +import com.teragrep.rlp_01.RelpBatch; import com.teragrep.rlp_01.pool.Poolable; import java.io.IOException; @@ -26,4 +27,5 @@ public interface IManagedRelpConnection extends Poolable { void connect() throws IOException; void forceReconnect(); void ensureSent(byte[] bytes); + void ensureSent(RelpBatch relpBatch); } diff --git a/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java index 456eff6..d6a8862 100644 --- a/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java +++ b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnection.java @@ -78,14 +78,13 @@ private void tearDown() { } @Override - public void ensureSent(byte[] bytes) { + public void ensureSent(RelpBatch relpBatch) { // avoid unnecessary exception for fresh connections if (!hasConnected) { connect(); } - final RelpBatch relpBatch = new RelpBatch(); - relpBatch.insert(bytes); + boolean notSent = true; while (notSent) { try { @@ -105,6 +104,13 @@ public void ensureSent(byte[] bytes) { } } + @Override + public void ensureSent(byte[] bytes) { + final RelpBatch relpBatch = new RelpBatch(); + relpBatch.insert(bytes); + ensureSent(relpBatch); + } + @Override public boolean isStub() { return false; diff --git a/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java index 4d24793..f8ce0e8 100644 --- a/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java +++ b/src/main/java/com/teragrep/rlp_01/client/ManagedRelpConnectionStub.java @@ -16,6 +16,8 @@ */ package com.teragrep.rlp_01.client; +import com.teragrep.rlp_01.RelpBatch; + import java.io.IOException; public class ManagedRelpConnectionStub implements IManagedRelpConnection { @@ -40,6 +42,11 @@ public void ensureSent(byte[] bytes) { throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); } + @Override + public void ensureSent(RelpBatch relpBatch) { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + @Override public boolean isStub() { return true; diff --git a/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java index 25acd74..b1790de 100644 --- a/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java +++ b/src/main/java/com/teragrep/rlp_01/client/RebindableRelpConnection.java @@ -16,6 +16,8 @@ */ package com.teragrep.rlp_01.client; +import com.teragrep.rlp_01.RelpBatch; + import java.io.IOException; public class RebindableRelpConnection implements IManagedRelpConnection { @@ -46,12 +48,20 @@ public void forceReconnect() { @Override public void ensureSent(byte[] bytes) { + RelpBatch relpBatch = new RelpBatch(); + relpBatch.insert(bytes); + ensureSent(relpBatch); + } + + @Override + public void ensureSent(RelpBatch relpBatch) { if (recordsSent >= rebindRequestAmount) { reconnect(); recordsSent = 0; } - managedRelpConnection.ensureSent(bytes); - recordsSent++; + int batchLength = relpBatch.getWorkQueueLength(); + managedRelpConnection.ensureSent(relpBatch); + recordsSent = recordsSent + batchLength; } @Override diff --git a/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java b/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java index 3556a1a..50e0bf8 100644 --- a/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java +++ b/src/main/java/com/teragrep/rlp_01/client/RenewableRelpConnection.java @@ -16,6 +16,8 @@ */ package com.teragrep.rlp_01.client; +import com.teragrep.rlp_01.RelpBatch; + import java.io.IOException; import java.time.Instant; import java.time.Duration; @@ -52,11 +54,18 @@ public void forceReconnect() { @Override public void ensureSent(byte[] bytes) { + RelpBatch relpBatch = new RelpBatch(); + relpBatch.insert(bytes); + ensureSent(relpBatch); + } + + @Override + public void ensureSent(RelpBatch relpBatch) { if (lastAccess.plus(maxIdle).isBefore(Instant.now())) { forceReconnect(); } lastAccess = Instant.now(); - managedRelpConnection.ensureSent(bytes); + managedRelpConnection.ensureSent(relpBatch); } @Override diff --git a/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java b/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java index 90a3dda..2b16c6e 100644 --- a/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java +++ b/src/test/java/com/teragrep/rlp_01/client/ManagedConnectionTest.java @@ -20,6 +20,7 @@ import com.teragrep.net_01.eventloop.EventLoop; import com.teragrep.net_01.eventloop.EventLoopFactory; import com.teragrep.net_01.server.ServerFactory; +import com.teragrep.rlp_01.RelpBatch; import com.teragrep.rlp_01.pool.Pool; import com.teragrep.rlp_01.pool.UnboundPool; import com.teragrep.rlp_03.frame.FrameDelegationClockFactory; @@ -281,4 +282,61 @@ public void testPooledReboundConnections() { connectionOpenCount.set(0); connectionCleanCloseCount.set(0); } + + @Test + public void testRelpBatchSend() { + RelpConfig relpConfig = new RelpConfig( + hostname, + port, + 500, + 0, + false, + Duration.ZERO, + false + ); + + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + + Pool relpConnectionPool = new UnboundPool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); + + int testCycles = 20; + CountDownLatch countDownLatch = new CountDownLatch(testCycles); + + + + for (int i = 0; i < testCycles; i++) { + final String heyRelp = "hey this is batched relp 0 " + i; + final String heyThisBeRelpToo = "hey this is batched relp 1 " + i; + ForkJoinPool.commonPool().submit(() -> { + RelpBatch relpBatch = new RelpBatch(); + relpBatch.insert(heyRelp.getBytes(StandardCharsets.UTF_8)); + relpBatch.insert(heyThisBeRelpToo.getBytes(StandardCharsets.UTF_8)); + + IManagedRelpConnection connection = relpConnectionPool.get(); + + // will set timer to 5 millis + connection.ensureSent(relpBatch); + + relpConnectionPool.offer(connection); + countDownLatch.countDown(); + }); + } + + Assertions.assertDoesNotThrow(() -> countDownLatch.await()); + + relpConnectionPool.close(); + + Assertions.assertEquals(testCycles*2, messageList.size()); + + Pattern heyPattern = Pattern.compile("hey this is batched relp \\d \\d+"); + while(!messageList.isEmpty()) { + byte[] payload = messageList.removeFirst(); + Assertions.assertTrue(heyPattern.matcher(new String(payload, StandardCharsets.UTF_8)).matches()); + } + + Assertions.assertTrue(connectionOpenCount.get() > 1); + Assertions.assertEquals(connectionOpenCount.get(), connectionCleanCloseCount.get()); + connectionOpenCount.set(0); + connectionCleanCloseCount.set(0); + } }