Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
51e22de
chore: gracefully terminate connections at block boundaries
timfn-hg Oct 28, 2025
2357c9a
add tests for block boundary connection closing; enhance logging
timfn-hg Oct 28, 2025
2b16411
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Oct 28, 2025
4fdfbc9
cleanup and spotless
timfn-hg Oct 28, 2025
afbd708
update comments
timfn-hg Oct 28, 2025
0340277
fix tests; fix logging
timfn-hg Oct 29, 2025
8cf53a7
update HAPI test; spotless
timfn-hg Oct 30, 2025
03606a4
update doc
timfn-hg Oct 30, 2025
62c916a
revert graceful shutdown when stopping connection manager
timfn-hg Oct 30, 2025
d585700
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Oct 30, 2025
db2ca69
update test
timfn-hg Oct 30, 2025
1144205
include block and request id in logs when sending block end
timfn-hg Oct 31, 2025
4ca37cc
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Oct 31, 2025
3d980d6
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Oct 31, 2025
4190ef8
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Nov 3, 2025
5b639de
remove extra log statement
timfn-hg Nov 3, 2025
fbc5f3d
fix HAPI test
timfn-hg Nov 4, 2025
92a6a0f
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Nov 4, 2025
aa148fd
spotless
timfn-hg Nov 4, 2025
caad43f
update code comment
timfn-hg Nov 5, 2025
c57cddc
Merge branch 'main' into timfn/21878-switch-connections-at-block-boun…
timfn-hg Nov 10, 2025
556c58e
merge cleanup
timfn-hg Nov 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -143,8 +144,16 @@
* Mechanism to retrieve configuration properties related to block-node communication.
*/
private final ConfigProvider configProvider;

/**
* Factory used to create the block node clients.
*/
private final BlockNodeClientFactory clientFactory;
/**
* Flag indicating if this connection should be closed at the next block boundary. For example: if set to true while
* the connection is actively streaming a block, then the connection will continue to stream the remaining block and
* once it is finished it will close the connection.
*/
private final AtomicBoolean closeAtNextBlockBoundary = new AtomicBoolean(false);

/**
* Represents the possible states of a Block Node connection.
Expand Down Expand Up @@ -312,19 +321,26 @@
}

if (newState == ConnectionState.ACTIVE) {
scheduleStreamReset();
// start worker thread to handle sending requests
final Thread workerThread = new Thread(new ConnectionWorkerLoopTask(), "bn-conn-worker-" + connectionId);
if (workerThreadRef.compareAndSet(null, workerThread)) {
workerThread.start();
}
handleConnectionActive();
} else {
cancelStreamReset();
}

return true;
}

/**
* Perform necessary setup steps once the connection has entered the ACTIVE state.
*/
private void handleConnectionActive() {
scheduleStreamReset();
// start worker thread to handle sending requests
final Thread workerThread = new Thread(new ConnectionWorkerLoopTask(), "bn-conn-worker-" + connectionId);
if (workerThreadRef.compareAndSet(null, workerThread)) {
workerThread.start();
}
}

/**
* Schedules the periodic stream reset task to ensure responsiveness and reliability.
*/
Expand Down Expand Up @@ -639,7 +655,7 @@
highestAckedBlockNumber);
try {
sendRequest(endStream);
} catch (RuntimeException e) {
} catch (final RuntimeException e) {

Check warning on line 658 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java#L658

Added line #L658 was not covered by tests
logger.warn("{} Error sending EndStream request", this, e);
}
close(true);
Expand All @@ -658,12 +674,6 @@

