Skip to content

Commit f72ed64

Browse files
petrezeNeeharika-SompalliJivkoKelchevmxtartaglia-slmhess-swl
authored
feat: end the stream with TOO_FAR_BEHIND if the block is missing from the buffer (#21912)
Signed-off-by: Neeharika-Sompalli <[email protected]> Signed-off-by: Petar Tonev <[email protected]> Signed-off-by: Zhivko Kelchev <[email protected]> Signed-off-by: mxtartaglia <[email protected]> Signed-off-by: Matt Hess <[email protected]> Signed-off-by: Anthony Petrov <[email protected]> Signed-off-by: Jendrik Johannes <[email protected]> Signed-off-by: Derek Riley <[email protected]> Signed-off-by: Tim Farber-Newman <[email protected]> Signed-off-by: Michael Heinrichs <[email protected]> Signed-off-by: Michael Tinker <[email protected]> Signed-off-by: Artur Biesiadowski <[email protected]> Signed-off-by: Josh Marinacci <[email protected]> Signed-off-by: Roger Barker <[email protected]> Signed-off-by: ibankov <[email protected]> Signed-off-by: Kelly Greco <[email protected]> Signed-off-by: Eva <[email protected]> Co-authored-by: Neeharika Sompalli <[email protected]> Co-authored-by: Zhivko Kelchev <[email protected]> Co-authored-by: Maxi Tartaglia <[email protected]> Co-authored-by: Matt Hess <[email protected]> Co-authored-by: anthony-swirldslabs <[email protected]> Co-authored-by: Jendrik Johannes <[email protected]> Co-authored-by: Derek Riley <[email protected]> Co-authored-by: Tim Farber-Newman <[email protected]> Co-authored-by: Michael Heinrichs <[email protected]> Co-authored-by: Michael Tinker <[email protected]> Co-authored-by: Artur Biesiadowski <[email protected]> Co-authored-by: Josh Marinacci <[email protected]> Co-authored-by: Roger Barker <[email protected]> Co-authored-by: Ivan Bankov <[email protected]> Co-authored-by: Kelly Greco <[email protected]> Co-authored-by: Evdokia-Georgieva <[email protected]>
1 parent 6833ec1 commit f72ed64

File tree

5 files changed

+61
-8
lines changed

5 files changed

+61
-8
lines changed

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockBufferService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ public void openBlock(final long blockNumber) {
334334
// Create a new block state
335335
final BlockState blockState = new BlockState(blockNumber);
336336
blockBuffer.put(blockNumber, blockState);
337-
// update the earliest block number if this is first block or lower than current earliest
337+
// update the earliest block number if this is the first block or lower than current earliest
338338
earliestBlockNumber.updateAndGet(
339339
current -> current == Long.MIN_VALUE ? blockNumber : Math.min(current, blockNumber));
340340
lastProducedBlockNumber.updateAndGet(old -> Math.max(old, blockNumber));
@@ -592,7 +592,7 @@ block period (e.g. 2 seconds). This gives us an ideal number of blocks in the bu
592592
if (block.blockNumber() > highestBlockAcked) {
593593
++numPendingAck;
594594
}
595-
// Keep track of earliest remaining block
595+
// Keep track of the earliest and the latest remaining blocks
596596
newEarliestBlock = Math.min(newEarliestBlock, blockNum);
597597
newLatestBlock = Math.max(newLatestBlock, blockNum);
598598
}
@@ -602,13 +602,13 @@ block period (e.g. 2 seconds). This gives us an ideal number of blocks in the bu
602602
it.remove();
603603
++numPruned;
604604
} else {
605-
// keep track of earliest remaining block
605+
// Keep track of the earliest and the latest remaining blocks
606606
newEarliestBlock = Math.min(newEarliestBlock, blockNum);
607607
newLatestBlock = Math.max(newLatestBlock, blockNum);
608608
}
609609
} else {
610610
++numPendingAck;
611-
// keep track of earliest remaining block
611+
// Keep track of the earliest and the latest remaining blocks
612612
newEarliestBlock = Math.min(newEarliestBlock, blockNum);
613613
newLatestBlock = Math.max(newLatestBlock, blockNum);
614614
}

hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnection.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
package com.hedera.node.app.blocks.impl.streaming;
33

44
import static java.util.Objects.requireNonNull;
5+
import static org.hiero.block.api.PublishStreamRequest.EndStream.Code.ERROR;
56
import static org.hiero.block.api.PublishStreamRequest.EndStream.Code.RESET;
67
import static org.hiero.block.api.PublishStreamRequest.EndStream.Code.TIMEOUT;
78
import static org.hiero.block.api.PublishStreamRequest.EndStream.Code.TOO_FAR_BEHIND;
@@ -612,7 +613,13 @@ private void handleResendBlock(@NonNull final ResendBlock resendBlock) {
612613
+ "on this consensus node. Closing connection and will retry later.",
613614
this,
614615
resendBlockNumber);
615-
closeAndReschedule(THIRTY_SECONDS, true);
616+
617+
if (resendBlockNumber < blockBufferService.getEarliestAvailableBlockNumber()) {
618+
// Indicate that the block node should catch up from another trustworthy block node
619+
endStreamAndReschedule(TOO_FAR_BEHIND);
620+
} else if (resendBlockNumber > blockBufferService.getLastBlockNumberProduced()) {
621+
endStreamAndReschedule(ERROR);
622+
}
616623
}
617624
}
618625

hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeCommunicationTestBase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,15 @@ protected static PublishStreamRequest createRequest(final EndStream.Code endCode
8181
return PublishStreamRequest.newBuilder().endStream(endStream).build();
8282
}
8383

84+
@NonNull
85+
protected static PublishStreamRequest createRequest(final EndStream.Code endCode, final long earliestBlockNumber) {
86+
final EndStream endStream = EndStream.newBuilder()
87+
.endCode(endCode)
88+
.earliestBlockNumber(earliestBlockNumber)
89+
.build();
90+
return PublishStreamRequest.newBuilder().endStream(endStream).build();
91+
}
92+
8493
@NonNull
8594
protected static PublishStreamRequest createRequest(final long blockNumber) {
8695
final BlockEnd endOfBlock =

hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/streaming/BlockNodeConnectionTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,13 +592,14 @@ void testOnNext_resendBlock_blockExists() {
592592
}
593593

594594
@Test
595-
void testOnNext_resendBlock_blockDoesNotExist() {
595+
void testOnNext_resendBlock_blockDoesNotExist_TooFarBehind() {
596596
openConnectionAndResetMocks();
597597

598598
final AtomicLong streamingBlockNumber = streamingBlockNumber();
599599
streamingBlockNumber.set(11); // pretend we are currently streaming block 11
600600
final PublishStreamResponse response = createResendBlock(10L);
601601
when(bufferService.getBlockState(10L)).thenReturn(null);
602+
when(bufferService.getEarliestAvailableBlockNumber()).thenReturn(11L);
602603
connection.updateConnectionState(ConnectionState.ACTIVE);
603604

604605
connection.onNext(response);
@@ -607,9 +608,46 @@ void testOnNext_resendBlock_blockDoesNotExist() {
607608
verify(metrics).recordResponseReceived(ResponseOneOfType.RESEND_BLOCK);
608609
verify(metrics).recordConnectionClosed();
609610
verify(metrics).recordActiveConnectionIp(-1L);
611+
verify(metrics).recordRequestLatency(anyLong());
612+
verify(metrics).recordRequestEndStreamSent(EndStream.Code.TOO_FAR_BEHIND);
613+
verify(requestPipeline).onNext(createRequest(EndStream.Code.TOO_FAR_BEHIND, 11L));
610614
verify(requestPipeline).onComplete();
611615
verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true);
616+
verify(connectionManager).notifyConnectionClosed(connection);
612617
verify(bufferService).getBlockState(10L);
618+
verify(bufferService, times(2)).getEarliestAvailableBlockNumber();
619+
verify(bufferService).getHighestAckedBlockNumber();
620+
verifyNoMoreInteractions(metrics);
621+
verifyNoMoreInteractions(requestPipeline);
622+
verifyNoMoreInteractions(connectionManager);
623+
verifyNoMoreInteractions(bufferService);
624+
}
625+
626+
@Test
627+
void testOnNext_resendBlock_blockDoesNotExist_Error() {
628+
openConnectionAndResetMocks();
629+
630+
final AtomicLong streamingBlockNumber = streamingBlockNumber();
631+
streamingBlockNumber.set(11); // pretend we are currently streaming block 11
632+
final PublishStreamResponse response = createResendBlock(10L);
633+
when(bufferService.getBlockState(10L)).thenReturn(null);
634+
connection.updateConnectionState(ConnectionState.ACTIVE);
635+
636+
connection.onNext(response);
637+
638+
verify(metrics).recordLatestBlockResendBlock(10L);
639+
verify(metrics).recordResponseReceived(ResponseOneOfType.RESEND_BLOCK);
640+
verify(metrics).recordConnectionClosed();
641+
verify(metrics).recordActiveConnectionIp(-1L);
642+
verify(metrics).recordRequestLatency(anyLong());
643+
verify(metrics).recordRequestEndStreamSent(EndStream.Code.ERROR);
644+
verify(requestPipeline).onNext(createRequest(EndStream.Code.ERROR));
645+
verify(requestPipeline).onComplete();
646+
verify(connectionManager).rescheduleConnection(connection, Duration.ofSeconds(30), null, true);
647+
verify(bufferService).getBlockState(10L);
648+
verify(bufferService, times(2)).getEarliestAvailableBlockNumber();
649+
verify(bufferService).getLastBlockNumberProduced();
650+
verify(bufferService).getHighestAckedBlockNumber();
613651
verifyNoMoreInteractions(metrics);
614652
verifyNoMoreInteractions(requestPipeline);
615653
verifyNoMoreInteractions(bufferService);

hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/blocknode/BlockNodeSuite.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -919,8 +919,7 @@ final Stream<DynamicTest> testCNReactionToPublishStreamResponses() {
919919
portNumbers.getFirst()),
920920
String.format(
921921
"/localhost:%s/ACTIVE] Block node requested a ResendBlock for block 9223372036854775807 but that block does not exist on this consensus node. Closing connection and will retry later",
922-
portNumbers.getFirst()))),
923-
assertBlockNodeCommsLogDoesNotContain(byNodeId(0), "ERROR", Duration.ofSeconds(5)));
922+
portNumbers.getFirst()))));
924923
}
925924

926925
@NotNull

0 commit comments

Comments
 (0)