Skip to content

Resend Replication Requests After a Timeout #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 41 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
4dfa2f2
initial boilerplate for timeout
samliok Apr 10, 2025
a986248
implement timeout handler methods
samliok Apr 10, 2025
54d0e16
add boilerplate timeout code in replication
samliok Apr 14, 2025
d8de1ca
go fmt
samliok Apr 14, 2025
9473941
add removal of tasks when recieved
samliok Apr 14, 2025
4334459
working with all old existing tests
samliok Apr 14, 2025
d5a9e6d
add simple test for timeouts
samliok Apr 14, 2025
58080fd
race condition fix
samliok Apr 14, 2025
cb8cdf6
run tasks in the same go-routine
samliok Apr 14, 2025
3dd0ea9
tests for replication timeout
samliok Apr 14, 2025
e6f8a7b
release lock before handling task
samliok Apr 14, 2025
f1b04c5
add logging to handler
samliok Apr 14, 2025
1b23f15
fmt
samliok Apr 14, 2025
2d17bfa
nits from self review
samliok Apr 14, 2025
320c117
license headers and rename
samliok Apr 15, 2025
80ee1aa
another rename to deadline
samliok Apr 15, 2025
b1c0ec1
move tick into seperate go routine
samliok Apr 15, 2025
0176cb7
fix bug when setting timeouthandler
samliok Apr 15, 2025
824ca24
initial split the replication requests, tests needed
samliok Apr 15, 2025
6da0069
add table tests for FindTask
samliok Apr 15, 2025
4a3d8eb
comments
samliok Apr 15, 2025
aaa96db
fix delete
samliok Apr 16, 2025
f5dee35
add test for incomplete responses
samliok Apr 16, 2025
6037499
nits and cleanup
samliok Apr 16, 2025
865c97c
add comment
samliok Apr 16, 2025
3b8bac9
add non blocking reads from tick
samliok Apr 16, 2025
746ffe2
buff channel
samliok Apr 16, 2025
d2614fc
typos + nits + logging from reivew
samliok Apr 21, 2025
f9aa002
remove goto label
samliok Apr 21, 2025
2d0b25a
remove peep func
samliok Apr 21, 2025
44330ba
close handler at the end of tests
samliok Apr 21, 2025
027652f
add lock to find task
samliok Apr 21, 2025
b2ff67d
separate into helper
samliok Apr 22, 2025
0579e43
check task order
samliok Apr 22, 2025
a5c297d
set buffered channel back to one
samliok Apr 22, 2025
ba57ca8
Merge branch 'main' into timeout
samliok Apr 22, 2025
6c54a8e
trigger new block after test
samliok Apr 22, 2025
f4f6825
typos and log
samliok Apr 24, 2025
8d72f71
add single execution check to timeout
samliok Apr 24, 2025
e6cea38
fix race condition
samliok Apr 24, 2025
07c075f
Merge branch 'main' into timeout
samliok Apr 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
DefaultMaxRoundWindow = 10
DefaultMaxPendingBlocks = 20

DefaultMaxProposalWaitTime = 5 * time.Second
DefaultMaxProposalWaitTime = 5 * time.Second
DefaultReplicationRequestTimeout = 5 * time.Second
)

