Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit aacf619

Browse files
committedFeb 23, 2024
raft: pass Progress to maybeSendAppend
Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
1 parent 8ccd359 commit aacf619

File tree

2 files changed

+21
-19
lines changed

2 files changed

+21
-19
lines changed
 

‎raft.go

+7-9
Original file line numberDiff line numberDiff line change
@@ -596,9 +596,7 @@ func (r *raft) send(m pb.Message) {
596596
// argument controls whether messages with no entries will be sent
597597
// ("empty" messages are useful to convey updated Commit indexes, but
598598
// are undesirable when we're sending multiple messages in a batch).
599-
func (r *raft) maybeSendAppend(to uint64) bool {
600-
pr := r.trk.Progress[to]
601-
599+
func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
602600
last, commit := r.raftLog.lastIndex(), r.raftLog.committed
603601
if !pr.ShouldSendMsgApp(last, commit) {
604602
return false
@@ -685,11 +683,11 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
685683
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
686684
// according to the progress recorded in r.trk.
687685
func (r *raft) bcastAppend() {
688-
r.trk.Visit(func(id uint64, _ *tracker.Progress) {
686+
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
689687
if id == r.id {
690688
return
691689
}
692-
r.maybeSendAppend(id)
690+
r.maybeSendAppend(id, pr)
693691
})
694692
}
695693

@@ -1467,7 +1465,7 @@ func stepLeader(r *raft, m pb.Message) error {
14671465
if pr.State == tracker.StateReplicate {
14681466
pr.BecomeProbe()
14691467
}
1470-
r.maybeSendAppend(m.From)
1468+
r.maybeSendAppend(m.From, pr)
14711469
}
14721470
} else {
14731471
// We want to update our tracking if the response updates our
@@ -1512,7 +1510,7 @@ func stepLeader(r *raft, m pb.Message) error {
15121510
// transitioning from probe to replicate, or when freeTo() covers
15131511
// multiple messages). Send as many messages as we can.
15141512
if r.id != m.From {
1515-
for r.maybeSendAppend(m.From) {
1513+
for r.maybeSendAppend(m.From, pr) {
15161514
}
15171515
}
15181516
// Transfer leadership is in progress.
@@ -1525,7 +1523,7 @@ func stepLeader(r *raft, m pb.Message) error {
15251523
case pb.MsgHeartbeatResp:
15261524
pr.RecentActive = true
15271525
pr.MsgAppFlowPaused = false
1528-
r.maybeSendAppend(m.From)
1526+
r.maybeSendAppend(m.From, pr)
15291527

15301528
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
15311529
return nil
@@ -1596,7 +1594,7 @@ func stepLeader(r *raft, m pb.Message) error {
15961594
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
15971595
} else {
15981596
pr.MsgAppFlowPaused = false
1599-
r.maybeSendAppend(leadTransferee)
1597+
r.maybeSendAppend(leadTransferee, pr)
16001598
}
16011599
}
16021600
return nil

‎raft_test.go

+14-10
Original file line numberDiff line numberDiff line change
@@ -2764,7 +2764,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27642764
r.becomeCandidate()
27652765
r.becomeLeader()
27662766
r.readMessages()
2767-
r.trk.Progress[2].BecomeProbe()
2767+
pr2 := r.trk.Progress[2]
2768+
pr2.BecomeProbe()
27682769

27692770
// each round is a heartbeat
27702771
for i := 0; i < 3; i++ {
@@ -2773,7 +2774,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27732774
// loop. After that, the follower is paused until a heartbeat response is
27742775
// received.
27752776
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2776-
r.maybeSendAppend(2)
2777+
r.maybeSendAppend(2, pr2)
27772778
msg := r.readMessages()
27782779
if len(msg) != 1 {
27792780
t.Errorf("len(msg) = %d, want %d", len(msg), 1)
@@ -2788,7 +2789,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27882789
}
27892790
for j := 0; j < 10; j++ {
27902791
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2791-
r.maybeSendAppend(2)
2792+
r.maybeSendAppend(2, pr2)
27922793
if l := len(r.readMessages()); l != 0 {
27932794
t.Errorf("len(msg) = %d, want %d", l, 0)
27942795
}
@@ -2831,11 +2832,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
28312832
r.becomeCandidate()
28322833
r.becomeLeader()
28332834
r.readMessages()
2834-
r.trk.Progress[2].BecomeReplicate()
2835+
pr2 := r.trk.Progress[2]
2836+
pr2.BecomeReplicate()
28352837

28362838
for i := 0; i < 10; i++ {
28372839
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2838-
r.maybeSendAppend(2)
2840+
r.maybeSendAppend(2, pr2)
28392841
msgs := r.readMessages()
28402842
if len(msgs) != 1 {
28412843
t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
@@ -2848,11 +2850,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
28482850
r.becomeCandidate()
28492851
r.becomeLeader()
28502852
r.readMessages()
2851-
r.trk.Progress[2].BecomeSnapshot(10)
2853+
pr2 := r.trk.Progress[2]
2854+
pr2.BecomeSnapshot(10)
28522855

28532856
for i := 0; i < 10; i++ {
28542857
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2855-
r.maybeSendAppend(2)
2858+
r.maybeSendAppend(2, pr2)
28562859
msgs := r.readMessages()
28572860
if len(msgs) != 0 {
28582861
t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
@@ -4671,16 +4674,17 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) {
46714674
r1.becomeCandidate()
46724675
r1.becomeLeader()
46734676
r1.readMessages()
4674-
r1.trk.Progress[2].BecomeReplicate()
4677+
pr2 := r1.trk.Progress[2]
4678+
pr2.BecomeReplicate()
46754679

46764680
r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
46774681

46784682
// r1 sends 2 MsgApp messages to r2.
46794683
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
4680-
r1.maybeSendAppend(2)
4684+
r1.maybeSendAppend(2, pr2)
46814685
req1 := expectOneMessage(t, r1)
46824686
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
4683-
r1.maybeSendAppend(2)
4687+
r1.maybeSendAppend(2, pr2)
46844688
req2 := expectOneMessage(t, r1)
46854689

46864690
// r2 receives the second MsgApp first due to reordering.

0 commit comments

Comments
 (0)
Please sign in to comment.