Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
51e22de
chore: gracefully terminate connections at block boundaries
timfn-hg Oct 28, 2025
2357c9a
add tests for block boundary connection closing; enhance logging
timfn-hg Oct 28, 2025
2b16411
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Oct 28, 2025
4fdfbc9
cleanup and spotless
timfn-hg Oct 28, 2025
afbd708
update comments
timfn-hg Oct 28, 2025
0340277
fix tests; fix logging
timfn-hg Oct 29, 2025
8cf53a7
update HAPI test; spotless
timfn-hg Oct 30, 2025
03606a4
update doc
timfn-hg Oct 30, 2025
62c916a
revert graceful shutdown when stopping connection manager
timfn-hg Oct 30, 2025
d585700
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Oct 30, 2025
db2ca69
update test
timfn-hg Oct 30, 2025
1144205
include block and request id in logs when sending block end
timfn-hg Oct 31, 2025
4ca37cc
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Oct 31, 2025
3d980d6
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Oct 31, 2025
4190ef8
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Nov 3, 2025
5b639de
remove extra log statement
timfn-hg Nov 3, 2025
fbc5f3d
fix HAPI test
timfn-hg Nov 4, 2025
92a6a0f
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Nov 4, 2025
aa148fd
spotless
timfn-hg Nov 4, 2025
caad43f
update code comment
timfn-hg Nov 5, 2025
c57cddc
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Nov 10, 2025
556c58e
merge cleanup
timfn-hg Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions hedera-node/docs/design/app/blocks/BlockNodeConnection.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,17 @@ the transition to a terminal state, it too will cease operations.

To establish a connection back to the block node, a new connection object will need to be created.

### Graceful Connection Close

When a connection is closed, a best effort attempt to gracefully close the connection will be performed. There are two
aspects to this "graceful close":
1. Unless the connection is unstable, or we are notifying the block node it is too far behind, before closing an attempt
will be made to send an EndStream request to the block with the code `RESET`.
2. If the connection is actively streaming a block, a best effort to stream the rest of the block will be performed
before closing the connection.

A caveat to this is if the connection manager is being shutdown, then the connections will NOT be closed gracefully.

### Connection States

- **UNINITIALIZED**: Initial state when a connection object is created. The bidi RequestObserver needs to be created.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,16 @@
* Mechanism to retrieve configuration properties related to block-node communication.
*/
private final ConfigProvider configProvider;

/**
* Factory used to create the block node clients.
*/
private final BlockNodeClientFactory clientFactory;
/**
* Flag indicating if this connection should be closed at the next block boundary. For example: if set to true while
* the connection is actively streaming a block, then the connection will continue to stream the remaining block and
* once it is finished it will close the connection.
*/
private final AtomicBoolean closeAtNextBlockBoundary = new AtomicBoolean(false);

/**
* Represents the possible states of a Block Node connection.
Expand Down Expand Up @@ -246,7 +254,10 @@

if (initialBlockToStream != null) {
streamingBlockNumber.set(initialBlockToStream);
logger.info("Block node connection will initially stream with block {}", initialBlockToStream);
logger.info(
"{} Block node connection will initially stream with block {}",
BlockNodeConnection.this,
initialBlockToStream);
}
}

Expand Down Expand Up @@ -312,7 +323,8 @@
.build();
if (logger.isDebugEnabled()) {
logger.debug(
"Created BlockStreamPublishServiceClient for {}:{}.",
"{} Created BlockStreamPublishServiceClient for {}:{}.",
BlockNodeConnection.this,
blockNodeConfig.address(),
blockNodeConfig.port());
}
Expand Down Expand Up @@ -356,19 +368,26 @@
}

if (newState == ConnectionState.ACTIVE) {
scheduleStreamReset();
// start worker thread to handle sending requests
final Thread workerThread = new Thread(new ConnectionWorkerLoopTask(), "bn-conn-worker-" + connectionId);
if (workerThreadRef.compareAndSet(null, workerThread)) {
workerThread.start();
}
handleConnectionActive();
} else {
cancelStreamReset();
}

return true;
}

/**
* Perform necessary setup steps once the connection has entered the ACTIVE state.
*/
private void handleConnectionActive() {
scheduleStreamReset();
// start worker thread to handle sending requests
final Thread workerThread = new Thread(new ConnectionWorkerLoopTask(), "bn-conn-worker-" + connectionId);
if (workerThreadRef.compareAndSet(null, workerThread)) {
workerThread.start();
}
}

/**
* Schedules the periodic stream reset task to ensure responsiveness and reliability.
*/
Expand Down Expand Up @@ -750,6 +769,7 @@
}

final long durationMs = System.currentTimeMillis() - startMs;