type EmptyVoteSet struct {
Expand Down Expand Up @@ -105,6 +106,7 @@ func NewEpoch(conf EpochConfig) (*Epoch, error) {
// AdvanceTime hints the engine that the given amount of time has passed.
func (e *Epoch) AdvanceTime(t time.Time) {
e.monitor.AdvanceTime(t)
e.replicationState.AdvanceTime(t)
}

// HandleMessage notifies the engine about a reception of a message.
Expand Down Expand Up @@ -173,7 +175,7 @@ func (e *Epoch) init() error {
e.maxPendingBlocks = DefaultMaxPendingBlocks
e.eligibleNodeIDs = make(map[string]struct{}, len(e.nodes))
e.futureMessages = make(messagesFromNode, len(e.nodes))
e.replicationState = NewReplicationState(e.Logger, e.Comm, e.ID, e.maxRoundWindow, e.ReplicationEnabled)
e.replicationState = NewReplicationState(e.Logger, e.Comm, e.ID, e.maxRoundWindow, e.ReplicationEnabled, e.StartTime)

for _, node := range e.nodes {
e.futureMessages[string(node)] = make(map[uint64]*messagesForRound)
Expand Down Expand Up @@ -2351,6 +2353,8 @@ func (e *Epoch) handleReplicationResponse(resp *ReplicationResponse, from NodeID
return nil
}

e.replicationState.receivedReplicationResponse(resp.Data, from)

return e.processReplicationState()
}

Expand Down
1 change: 1 addition & 0 deletions epoch_multinode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func (tw *testWAL) containsEmptyNotarization(round uint64) bool {

// messageFilter defines a function that filters
// certain messages from being sent or broadcasted.
// a message filter should return true if the message is allowed to be sent
type messageFilter func(*Message, NodeID) bool

// allowAllMessages allows every message to be sent
Expand Down
7 changes: 4 additions & 3 deletions monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
package simplex

import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func TestMonitorDoubleClose(t *testing.T) {
Expand Down
103 changes: 97 additions & 6 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package simplex
import (
"fmt"
"math"
"slices"
"time"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -55,19 +57,26 @@ type ReplicationState struct {

// request iterator
requestIterator int

timeoutHandler *TimeoutHandler
}

func NewReplicationState(logger Logger, comm Communication, id NodeID, maxRoundWindow uint64, enabled bool) *ReplicationState {
func NewReplicationState(logger Logger, comm Communication, id NodeID, maxRoundWindow uint64, enabled bool, start time.Time) *ReplicationState {
return &ReplicationState{
logger: logger,
enabled: enabled,
comm: comm,
id: id,
maxRoundWindow: maxRoundWindow,
receivedQuorumRounds: make(map[uint64]QuorumRound),
timeoutHandler: NewTimeoutHandler(logger, start, comm.ListNodes()),
}
}

func (r *ReplicationState) AdvanceTime(now time.Time) {
r.timeoutHandler.Tick(now)
}

// isReplicationComplete returns true if we have finished the replication process.
// The process is considered finished once [currentRound] has caught up to the highest round received.
func (r *ReplicationState) isReplicationComplete(nextSeqToCommit uint64, currentRound uint64) bool {
Expand Down Expand Up @@ -111,6 +120,7 @@ func (r *ReplicationState) sendReplicationRequests(start uint64, end uint64) {
numSeqs := end + 1 - start
seqsPerNode := numSeqs / uint64(numNodes)

r.logger.Debug("Distributing replication requests", zap.Uint64("start", start), zap.Uint64("end", end), zap.Stringer("nodes", NodeIDs(nodes)))
// Distribute sequences evenly among nodes in round-robin fashion
for i := range numNodes {
nodeIndex := (r.requestIterator + i) % numNodes
Expand All @@ -122,16 +132,20 @@ func (r *ReplicationState) sendReplicationRequests(start uint64, end uint64) {
nodeEnd = end
}

r.sendRequestToNode(nodeStart, nodeEnd, nodes[nodeIndex])
r.sendRequestToNode(nodeStart, nodeEnd, nodes, nodeIndex)
}

r.lastSequenceRequested = end
// next time we send requests, we start with a different permutation
r.requestIterator++
}

func (r *ReplicationState) sendRequestToNode(start uint64, end uint64, node NodeID) {
// sendRequestToNode requests the sequences [start, end] from nodes[index].
// In case the nodes[index] does not respond, we create a timeout that will
// re-send the request.
func (r *ReplicationState) sendRequestToNode(start uint64, end uint64, nodes []NodeID, index int) {
r.logger.Debug("Requesting missing finalization certificates ",
zap.Stringer("from", node),
zap.Stringer("from", nodes[index]),
zap.Uint64("start", start),
zap.Uint64("end", end))
seqs := make([]uint64, (end+1)-start)
Expand All @@ -144,8 +158,83 @@ func (r *ReplicationState) sendRequestToNode(start uint64, end uint64, node Node
}
msg := &Message{ReplicationRequest: request}

r.lastSequenceRequested = end
r.comm.SendMessage(msg, node)
task := r.createReplicationTimeoutTask(start, end, nodes, index)

r.timeoutHandler.AddTask(task)

r.comm.SendMessage(msg, nodes[index])
}

func (r *ReplicationState) createReplicationTimeoutTask(start, end uint64, nodes []NodeID, index int) *TimeoutTask {
taskFunc := func() {
r.sendRequestToNode(start, end, nodes, (index+1)%len(nodes))
}
timeoutTask := &TimeoutTask{
Start: start,
End: end,
NodeID: nodes[index],
TaskID: getTimeoutID(start, end),
Task: taskFunc,
Deadline: r.timeoutHandler.GetTime().Add(DefaultReplicationRequestTimeout),
}

return timeoutTask
}

// receivedReplicationResponse notifies the task handler a response was received. If the response
// was incomplete(meaning our timeout expected more seqs), then we will create a new timeout
// for the missing sequences and send the request to a different node.
func (r *ReplicationState) receivedReplicationResponse(data []QuorumRound, node NodeID) {
seqs := make([]uint64, 0, len(data))

for _, qr := range data {
seqs = append(seqs, qr.GetSequence())
}

slices.Sort(seqs)

task := r.timeoutHandler.FindTask(node, seqs)
if task == nil {
r.logger.Debug("Could not find a timeout task associated with the replication response", zap.Stringer("from", node))
return
}
r.timeoutHandler.RemoveTask(node, task.TaskID)

// we found the timeout, now make sure all seqs were returned
missing := findMissingNumbersInRange(task.Start, task.End, seqs)
if len(missing) == 0 {
return
}

// if not all sequences were returned, create new timeouts
r.logger.Debug("Received missing sequences in the replication response", zap.Stringer("from", node), zap.Any("missing", missing))
nodes := r.highestSequenceObserved.signers.Remove(r.id)
numNodes := len(nodes)
segments := CompressSequences(missing)
for i, seqs := range segments {
index := i % numNodes
newTask := r.createReplicationTimeoutTask(seqs.Start, seqs.End, nodes, index)
r.timeoutHandler.AddTask(newTask)
}
}

// findMissingNumbersInRange finds numbers in an array constructed by [start...end] that are not in [nums]
// ex. (3, 10, [1,2,3,4,5,6]) -> [7,8,9,10]
func findMissingNumbersInRange(start, end uint64, nums []uint64) []uint64 {
numMap := make(map[uint64]struct{})
for _, num := range nums {
numMap[num] = struct{}{}
}

var result []uint64

for i := start; i <= end; i++ {
if _, exists := numMap[i]; !exists {
result = append(result, i)
}
}

return result
}

func (r *ReplicationState) replicateBlocks(fCert *FinalizationCertificate, nextSeqToCommit uint64) {
Expand Down Expand Up @@ -197,6 +286,8 @@ func (r *ReplicationState) StoreQuorumRound(round QuorumRound) {

r.highestSequenceObserved = signedSeq
}

r.logger.Debug("Stored quorum round ", zap.Stringer("qr", &round))
r.receivedQuorumRounds[round.GetRound()] = round
}

Expand Down
2 changes: 1 addition & 1 deletion replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func testReplication(t *testing.T, startSeq uint64, nodes []simplex.NodeID) {
bb := newTestControlledBlockBuilder(t)
net := newInMemNetwork(t, nodes)

// initiate a network with 4 nodes. one node is behind by 8 blocks
// initiate a network with 4 nodes. one node is behind by startSeq blocks
storageData := createBlocks(t, nodes, &bb.testBlockBuilder, startSeq)
testEpochConfig := &testNodeConfig{
initialStorage: storageData,
Expand Down
Loading