diff --git a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md index 8b47306882ec..8014e11ee852 100644 --- a/hedera-node/docs/design/app/blocks/BlockNodeConnection.md +++ b/hedera-node/docs/design/app/blocks/BlockNodeConnection.md @@ -105,6 +105,17 @@ the transition to a terminal state, it too will cease operations. To establish a connection back to the block node, a new connection object will need to be created. +### Graceful Connection Close + +When a connection is closed, a best effort attempt to gracefully close the connection will be performed. There are two +aspects to this "graceful close": +1. Unless the connection is unstable, or we are notifying the block node it is too far behind, before closing an attempt +will be made to send an EndStream request to the block with the code `RESET`. +2. If the connection is actively streaming a block, a best effort to stream the rest of the block will be performed +before closing the connection. + +A caveat to this is if the connection manager is being shutdown, then the connections will NOT be closed gracefully. + ### Connection States - **UNINITIALIZED**: Initial state when a connection object is created. The bidi RequestObserver needs to be created. diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java index 0f0a7195daa7..ecfead6021d9 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java @@ -160,8 +160,16 @@ private record Options(Optional authority, String contentType) implement * Mechanism to retrieve configuration properties related to block-node communication. */ private final ConfigProvider configProvider; - + /** + * Factory used to create the block node clients. + */ private final BlockNodeClientFactory clientFactory; + /** + * Flag indicating if this connection should be closed at the next block boundary. For example: if set to true while + * the connection is actively streaming a block, then the connection will continue to stream the remaining block and + * once it is finished it will close the connection. + */ + private final AtomicBoolean closeAtNextBlockBoundary = new AtomicBoolean(false); /** * Represents the possible states of a Block Node connection. @@ -245,7 +253,10 @@ public BlockNodeConnection( if (initialBlockToStream != null) { streamingBlockNumber.set(initialBlockToStream); - logger.info("Block node connection will initially stream with block {}", initialBlockToStream); + logger.info( + "{} Block node connection will initially stream with block {}", + BlockNodeConnection.this, + initialBlockToStream); } } @@ -310,12 +321,12 @@ public synchronized void createRequestPipeline() { .build())) .connectTimeout(timeoutDuration) .build(); - if (logger.isDebugEnabled()) { - logger.debug( - "Created BlockStreamPublishServiceClient for {}:{}.", - blockNodeProtocolConfig.blockNodeConfig().address(), - blockNodeProtocolConfig.blockNodeConfig().port()); - } + + logger.debug( + "{} Created BlockStreamPublishServiceClient for {}:{}.", + this, + blockNodeProtocolConfig.blockNodeConfig().address(), + blockNodeProtocolConfig.blockNodeConfig().port()); return clientFactory.createClient(webClient, grpcConfig, OPTIONS); } @@ -356,12 +367,7 @@ private boolean updateConnectionState( } if (newState == ConnectionState.ACTIVE) { - scheduleStreamReset(); - // start worker thread to handle sending requests - final Thread workerThread = new Thread(new ConnectionWorkerLoopTask(), "bn-conn-worker-" + connectionId); - if (workerThreadRef.compareAndSet(null, workerThread)) { - workerThread.start(); - } + handleConnectionActive(); } else { cancelStreamReset(); } @@ -369,6 +375,18 @@ private boolean updateConnectionState( return true; } + /** + * Perform necessary setup steps once the connection has entered the ACTIVE state. + */ + private void handleConnectionActive() { + scheduleStreamReset(); + // start worker thread to handle sending requests + final Thread workerThread = new Thread(new ConnectionWorkerLoopTask(), "bn-conn-worker-" + connectionId); + if (workerThreadRef.compareAndSet(null, workerThread)) { + workerThread.start(); + } + } + /** * Schedules the periodic stream reset task to ensure responsiveness and reliability. */ @@ -751,6 +769,7 @@ private boolean sendRequest( } final long durationMs = System.currentTimeMillis() - startMs; + blockStreamMetrics.recordRequestLatency(durationMs); if (blockNumber == -1 && requestNumber == -1) { @@ -809,7 +828,7 @@ connection is in another state (e.g. CLOSING) then we want to ignore the error. * failure in closing the connection, the error will be logged and not propagated back to the caller. * @param callOnComplete whether to call onComplete on the request pipeline */ - public void close(final boolean callOnComplete) { + void close(final boolean callOnComplete) { final ConnectionState connState = getConnectionState(); if (connState.isTerminal()) { logger.debug("{} Connection already in terminal state ({}).", this, connState); @@ -973,13 +992,9 @@ public void onError(final Throwable error) { blockStreamMetrics.recordConnectionOnError(); if (error instanceof final GrpcException grpcException) { - if (logger.isWarnEnabled()) { - logger.warn("{} Error received (grpcStatus={}).", this, grpcException.status(), grpcException); - } + logger.warn("{} Error received (grpcStatus={}).", this, grpcException.status(), grpcException); } else { - if (logger.isWarnEnabled()) { - logger.warn("{} Error received.", this, error); - } + logger.warn("{} Error received.", this, error); } handleStreamFailure(); @@ -1016,6 +1031,17 @@ public ConnectionState getConnectionState() { return connectionState.get(); } + /** + * Indicates that this connection should be closed at the next block boundary. If this connection is actively + * streaming a block, then the connection will wait until the block is fully sent before closing. If the connection + * is waiting to stream a block that is not available, then the connection will be closed without sending any items + * for the pending block. + */ + public void closeAtBlockBoundary() { + logger.info("{} Connection will be closed at the next block boundary", this); + closeAtNextBlockBoundary.set(true); + } + @Override public String toString() { return "[" + connectionId + "/" @@ -1103,7 +1129,7 @@ public void run() { } // if we exit the worker loop, then this thread is over... remove it from the worker thread reference - logger.info("Worker thread exiting"); + logger.info("{} Worker thread exiting", BlockNodeConnection.this); workerThreadRef.compareAndSet(Thread.currentThread(), null); } @@ -1112,6 +1138,15 @@ private void doWork() { if (block == null) { // The block we want to stream is not available + if (closeAtNextBlockBoundary.get()) { + // The flag to indicate that we should close the connection at a block boundary is set to true + // since no block is available to stream, we are at a safe "boundary" and can close the connection + logger.info( + "{} Block boundary reached; closing connection (no block available)", + BlockNodeConnection.this); + endTheStreamWith(EndStream.Code.RESET); + } + return; } @@ -1121,6 +1156,11 @@ private void doWork() { if (itemIndex == 0) { logger.trace( "{} Starting to process items for block {}", BlockNodeConnection.this, block.blockNumber()); + if (lastSendTimeMillis == -1) { + // if we've never sent a request and this is the first time we are processing a block, update + // the last send time to the current time. this will avoid prematurely sending a request + lastSendTimeMillis = System.currentTimeMillis(); + } } final int itemSize = item.protobufSize() + 5; // add an extra 5 bytes to account for potential overhead @@ -1158,28 +1198,73 @@ private void doWork() { } if (!pendingRequestItems.isEmpty()) { - // There are pending items to send. Check if enough time has elapsed since the last request was sent. - // If so, send the current pending request. - final long diffMillis = System.currentTimeMillis() - lastSendTimeMillis; - if (diffMillis >= maxRequestDelayMillis()) { + if (block.isClosed() && block.itemCount() == itemIndex) { + // Send the last pending items of the block sendPendingRequest(); + sendBlockEnd(); + } else { + // If the duration since the last time of sending a request exceeds the max delay configuration, + // send the pending items + final long diffMillis = System.currentTimeMillis() - lastSendTimeMillis; + final long maxDelayMillis = maxRequestDelayMillis(); + if (diffMillis >= maxDelayMillis) { + logger.trace( + "{} Max delay exceeded (target: {}ms, actual: {}ms) - sending {} item(s)", + BlockNodeConnection.this, + maxDelayMillis, + diffMillis, + pendingRequestItems.size()); + sendPendingRequest(); + } } } - if (pendingRequestItems.isEmpty() && block.isClosed() && block.itemCount() == itemIndex) { - // Indicate to the block node that this is the end of the current block - final PublishStreamRequest endOfBlock = PublishStreamRequest.newBuilder() - .endOfBlock(BlockEnd.newBuilder().blockNumber(block.blockNumber())) - .build(); - try { - sendRequest(endOfBlock); - } catch (RuntimeException e) { - logger.warn("{} Error sending EndOfBlock request", BlockNodeConnection.this, e); - handleStreamFailureWithoutOnComplete(); - } + maybeAdvanceBlock(); + } + + private void sendBlockEnd() { + final PublishStreamRequest endOfBlock = PublishStreamRequest.newBuilder() + .endOfBlock(BlockEnd.newBuilder().blockNumber(block.blockNumber())) + .build(); + try { + sendRequest(block.blockNumber(), requestCtr.get(), endOfBlock); + } catch (final RuntimeException e) { + logger.warn("{} Error sending EndOfBlock request", BlockNodeConnection.this, e); + handleStreamFailureWithoutOnComplete(); + } + } + + /** + * Checks if the current block has all of its items sent. If so, then the next block is loaded into the worker. + * Additionally, if there is a request to close the connection at a block boundary and the current block is + * finished, then this connection will be closed. + */ + private void maybeAdvanceBlock() { + final boolean finishedWithCurrentBlock = pendingRequestItems.isEmpty() // no more items ready to send + && block.isClosed() // the block is closed, so no more items are expected + && block.itemCount() == itemIndex; // we've exhausted all items in the block + + if (!finishedWithCurrentBlock) { + return; // still more work to do + } - // We've gathered all block items and have sent them to the block node. No additional work is needed - // for the current block so we can move to the next block. + /* + We are now done with the current block and have two options: + 1) We advance to the next block (normal case). + 2) This connection has been marked for closure after we are finished processing the current block. If this + is true, then we will close this connection. This allows us to close the connection at a block boundary + instead of closing the connection mid-block. + */ + + if (closeAtNextBlockBoundary.get()) { + // the connection manager wants us to gracefully stop this connection + logger.info( + "{} Block boundary reached; closing connection (finished sending block)", + BlockNodeConnection.this); + endTheStreamWith(EndStream.Code.RESET); + } else { + // the connection manager hasn't informed us to close this connection, so we are now free to advance to + // the next block final long nextBlockNumber = block.blockNumber() + 1; if (streamingBlockNumber.compareAndSet(block.blockNumber(), nextBlockNumber)) { logger.trace("{} Advancing to block {}", BlockNodeConnection.this, nextBlockNumber); @@ -1204,16 +1289,14 @@ private boolean sendPendingRequest() { final PublishStreamRequest req = PublishStreamRequest.newBuilder().blockItems(itemSet).build(); - if (logger.isTraceEnabled()) { - logger.trace( - "{} Attempting to send request (block={}, request={}, itemCount={}, estimatedBytes={} actualBytes={})", - BlockNodeConnection.this, - block.blockNumber(), - requestCtr.get(), - pendingRequestItems.size(), - pendingRequestBytes, - req.protobufSize()); - } + logger.trace( + "{} Attempting to send request (block={}, request={}, itemCount={}, estimatedBytes={} actualBytes={})", + BlockNodeConnection.this, + block.blockNumber(), + requestCtr.get(), + pendingRequestItems.size(), + pendingRequestBytes, + req.protobufSize()); try { if (sendRequest(block.blockNumber(), requestCtr.get(), req)) { @@ -1266,18 +1349,15 @@ private void switchBlockIfNeeded() { } // Swap blocks and reset - if (logger.isTraceEnabled()) { - final long oldBlock = block == null ? -1 : block.blockNumber(); - logger.trace( - "{} Worker switching from block {} to block {}", - BlockNodeConnection.this, - oldBlock, - latestActiveBlockNumber); - } - + final BlockState oldBlock = block; block = blockBufferService.getBlockState(latestActiveBlockNumber); + if (block == null && latestActiveBlockNumber < blockBufferService.getEarliestAvailableBlockNumber()) { // Indicate that the block node should catch up from another trustworthy block node + logger.warn( + "{} Wanted block ({}) is not obtainable; notifying block node it is too far behind and closing connection", + BlockNodeConnection.this, + latestActiveBlockNumber); endStreamAndReschedule(TOO_FAR_BEHIND); } @@ -1285,6 +1365,20 @@ private void switchBlockIfNeeded() { itemIndex = 0; pendingRequestItems.clear(); requestCtr.set(1); + + if (block == null) { + logger.trace( + "{} Wanted to switch from block {} to block {}, but it is not available", + BlockNodeConnection.this, + (oldBlock == null ? -1 : oldBlock.blockNumber()), + latestActiveBlockNumber); + } else { + logger.trace( + "{} Switched from block {} to block {}", + BlockNodeConnection.this, + (oldBlock == null ? -1 : oldBlock.blockNumber()), + latestActiveBlockNumber); + } } /** diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java index dd1d540eaf09..f8db884e6116 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java @@ -260,7 +260,7 @@ private List extractBlockNodesConfigurations(@NonNull f * @return whether there is only one block node configured */ public boolean isOnlyOneBlockNodeConfigured() { - int size; + final int size; synchronized (availableBlockNodes) { size = availableBlockNodes.size(); } @@ -358,7 +358,7 @@ private void scheduleConnectionAttempt( logger.debug("{} Successfully scheduled reconnection task.", newConnection); } catch (final Exception e) { logger.error("{} Failed to schedule connection task for block node.", newConnection, e); - newConnection.close(true); + newConnection.closeAtBlockBoundary(); } } @@ -400,6 +400,8 @@ private void closeAllConnections() { final Map.Entry entry = iterator.next(); final BlockNodeConnection connection = entry.getValue(); try { + // This method is invoked during a shutdown of the connection manager, in which case we don't want + // to gracefully close connections at block boundaries, so just call close immediately. connection.close(true); } catch (final RuntimeException e) { logger.debug( @@ -745,7 +747,8 @@ public void run() { "{} Active connection has equal/higher priority. Ignoring candidate. Active: {}.", connection, activeConnection); - connection.close(true); + // This connection was never initialized so we are safe to call close immediately + connection.close(false); return; } } @@ -771,7 +774,8 @@ public void run() { // close the old active connection try { logger.debug("{} Closing current active connection {}.", connection, activeConnection); - activeConnection.close(true); + activeConnection.closeAtBlockBoundary(); + // For a forced switch, reschedule the previously active connection to try again later if (force) { try { @@ -797,7 +801,7 @@ public void run() { } } } catch (final Exception e) { - logger.debug("{} Failed to establish connection to block node. Will schedule a retry.", connection); + logger.debug("{} Failed to establish connection to block node. Will schedule a retry.", connection, e); blockStreamMetrics.recordConnectionCreateFailure(); reschedule(); selectNewBlockNodeForStreaming(false); @@ -848,7 +852,7 @@ private void reschedule() { logger.info("{} Rescheduled connection attempt (delayMillis={}).", connection, jitteredDelayMs); } catch (final Exception e) { logger.error("{} Failed to reschedule connection attempt. Removing from retry map.", connection, e); - connection.close(true); + connection.closeAtBlockBoundary(); } } } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java index 082b62f0ae6e..0f3ad823f618 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManagerTest.java @@ -602,7 +602,7 @@ void testConnectionTask_higherPriorityConnectionExists_withoutForce() { verify(activeConnection).getNodeConfig(); verify(newConnection).getNodeConfig(); - verify(newConnection).close(true); + verify(newConnection).close(false); verifyNoMoreInteractions(activeConnection); verifyNoMoreInteractions(newConnection); @@ -630,7 +630,7 @@ void testConnectionTask_higherPriorityConnectionExists_withForce() { assertThat(activeConnectionRef).hasValue(newConnection); verify(activeConnection, times(2)).getNodeConfig(); - verify(activeConnection).close(true); + verify(activeConnection).closeAtBlockBoundary(); verify(newConnection, times(2)).getNodeConfig(); verify(newConnection).createRequestPipeline(); verify(newConnection).updateConnectionState(ConnectionState.ACTIVE); @@ -661,7 +661,7 @@ void testConnectionTask_connectionUninitialized_withActiveLowerPriorityConnectio assertThat(activeConnectionRef).hasValue(newConnection); verify(activeConnection).getNodeConfig(); - verify(activeConnection).close(true); + verify(activeConnection).closeAtBlockBoundary(); verify(newConnection, times(2)).getNodeConfig(); verify(newConnection).createRequestPipeline(); verify(newConnection).updateConnectionState(ConnectionState.ACTIVE); @@ -721,7 +721,7 @@ void testConnectionTask_closeExistingActiveFailed() { doReturn(activeConnectionConfig).when(activeConnection).getNodeConfig(); doThrow(new RuntimeException("why does this always happen to me")) .when(activeConnection) - .close(true); + .closeAtBlockBoundary(); activeConnectionRef.set(activeConnection); final BlockNodeConnection newConnection = mock(BlockNodeConnection.class); @@ -733,7 +733,7 @@ void testConnectionTask_closeExistingActiveFailed() { assertThat(activeConnectionRef).hasValue(newConnection); verify(activeConnection).getNodeConfig(); - verify(activeConnection).close(true); + verify(activeConnection).closeAtBlockBoundary(); verify(newConnection, times(2)).getNodeConfig(); verify(newConnection).createRequestPipeline(); verify(newConnection).updateConnectionState(ConnectionState.ACTIVE); @@ -846,7 +846,7 @@ void testConnectionTask_reschedule_failure() { verify(connection).createRequestPipeline(); verify(executorService).schedule(eq(task), anyLong(), eq(TimeUnit.MILLISECONDS)); - verify(connection).close(true); + verify(connection).closeAtBlockBoundary(); verify(metrics).recordConnectionCreateFailure(); verifyNoMoreInteractions(connection); @@ -1756,7 +1756,7 @@ void testConnectionTask_closeOldActiveConnectionThrowsException() { final BlockNodeConnection oldActive = mock(BlockNodeConnection.class); final BlockNodeConfig oldConfig = newBlockNodeConfig(PBJ_UNIT_TEST_HOST, 8080, 2); doReturn(oldConfig).when(oldActive).getNodeConfig(); - doThrow(new RuntimeException("Close failed")).when(oldActive).close(true); + doThrow(new RuntimeException("Close failed")).when(oldActive).closeAtBlockBoundary(); activeConnectionRef.set(oldActive); final BlockNodeConnection newConnection = mock(BlockNodeConnection.class); @@ -1766,7 +1766,7 @@ void testConnectionTask_closeOldActiveConnectionThrowsException() { // Should handle exception gracefully connectionManager.new BlockNodeConnectionTask(newConnection, Duration.ZERO, false).run(); - verify(oldActive).close(true); + verify(oldActive).closeAtBlockBoundary(); verify(newConnection).createRequestPipeline(); verify(newConnection).updateConnectionState(ConnectionState.ACTIVE); assertThat(activeConnectionRef).hasValue(newConnection); diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java index 60be8be24d0b..2fb783f5f60f 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.when; import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.hapi.block.stream.BlockProof; +import com.hedera.hapi.block.stream.output.BlockHeader; import com.hedera.node.app.blocks.impl.streaming.BlockNodeConnection.ConnectionState; import com.hedera.node.app.metrics.BlockStreamMetrics; import com.hedera.node.config.ConfigProvider; @@ -53,6 +55,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import org.hiero.block.api.BlockEnd; import org.hiero.block.api.BlockItemSet; import org.hiero.block.api.BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient; import org.hiero.block.api.PublishStreamRequest; @@ -94,8 +97,8 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase { } private BlockNodeConnection connection; + private ConfigProvider configProvider; private BlockNodeProtocolConfig nodeConfig; - private BlockNodeConnectionManager connectionManager; private BlockBufferService bufferService; private BlockStreamPublishServiceClient grpcServiceClient; @@ -106,10 +109,12 @@ class BlockNodeConnectionTest extends BlockNodeCommunicationTestBase { private BlockNodeStats.HighLatencyResult latencyResult; private BlockNodeClientFactory clientFactory; + private ExecutorService realExecutor; + @BeforeEach @SuppressWarnings("unchecked") void beforeEach() throws Exception { - final ConfigProvider configProvider = createConfigProvider(createDefaultConfigProvider()); + configProvider = createConfigProvider(createDefaultConfigProvider()); nodeConfig = newBlockNodeConfig(8080, 1); connectionManager = mock(BlockNodeConnectionManager.class); bufferService = mock(BlockBufferService.class); @@ -123,10 +128,10 @@ void beforeEach() throws Exception { // Set up default behavior for pipelineExecutor using a real executor // This ensures proper Future semantics while still being fast for tests // Individual tests can override this with their own specific mocks for timeout scenarios - final ExecutorService realExecutor = Executors.newCachedThreadPool(); + realExecutor = Executors.newCachedThreadPool(); lenient() .doAnswer(invocation -> { - Runnable runnable = invocation.getArgument(0); + final Runnable runnable = invocation.getArgument(0); return realExecutor.submit(runnable); }) .when(pipelineExecutor) @@ -143,8 +148,8 @@ void beforeEach() throws Exception { lenient() .doAnswer(invocation -> { - long timeout = invocation.getArgument(0); - TimeUnit unit = invocation.getArgument(1); + final long timeout = invocation.getArgument(0); + final TimeUnit unit = invocation.getArgument(1); return realExecutor.awaitTermination(timeout, unit); }) .when(pipelineExecutor) @@ -176,6 +181,10 @@ void beforeEach() throws Exception { @AfterEach void afterEach() throws Exception { + if (realExecutor != null) { + realExecutor.shutdownNow(); + } + // set the connection to closed so the worker thread stops gracefully connection.updateConnectionState(ConnectionState.CLOSED); final AtomicReference workerThreadRef = workerThreadRef(); @@ -218,10 +227,8 @@ void testCreateRequestPipeline_alreadyExists() { @Test void testConstructorWithInitialBlock() { - final ConfigProvider configProvider = createConfigProvider(createDefaultConfigProvider()); - // Create connection with initial block number - final BlockNodeConnection connectionWithInitialBlock = new BlockNodeConnection( + connection = new BlockNodeConnection( configProvider, nodeConfig, connectionManager, @@ -233,7 +240,7 @@ void testConstructorWithInitialBlock() { clientFactory); // Verify the streamingBlockNumber was set - final AtomicLong streamingBlockNumber = streamingBlockNumber(connectionWithInitialBlock); + final AtomicLong streamingBlockNumber = streamingBlockNumber(); assertThat(streamingBlockNumber).hasValue(100L); } @@ -896,6 +903,193 @@ void testSendRequest_activeButPipelineNull() { verifyNoInteractions(bufferService); } + @Test + void testCloseAtBlockBoundary_noActiveBlock() throws Exception { + // re-create the connection so we get the worker thread to run + final long blockNumber = 10; + // indicate we want to start with block 10, but don't add the block to the buffer + + connection = new BlockNodeConnection( + configProvider, + nodeConfig, + connectionManager, + bufferService, + metrics, + executorService, + pipelineExecutor, + blockNumber, // start streaming with block 10 + clientFactory); + + lenient().doReturn(requestPipeline).when(grpcServiceClient).publishBlockStream(connection); + + connection.createRequestPipeline(); + connection.updateConnectionState(ConnectionState.ACTIVE); // this will start the worker thread + + assertThat(workerThreadRef()).doesNotHaveNullValue(); + + // sleep for a bit to let the worker run + Thread.sleep(250); + + // signal to close at the block boundary + connection.closeAtBlockBoundary(); + + // the worker should determine there is no block available to stream and with the flag enabled to close at the + // nearest block boundary, the connection should be closed without sending any items + + // sleep for a short period to make sure the worker as run after setting the flag + Thread.sleep(100); + + // now the connection should be closed and all the items are sent + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishStreamRequest.class); + + // only one request should be sent and it should be the EndStream message + verify(requestPipeline).onNext(requestCaptor.capture()); + + assertThat(requestCaptor.getAllValues()).hasSize(1); + final PublishStreamRequest req = requestCaptor.getAllValues().getFirst(); + final EndStream endStream = req.endStream(); + assertThat(endStream).isNotNull(); + assertThat(endStream.endCode()).isEqualTo(EndStream.Code.RESET); + + verify(requestPipeline).onComplete(); + verify(bufferService, atLeastOnce()).getBlockState(blockNumber); + verify(bufferService, atLeastOnce()).getEarliestAvailableBlockNumber(); + verify(bufferService).getHighestAckedBlockNumber(); + verify(metrics).recordConnectionOpened(); + verify(metrics).recordRequestLatency(anyLong()); + verify(metrics).recordRequestEndStreamSent(EndStream.Code.RESET); + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + + verifyNoMoreInteractions(metrics); + verifyNoMoreInteractions(requestPipeline); + verifyNoMoreInteractions(bufferService); + } + + @Test + void testCloseAtBlockBoundary_activeBlock() throws Exception { + // re-create the connection so we get the worker thread to run + final long blockNumber = 10; + final BlockState block = new BlockState(blockNumber); + when(bufferService.getBlockState(blockNumber)).thenReturn(block); + + connection = new BlockNodeConnection( + configProvider, + nodeConfig, + connectionManager, + bufferService, + metrics, + executorService, + pipelineExecutor, + blockNumber, // start streaming with block 10 + clientFactory); + + lenient().doReturn(requestPipeline).when(grpcServiceClient).publishBlockStream(connection); + + connection.createRequestPipeline(); + connection.updateConnectionState(ConnectionState.ACTIVE); // this will start the worker thread + + assertThat(workerThreadRef()).doesNotHaveNullValue(); + + block.addItem(newBlockHeaderItem(blockNumber)); + block.addItem(newBlockTxItem(1_345)); + + Thread.sleep(500); // sleep for a bit to ensure the items get sent + + // now signal to close the connection at the block boundary + connection.closeAtBlockBoundary(); + + // sleep for a little bit, then add more items including the proof and ensure they are all sent + Thread.sleep(100); + + block.addItem(newBlockTxItem(5_039)); + block.addItem(newBlockTxItem(590)); + block.addItem(newBlockProofItem(blockNumber, 3_501)); + block.closeBlock(); + + Thread.sleep(500); + + // now the connection should be closed and all the items are sent + assertThat(connection.getConnectionState()).isEqualTo(ConnectionState.CLOSED); + + final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishStreamRequest.class); + + /* + There should be at least 3 requests. + All items, in order are: + 1) Block header <-+ + 2) Signed transaction | + 3) Signed transaction +- 1 or more requests + 4) Signed transaction | + 5) Block proof <-+ + 6) Block end <--- single request + 7) EndStream with RESET <--- single request + */ + + verify(requestPipeline, atLeast(3)).onNext(requestCaptor.capture()); + final List requests = requestCaptor.getAllValues(); + + final PublishStreamRequest lastRequest = requests.getLast(); + final EndStream endStream = lastRequest.endStream(); + assertThat(endStream).isNotNull(); + assertThat(endStream.endCode()).isEqualTo(EndStream.Code.RESET); + + final PublishStreamRequest secondToLastRequest = requests.get(requests.size() - 2); + final BlockEnd blockEnd = secondToLastRequest.endOfBlock(); + assertThat(blockEnd).isNotNull(); + assertThat(blockEnd.blockNumber()).isEqualTo(blockNumber); + + // collect the block items + final List items = new ArrayList<>(); + for (int i = 0; i < requests.size() - 2; ++i) { + final PublishStreamRequest request = requests.get(i); + final BlockItemSet bis = request.blockItems(); + if (bis != null) { + items.addAll(bis.blockItems()); + } + } + + // there should be 5 items + assertThat(items).hasSize(5); + for (int i = 0; i < 5; ++i) { + final BlockItem blockItem = items.get(i); + + if (i == 0) { + // the first item should be the block header + final BlockHeader header = blockItem.blockHeader(); + assertThat(header).isNotNull(); + assertThat(header.number()).isEqualTo(blockNumber); + } else if (i == 4) { + // the last item should be the block proof + final BlockProof proof = blockItem.blockProof(); + assertThat(proof).isNotNull(); + assertThat(proof.block()).isEqualTo(blockNumber); + } else { + // the other items should all be signed transactions + assertThat(blockItem.signedTransaction()).isNotNull(); + } + } + + verify(requestPipeline).onComplete(); + verify(bufferService).getBlockState(blockNumber); + verify(bufferService).getEarliestAvailableBlockNumber(); + verify(bufferService).getHighestAckedBlockNumber(); + verify(metrics).recordConnectionOpened(); + verify(metrics, atLeastOnce()).recordRequestLatency(anyLong()); + verify(metrics, atLeastOnce()).recordRequestSent(RequestOneOfType.BLOCK_ITEMS); + verify(metrics, atLeastOnce()).recordBlockItemsSent(anyInt()); + verify(metrics).recordRequestSent(RequestOneOfType.END_OF_BLOCK); + verify(metrics).recordRequestEndStreamSent(EndStream.Code.RESET); + verify(metrics).recordConnectionClosed(); + verify(metrics).recordActiveConnectionIp(-1L); + + verifyNoMoreInteractions(metrics); + verifyNoMoreInteractions(requestPipeline); + verifyNoMoreInteractions(bufferService); + } + @Test void testClose() { openConnectionAndResetMocks(); @@ -1441,7 +1635,7 @@ void testConnectionWorker_noItemsAvailable() throws Exception { connection.updateConnectionState(ConnectionState.ACTIVE); // sleep to let the worker detect the state change and start doing work - sleep(150); + sleep(250); assertThat(workerThreadRef).doesNotHaveNullValue(); assertThat(streamingBlockNumber).hasValue(10); @@ -1509,7 +1703,7 @@ void testConnectionWorker_blockJump() throws Exception { connection.updateConnectionState(ConnectionState.ACTIVE); // sleep to let the worker detect the state change and start doing work - sleep(150); + sleep(250); // create a skip response to force the connection to jump to block 11 final PublishStreamResponse skipResponse = createSkipBlock(10L); @@ -1557,7 +1751,7 @@ void testConnectionWorker_hugeItem() throws Exception { connection.updateConnectionState(ConnectionState.ACTIVE); // sleep to let the worker detect the state change and start doing work - sleep(150); + sleep(250); final ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishStreamRequest.class); verify(requestPipeline, times(2)).onNext(requestCaptor.capture()); @@ -2481,10 +2675,6 @@ private AtomicLong streamingBlockNumber() { return (AtomicLong) streamingBlockNumberHandle.get(connection); } - private AtomicLong streamingBlockNumber(final BlockNodeConnection conn) { - return (AtomicLong) streamingBlockNumberHandle.get(conn); - } - @SuppressWarnings("unchecked") private AtomicReference workerThreadRef() { return (AtomicReference) workerThreadRefHandle.get(connection); diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java index bf16fe1f160b..73d356806196 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java @@ -433,6 +433,12 @@ final Stream node0StreamingBlockNodeConnectionDropsTrickle() { String.format( "/localhost:%s/ACTIVE] Connection state transitioned from PENDING to ACTIVE.", portNumbers.get(1)), + String.format( + "/localhost:%s/ACTIVE] Connection will be closed at the next block boundary", + portNumbers.get(3)), + String.format( + "/localhost:%s/ACTIVE] Block boundary reached; closing connection (finished sending block)", + portNumbers.get(3)), String.format("/localhost:%s/CLOSING] Closing connection.", portNumbers.get(3)), String.format( "/localhost:%s/CLOSING] Connection state transitioned from ACTIVE to CLOSING.", @@ -1043,8 +1049,6 @@ private Stream validateHappyPath(final int blocksToWait) { assertBlockNodeCommsLogDoesNotContain( byNodeId(0), "Block node has exceeded high latency threshold", Duration.ofSeconds(0)), assertBlockNodeCommsLogContains( - byNodeId(0), - "Sending ad hoc request to block node (type=END_OF_BLOCK)", - Duration.ofSeconds(0))); + byNodeId(0), "Sending request to block node (type=END_OF_BLOCK)", Duration.ofSeconds(0))); } }