From 51e22def826bae8ba97226c0eca1c76c9b54b6f8 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Tue, 28 Oct 2025 12:32:39 -0500 Subject: [PATCH 01/15] chore: gracefully terminate connections at block boundaries Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 100 ++++++++++++++---- .../streaming/BlockNodeConnectionManager.java | 14 +-- 2 files changed, 88 insertions(+), 26 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 48d28750f4ba..39e4d4d0832a 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -40,6 +40,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; @@ -152,6 +153,8 @@ private record Options(Optional authority, String contentType) implement private final ConfigProvider configProvider; private final BlockNodeClientFactory clientFactory; + private final AtomicReference connectionStartTimestamp = new AtomicReference<>(); + private final AtomicBoolean closeAtNextBlockBoundary = new AtomicBoolean(false); /** * Represents the possible states of a Block Node connection. @@ -327,12 +330,7 @@ private boolean updateConnectionState( } if (newState == ConnectionState.ACTIVE) { - scheduleStreamReset(); - // start worker thread to handle sending requests - final Thread workerThread = new Thread(new ConnectionWorkerLoopTask(), "bn-conn-worker-" + connectionId); - if (workerThreadRef.compareAndSet(null, workerThread)) { - workerThread.start(); - } + handleConnectionActive(); } else { cancelStreamReset(); } @@ -340,6 +338,20 @@ private boolean updateConnectionState( return true; } + private void handleConnectionActive() { + connectionStartTimestamp.set(Instant.now()); + scheduleStreamReset(); + // start worker thread to handle sending requests + final Thread workerThread = new Thread(new ConnectionWorkerLoopTask(), "bn-conn-worker-" + connectionId); + if (workerThreadRef.compareAndSet(null, workerThread)) { + workerThread.start(); + } + } + + Instant connectionStartTimestamp() { + return connectionStartTimestamp.get(); + } + /** * Schedules the periodic stream reset task to ensure responsiveness and reliability. */ @@ -710,7 +722,6 @@ public boolean sendRequest(@NonNull final PublishStreamRequest request) { TRACE, this, "[{}] Sending request to block node (type={}, bytes={})", - this, request.request().kind(), request.protobufSize()); } @@ -718,7 +729,7 @@ public boolean sendRequest(@NonNull final PublishStreamRequest request) { pipeline.onNext(request); final long durationMs = System.currentTimeMillis() - startMs; blockStreamMetrics.recordRequestLatency(durationMs); - logWithContext(logger, TRACE, this, "Request took {}ms to send", this, durationMs); + logWithContext(logger, TRACE, this, "Request took {}ms to send", durationMs); if (request.hasEndStream()) { blockStreamMetrics.recordRequestEndStreamSent( @@ -763,7 +774,7 @@ connection is in another state (e.g. CLOSING) then we want to ignore the error. * failure in closing the connection, the error will be logged and not propagated back to the caller. * @param callOnComplete whether to call onComplete on the request pipeline */ - public void close(final boolean callOnComplete) { + private void close(final boolean callOnComplete) { final ConnectionState connState = getConnectionState(); if (connState.isTerminal()) { logWithContext(logger, DEBUG, this, "Connection already in terminal state ({}).", connState); @@ -934,6 +945,10 @@ public ConnectionState getConnectionState() { return connectionState.get(); } + public void closeAfterCurrentBlockSent() { + closeAtNextBlockBoundary.set(true); + } + @Override public String toString() { return "[" + connectionId + "/" + blockNodeConfig.address() + ":" + blockNodeConfig.port() + "/" @@ -979,6 +994,7 @@ private class ConnectionWorkerLoopTask implements Runnable { private int itemIndex = 0; private BlockState block; private long lastSendTimeMillis = -1; + private final AtomicInteger requestCtr = new AtomicInteger(1); @Override public void run() { @@ -1074,19 +1090,38 @@ private void doWork() { } } - if (pendingRequestItems.isEmpty() && block.isClosed() && block.itemCount() == itemIndex) { - // We've gathered all block items and have sent them to the block node. No additional work is needed - // for the current block so we can move to the next block. + maybeAdvanceBlock(); + } + + private void maybeAdvanceBlock() { + final boolean finishedWithCurrentBlock = + pendingRequestItems.isEmpty() // no more items ready to send + && block.isClosed() // the block is closed, so no more items are expected + && block.itemCount() == itemIndex; // we've exhausted all items in the block + + if (!finishedWithCurrentBlock) { + return; // still more work to do + } + + /* + We are now down with the current block and have two options. + 1) We advance to the next block (normal case). + 2) We check with the connection manager and determine if we should gracefully close the connection. This can + happen if we are switching connections, but we wanted to finish streaming the current block before + closing this connection. This allows us to switch connections at block boundaries, instead of mid-block. + */ + + if (closeAtNextBlockBoundary.get()) { + // the connection manager wants us to gracefully stop this connection + endTheStreamWith(RESET); + } else { + // the connection manager hasn't informed us to close this connection, so we are now free to advance to + // the next block final long nextBlockNumber = block.blockNumber() + 1; if (streamingBlockNumber.compareAndSet(block.blockNumber(), nextBlockNumber)) { - logWithContext(logger, TRACE, BlockNodeConnection.this, "Advancing to block {}", nextBlockNumber); + logger.trace("{} Advancing to block {}", BlockNodeConnection.this, nextBlockNumber); } else { - logWithContext( - logger, - TRACE, - BlockNodeConnection.this, - "Tried to advance to block {} but the block to stream was updated externally", - nextBlockNumber); + logger.trace("{} Tried to advance to block {} but the block to stream was updated externally", BlockNodeConnection.this, nextBlockNumber); } } } @@ -1103,6 +1138,31 @@ private boolean sendPendingRequest() { final PublishStreamRequest req = PublishStreamRequest.newBuilder().blockItems(itemSet).build(); + if (logger.isTraceEnabled()) { + // gather information about what type of items are in the request + + int headerIndex = -1; + int itemStartIndex = -1; + int itemEndIndex = -1; + int proofIndex = -1; + for (int i = 0; i < pendingRequestItems.size(); ++i) { + final BlockItem item = pendingRequestItems.get(i); + switch (item.item().kind()) { + case BLOCK_HEADER -> headerIndex = i; + case BLOCK_PROOF -> proofIndex = i; + default -> { + if (itemStartIndex == -1) { + itemStartIndex = i; + } + itemEndIndex = Math.max(itemEndIndex, i); + } + } + } + + logger.trace("{} Request details: block={}, request={}, items={}, headerIndex={}, otherItemsIndexRange=[{}, {}], proofIndex={}", + BlockNodeConnection.this, block.blockNumber(), requestCtr.get(), pendingRequestItems.size(), headerIndex, itemStartIndex, itemEndIndex, proofIndex); + } + try { if (sendRequest(req)) { // record that we've sent the request @@ -1111,6 +1171,7 @@ private boolean sendPendingRequest() { // clear the pending request data pendingRequestBytes = BYTES_PADDING; pendingRequestItems.clear(); + requestCtr.incrementAndGet(); return true; } } catch (final UncheckedIOException e) { @@ -1173,6 +1234,7 @@ private void switchBlockIfNeeded() { pendingRequestBytes = BYTES_PADDING; itemIndex = 0; pendingRequestItems.clear(); + requestCtr.set(1); } /** diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 065c2702bde0..9700c1c2354a 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -118,7 +118,7 @@ public class BlockNodeConnectionManager { /** * The directory containing the block node connection configuration file. */ - private Path blockNodeConfigDirectory; + private final Path blockNodeConfigDirectory; /** * The file name of the block node configuration file. */ @@ -268,7 +268,7 @@ private List extractBlockNodesConfigurations(@NonNull final Str * @return whether there is only one block node configured */ public boolean isOnlyOneBlockNodeConfigured() { - int size; + final int size; synchronized (availableBlockNodes) { size = availableBlockNodes.size(); } @@ -405,7 +405,7 @@ private void scheduleConnectionAttempt( } catch (final Exception e) { logger.error(formatLogMessage("Failed to schedule connection task for block node.", newConnection), e); connections.remove(newConnection.getNodeConfig()); - newConnection.close(true); + newConnection.closeAfterCurrentBlockSent(); } } @@ -447,7 +447,7 @@ private void closeAllConnections() { final Map.Entry entry = it.next(); final BlockNodeConnection connection = entry.getValue(); try { - connection.close(true); + connection.closeAfterCurrentBlockSent(); } catch (final RuntimeException e) { logWithContext( logger, @@ -805,7 +805,7 @@ public void run() { DEBUG, "Active connection has equal/higher priority. Ignoring candidate. Active: {}.", activeConnection); - connection.close(true); + connection.closeAfterCurrentBlockSent(); return; } } @@ -831,7 +831,7 @@ public void run() { // close the old active connection try { logWithContext(DEBUG, "Closing current active connection {}.", activeConnection); - activeConnection.close(true); + activeConnection.closeAfterCurrentBlockSent(); } catch (final RuntimeException e) { logger.info( "Failed to shutdown current active connection {} (shutdown reason: another connection was elevated to active).", @@ -893,7 +893,7 @@ private void reschedule() { // If rescheduling fails, close the connection and remove it from the connection map. A periodic task // will handle checking if there are no longer any connections connections.remove(connection.getNodeConfig()); - connection.close(true); + connection.closeAfterCurrentBlockSent(); } } } From 2357c9a8382abc569423870407bcf7e12a5186f2 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Tue, 28 Oct 2025 14:43:30 -0500 Subject: [PATCH 02/15] add tests for block boundary connection closing; enhance logging Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 88 +++++----- .../streaming/BlockNodeConnectionManager.java | 12 +- .../BlockNodeConnectionManagerTest.java | 32 ++-- .../streaming/BlockNodeConnectionTest.java | 166 +++++++++++++++++- 4 files changed, 227 insertions(+), 71 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 39e4d4d0832a..740fdccf8eaf 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -7,7 +7,6 @@ import static org.apache.logging.log4j.Level.DEBUG; import static org.apache.logging.log4j.Level.ERROR; import static org.apache.logging.log4j.Level.INFO; -import static org.apache.logging.log4j.Level.TRACE; import static org.apache.logging.log4j.Level.WARN; import static org.hiero.block.api.PublishStreamRequest.EndStream.Code.RESET; import static org.hiero.block.api.PublishStreamRequest.EndStream.Code.TIMEOUT; @@ -348,10 +347,6 @@ private void handleConnectionActive() { } } - Instant connectionStartTimestamp() { - return connectionStartTimestamp.get(); - } - /** * Schedules the periodic stream reset task to ensure responsiveness and reliability. */ @@ -686,7 +681,7 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) { highestAckedBlockNumber); try { sendRequest(endStream); - } catch (RuntimeException e) { + } catch (final RuntimeException e) { logger.warn(formatLogMessage("Error sending EndStream request", this), e); } close(true); @@ -705,31 +700,22 @@ public boolean sendRequest(@NonNull final PublishStreamRequest request) { if (getConnectionState() == ConnectionState.ACTIVE && pipeline != null) { try { - if (logger.isDebugEnabled()) { - logWithContext( - logger, - DEBUG, - this, - "Sending request to block node (type={}).", - request.request().kind()); - } else if (logger.isTraceEnabled()) { + if (logger.isTraceEnabled()) { /* PublishStreamRequest#protobufSize does the size calculation lazily and thus calling this can incur a performance penality. Therefore, we only want to log the byte size at trace level. */ - logWithContext( - logger, - TRACE, - this, - "[{}] Sending request to block node (type={}, bytes={})", - request.request().kind(), - request.protobufSize()); + logger.trace("{} Sending request to block node (type={}, bytes={})", this, request.request().kind(), request.protobufSize()); + } else if (logger.isDebugEnabled()) { + logger.debug("{} Sending request to block node (type={})", this, request.request().kind()); } + final long startMs = System.currentTimeMillis(); pipeline.onNext(request); final long durationMs = System.currentTimeMillis() - startMs; + blockStreamMetrics.recordRequestLatency(durationMs); - logWithContext(logger, TRACE, this, "Request took {}ms to send", durationMs); + logger.trace("{} Request took {}ms to send", this, durationMs); if (request.hasEndStream()) { blockStreamMetrics.recordRequestEndStreamSent( @@ -774,7 +760,7 @@ connection is in another state (e.g. CLOSING) then we want to ignore the error. * failure in closing the connection, the error will be logged and not propagated back to the caller. * @param callOnComplete whether to call onComplete on the request pipeline */ - private void close(final boolean callOnComplete) { + void close(final boolean callOnComplete) { final ConnectionState connState = getConnectionState(); if (connState.isTerminal()) { logWithContext(logger, DEBUG, this, "Connection already in terminal state ({}).", connState); @@ -945,7 +931,8 @@ public ConnectionState getConnectionState() { return connectionState.get(); } - public void closeAfterCurrentBlockSent() { + public void closeAtBlockBoundary() { + logger.info("{} Connection will be closed at the next block boundary", this); closeAtNextBlockBoundary.set(true); } @@ -1026,11 +1013,23 @@ public void run() { workerThreadRef.compareAndSet(Thread.currentThread(), null); } + private void closeConnection(final EndStream.Code endCode) { + endTheStreamWith(endCode); + blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this); + } + private void doWork() { switchBlockIfNeeded(); if (block == null) { // The block we want to stream is not available + if (closeAtNextBlockBoundary.get()) { + // The flag to indicate that we should close the connection at a block boundary is set to true + // since no block is available to stream, we are at a safe "boundary" and can close the connection + logger.info("{} Block boundary reached; closing connection (no block available)", BlockNodeConnection.this); + closeConnection(EndStream.Code.RESET); + } + return; } @@ -1038,12 +1037,12 @@ private void doWork() { while ((item = block.blockItem(itemIndex)) != null) { if (itemIndex == 0) { - logWithContext( - logger, - TRACE, - BlockNodeConnection.this, - "Starting to process items for block {}", - block.blockNumber()); + logger.trace("{} Starting to process items for block {}", BlockNodeConnection.this, block.blockNumber()); + if (lastSendTimeMillis == -1) { + // if we've never sent a request and this is the first time we are processing a block, update + // the last send time to the current time. this will avoid prematurely sending a request + lastSendTimeMillis = System.currentTimeMillis(); + } } final int itemSize = item.protobufSize(); @@ -1069,8 +1068,7 @@ private void doWork() { itemIndex, newRequestBytes, MAX_BYTES_PER_REQUEST); - endTheStreamWith(EndStream.Code.ERROR); - blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this); + closeConnection(EndStream.Code.ERROR); break; } } else { @@ -1085,7 +1083,9 @@ private void doWork() { // There are pending items to send. Check if enough time has elapsed since the last request was sent. // If so, send the current pending request. final long diffMillis = System.currentTimeMillis() - lastSendTimeMillis; - if (diffMillis >= maxRequestDelayMillis()) { + final long maxDelayMillis = maxRequestDelayMillis(); + if (diffMillis >= maxDelayMillis) { + logger.trace("{} Max delay exceeded (target: {}ms, actual: {}ms) - sending {} item(s)", BlockNodeConnection.this, maxDelayMillis, diffMillis, pendingRequestItems.size()); sendPendingRequest(); } } @@ -1113,7 +1113,8 @@ private void maybeAdvanceBlock() { if (closeAtNextBlockBoundary.get()) { // the connection manager wants us to gracefully stop this connection - endTheStreamWith(RESET); + logger.info("{} Block boundary reached; closing connection (finished sending block)", BlockNodeConnection.this); + closeConnection(EndStream.Code.RESET); } else { // the connection manager hasn't informed us to close this connection, so we are now free to advance to // the next block @@ -1220,21 +1221,20 @@ private void switchBlockIfNeeded() { } // Swap blocks and reset - if (logger.isTraceEnabled()) { - final long oldBlock = block == null ? -1 : block.blockNumber(); - logWithContext( - logger, - TRACE, - BlockNodeConnection.this, - "Worker switching from block {} to block {}", - oldBlock, - latestActiveBlockNumber); - } + final BlockState oldBlock = block; block = blockBufferService.getBlockState(latestActiveBlockNumber); pendingRequestBytes = BYTES_PADDING; itemIndex = 0; pendingRequestItems.clear(); requestCtr.set(1); + + if (block == null) { + logger.trace("{} Wanted to switch from block {} to block {}, but it is not available", + BlockNodeConnection.this, (oldBlock == null ? -1 : oldBlock.blockNumber()), latestActiveBlockNumber); + } else { + logger.trace("{} Switched from block {} to block {}", BlockNodeConnection.this, + (oldBlock == null ? -1 : oldBlock.blockNumber()), latestActiveBlockNumber); + } } /** diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 9700c1c2354a..6678dac97c10 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -118,7 +118,7 @@ public class BlockNodeConnectionManager { /** * The directory containing the block node connection configuration file. */ - private final Path blockNodeConfigDirectory; + private Path blockNodeConfigDirectory; /** * The file name of the block node configuration file. */ @@ -405,7 +405,7 @@ private void scheduleConnectionAttempt( } catch (final Exception e) { logger.error(formatLogMessage("Failed to schedule connection task for block node.", newConnection), e); connections.remove(newConnection.getNodeConfig()); - newConnection.closeAfterCurrentBlockSent(); + newConnection.closeAtBlockBoundary(); } } @@ -447,7 +447,7 @@ private void closeAllConnections() { final Map.Entry entry = it.next(); final BlockNodeConnection connection = entry.getValue(); try { - connection.closeAfterCurrentBlockSent(); + connection.closeAtBlockBoundary(); } catch (final RuntimeException e) { logWithContext( logger, @@ -805,7 +805,7 @@ public void run() { DEBUG, "Active connection has equal/higher priority. Ignoring candidate. Active: {}.", activeConnection); - connection.closeAfterCurrentBlockSent(); + connection.closeAtBlockBoundary(); return; } } @@ -831,7 +831,7 @@ public void run() { // close the old active connection try { logWithContext(DEBUG, "Closing current active connection {}.", activeConnection); - activeConnection.closeAfterCurrentBlockSent(); + activeConnection.closeAtBlockBoundary(); } catch (final RuntimeException e) { logger.info( "Failed to shutdown current active connection {} (shutdown reason: another connection was elevated to active).", @@ -893,7 +893,7 @@ private void reschedule() { // If rescheduling fails, close the connection and remove it from the connection map. A periodic task // will handle checking if there are no longer any connections connections.remove(connection.getNodeConfig()); - connection.closeAfterCurrentBlockSent(); + connection.closeAtBlockBoundary(); } } } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index 4e978c826539..795599414c26 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -266,7 +266,7 @@ void testShutdown() { connections.put(node3Config, node3Conn); // introduce a failure on one of the connection closes to ensure the shutdown process does not fail prematurely - doThrow(new RuntimeException("oops, I did it again")).when(node2Conn).close(true); + doThrow(new RuntimeException("oops, I did it again")).when(node2Conn).closeAtBlockBoundary(); final AtomicBoolean isActive = isActiveFlag(); isActive.set(true); @@ -286,9 +286,9 @@ void testShutdown() { // and not shutdown the buffer service again connectionManager.shutdown(); - verify(node1Conn).close(true); - verify(node2Conn).close(true); - verify(node3Conn).close(true); + verify(node1Conn).closeAtBlockBoundary(); + verify(node2Conn).closeAtBlockBoundary(); + verify(node3Conn).closeAtBlockBoundary(); verify(bufferService).shutdown(); verifyNoMoreInteractions(node1Conn); verifyNoMoreInteractions(node2Conn); @@ -556,7 +556,7 @@ void testConnectionTask_higherPriorityConnectionExists_withoutForce() { verify(activeConnection).getNodeConfig(); verify(newConnection).getNodeConfig(); - verify(newConnection).close(true); + verify(newConnection).closeAtBlockBoundary(); verifyNoMoreInteractions(activeConnection); verifyNoMoreInteractions(newConnection); @@ -584,7 +584,7 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() { assertThat(activeConnectionRef).hasValue(newConnection); verify(activeConnection).getNodeConfig(); - verify(activeConnection).close(true); + verify(activeConnection).closeAtBlockBoundary(); verify(newConnection, times(2)).getNodeConfig(); verify(newConnection).createRequestPipeline(); verify(newConnection).updateConnectionState(ConnectionState.ACTIVE); @@ -617,7 +617,7 @@ void testConnectionTask_connectionUninitialized_withActiveLowerPriorityConnectio assertThat(activeConnectionRef).hasValue(newConnection); verify(activeConnection).getNodeConfig(); - verify(activeConnection).close(true); + verify(activeConnection).closeAtBlockBoundary(); verify(newConnection, times(2)).getNodeConfig(); verify(newConnection).createRequestPipeline(); verify(newConnection).updateConnectionState(ConnectionState.ACTIVE); @@ -677,7 +677,7 @@ void testConnectionTask_closeExistingActiveFailed() { doReturn(activeConnectionConfig).when(activeConnection).getNodeConfig(); doThrow(new RuntimeException("why does this always happen to me")) .when(activeConnection) - .close(true); + .closeAtBlockBoundary(); activeConnectionRef.set(activeConnection); final BlockNodeConnection newConnection = mock(BlockNodeConnection.class); @@ -689,7 +689,7 @@ void testConnectionTask_closeExistingActiveFailed() { assertThat(activeConnectionRef).hasValue(newConnection); verify(activeConnection).getNodeConfig(); - verify(activeConnection).close(true); + verify(activeConnection).closeAtBlockBoundary(); verify(newConnection, times(2)).getNodeConfig(); verify(newConnection).createRequestPipeline(); verify(newConnection).updateConnectionState(ConnectionState.ACTIVE); @@ -804,7 +804,7 @@ void testConnectionTask_reschedule_failure() { verify(connection).createRequestPipeline(); verify(executorService).schedule(eq(task), anyLong(), eq(TimeUnit.MILLISECONDS)); verify(connection, atLeast(1)).getNodeConfig(); - verify(connection).close(true); + verify(connection).closeAtBlockBoundary(); verify(metrics).recordConnectionCreateFailure(); verifyNoMoreInteractions(connection); @@ -1284,7 +1284,7 @@ void testCloseAllConnections() { invoke_closeAllConnections(); - verify(conn).close(true); + verify(conn).closeAtBlockBoundary(); assertThat(connections()).isEmpty(); } @@ -1297,7 +1297,7 @@ void testCloseAllConnections_whenStreamingDisabled() { invoke_closeAllConnections(); - verify(conn).close(true); + verify(conn).closeAtBlockBoundary(); } @Test @@ -1310,7 +1310,7 @@ void testRefreshAvailableBlockNodes() { invoke_refreshAvailableBlockNodes(); // Verify old connection was closed - verify(conn).close(true); + verify(conn).closeAtBlockBoundary(); } @Test @@ -1334,7 +1334,7 @@ void testRefreshAvailableBlockNodes_shutsDownExecutorAndReloads_whenValid() { invoke_refreshAvailableBlockNodes(); // Old connection closed and executor shut down - verify(existing).close(true); + verify(existing).closeAtBlockBoundary(); // Available nodes should be reloaded from bootstrap JSON (non-empty) assertThat(availableNodes()).isNotEmpty(); @@ -1367,13 +1367,13 @@ void testStartConfigWatcher_reactsToCreateModifyDelete() throws Exception { @Test void testCloseAllConnections_withException() { final BlockNodeConnection conn = mock(BlockNodeConnection.class); - doThrow(new RuntimeException("Close failed")).when(conn).close(true); + doThrow(new RuntimeException("Close failed")).when(conn).closeAtBlockBoundary(); connections().put(newBlockNodeConfig(8080, 1), conn); // Should not throw - exceptions are caught and logged invoke_closeAllConnections(); - verify(conn).close(true); + verify(conn).closeAtBlockBoundary(); assertThat(connections()).isEmpty(); } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 922b74f6abcd..28475f43fea3 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -88,7 +88,7 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase { private BlockNodeConnection connection; private BlockNodeConfig nodeConfig; - + private ConfigProvider configProvider; private BlockNodeConnectionManager connectionManager; private BlockBufferService bufferService; private BlockStreamPublishServiceClient grpcServiceClient; @@ -96,11 +96,12 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase { private Pipeline requestPipeline; private ScheduledExecutorService executorService; private BlockNodeStats.HighLatencyResult latencyResult; + private BlockNodeClientFactory clientFactory; @BeforeEach @SuppressWarnings("unchecked") void beforeEach() { - final ConfigProvider configProvider = createConfigProvider(createDefaultConfigProvider()); + configProvider = createConfigProvider(createDefaultConfigProvider()); nodeConfig = newBlockNodeConfig(8080, 1); connectionManager = mock(BlockNodeConnectionManager.class); bufferService = mock(BlockBufferService.class); @@ -110,7 +111,7 @@ void beforeEach() { executorService = mock(ScheduledExecutorService.class); latencyResult = mock(BlockNodeStats.HighLatencyResult.class); - final BlockNodeClientFactory clientFactory = mock(BlockNodeClientFactory.class); + clientFactory = mock(BlockNodeClientFactory.class); lenient() .doReturn(grpcServiceClient) .when(clientFactory) @@ -131,8 +132,6 @@ void beforeEach() { final AtomicReference workerThreadRef = workerThreadRef(); workerThreadRef.set(FAKE_WORKER_THREAD); - // resetMocks(); - lenient().doReturn(requestPipeline).when(grpcServiceClient).publishBlockStream(connection); } @@ -679,6 +678,163 @@ void testSendRequest_activeButPipelineNull() { verifyNoInteractions(bufferService); } + @Test + void testCloseAtBlockBoundary_noActiveBlock() throws Exception { + // re-create the connection so we get the worker thread to run + final long blockNumber = 10; + // indicate we want to start with block 10, but don't add the block to the buffer + + connection = new BlockNodeConnection( + configProvider, + nodeConfig, + connectionManager, + bufferService, + metrics, + executorService, + blockNumber, // start streaming with block 10 + clientFactory); + + lenient().doReturn(requestPipeline).when(grpcServiceClient).publishBlockStream(connection); + + connection.createRequestPipeline(); + connection.updateConnectionState(ConnectionState.ACTIVE); // this will start the worker thread + + assertThat(workerThreadRef()).doesNotHaveNullValue(); + + // sleep for a bit to let the worker run + Thread.sleep(250); + + // signal to close at the block boundary + connection.closeAtBlockBoundary(); + + // the worker should determine there is no block available to stream and with the flag enabled to close at the + // nearest block boundary, the connection should be closed without sending any items + + // sleep for a short period to make sure the worker as run after setting the flag + Thread.sleep(100); + + // now the connection should be closed and all the items are sent + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishStreamRequest.class); + + // only one request should be sent and it should be the EndStream message + verify(requestPipeline).onNext(requestCaptor.capture()); + + assertThat(requestCaptor.getAllValues()).hasSize(1); + final PublishStreamRequest req = requestCaptor.getAllValues().getFirst(); + final EndStream endStream = req.endStream(); + assertThat(endStream).isNotNull(); + assertThat(endStream.endCode()).isEqualTo(EndStream.Code.RESET); + + verify(requestPipeline).onComplete(); + verify(bufferService, atLeastOnce()).getBlockState(blockNumber); + verify(bufferService).getEarliestAvailableBlockNumber(); + verify(bufferService).getHighestAckedBlockNumber(); + verify(metrics).recordConnectionOpened(); + verify(metrics).recordRequestLatency(anyLong()); + verify(metrics).recordRequestEndStreamSent(EndStream.Code.RESET); + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + + verifyNoMoreInteractions(metrics); + verifyNoMoreInteractions(requestPipeline); + verifyNoMoreInteractions(bufferService); + } + + @Test + void testCloseAtBlockBoundary_activeBlock() throws Exception { + // re-create the connection so we get the worker thread to run + final long blockNumber = 10; + final BlockState block = new BlockState(blockNumber); + when(bufferService.getBlockState(blockNumber)) + .thenReturn(block); + + connection = new BlockNodeConnection( + configProvider, + nodeConfig, + connectionManager, + bufferService, + metrics, + executorService, + blockNumber, // start streaming with block 10 + clientFactory); + + lenient().doReturn(requestPipeline).when(grpcServiceClient).publishBlockStream(connection); + + connection.createRequestPipeline(); + connection.updateConnectionState(ConnectionState.ACTIVE); // this will start the worker thread + + assertThat(workerThreadRef()).doesNotHaveNullValue(); + + block.addItem(newBlockHeaderItem(blockNumber)); + block.addItem(newBlockTxItem(1_345)); + + Thread.sleep(500); // sleep for a bit to ensure the items get sent + + // now signal to close the connection at the block boundary + connection.closeAtBlockBoundary(); + + // sleep for a little bit, then add more items including the proof and ensure they are all sent + Thread.sleep(100); + + block.addItem(newBlockTxItem(5_039)); + block.addItem(newBlockTxItem(590)); + block.addItem(newBlockProofItem(blockNumber, 3_501)); + block.closeBlock(); + + Thread.sleep(500); + + // now the connection should be closed and all the items are sent + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishStreamRequest.class); + + verify(requestPipeline, atLeastOnce()).onNext(requestCaptor.capture()); + // Do NOT try to verify the number of requests sent beyond more than 1 + // Due to timing and multiple threads, there could be different number of requests sent. Instead, only validate + // the number of items and end stream items sent, since that should always be the same + + final List items = new ArrayList<>(); + final List endStreams = new ArrayList<>(); + + for (final PublishStreamRequest request : requestCaptor.getAllValues()) { + final BlockItemSet itemSet = request.blockItems(); + if (itemSet != null) { + items.addAll(itemSet.blockItems()); + } + + final EndStream endStream = request.endStream(); + if (endStream != null) { + endStreams.add(endStream); + } + } + + // all of our items should be sent, 1 header + 3 TX items + 1 proof + assertThat(items).hasSize(5); + assertThat(endStreams).hasSize(1); + + // an EndStream request should also be sent with the RESET code + final EndStream endStream = endStreams.getFirst(); + assertThat(endStream.endCode()).isEqualTo(EndStream.Code.RESET); + + verify(requestPipeline).onComplete(); + verify(bufferService).getBlockState(blockNumber); + verify(bufferService).getEarliestAvailableBlockNumber(); + verify(bufferService).getHighestAckedBlockNumber(); + verify(metrics).recordConnectionOpened(); + verify(metrics, atLeastOnce()).recordRequestLatency(anyLong()); + verify(metrics, atLeastOnce()).recordRequestSent(RequestOneOfType.BLOCK_ITEMS); + verify(metrics, atLeastOnce()).recordBlockItemsSent(anyInt()); + verify(metrics).recordRequestEndStreamSent(EndStream.Code.RESET); + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + + verifyNoMoreInteractions(metrics); + verifyNoMoreInteractions(requestPipeline); + verifyNoMoreInteractions(bufferService); + } + @Test void testClose() { openConnectionAndResetMocks(); From 4fdfbc9aa266da8248e94c757aa7527eb8a6bab0 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Tue, 28 Oct 2025 14:52:02 -0500 Subject: [PATCH 03/15] cleanup and spotless Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 70 +++++++++++++------ .../streaming/BlockNodeConnectionTest.java | 3 +- 2 files changed, 49 insertions(+), 24 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index e8797b82a474..0b7559524c14 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -2,13 +2,6 @@ package com.hedera.node.app.blocks.impl.streaming; import static java.util.Objects.requireNonNull; -<<<<<<< HEAD -import static org.apache.logging.log4j.Level.DEBUG; -import static org.apache.logging.log4j.Level.ERROR; -import static org.apache.logging.log4j.Level.INFO; -import static org.apache.logging.log4j.Level.WARN; -======= ->>>>>>> main import static org.hiero.block.api.PublishStreamRequest.EndStream.Code.RESET; import static org.hiero.block.api.PublishStreamRequest.EndStream.Code.TIMEOUT; import static org.hiero.block.api.PublishStreamRequest.EndStream.Code.TOO_FAR_BEHIND; @@ -678,9 +671,16 @@ public boolean sendRequest(@NonNull final PublishStreamRequest request) { PublishStreamRequest#protobufSize does the size calculation lazily and thus calling this can incur a performance penality. Therefore, we only want to log the byte size at trace level. */ - logger.trace("{} Sending request to block node (type={}, bytes={})", this, request.request().kind(), request.protobufSize()); + logger.trace( + "{} Sending request to block node (type={}, bytes={})", + this, + request.request().kind(), + request.protobufSize()); } else if (logger.isDebugEnabled()) { - logger.debug("{} Sending request to block node (type={})", this, request.request().kind()); + logger.debug( + "{} Sending request to block node (type={})", + this, + request.request().kind()); } final long startMs = System.currentTimeMillis(); @@ -1000,7 +1000,9 @@ private void doWork() { if (closeAtNextBlockBoundary.get()) { // The flag to indicate that we should close the connection at a block boundary is set to true // since no block is available to stream, we are at a safe "boundary" and can close the connection - logger.info("{} Block boundary reached; closing connection (no block available)", BlockNodeConnection.this); + logger.info( + "{} Block boundary reached; closing connection (no block available)", + BlockNodeConnection.this); closeConnection(EndStream.Code.RESET); } @@ -1011,7 +1013,8 @@ private void doWork() { while ((item = block.blockItem(itemIndex)) != null) { if (itemIndex == 0) { - logger.trace("{} Starting to process items for block {}", BlockNodeConnection.this, block.blockNumber()); + logger.trace( + "{} Starting to process items for block {}", BlockNodeConnection.this, block.blockNumber()); if (lastSendTimeMillis == -1) { // if we've never sent a request and this is the first time we are processing a block, update // the last send time to the current time. this will avoid prematurely sending a request @@ -1059,7 +1062,12 @@ private void doWork() { final long diffMillis = System.currentTimeMillis() - lastSendTimeMillis; final long maxDelayMillis = maxRequestDelayMillis(); if (diffMillis >= maxDelayMillis) { - logger.trace("{} Max delay exceeded (target: {}ms, actual: {}ms) - sending {} item(s)", BlockNodeConnection.this, maxDelayMillis, diffMillis, pendingRequestItems.size()); + logger.trace( + "{} Max delay exceeded (target: {}ms, actual: {}ms) - sending {} item(s)", + BlockNodeConnection.this, + maxDelayMillis, + diffMillis, + pendingRequestItems.size()); sendPendingRequest(); } } @@ -1068,8 +1076,7 @@ private void doWork() { } private void maybeAdvanceBlock() { - final boolean finishedWithCurrentBlock = - pendingRequestItems.isEmpty() // no more items ready to send + final boolean finishedWithCurrentBlock = pendingRequestItems.isEmpty() // no more items ready to send && block.isClosed() // the block is closed, so no more items are expected && block.itemCount() == itemIndex; // we've exhausted all items in the block @@ -1087,7 +1094,9 @@ private void maybeAdvanceBlock() { if (closeAtNextBlockBoundary.get()) { // the connection manager wants us to gracefully stop this connection - logger.info("{} Block boundary reached; closing connection (finished sending block)", BlockNodeConnection.this); + logger.info( + "{} Block boundary reached; closing connection (finished sending block)", + BlockNodeConnection.this); closeConnection(EndStream.Code.RESET); } else { // the connection manager hasn't informed us to close this connection, so we are now free to advance to @@ -1096,7 +1105,10 @@ private void maybeAdvanceBlock() { if (streamingBlockNumber.compareAndSet(block.blockNumber(), nextBlockNumber)) { logger.trace("{} Advancing to block {}", BlockNodeConnection.this, nextBlockNumber); } else { - logger.trace("{} Tried to advance to block {} but the block to stream was updated externally", BlockNodeConnection.this, nextBlockNumber); + logger.trace( + "{} Tried to advance to block {} but the block to stream was updated externally", + BlockNodeConnection.this, + nextBlockNumber); } } } @@ -1134,8 +1146,16 @@ private boolean sendPendingRequest() { } } - logger.trace("{} Request details: block={}, request={}, items={}, headerIndex={}, otherItemsIndexRange=[{}, {}], proofIndex={}", - BlockNodeConnection.this, block.blockNumber(), requestCtr.get(), pendingRequestItems.size(), headerIndex, itemStartIndex, itemEndIndex, proofIndex); + logger.trace( + "{} Request details: block={}, request={}, items={}, headerIndex={}, otherItemsIndexRange=[{}, {}], proofIndex={}", + BlockNodeConnection.this, + block.blockNumber(), + requestCtr.get(), + pendingRequestItems.size(), + headerIndex, + itemStartIndex, + itemEndIndex, + proofIndex); } try { @@ -1197,11 +1217,17 @@ private void switchBlockIfNeeded() { requestCtr.set(1); if (block == null) { - logger.trace("{} Wanted to switch from block {} to block {}, but it is not available", - BlockNodeConnection.this, (oldBlock == null ? -1 : oldBlock.blockNumber()), latestActiveBlockNumber); + logger.trace( + "{} Wanted to switch from block {} to block {}, but it is not available", + BlockNodeConnection.this, + (oldBlock == null ? -1 : oldBlock.blockNumber()), + latestActiveBlockNumber); } else { - logger.trace("{} Switched from block {} to block {}", BlockNodeConnection.this, - (oldBlock == null ? -1 : oldBlock.blockNumber()), latestActiveBlockNumber); + logger.trace( + "{} Switched from block {} to block {}", + BlockNodeConnection.this, + (oldBlock == null ? -1 : oldBlock.blockNumber()), + latestActiveBlockNumber); } } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 4c8d5d5693ca..685a6055824f 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -798,8 +798,7 @@ void testCloseAtBlockBoundary_activeBlock() throws Exception { // re-create the connection so we get the worker thread to run final long blockNumber = 10; final BlockState block = new BlockState(blockNumber); - when(bufferService.getBlockState(blockNumber)) - .thenReturn(block); + when(bufferService.getBlockState(blockNumber)).thenReturn(block); connection = new BlockNodeConnection( configProvider, From afbd7085dc1019b2f950538bfe86ebcc907e807d Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Tue, 28 Oct 2025 15:02:55 -0500 Subject: [PATCH 04/15] update comments Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 46 ++++++++++++++----- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 0b7559524c14..eba5e85f2a1e 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -144,9 +144,15 @@ private record Options(Optional authority, String contentType) implement * Mechanism to retrieve configuration properties related to block-node communication. */ private final ConfigProvider configProvider; - + /** + * Factory used to create the block node clients. + */ private final BlockNodeClientFactory clientFactory; - private final AtomicReference connectionStartTimestamp = new AtomicReference<>(); + /** + * Flag indicating if this connection should be closed at the next block boundary. For example: if set to true while + * the connection is actively streaming a block, then the connection will continue to stream the remaining block and + * once it is finished it will close the connection. + */ private final AtomicBoolean closeAtNextBlockBoundary = new AtomicBoolean(false); /** @@ -323,8 +329,10 @@ private boolean updateConnectionState( return true; } + /** + * Perform necessary setup steps once the connection has entered the ACTIVE state. + */ private void handleConnectionActive() { - connectionStartTimestamp.set(Instant.now()); scheduleStreamReset(); // start worker thread to handle sending requests final Thread workerThread = new Thread(new ConnectionWorkerLoopTask(), "bn-conn-worker-" + connectionId); @@ -905,6 +913,12 @@ public ConnectionState getConnectionState() { return connectionState.get(); } + /** + * Indicates that this connection should be closed at the next block boundary. If this connection is actively + * streaming a block, then the connection will wait until the block is fully sent before closing. If the connection + * is waiting to stream a block that is not available, then the connection will be closed without sending any items + * for the pending block. + */ public void closeAtBlockBoundary() { logger.info("{} Connection will be closed at the next block boundary", this); closeAtNextBlockBoundary.set(true); @@ -987,11 +1001,6 @@ public void run() { workerThreadRef.compareAndSet(Thread.currentThread(), null); } - private void closeConnection(final EndStream.Code endCode) { - endTheStreamWith(endCode); - blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this); - } - private void doWork() { switchBlockIfNeeded(); @@ -1075,6 +1084,11 @@ private void doWork() { maybeAdvanceBlock(); } + /** + * Checks if the current block has all of its items sent. If so, then the next block is loaded into the worker. + * Additionally, if there is a request to close the connection at a block boundary and the current block is + * finished, then this connection will be closed. + */ private void maybeAdvanceBlock() { final boolean finishedWithCurrentBlock = pendingRequestItems.isEmpty() // no more items ready to send && block.isClosed() // the block is closed, so no more items are expected @@ -1087,9 +1101,9 @@ private void maybeAdvanceBlock() { /* We are now down with the current block and have two options. 1) We advance to the next block (normal case). - 2) We check with the connection manager and determine if we should gracefully close the connection. This can - happen if we are switching connections, but we wanted to finish streaming the current block before - closing this connection. This allows us to switch connections at block boundaries, instead of mid-block. + 2) This connection has been marked for closure after we are finished processing the current block. If this + is true, then we will close this connection. This allows us to close the connection at a block boundary + instead of closing the connection mid-block. */ if (closeAtNextBlockBoundary.get()) { @@ -1113,6 +1127,16 @@ private void maybeAdvanceBlock() { } } + /** + * Attempts to send the specified end stream code to the block node, and then closes this connection. + * + * @param endCode the end stream code to attempt to send to the block node before closing + */ + private void closeConnection(final EndStream.Code endCode) { + endTheStreamWith(endCode); + blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this); + } + /** * Attempt to send the pending block items to the block node. * From 03402773f027cf5f5c8510dd402c5b38c154235a Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Wed, 29 Oct 2025 14:28:05 -0500 Subject: [PATCH 05/15] fix tests; fix logging Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 31 +++++++++++-------- .../BlockNodeConnectionManagerTest.java | 4 +-- .../streaming/BlockNodeConnectionTest.java | 6 ++-- .../bdd/suites/blocknode/BlockNodeSuite.java | 2 ++ 4 files changed, 25 insertions(+), 18 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index eba5e85f2a1e..8bbb5c739f26 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -231,7 +231,7 @@ public BlockNodeConnection( if (initialBlockToStream != null) { streamingBlockNumber.set(initialBlockToStream); - logger.info("Block node connection will initially stream with block {}", initialBlockToStream); + logger.info("{} Block node connection will initially stream with block {}", BlockNodeConnection.this, initialBlockToStream); } } @@ -277,7 +277,8 @@ public synchronized void createRequestPipeline() { .build(); if (logger.isDebugEnabled()) { logger.debug( - "Created BlockStreamPublishServiceClient for {}:{}.", + "{} Created BlockStreamPublishServiceClient for {}:{}.", + BlockNodeConnection.this, blockNodeConfig.address(), blockNodeConfig.port()); } @@ -668,6 +669,10 @@ public void endTheStreamWith(final PublishStreamRequest.EndStream.Code code) { * @return true if the request was sent, else false */ public boolean sendRequest(@NonNull final PublishStreamRequest request) { + return sendRequest(-1, -1, request); + } + + private boolean sendRequest(final long blockNumber, final int requestNumber, @NonNull final PublishStreamRequest request) { requireNonNull(request, "request must not be null"); final Pipeline pipeline = requestPipelineRef.get(); @@ -680,14 +685,18 @@ public boolean sendRequest(@NonNull final PublishStreamRequest request) { a performance penality. Therefore, we only want to log the byte size at trace level. */ logger.trace( - "{} Sending request to block node (type={}, bytes={})", + "{} Sending request (block={}, requestNumber={}) to block node (type={}, bytes={})", this, + blockNumber, + requestNumber, request.request().kind(), request.protobufSize()); } else if (logger.isDebugEnabled()) { logger.debug( - "{} Sending request to block node (type={})", + "{} Sending request (block={}, requestNumber={}) to block node (type={})", this, + blockNumber, + requestNumber, request.request().kind()); } @@ -696,7 +705,7 @@ public boolean sendRequest(@NonNull final PublishStreamRequest request) { final long durationMs = System.currentTimeMillis() - startMs; blockStreamMetrics.recordRequestLatency(durationMs); - logger.trace("{} Request took {}ms to send", this, durationMs); + logger.trace("{} Request (block={}, requestNumber={}) took {}ms to send", this, blockNumber, requestNumber, durationMs); if (request.hasEndStream()) { blockStreamMetrics.recordRequestEndStreamSent( @@ -870,13 +879,9 @@ public void onError(final Throwable error) { blockStreamMetrics.recordConnectionOnError(); if (error instanceof final GrpcException grpcException) { - if (logger.isWarnEnabled()) { - logger.warn("{} Error received (grpcStatus={}).", this, grpcException.status(), grpcException); - } + logger.warn("{} Error received (grpcStatus={}).", this, grpcException.status(), grpcException); } else { - if (logger.isWarnEnabled()) { - logger.warn("{} Error received.", this, error); - } + logger.warn("{} Error received.", this, error); } handleStreamFailure(); @@ -997,7 +1002,7 @@ public void run() { } // if we exit the worker loop, then this thread is over... remove it from the worker thread reference - logger.info("Worker thread exiting"); + logger.info("{} Worker thread exiting", BlockNodeConnection.this); workerThreadRef.compareAndSet(Thread.currentThread(), null); } @@ -1183,7 +1188,7 @@ private boolean sendPendingRequest() { } try { - if (sendRequest(req)) { + if (sendRequest(block.blockNumber(), requestCtr.get(), req)) { // record that we've sent the request lastSendTimeMillis = System.currentTimeMillis(); diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index abed6d71293a..de038c47fdcd 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -1798,7 +1798,7 @@ void testConnectionTask_closeOldActiveConnectionThrowsException() { final BlockNodeConnection oldActive = mock(BlockNodeConnection.class); final BlockNodeConfig oldConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 2); doReturn(oldConfig).when(oldActive).getNodeConfig(); - doThrow(new RuntimeException("Close failed")).when(oldActive).close(true); + doThrow(new RuntimeException("Close failed")).when(oldActive).closeAtBlockBoundary(); activeConnectionRef.set(oldActive); final BlockNodeConnection newConnection = mock(BlockNodeConnection.class); @@ -1808,7 +1808,7 @@ void testConnectionTask_closeOldActiveConnectionThrowsException() { // Should handle exception gracefully connectionManager.new BlockNodeConnectionTask(newConnection, Duration.ZERO, false).run(); - verify(oldActive).close(true); + verify(oldActive).closeAtBlockBoundary(); verify(newConnection).createRequestPipeline(); verify(newConnection).updateConnectionState(ConnectionState.ACTIVE); assertThat(activeConnectionRef).hasValue(newConnection); diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 685a6055824f..cdb2c329144e 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -1432,7 +1432,7 @@ void testConnectionWorker_noItemsAvailable() throws Exception { connection.updateConnectionState(ConnectionState.ACTIVE); // sleep to let the worker detect the state change and start doing work - sleep(150); + sleep(250); assertThat(workerThreadRef).doesNotHaveNullValue(); assertThat(streamingBlockNumber).hasValue(10); @@ -1464,7 +1464,7 @@ void testConnectionWorker_blockJump() throws Exception { connection.updateConnectionState(ConnectionState.ACTIVE); // sleep to let the worker detect the state change and start doing work - sleep(150); + sleep(250); // create a skip response to force the connection to jump to block 11 final PublishStreamResponse skipResponse = createSkipBlock(10L); @@ -1511,7 +1511,7 @@ void testConnectionWorker_hugeItem() throws Exception { connection.updateConnectionState(ConnectionState.ACTIVE); // sleep to let the worker detect the state change and start doing work - sleep(150); + sleep(250); final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishStreamRequest.class); verify(requestPipeline, times(2)).onNext(requestCaptor.capture()); diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index 9f622cb89aed..acb4fafb1597 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -509,6 +509,8 @@ final Stream testProactiveBlockBufferAction() { // look for the log that shows we are forcing a reconnect to a different block node "Attempting to forcefully switch block node connections due to increasing block buffer saturation")), doingContextual(spec -> timeRef.set(Instant.now())), + // BN 0 may be promoted again since it is a higher priority connection; re-enable acknowledging + blockNode(0).updateSendingBlockAcknowledgements(true), sourcingContextual(spec -> assertBlockNodeCommsLogContainsTimeframe( byNodeId(0), timeRef::get, From 8cf53a7a79db68f3fea36cd649046f44085b7245 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Wed, 29 Oct 2025 20:58:29 -0500 Subject: [PATCH 06/15] update HAPI test; spotless Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 15 ++++++++++++--- .../bdd/suites/blocknode/BlockNodeSuite.java | 6 ++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 8bbb5c739f26..a0cbb2e0869a 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -231,7 +231,10 @@ public BlockNodeConnection( if (initialBlockToStream != null) { streamingBlockNumber.set(initialBlockToStream); - logger.info("{} Block node connection will initially stream with block {}", BlockNodeConnection.this, initialBlockToStream); + logger.info( + "{} Block node connection will initially stream with block {}", + BlockNodeConnection.this, + initialBlockToStream); } } @@ -672,7 +675,8 @@ public boolean sendRequest(@NonNull final PublishStreamRequest request) { return sendRequest(-1, -1, request); } - private boolean sendRequest(final long blockNumber, final int requestNumber, @NonNull final PublishStreamRequest request) { + private boolean sendRequest( + final long blockNumber, final int requestNumber, @NonNull final PublishStreamRequest request) { requireNonNull(request, "request must not be null"); final Pipeline pipeline = requestPipelineRef.get(); @@ -705,7 +709,12 @@ private boolean sendRequest(final long blockNumber, final int requestNumber, @No final long durationMs = System.currentTimeMillis() - startMs; blockStreamMetrics.recordRequestLatency(durationMs); - logger.trace("{} Request (block={}, requestNumber={}) took {}ms to send", this, blockNumber, requestNumber, durationMs); + logger.trace( + "{} Request (block={}, requestNumber={}) took {}ms to send", + this, + blockNumber, + requestNumber, + durationMs); if (request.hasEndStream()) { blockStreamMetrics.recordRequestEndStreamSent( diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index acb4fafb1597..f741e61dbd8e 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -432,6 +432,12 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { String.format( "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE.", portNumbers.get(1)), + String.format( + "/localhost:%s/ACTIVE] Connection will be closed at the next block boundary", + portNumbers.get(3)), + String.format( + "/localhost:%s/ACTIVE] Block boundary reached; closing connection (finished sending block)", + portNumbers.get(3)), String.format("/localhost:%s/CLOSING] Closing connection.", portNumbers.get(3)), String.format( "/localhost:%s/CLOSING] Connection state transitioned from ACTIVE to CLOSING.", From 03606a44dfe21fbe15205ae78df09599417a4559 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Wed, 29 Oct 2025 21:12:44 -0500 Subject: [PATCH 07/15] update doc Signed-off-by: Tim Farber-Newman --- .../docs/design/app/blocks/BlockNodeConnection.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md index 6f33b08f46d7..f4e3f423592b 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md @@ -100,6 +100,15 @@ the transition to a terminal state, it too will cease operations. To establish a connection back to the block node, a new connection object will need to be created. +### Graceful Connection Close + +When a connection is closed, a best effort attempt to gracefully close the connection will be performed. There are two +aspects to this "graceful close": +1. Unless the connection is unstable, or we are notifying the block node it is too far behind, before closing an attempt +will be made to send an EndStream request to the block with the code `RESET`. +2. If the connection is actively streaming a block, a best effort to stream the rest of the block will be performed +before closing the connection. + ### Connection States - **UNINITIALIZED**: Initial state when a connection object is created. The bidi RequestObserver needs to be created. From 62c916aa7655da205701467d9800c7bf14d768dc Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Thu, 30 Oct 2025 13:30:48 -0500 Subject: [PATCH 08/15] revert graceful shutdown when stopping connection manager Signed-off-by: Tim Farber-Newman --- .../design/app/blocks/BlockNodeConnection.md | 2 ++ .../impl/streaming/BlockNodeConnection.java | 27 +++++++++++-------- .../streaming/BlockNodeConnectionManager.java | 4 ++- .../BlockNodeConnectionManagerTest.java | 20 +++++++------- .../BlockNodeSoftwareUpgradeSuite.java | 2 +- 5 files changed, 32 insertions(+), 23 deletions(-) diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md index f4e3f423592b..5a29b4e59148 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md @@ -109,6 +109,8 @@ will be made to send an EndStream request to the block with the code `RESET`. 2. If the connection is actively streaming a block, a best effort to stream the rest of the block will be performed before closing the connection. +A caveat to this is if the connection manager is being shutdown, then the connections will NOT be closed gracefully. + ### Connection States - **UNINITIALIZED**: Initial state when a connection object is created. The bidi RequestObserver needs to be created. diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index a0cbb2e0869a..29f8eb2a7847 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -1080,18 +1080,23 @@ private void doWork() { } if (!pendingRequestItems.isEmpty()) { - // There are pending items to send. Check if enough time has elapsed since the last request was sent. - // If so, send the current pending request. - final long diffMillis = System.currentTimeMillis() - lastSendTimeMillis; - final long maxDelayMillis = maxRequestDelayMillis(); - if (diffMillis >= maxDelayMillis) { - logger.trace( - "{} Max delay exceeded (target: {}ms, actual: {}ms) - sending {} item(s)", - BlockNodeConnection.this, - maxDelayMillis, - diffMillis, - pendingRequestItems.size()); + if (block.isClosed() && block.itemCount() == itemIndex) { + // Send the last pending items of the block sendPendingRequest(); + } else { + // If the duration since the last time of sending a request exceeds the max delay configuration, + // send the pending items + final long diffMillis = System.currentTimeMillis() - lastSendTimeMillis; + final long maxDelayMillis = maxRequestDelayMillis(); + if (diffMillis >= maxDelayMillis) { + logger.trace( + "{} Max delay exceeded (target: {}ms, actual: {}ms) - sending {} item(s)", + BlockNodeConnection.this, + maxDelayMillis, + diffMillis, + pendingRequestItems.size()); + sendPendingRequest(); + } } } diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index dfbd4a26ec8a..90fed4b681fa 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -431,7 +431,9 @@ private void closeAllConnections() { final Map.Entry entry = it.next(); final BlockNodeConnection connection = entry.getValue(); try { - connection.closeAtBlockBoundary(); + // This method is invoked during a shutdown of the connection manager, in which case we don't want + // to gracefully close connections at block boundaries, so just call close immediately. + connection.close(true); } catch (final RuntimeException e) { logger.debug( "{} Error while closing connection during connection manager shutdown. Ignoring.", diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index de038c47fdcd..4d5185664464 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -299,7 +299,7 @@ void testShutdown() { connections.put(node3Config, node3Conn); // introduce a failure on one of the connection closes to ensure the shutdown process does not fail prematurely - doThrow(new RuntimeException("oops, I did it again")).when(node2Conn).closeAtBlockBoundary(); + doThrow(new RuntimeException("oops, I did it again")).when(node2Conn).close(true); final AtomicBoolean isActive = isActiveFlag(); isActive.set(true); @@ -319,9 +319,9 @@ void testShutdown() { // and not shutdown the buffer service again connectionManager.shutdown(); - verify(node1Conn).closeAtBlockBoundary(); - verify(node2Conn).closeAtBlockBoundary(); - verify(node3Conn).closeAtBlockBoundary(); + verify(node1Conn).close(true); + verify(node2Conn).close(true); + verify(node3Conn).close(true); verify(bufferService).shutdown(); verifyNoMoreInteractions(node1Conn); verifyNoMoreInteractions(node2Conn); @@ -1317,7 +1317,7 @@ void testCloseAllConnections() { invoke_closeAllConnections(); - verify(conn).closeAtBlockBoundary(); + verify(conn).close(true); assertThat(connections()).isEmpty(); } @@ -1330,7 +1330,7 @@ void testCloseAllConnections_whenStreamingDisabled() { invoke_closeAllConnections(); - verify(conn).closeAtBlockBoundary(); + verify(conn).close(true); } @Test @@ -1343,7 +1343,7 @@ void testRefreshAvailableBlockNodes() { invoke_refreshAvailableBlockNodes(); // Verify old connection was closed - verify(conn).closeAtBlockBoundary(); + verify(conn).close(true); } @Test @@ -1367,7 +1367,7 @@ void testRefreshAvailableBlockNodes_shutsDownExecutorAndReloads_whenValid() { invoke_refreshAvailableBlockNodes(); // Old connection closed and executor shut down - verify(existing).closeAtBlockBoundary(); + verify(existing).close(true); // Available nodes should be reloaded from bootstrap JSON (non-empty) assertThat(availableNodes()).isNotEmpty(); @@ -1455,13 +1455,13 @@ void testStartConfigWatcher_reactsToCreateModifyDelete() throws Exception { @Test void testCloseAllConnections_withException() { final BlockNodeConnection conn = mock(BlockNodeConnection.class); - doThrow(new RuntimeException("Close failed")).when(conn).closeAtBlockBoundary(); + doThrow(new RuntimeException("Close failed")).when(conn).close(true); connections().put(newBlockNodeConfig(8080, 1), conn); // Should not throw - exceptions are caught and logged invoke_closeAllConnections(); - verify(conn).closeAtBlockBoundary(); + verify(conn).close(true); assertThat(connections()).isEmpty(); } diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSoftwareUpgradeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSoftwareUpgradeSuite.java index 12888772b089..2fe65835dfba 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSoftwareUpgradeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSoftwareUpgradeSuite.java @@ -227,7 +227,7 @@ final Stream updateAppPropertiesWriterMode() { timeRef::get, Duration.ofMinutes(2), Duration.ofMinutes(2), - String.format("/localhost:%s/ACTIVE] Sending request to block node", portNumbers.getFirst()), + String.format("/localhost:%s/ACTIVE] Sending request", portNumbers.getFirst()), String.format( "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE.", portNumbers.getFirst()))), From db2ca69b92a785a17ab3d3e08841947797197f09 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Thu, 30 Oct 2025 14:37:49 -0500 Subject: [PATCH 09/15] update test Signed-off-by: Tim Farber-Newman --- .../streaming/BlockNodeConnectionTest.java | 72 +++++++++++++------ 1 file changed, 52 insertions(+), 20 deletions(-) diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index a4c2b7176a17..6a2d6cf45c69 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.when; import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.hapi.block.stream.BlockProof; +import com.hedera.hapi.block.stream.output.BlockHeader; import com.hedera.node.app.blocks.impl.streaming.BlockNodeConnection.ConnectionState; import com.hedera.node.app.metrics.BlockStreamMetrics; import com.hedera.node.config.ConfigProvider; @@ -49,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.hiero.block.api.BlockEnd; import org.hiero.block.api.BlockItemSet; import org.hiero.block.api.BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient; import org.hiero.block.api.PublishStreamRequest; @@ -840,33 +843,61 @@ void testCloseAtBlockBoundary_activeBlock() throws Exception { final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishStreamRequest.class); - verify(requestPipeline, atLeastOnce()).onNext(requestCaptor.capture()); - // Do NOT try to verify the number of requests sent beyond more than 1 - // Due to timing and multiple threads, there could be different number of requests sent. Instead, only validate - // the number of items and end stream items sent, since that should always be the same + /* + There should be at least 3 requests. + All items, in order are: + 1) Block header <-+ + 2) Signed transaction | + 3) Signed transaction +- 1 or more requests + 4) Signed transaction | + 5) Block proof <-+ + 6) Block end <--- single request + 7) EndStream with RESET <--- single request + */ + + verify(requestPipeline, atLeast(3)).onNext(requestCaptor.capture()); + final List requests = requestCaptor.getAllValues(); - final List items = new ArrayList<>(); - final List endStreams = new ArrayList<>(); + final PublishStreamRequest lastRequest = requests.getLast(); + final EndStream endStream = lastRequest.endStream(); + assertThat(endStream).isNotNull(); + assertThat(endStream.endCode()).isEqualTo(EndStream.Code.RESET); - for (final PublishStreamRequest request : requestCaptor.getAllValues()) { - final BlockItemSet itemSet = request.blockItems(); - if (itemSet != null) { - items.addAll(itemSet.blockItems()); - } + final PublishStreamRequest secondToLastRequest = requests.get(requests.size() - 2); + final BlockEnd blockEnd = secondToLastRequest.endOfBlock(); + assertThat(blockEnd).isNotNull(); + assertThat(blockEnd.blockNumber()).isEqualTo(blockNumber); - final EndStream endStream = request.endStream(); - if (endStream != null) { - endStreams.add(endStream); + // collect the block items + final List items = new ArrayList<>(); + for (int i = 0; i < requests.size() - 2; ++i) { + final PublishStreamRequest request = requests.get(i); + final BlockItemSet bis = request.blockItems(); + if (bis != null) { + items.addAll(bis.blockItems()); } } - // all of our items should be sent, 1 header + 3 TX items + 1 proof + // there should be 5 items assertThat(items).hasSize(5); - assertThat(endStreams).hasSize(1); - - // an EndStream request should also be sent with the RESET code - final EndStream endStream = endStreams.getFirst(); - assertThat(endStream.endCode()).isEqualTo(EndStream.Code.RESET); + for (int i = 0; i < 5; ++i) { + final BlockItem blockItem = items.get(i); + + if (i == 0) { + // the first item should be the block header + final BlockHeader header = blockItem.blockHeader(); + assertThat(header).isNotNull(); + assertThat(header.number()).isEqualTo(blockNumber); + } else if (i == 4) { + // the last item should be the block proof + final BlockProof proof = blockItem.blockProof(); + assertThat(proof).isNotNull(); + assertThat(proof.block()).isEqualTo(blockNumber); + } else { + // the other items should all be signed transactions + assertThat(blockItem.signedTransaction()).isNotNull(); + } + } verify(requestPipeline).onComplete(); verify(bufferService).getBlockState(blockNumber); @@ -876,6 +907,7 @@ void testCloseAtBlockBoundary_activeBlock() throws Exception { verify(metrics, atLeastOnce()).recordRequestLatency(anyLong()); verify(metrics, atLeastOnce()).recordRequestSent(RequestOneOfType.BLOCK_ITEMS); verify(metrics, atLeastOnce()).recordBlockItemsSent(anyInt()); + verify(metrics).recordRequestSent(RequestOneOfType.END_OF_BLOCK); verify(metrics).recordRequestEndStreamSent(EndStream.Code.RESET); verify(metrics).recordConnectionClosed(); verify(metrics).recordActiveConnectionIp(-1L); From 1144205296d6aea4e85906d6765e0cc7c3605e94 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Fri, 31 Oct 2025 09:30:57 -0500 Subject: [PATCH 10/15] include block and request id in logs when sending block end Signed-off-by: Tim Farber-Newman --- .../node/app/blocks/impl/streaming/BlockNodeConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index f3f81c6dec3f..e8c21d00b2b8 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -1112,7 +1112,7 @@ private void sendBlockEnd() { .endOfBlock(BlockEnd.newBuilder().blockNumber(block.blockNumber())) .build(); try { - sendRequest(endOfBlock); + sendRequest(block.blockNumber(), requestCtr.get(), endOfBlock); } catch (final RuntimeException e) { logger.warn("{} Error sending EndOfBlock request", BlockNodeConnection.this, e); handleStreamFailureWithoutOnComplete(); From 5b639deb1c454435a8bc56cf1c30553bb0bd81ab Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Mon, 3 Nov 2025 13:02:26 -0600 Subject: [PATCH 11/15] remove extra log statement Signed-off-by: Tim Farber-Newman --- .../impl/streaming/BlockNodeConnection.java | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 0c457399794f..883bc4e8e5f8 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -1173,39 +1173,6 @@ private boolean sendPendingRequest() { final PublishStreamRequest req = PublishStreamRequest.newBuilder().blockItems(itemSet).build(); - if (logger.isTraceEnabled()) { - // gather information about what type of items are in the request - - int headerIndex = -1; - int itemStartIndex = -1; - int itemEndIndex = -1; - int proofIndex = -1; - for (int i = 0; i < pendingRequestItems.size(); ++i) { - final BlockItem item = pendingRequestItems.get(i); - switch (item.item().kind()) { - case BLOCK_HEADER -> headerIndex = i; - case BLOCK_PROOF -> proofIndex = i; - default -> { - if (itemStartIndex == -1) { - itemStartIndex = i; - } - itemEndIndex = Math.max(itemEndIndex, i); - } - } - } - - logger.trace( - "{} Request details: block={}, request={}, items={}, headerIndex={}, otherItemsIndexRange=[{}, {}], proofIndex={}", - BlockNodeConnection.this, - block.blockNumber(), - requestCtr.get(), - pendingRequestItems.size(), - headerIndex, - itemStartIndex, - itemEndIndex, - proofIndex); - } - logger.trace( "{} Attempting to send request (block={}, request={}, itemCount={}, estimatedBytes={} actualBytes={})", BlockNodeConnection.this, From fbc5f3d2906ab80946ad6d4e785ee3469f32a5f4 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Tue, 4 Nov 2025 13:02:35 -0600 Subject: [PATCH 12/15] fix HAPI test Signed-off-by: Tim Farber-Newman --- .../blocks/impl/streaming/BlockNodeConnectionManager.java | 5 +++-- .../impl/streaming/BlockNodeConnectionManagerTest.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index 2c9e41510d68..a7ee88194d80 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -742,7 +742,8 @@ public void run() { "{} Active connection has equal/higher priority. Ignoring candidate. Active: {}.", connection, activeConnection); - connection.closeAtBlockBoundary(); + // This connection was never initialized so we are safe to call close immediately + connection.close(false); return; } } @@ -794,7 +795,7 @@ public void run() { } } } catch (final Exception e) { - logger.debug("{} Failed to establish connection to block node. Will schedule a retry.", connection); + logger.debug("{} Failed to establish connection to block node. Will schedule a retry.", connection, e); blockStreamMetrics.recordConnectionCreateFailure(); reschedule(); selectNewBlockNodeForStreaming(false); diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index f4e709ee4747..0eae33cf8880 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -589,7 +589,7 @@ void testConnectionTask_higherPriorityConnectionExists_withoutForce() { verify(activeConnection).getNodeConfig(); verify(newConnection).getNodeConfig(); - verify(newConnection).closeAtBlockBoundary(); + verify(newConnection).close(false); verifyNoMoreInteractions(activeConnection); verifyNoMoreInteractions(newConnection); From aa148fdefeb30acaa209c78042023ed8fd5b4383 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Tue, 4 Nov 2025 13:21:16 -0600 Subject: [PATCH 13/15] spotless Signed-off-by: Tim Farber-Newman --- .../node/app/blocks/impl/streaming/BlockNodeConnectionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 68d99301ec72..79c36109ee4e 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -115,7 +115,7 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase { @BeforeEach @SuppressWarnings("unchecked") void beforeEach() throws Exception { - configProvider = createConfigProvider(createDefaultConfigProvider()); + configProvider = createConfigProvider(createDefaultConfigProvider()); nodeConfig = newBlockNodeConfig(8080, 1); connectionManager = mock(BlockNodeConnectionManager.class); bufferService = mock(BlockBufferService.class); From caad43ff9457f394a53a47ee5f1bce2470b970d7 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Wed, 5 Nov 2025 09:03:56 -0600 Subject: [PATCH 14/15] update code comment Signed-off-by: Tim Farber-Newman --- .../node/app/blocks/impl/streaming/BlockNodeConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 1d96b547cbeb..8abf8b2c001d 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -1227,7 +1227,7 @@ private void maybeAdvanceBlock() { } /* - We are now down with the current block and have two options. + We are now done with the current block and have two options: 1) We advance to the next block (normal case). 2) This connection has been marked for closure after we are finished processing the current block. If this is true, then we will close this connection. This allows us to close the connection at a block boundary From 556c58ec88f0e86914be3b4a3d06b0206c1065d9 Mon Sep 17 00:00:00 2001 From: Tim Farber-Newman Date: Mon, 10 Nov 2025 10:08:23 -0600 Subject: [PATCH 15/15] merge cleanup Signed-off-by: Tim Farber-Newman --- .../blocks/impl/streaming/BlockNodeConnection.java | 12 ++++++------ .../streaming/BlockNodeConnectionManagerTest.java | 1 - 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 4b5d81b32ddf..ecfead6021d9 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -321,12 +321,12 @@ public synchronized void createRequestPipeline() { .build())) .connectTimeout(timeoutDuration) .build(); - - logger.debug( - "{} Created BlockStreamPublishServiceClient for {}:{}.", - this, - blockNodeProtocolConfig.blockNodeConfig().address(), - blockNodeProtocolConfig.blockNodeConfig().port()); + + logger.debug( + "{} Created BlockStreamPublishServiceClient for {}:{}.", + this, + blockNodeProtocolConfig.blockNodeConfig().address(), + blockNodeProtocolConfig.blockNodeConfig().port()); return clientFactory.createClient(webClient, grpcConfig, OPTIONS); } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index f733717b9d43..0f3ad823f618 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -846,7 +846,6 @@ void testConnectionTask_reschedule_failure() { verify(connection).createRequestPipeline(); verify(executorService).schedule(eq(task), anyLong(), eq(TimeUnit.MILLISECONDS)); - verify(connection, atLeast(1)).getNodeConfig(); verify(connection).closeAtBlockBoundary(); verify(metrics).recordConnectionCreateFailure();