blockStreamMetrics.recordRequestLatency(durationMs);

if (blockNumber == -1 && requestNumber == -1) {
Expand Down Expand Up @@ -808,7 +828,7 @@
* failure in closing the connection, the error will be logged and not propagated back to the caller.
* @param callOnComplete whether to call onComplete on the request pipeline
*/
public void close(final boolean callOnComplete) {
void close(final boolean callOnComplete) {
final ConnectionState connState = getConnectionState();
if (connState.isTerminal()) {
logger.debug("{} Connection already in terminal state ({}).", this, connState);
Expand Down Expand Up @@ -964,13 +984,9 @@
blockStreamMetrics.recordConnectionOnError();

if (error instanceof final GrpcException grpcException) {
if (logger.isWarnEnabled()) {
logger.warn("{} Error received (grpcStatus={}).", this, grpcException.status(), grpcException);
}
logger.warn("{} Error received (grpcStatus={}).", this, grpcException.status(), grpcException);
} else {
if (logger.isWarnEnabled()) {
logger.warn("{} Error received.", this, error);
}
logger.warn("{} Error received.", this, error);
}

handleStreamFailure();
Expand Down Expand Up @@ -1007,6 +1023,17 @@
return connectionState.get();
}

/**
* Indicates that this connection should be closed at the next block boundary. If this connection is actively
* streaming a block, then the connection will wait until the block is fully sent before closing. If the connection
* is waiting to stream a block that is not available, then the connection will be closed without sending any items
* for the pending block.
*/
public void closeAtBlockBoundary() {
logger.info("{} Connection will be closed at the next block boundary", this);
closeAtNextBlockBoundary.set(true);
}

@Override
public String toString() {
return "[" + connectionId + "/" + blockNodeConfig.address() + ":" + blockNodeConfig.port() + "/"
Expand Down Expand Up @@ -1080,7 +1107,7 @@
}

// if we exit the worker loop, then this thread is over... remove it from the worker thread reference
logger.info("Worker thread exiting");
logger.info("{} Worker thread exiting", BlockNodeConnection.this);
workerThreadRef.compareAndSet(Thread.currentThread(), null);
}

Expand All @@ -1089,6 +1116,15 @@

if (block == null) {
// The block we want to stream is not available
if (closeAtNextBlockBoundary.get()) {
// The flag to indicate that we should close the connection at a block boundary is set to true
// since no block is available to stream, we are at a safe "boundary" and can close the connection
logger.info(
"{} Block boundary reached; closing connection (no block available)",
BlockNodeConnection.this);
endTheStreamWith(EndStream.Code.RESET);
}

return;
}

Expand All @@ -1098,6 +1134,11 @@
if (itemIndex == 0) {
logger.trace(
"{} Starting to process items for block {}", BlockNodeConnection.this, block.blockNumber());
if (lastSendTimeMillis == -1) {
// if we've never sent a request and this is the first time we are processing a block, update
// the last send time to the current time. this will avoid prematurely sending a request
lastSendTimeMillis = System.currentTimeMillis();
}
}

final int itemSize = item.protobufSize() + 2; // each item has 2 bytes of overhead
Expand Down Expand Up @@ -1135,28 +1176,73 @@
}

if (!pendingRequestItems.isEmpty()) {
// There are pending items to send. Check if enough time has elapsed since the last request was sent.
// If so, send the current pending request.
final long diffMillis = System.currentTimeMillis() - lastSendTimeMillis;
if (diffMillis >= maxRequestDelayMillis()) {
if (block.isClosed() && block.itemCount() == itemIndex) {
// Send the last pending items of the block
sendPendingRequest();
sendBlockEnd();
} else {
// If the duration since the last time of sending a request exceeds the max delay configuration,
// send the pending items
final long diffMillis = System.currentTimeMillis() - lastSendTimeMillis;
final long maxDelayMillis = maxRequestDelayMillis();
if (diffMillis >= maxDelayMillis) {
logger.trace(
"{} Max delay exceeded (target: {}ms, actual: {}ms) - sending {} item(s)",
BlockNodeConnection.this,
maxDelayMillis,
diffMillis,
pendingRequestItems.size());
sendPendingRequest();
}
}
}

if (pendingRequestItems.isEmpty() && block.isClosed() && block.itemCount() == itemIndex) {
// Indicate to the block node that this is the end of the current block
final PublishStreamRequest endOfBlock = PublishStreamRequest.newBuilder()
.endOfBlock(BlockEnd.newBuilder().blockNumber(block.blockNumber()))
.build();
try {
sendRequest(endOfBlock);
} catch (RuntimeException e) {
logger.warn("{} Error sending EndOfBlock request", BlockNodeConnection.this, e);
handleStreamFailureWithoutOnComplete();
}
maybeAdvanceBlock();
}

// We've gathered all block items and have sent them to the block node. No additional work is needed
// for the current block so we can move to the next block.
private void sendBlockEnd() {
final PublishStreamRequest endOfBlock = PublishStreamRequest.newBuilder()
.endOfBlock(BlockEnd.newBuilder().blockNumber(block.blockNumber()))
.build();
try {
sendRequest(block.blockNumber(), requestCtr.get(), endOfBlock);
} catch (final RuntimeException e) {
logger.warn("{} Error sending EndOfBlock request", BlockNodeConnection.this, e);
handleStreamFailureWithoutOnComplete();

Check warning on line 1211 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java#L1209-L1211

Added lines #L1209 - L1211 were not covered by tests
}
}

/**
* Checks if the current block has all of its items sent. If so, then the next block is loaded into the worker.
* Additionally, if there is a request to close the connection at a block boundary and the current block is
* finished, then this connection will be closed.
*/
private void maybeAdvanceBlock() {
final boolean finishedWithCurrentBlock = pendingRequestItems.isEmpty() // no more items ready to send
&& block.isClosed() // the block is closed, so no more items are expected
&& block.itemCount() == itemIndex; // we've exhausted all items in the block

if (!finishedWithCurrentBlock) {
return; // still more work to do
}

/*
We are now done with the current block and have two options:
1) We advance to the next block (normal case).
2) This connection has been marked for closure after we are finished processing the current block. If this
is true, then we will close this connection. This allows us to close the connection at a block boundary
instead of closing the connection mid-block.
*/

if (closeAtNextBlockBoundary.get()) {
// the connection manager wants us to gracefully stop this connection
logger.info(
"{} Block boundary reached; closing connection (finished sending block)",
BlockNodeConnection.this);
endTheStreamWith(EndStream.Code.RESET);
} else {
// the connection manager hasn't informed us to close this connection, so we are now free to advance to
// the next block
final long nextBlockNumber = block.blockNumber() + 1;
if (streamingBlockNumber.compareAndSet(block.blockNumber(), nextBlockNumber)) {
logger.trace("{} Advancing to block {}", BlockNodeConnection.this, nextBlockNumber);
Expand All @@ -1181,16 +1267,14 @@
final PublishStreamRequest req =
PublishStreamRequest.newBuilder().blockItems(itemSet).build();

if (logger.isTraceEnabled()) {
logger.trace(
"{} Attempting to send request (block={}, request={}, itemCount={}, estimatedBytes={} actualBytes={})",
BlockNodeConnection.this,
block.blockNumber(),
requestCtr.get(),
pendingRequestItems.size(),
pendingRequestBytes,
req.protobufSize());
}
logger.trace(
"{} Attempting to send request (block={}, request={}, itemCount={}, estimatedBytes={} actualBytes={})",
BlockNodeConnection.this,
block.blockNumber(),
requestCtr.get(),
pendingRequestItems.size(),
pendingRequestBytes,
req.protobufSize());

try {
if (sendRequest(block.blockNumber(), requestCtr.get(), req)) {
Expand Down Expand Up @@ -1243,25 +1327,36 @@
}

// Swap blocks and reset
if (logger.isTraceEnabled()) {
final long oldBlock = block == null ? -1 : block.blockNumber();
logger.trace(
"{} Worker switching from block {} to block {}",
BlockNodeConnection.this,
oldBlock,
latestActiveBlockNumber);
}

final BlockState oldBlock = block;
block = blockBufferService.getBlockState(latestActiveBlockNumber);

if (block == null && latestActiveBlockNumber < blockBufferService.getEarliestAvailableBlockNumber()) {
// Indicate that the block node should catch up from another trustworthy block node
logger.warn(
"{} Wanted block ({}) is not obtainable; notifying block node it is too far behind and closing connection",
BlockNodeConnection.this,
latestActiveBlockNumber);
endStreamAndReschedule(TOO_FAR_BEHIND);
}

pendingRequestBytes = BYTES_PADDING;
itemIndex = 0;
pendingRequestItems.clear();
requestCtr.set(1);

if (block == null) {
logger.trace(
"{} Wanted to switch from block {} to block {}, but it is not available",
BlockNodeConnection.this,
(oldBlock == null ? -1 : oldBlock.blockNumber()),
latestActiveBlockNumber);
} else {
logger.trace(
"{} Switched from block {} to block {}",
BlockNodeConnection.this,
(oldBlock == null ? -1 : oldBlock.blockNumber()),
latestActiveBlockNumber);
}
}

/**
Expand Down
Loading
Loading