if (getConnectionState() == ConnectionState.ACTIVE && pipeline != null) {
try {
if (logger.isDebugEnabled()) {
logger.debug(
"{} Sending request to block node (type={}).",
this,
request.request().kind());
}
if (logger.isTraceEnabled()) {
/*
PublishStreamRequest#protobufSize does the size calculation lazily and thus calling this can incur
Expand All @@ -674,10 +684,17 @@
this,
request.request().kind(),
request.protobufSize());
} else if (logger.isDebugEnabled()) {
logger.debug(

Check warning on line 688 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java#L688

Added line #L688 was not covered by tests
"{} Sending request to block node (type={})",
this,
request.request().kind());

Check warning on line 691 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java#L691

Added line #L691 was not covered by tests
}

final long startMs = System.currentTimeMillis();
pipeline.onNext(request);
final long durationMs = System.currentTimeMillis() - startMs;

blockStreamMetrics.recordRequestLatency(durationMs);
logger.trace("{} Request took {}ms to send", this, durationMs);

Expand Down Expand Up @@ -724,7 +741,7 @@
* failure in closing the connection, the error will be logged and not propagated back to the caller.
* @param callOnComplete whether to call onComplete on the request pipeline
*/
public void close(final boolean callOnComplete) {
void close(final boolean callOnComplete) {
final ConnectionState connState = getConnectionState();
if (connState.isTerminal()) {
logger.debug("{} Connection already in terminal state ({}).", this, connState);
Expand Down Expand Up @@ -896,6 +913,17 @@
return connectionState.get();
}

/**
* Indicates that this connection should be closed at the next block boundary. If this connection is actively
* streaming a block, then the connection will wait until the block is fully sent before closing. If the connection
* is waiting to stream a block that is not available, then the connection will be closed without sending any items
* for the pending block.
*/
public void closeAtBlockBoundary() {
logger.info("{} Connection will be closed at the next block boundary", this);
closeAtNextBlockBoundary.set(true);
}

@Override
public String toString() {
return "[" + connectionId + "/" + blockNodeConfig.address() + ":" + blockNodeConfig.port() + "/"
Expand Down Expand Up @@ -941,6 +969,7 @@
private int itemIndex = 0;
private BlockState block;
private long lastSendTimeMillis = -1;
private final AtomicInteger requestCtr = new AtomicInteger(1);

@Override
public void run() {
Expand Down Expand Up @@ -977,6 +1006,15 @@

if (block == null) {
// The block we want to stream is not available
if (closeAtNextBlockBoundary.get()) {
// The flag to indicate that we should close the connection at a block boundary is set to true
// since no block is available to stream, we are at a safe "boundary" and can close the connection
logger.info(
"{} Block boundary reached; closing connection (no block available)",
BlockNodeConnection.this);
closeConnection(EndStream.Code.RESET);
}

return;
}

Expand All @@ -986,6 +1024,11 @@
if (itemIndex == 0) {
logger.trace(
"{} Starting to process items for block {}", BlockNodeConnection.this, block.blockNumber());
if (lastSendTimeMillis == -1) {
// if we've never sent a request and this is the first time we are processing a block, update
// the last send time to the current time. this will avoid prematurely sending a request
lastSendTimeMillis = System.currentTimeMillis();
}
}

final int itemSize = item.protobufSize();
Expand All @@ -1011,8 +1054,7 @@
itemIndex,
newRequestBytes,
MAX_BYTES_PER_REQUEST);
endTheStreamWith(EndStream.Code.ERROR);
blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this);
closeConnection(EndStream.Code.ERROR);
break;
}
} else {
Expand All @@ -1027,14 +1069,52 @@
// There are pending items to send. Check if enough time has elapsed since the last request was sent.
// If so, send the current pending request.
final long diffMillis = System.currentTimeMillis() - lastSendTimeMillis;
if (diffMillis >= maxRequestDelayMillis()) {
final long maxDelayMillis = maxRequestDelayMillis();
if (diffMillis >= maxDelayMillis) {
logger.trace(
"{} Max delay exceeded (target: {}ms, actual: {}ms) - sending {} item(s)",
BlockNodeConnection.this,
maxDelayMillis,
diffMillis,
pendingRequestItems.size());
sendPendingRequest();
}
}

if (pendingRequestItems.isEmpty() && block.isClosed() && block.itemCount() == itemIndex) {
// We've gathered all block items and have sent them to the block node. No additional work is needed
// for the current block so we can move to the next block.
maybeAdvanceBlock();
}

/**
* Checks if the current block has all of its items sent. If so, then the next block is loaded into the worker.
* Additionally, if there is a request to close the connection at a block boundary and the current block is
* finished, then this connection will be closed.
*/
private void maybeAdvanceBlock() {
final boolean finishedWithCurrentBlock = pendingRequestItems.isEmpty() // no more items ready to send
&& block.isClosed() // the block is closed, so no more items are expected
&& block.itemCount() == itemIndex; // we've exhausted all items in the block

if (!finishedWithCurrentBlock) {
return; // still more work to do
}

/*
We are now down with the current block and have two options.
1) We advance to the next block (normal case).
2) This connection has been marked for closure after we are finished processing the current block. If this
is true, then we will close this connection. This allows us to close the connection at a block boundary
instead of closing the connection mid-block.
*/

if (closeAtNextBlockBoundary.get()) {
// the connection manager wants us to gracefully stop this connection
logger.info(
"{} Block boundary reached; closing connection (finished sending block)",
BlockNodeConnection.this);
closeConnection(EndStream.Code.RESET);
} else {
// the connection manager hasn't informed us to close this connection, so we are now free to advance to
// the next block
final long nextBlockNumber = block.blockNumber() + 1;
if (streamingBlockNumber.compareAndSet(block.blockNumber(), nextBlockNumber)) {
logger.trace("{} Advancing to block {}", BlockNodeConnection.this, nextBlockNumber);
Expand All @@ -1047,6 +1127,16 @@
}
}

/**
* Attempts to send the specified end stream code to the block node, and then closes this connection.
*
* @param endCode the end stream code to attempt to send to the block node before closing
*/
private void closeConnection(final EndStream.Code endCode) {
endTheStreamWith(endCode);
blockNodeConnectionManager.connectionResetsTheStream(BlockNodeConnection.this);
}

/**
* Attempt to send the pending block items to the block node.
*
Expand All @@ -1059,6 +1149,39 @@
final PublishStreamRequest req =
PublishStreamRequest.newBuilder().blockItems(itemSet).build();

if (logger.isTraceEnabled()) {
// gather information about what type of items are in the request

int headerIndex = -1;
int itemStartIndex = -1;
int itemEndIndex = -1;
int proofIndex = -1;
for (int i = 0; i < pendingRequestItems.size(); ++i) {
final BlockItem item = pendingRequestItems.get(i);
switch (item.item().kind()) {
case BLOCK_HEADER -> headerIndex = i;
case BLOCK_PROOF -> proofIndex = i;
default -> {
if (itemStartIndex == -1) {
itemStartIndex = i;
}
itemEndIndex = Math.max(itemEndIndex, i);
}
}
}

logger.trace(
"{} Request details: block={}, request={}, items={}, headerIndex={}, otherItemsIndexRange=[{}, {}], proofIndex={}",
BlockNodeConnection.this,
block.blockNumber(),
requestCtr.get(),
pendingRequestItems.size(),
headerIndex,
itemStartIndex,
itemEndIndex,
proofIndex);
}

try {
if (sendRequest(req)) {
// record that we've sent the request
Expand All @@ -1067,6 +1190,7 @@
// clear the pending request data
pendingRequestBytes = BYTES_PADDING;
pendingRequestItems.clear();
requestCtr.incrementAndGet();
return true;
}
} catch (final UncheckedIOException e) {
Expand Down Expand Up @@ -1109,18 +1233,26 @@
}

// Swap blocks and reset
if (logger.isTraceEnabled()) {
final long oldBlock = block == null ? -1 : block.blockNumber();
logger.trace(
"{} Worker switching from block {} to block {}",
BlockNodeConnection.this,
oldBlock,
latestActiveBlockNumber);
}
final BlockState oldBlock = block;
block = blockBufferService.getBlockState(latestActiveBlockNumber);
pendingRequestBytes = BYTES_PADDING;
itemIndex = 0;
pendingRequestItems.clear();
requestCtr.set(1);

if (block == null) {
logger.trace(
"{} Wanted to switch from block {} to block {}, but it is not available",
BlockNodeConnection.this,
(oldBlock == null ? -1 : oldBlock.blockNumber()),
latestActiveBlockNumber);
} else {
logger.trace(
"{} Switched from block {} to block {}",
BlockNodeConnection.this,
(oldBlock == null ? -1 : oldBlock.blockNumber()),
latestActiveBlockNumber);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@
* @return whether there is only one block node configured
*/
public boolean isOnlyOneBlockNodeConfigured() {
int size;
final int size;
synchronized (availableBlockNodes) {
size = availableBlockNodes.size();
}
Expand Down Expand Up @@ -389,7 +389,7 @@
} catch (final Exception e) {
logger.error("{} Failed to schedule connection task for block node.", newConnection, e);
connections.remove(newConnection.getNodeConfig());
newConnection.close(true);
newConnection.closeAtBlockBoundary();

Check warning on line 392 in hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java

View check run for this annotation

Codecov / codecov/patch

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionManager.java#L392

Added line #L392 was not covered by tests
}
}

Expand Down Expand Up @@ -431,7 +431,7 @@
final Map.Entry<BlockNodeConfig, BlockNodeConnection> entry = it.next();
final BlockNodeConnection connection = entry.getValue();
try {
connection.close(true);
connection.closeAtBlockBoundary();
} catch (final RuntimeException e) {
logger.debug(
"{} Error while closing connection during connection manager shutdown. Ignoring.",
Expand Down Expand Up @@ -772,7 +772,7 @@
"{} Active connection has equal/higher priority. Ignoring candidate. Active: {}.",
connection,
activeConnection);
connection.close(true);
connection.closeAtBlockBoundary();
return;
}
}
Expand All @@ -798,7 +798,7 @@
// close the old active connection
try {
logger.debug("{} Closing current active connection {}.", connection, activeConnection);
activeConnection.close(true);
activeConnection.closeAtBlockBoundary();
} catch (final RuntimeException e) {
logger.info(
"Failed to shutdown current active connection {} (shutdown reason: another connection was elevated to active).",
Expand Down Expand Up @@ -860,7 +860,7 @@
// If rescheduling fails, close the connection and remove it from the connection map. A periodic task
// will handle checking if there are no longer any connections
connections.remove(connection.getNodeConfig());
connection.close(true);
connection.closeAtBlockBoundary();
}
}
}
Expand Down
Loading
Loading