Skip to content

Commit 008b99b

Browse files
feat(20242): Improve Reconnect orchestration (#21599)
Signed-off-by: mxtartaglia <[email protected]>
1 parent ba1f3df commit 008b99b

File tree

61 files changed

+3531
-2078
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+3531
-2078
lines changed

platform-sdk/consensus-gossip/src/main/java/module-info.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,5 @@
22
module org.hiero.consensus.gossip {
33
exports org.hiero.consensus.gossip;
44

5-
requires transitive org.hiero.consensus.model;
65
requires static transitive com.github.spotbugs.annotations;
76
}

platform-sdk/consensus-gossip/src/main/java/org/hiero/consensus/gossip/FallenBehindManager.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

platform-sdk/consensus-otter-tests/src/test/java/org/hiero/otter/fixtures/turtle/gossip/SimulatedGossipTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ void randomDataTest(final int networkSize) {
127127
mock(BindableInputWire.class),
128128
mock(BindableInputWire.class),
129129
mock(BindableInputWire.class),
130+
mock(BindableInputWire.class),
131+
mock(BindableInputWire.class),
130132
mock(StandardOutputWire.class));
131133
model.start();
132134
}

platform-sdk/consensus-otter-tests/src/testFixtures/java/org/hiero/otter/fixtures/Network.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.hedera.hapi.node.state.roster.Roster;
66
import com.swirlds.common.test.fixtures.WeightGenerator;
77
import com.swirlds.common.test.fixtures.WeightGenerators;
8+
import com.swirlds.platform.reconnect.FallenBehindStatus;
89
import edu.umd.cs.findbugs.annotations.NonNull;
910
import edu.umd.cs.findbugs.annotations.Nullable;
1011
import java.nio.file.Path;
@@ -453,7 +454,7 @@ default Partition createNetworkPartition(@NonNull final Node node0, @NonNull fin
453454
*
454455
* @param maybeBehindNode the node to check behind status for
455456
* @return {@code true} if the node is behind by node weight, {@code false} otherwise
456-
* @see com.swirlds.platform.gossip.shadowgraph.SyncFallenBehindStatus
457+
* @see FallenBehindStatus
457458
*/
458459
boolean nodeIsBehindByNodeWeight(@NonNull Node maybeBehindNode);
459460

@@ -478,7 +479,7 @@ default boolean nodeIsBehindByNodeCount(@NonNull Node maybeBehindNode) {
478479
* @param maybeBehindNode the node to check behind status for
479480
* @param otherNodes additional nodes to consider for the behind check (optional)
480481
* @return {@code true} if the node is behind by the specified fraction of peers, {@code false} otherwise
481-
* @see com.swirlds.platform.gossip.shadowgraph.SyncFallenBehindStatus
482+
* @see FallenBehindStatus
482483
*/
483484
boolean nodesAreBehindByNodeCount(@NonNull Node maybeBehindNode, @Nullable Node... otherNodes);
484485

platform-sdk/consensus-otter-tests/src/testFixtures/java/org/hiero/otter/fixtures/internal/AbstractNetwork.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import com.swirlds.common.test.fixtures.WeightGenerators;
1919
import com.swirlds.common.utility.Threshold;
2020
import com.swirlds.platform.crypto.CryptoStatic;
21-
import com.swirlds.platform.gossip.shadowgraph.SyncFallenBehindStatus;
21+
import com.swirlds.platform.reconnect.FallenBehindStatus;
2222
import edu.umd.cs.findbugs.annotations.NonNull;
2323
import edu.umd.cs.findbugs.annotations.Nullable;
2424
import java.nio.file.Path;
@@ -824,8 +824,8 @@ public boolean nodeIsBehindByNodeWeight(@NonNull final Node maybeBehindNode) {
824824
maybeAheadNode.newConsensusResult().getLatestEventWindow();
825825

826826
// If any peer in the required list says the "self" node is not behind, the node is not behind.
827-
if (SyncFallenBehindStatus.getStatus(selfEventWindow, peerEventWindow)
828-
!= SyncFallenBehindStatus.SELF_FALLEN_BEHIND) {
827+
if (FallenBehindStatus.getStatus(selfEventWindow, peerEventWindow)
828+
!= FallenBehindStatus.SELF_FALLEN_BEHIND) {
829829
weightOfAheadNodes += maybeAheadNode.weight();
830830
}
831831
}
@@ -861,8 +861,8 @@ public boolean nodesAreBehindByNodeCount(
861861
- 5)); // add buffer to account for unpropagated event windows
862862

863863
// If any peer in the required list says the "self" node is behind, it is ahead so add it to the count
864-
if (SyncFallenBehindStatus.getStatus(selfEventWindow, peerEventWindowWithBuffer)
865-
== SyncFallenBehindStatus.SELF_FALLEN_BEHIND) {
864+
if (FallenBehindStatus.getStatus(selfEventWindow, peerEventWindowWithBuffer)
865+
== FallenBehindStatus.SELF_FALLEN_BEHIND) {
866866
numNodesAhead++;
867867
}
868868
}

platform-sdk/consensus-otter-tests/src/testFixtures/java/org/hiero/otter/fixtures/turtle/gossip/SimulatedGossip.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public void bind(
7474
@NonNull final BindableInputWire<NoInput, Void> startInput,
7575
@NonNull final BindableInputWire<NoInput, Void> stopInput,
7676
@NonNull final BindableInputWire<NoInput, Void> clearInput,
77+
@NonNull final BindableInputWire<NoInput, Void> pauseInput,
78+
@NonNull final BindableInputWire<NoInput, Void> resumeInput,
7779
@NonNull final BindableInputWire<Duration, Void> systemHealthInput,
7880
@NonNull final BindableInputWire<PlatformStatus, Void> platformStatusInput,
7981
@NonNull final StandardOutputWire<Double> syncLagOutput) {
@@ -86,6 +88,8 @@ public void bind(
8688
startInput.bindConsumer(ignored -> {});
8789
stopInput.bindConsumer(ignored -> {});
8890
clearInput.bindConsumer(ignored -> {});
91+
pauseInput.bindConsumer(ignored -> {});
92+
resumeInput.bindConsumer(ignored -> {});
8993
systemHealthInput.bindConsumer(ignored -> {});
9094
platformStatusInput.bindConsumer(ignored -> {});
9195
}

platform-sdk/consensus-otter-tests/src/testOtter/java/org/hiero/otter/test/ReconnectTest.java

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import static org.hiero.otter.fixtures.assertions.StatusProgressionStep.target;
1414

1515
import com.swirlds.common.merkle.synchronization.config.ReconnectConfig_;
16+
import com.swirlds.logging.legacy.payload.ReconnectStartPayload;
1617
import com.swirlds.platform.consensus.ConsensusConfig_;
1718
import com.swirlds.platform.wiring.PlatformSchedulersConfig_;
1819
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -31,7 +32,6 @@
3132
import org.hiero.otter.fixtures.result.MultipleNodePlatformStatusResults;
3233
import org.hiero.otter.fixtures.result.SingleNodePlatformStatusResult;
3334
import org.hiero.otter.fixtures.result.SubscriberAction;
34-
import org.junit.jupiter.api.Disabled;
3535

3636
/**
3737
* Tests the reconnect functionality of a node that has fallen behind in the consensus rounds. The test ensures that the
@@ -42,6 +42,8 @@ public class ReconnectTest {
4242
/** Reducing the number of rounds non-expired will allow nodes to require a reconnect faster. */
4343
private static final long ROUNDS_EXPIRED = 100L;
4444

45+
public static final Duration BEHIND_WAIT_TIME = Duration.ofSeconds(240L);
46+
4547
/**
4648
* Tests that a node which is killed, kept down until it is behind, and then restarted is able to reconnect to the
4749
* network and catch up with consensus.
@@ -249,7 +251,6 @@ private void disableSyntheticBottleneck(@NonNull final Node... nodesToThrottle)
249251
* @param env the test environment
250252
*/
251253
@OtterTest(requires = Capability.RECONNECT)
252-
@Disabled
253254
void testReconnectSucceedsAfterFailure(@NonNull final TestEnvironment env) {
254255
final Network network = env.network();
255256
final TimeManager timeManager = env.timeManager();
@@ -263,7 +264,9 @@ void testReconnectSucceedsAfterFailure(@NonNull final TestEnvironment env) {
263264
PlatformSchedulersConfig_.TRANSACTION_HANDLER,
264265
"SEQUENTIAL_THREAD CAPACITY(100) FLUSHABLE SQUELCHABLE")
265266
.withConfigValue(ConsensusConfig_.ROUNDS_EXPIRED, ROUNDS_EXPIRED)
266-
.withConfigValue(ReconnectConfig_.ASYNC_STREAM_TIMEOUT, Duration.ofSeconds(1));
267+
.withConfigValue(ReconnectConfig_.ASYNC_STREAM_TIMEOUT, Duration.ofSeconds(1))
268+
.withConfigValue(ReconnectConfig_.MAXIMUM_RECONNECT_FAILURES_BEFORE_SHUTDOWN, 2)
269+
.withConfigValue(ReconnectConfig_.MINIMUM_TIME_BETWEEN_RECONNECTS, Duration.ofMillis(10));
267270

268271
network.start();
269272

@@ -280,7 +283,7 @@ void testReconnectSucceedsAfterFailure(@NonNull final TestEnvironment env) {
280283
enableSyntheticBottleneck(Duration.ofMinutes(10), nodeToReconnect);
281284
timeManager.waitForCondition(
282285
nodeToReconnect::isBehind,
283-
Duration.ofSeconds(120L),
286+
BEHIND_WAIT_TIME,
284287
"Node did not enter BEHIND status within the expected time "
285288
+ "frame after synthetic bottleneck was enabled");
286289

@@ -308,7 +311,6 @@ void testReconnectSucceedsAfterFailure(@NonNull final TestEnvironment env) {
308311
* @param env the test environment
309312
*/
310313
@OtterTest(requires = {Capability.RECONNECT, Capability.SINGLE_NODE_JVM_SHUTDOWN})
311-
@Disabled
312314
void testNodeShutsDownAfterMaxFailedReconnects(@NonNull final TestEnvironment env) {
313315
final Network network = env.network();
314316
final TimeManager timeManager = env.timeManager();
@@ -325,24 +327,29 @@ void testNodeShutsDownAfterMaxFailedReconnects(@NonNull final TestEnvironment en
325327
"SEQUENTIAL_THREAD CAPACITY(100) FLUSHABLE SQUELCHABLE")
326328
.withConfigValue(ConsensusConfig_.ROUNDS_EXPIRED, ROUNDS_EXPIRED)
327329
.withConfigValue(ReconnectConfig_.ASYNC_STREAM_TIMEOUT, Duration.ofSeconds(1))
328-
.withConfigValue(ReconnectConfig_.MAXIMUM_RECONNECT_FAILURES_BEFORE_SHUTDOWN, maxFailedReconnects);
330+
.withConfigValue(ReconnectConfig_.MAXIMUM_RECONNECT_FAILURES_BEFORE_SHUTDOWN, maxFailedReconnects)
331+
.withConfigValue(ReconnectConfig_.MINIMUM_TIME_BETWEEN_RECONNECTS, Duration.ofMillis(10));
329332

330333
network.start();
331334

332335
final Node nodeToReconnect = network.nodes().getLast();
333336

334-
nodeToReconnect.newLogResult().subscribe(logEntry -> {
335-
if (logEntry.message().contains("Starting reconnect in role of the receiver.")) {
336-
network.isolate(nodeToReconnect);
337-
return SubscriberAction.UNSUBSCRIBE;
337+
network.newReconnectResults().subscribe(notification -> {
338+
if (notification.payload().getClass().equals(ReconnectStartPayload.class)) {
339+
final ReconnectStartPayload payload = (ReconnectStartPayload) notification.payload();
340+
final Node node = network.nodes().stream()
341+
.filter(n -> n.selfId().id() == payload.getOtherNodeId())
342+
.findFirst()
343+
.orElse(nodeToReconnect);
344+
network.isolate(node);
338345
}
339346
return SubscriberAction.CONTINUE;
340347
});
341348

342349
enableSyntheticBottleneck(Duration.ofMinutes(10), nodeToReconnect);
343350
timeManager.waitForCondition(
344351
nodeToReconnect::isBehind,
345-
Duration.ofSeconds(120L),
352+
BEHIND_WAIT_TIME,
346353
"Node did not enter BEHIND status within the expected time "
347354
+ "frame after synthetic bottleneck was enabled");
348355

@@ -359,4 +366,59 @@ void testNodeShutsDownAfterMaxFailedReconnects(@NonNull final TestEnvironment en
359366
Duration.ofMinutes(2L),
360367
"Node did not shut down within the expected time frame after exceeding the maximum number of failed reconnects.");
361368
}
369+
370+
@OtterTest(requires = {Capability.RECONNECT, Capability.SINGLE_NODE_JVM_SHUTDOWN})
371+
void testIsolateNodeWhileReconnectingAndRestore(@NonNull final TestEnvironment env) {
372+
final Network network = env.network();
373+
final TimeManager timeManager = env.timeManager();
374+
375+
network.addNodes(5);
376+
377+
final int maxFailedReconnects = 3;
378+
379+
// For this test to work, we need to lower the limit for the transaction handler component
380+
// With the new limit set, once the transaction handler has 100 pending transactions, the node will stop
381+
// gossipping and stop creating events. This will cause the node to go into the checking state.
382+
network.withConfigValue(
383+
PlatformSchedulersConfig_.TRANSACTION_HANDLER,
384+
"SEQUENTIAL_THREAD CAPACITY(100) FLUSHABLE SQUELCHABLE")
385+
.withConfigValue(ConsensusConfig_.ROUNDS_EXPIRED, ROUNDS_EXPIRED)
386+
.withConfigValue(ReconnectConfig_.ASYNC_STREAM_TIMEOUT, Duration.ofSeconds(1))
387+
.withConfigValue(ReconnectConfig_.MAXIMUM_RECONNECT_FAILURES_BEFORE_SHUTDOWN, maxFailedReconnects)
388+
.withConfigValue(ReconnectConfig_.MINIMUM_TIME_BETWEEN_RECONNECTS, Duration.ofMillis(10));
389+
390+
network.start();
391+
392+
final Node nodeToReconnect = network.nodes().getLast();
393+
394+
network.newReconnectResults().subscribe(notification -> {
395+
if (notification.payload().getClass().equals(ReconnectStartPayload.class)) {
396+
network.isolate(nodeToReconnect);
397+
return SubscriberAction.UNSUBSCRIBE;
398+
}
399+
return SubscriberAction.CONTINUE;
400+
});
401+
402+
enableSyntheticBottleneck(Duration.ofMinutes(10), nodeToReconnect);
403+
timeManager.waitForCondition(
404+
nodeToReconnect::isBehind,
405+
BEHIND_WAIT_TIME,
406+
"Node did not enter BEHIND status within the expected time "
407+
+ "frame after synthetic bottleneck was enabled");
408+
409+
network.setBandwidthForAllConnections(nodeToReconnect, BandwidthLimit.ofKilobytesPerSecond(1));
410+
411+
disableSyntheticBottleneck(nodeToReconnect);
412+
413+
timeManager.waitForCondition(
414+
() -> nodeToReconnect.newReconnectResult().numFailedReconnects() > 0,
415+
Duration.ofMinutes(2L),
416+
"Node did not record the expected number of failed reconnect attempts within the expected time frame.");
417+
timeManager.waitFor(Duration.ofSeconds(30));
418+
network.restoreConnectivity();
419+
timeManager.waitForCondition(
420+
nodeToReconnect::isActive,
421+
Duration.ofMinutes(2L),
422+
"Node did not become ACTIVE within the expected time frame after restoring normal connectivity");
423+
}
362424
}

platform-sdk/swirlds-logging/src/main/java/com/swirlds/logging/legacy/payload/ReconnectFinishPayload.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ public class ReconnectFinishPayload extends AbstractLogPayload {
77
private long nodeId;
88
private long otherNodeId;
99
private long round;
10-
private boolean success;
1110

1211
public ReconnectFinishPayload() {}
1312

@@ -68,12 +67,4 @@ public long getRound() {
6867
public void setRound(long round) {
6968
this.round = round;
7069
}
71-
72-
public boolean isSuccess() {
73-
return success;
74-
}
75-
76-
public void setSuccess(final boolean success) {
77-
this.success = success;
78-
}
7970
}

0 commit comments

Comments
 (0)