Skip to content

Commit 9dbaf80

Browse files
committed
tracker: track in-flight commit index
Signed-off-by: Pavel Kalinnikov <[email protected]>
1 parent 604f2c7 commit 9dbaf80

File tree

4 files changed

+37
-25
lines changed

4 files changed

+37
-25
lines changed

raft.go

+15-9
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,10 @@ func (r *raft) sendAppend(to uint64) {
599599
// argument controls whether messages with no entries will be sent
600600
// ("empty" messages are useful to convey updated Commit indexes, but
601601
// are undesirable when we're sending multiple messages in a batch).
602+
//
603+
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
604+
// struct contains all the state necessary for deciding whether to send a
605+
// message.
602606
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
603607
pr := r.trk.Progress[to]
604608
if pr.IsPaused() {
@@ -641,6 +645,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
641645
Commit: r.raftLog.committed,
642646
})
643647
pr.SentEntries(len(ents), uint64(payloadsSize(ents)))
648+
pr.SentCommit(r.raftLog.committed)
644649
return true
645650
}
646651

@@ -675,21 +680,21 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
675680

676681
// sendHeartbeat sends a heartbeat RPC to the given peer.
677682
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
683+
pr := r.trk.Progress[to]
678684
// Attach the commit as min(to.matched, r.committed).
679685
// When the leader sends out heartbeat message,
680686
// the receiver(follower) might not be matched with the leader
681687
// or it might not have all the committed entries.
682688
// The leader MUST NOT forward the follower's commit to
683689
// an unmatched index.
684-
commit := min(r.trk.Progress[to].Match, r.raftLog.committed)
685-
m := pb.Message{
690+
commit := min(pr.Match, r.raftLog.committed)
691+
r.send(pb.Message{
686692
To: to,
687693
Type: pb.MsgHeartbeat,
688694
Commit: commit,
689695
Context: ctx,
690-
}
691-
692-
r.send(m)
696+
})
697+
pr.SentCommit(commit)
693698
}
694699

695700
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
@@ -1480,7 +1485,6 @@ func stepLeader(r *raft, m pb.Message) error {
14801485
r.sendAppend(m.From)
14811486
}
14821487
} else {
1483-
oldPaused := pr.IsPaused()
14841488
// We want to update our tracking if the response updates our
14851489
// matched index or if the response can move a probing peer back
14861490
// into StateReplicate (see heartbeat_rep_recovers_from_probing.txt
@@ -1517,9 +1521,11 @@ func stepLeader(r *raft, m pb.Message) error {
15171521
// to respond to pending read index requests
15181522
releasePendingReadIndexMessages(r)
15191523
r.bcastAppend()
1520-
} else if oldPaused {
1521-
// If we were paused before, this node may be missing the
1522-
// latest commit index, so send it.
1524+
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
1525+
// This node may be missing the latest commit index, so send it.
1526+
// NB: this is not strictly necessary because the periodic heartbeat
1527+
// messages deliver commit indices too. However, a message sent now
1528+
// may arrive earlier than the next heartbeat fires.
15231529
r.sendAppend(m.From)
15241530
}
15251531
// We've updated flow control information above, which may

testdata/confchange_v2_replace_leader.txt

-4
Original file line numberDiff line numberDiff line change
@@ -282,12 +282,10 @@ stabilize
282282
CommittedEntries:
283283
2/5 EntryNormal ""
284284
Messages:
285-
4->1 MsgApp Term:2 Log:2/5 Commit:4
286285
4->1 MsgApp Term:2 Log:2/5 Commit:5
287286
4->2 MsgApp Term:2 Log:2/5 Commit:5
288287
4->3 MsgApp Term:2 Log:2/5 Commit:5
289288
> 1 receiving messages
290-
4->1 MsgApp Term:2 Log:2/5 Commit:4
291289
4->1 MsgApp Term:2 Log:2/5 Commit:5
292290
> 2 receiving messages
293291
4->2 MsgApp Term:2 Log:2/5 Commit:5
@@ -300,7 +298,6 @@ stabilize
300298
2/5 EntryNormal ""
301299
Messages:
302300
1->4 MsgAppResp Term:2 Log:0/5
303-
1->4 MsgAppResp Term:2 Log:0/5
304301
> 2 handling Ready
305302
Ready MustSync=false:
306303
HardState Term:2 Vote:4 Commit:5
@@ -316,7 +313,6 @@ stabilize
316313
Messages:
317314
3->4 MsgAppResp Term:2 Log:0/5
318315
> 4 receiving messages
319-
1->4 MsgAppResp Term:2 Log:0/5
320316
1->4 MsgAppResp Term:2 Log:0/5
321317
2->4 MsgAppResp Term:2 Log:0/5
322318
3->4 MsgAppResp Term:2 Log:0/5

testdata/probe_and_replicate.txt

-12
Original file line numberDiff line numberDiff line change
@@ -507,18 +507,6 @@ stabilize 1 2
507507
2->1 MsgAppResp Term:8 Log:0/21
508508
> 1 receiving messages
509509
2->1 MsgAppResp Term:8 Log:0/21
510-
> 1 handling Ready
511-
Ready MustSync=false:
512-
Messages:
513-
1->2 MsgApp Term:8 Log:8/21 Commit:18
514-
> 2 receiving messages
515-
1->2 MsgApp Term:8 Log:8/21 Commit:18
516-
> 2 handling Ready
517-
Ready MustSync=false:
518-
Messages:
519-
2->1 MsgAppResp Term:8 Log:0/21
520-
> 1 receiving messages
521-
2->1 MsgAppResp Term:8 Log:0/21
522510

523511
stabilize 1 3
524512
----

tracker/progress.go

+22
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ type Progress struct {
3636
// Invariant: 0 <= Match < Next.
3737
Next uint64
3838

39+
// pendingCommit is the highest commit index in flight to the follower.
40+
//
41+
// Invariant: 0 <= pendingCommit < Next.
42+
pendingCommit uint64
43+
3944
// State defines how the leader should interact with the follower.
4045
//
4146
// When in StateProbe, leader sends at most one replication message
@@ -126,13 +131,15 @@ func (pr *Progress) BecomeProbe() {
126131
} else {
127132
pr.ResetState(StateProbe)
128133
pr.Next = pr.Match + 1
134+
pr.pendingCommit = min(pr.pendingCommit, pr.Match)
129135
}
130136
}
131137

132138
// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
133139
func (pr *Progress) BecomeReplicate() {
134140
pr.ResetState(StateReplicate)
135141
pr.Next = pr.Match + 1
142+
pr.pendingCommit = min(pr.pendingCommit, pr.Match)
136143
}
137144

138145
// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
@@ -169,6 +176,19 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) {
169176
}
170177
}
171178

179+
// CanBumpCommit returns true if sending the given commit index can potentially
180+
// advance the follower's commit index.
181+
func (pr *Progress) CanBumpCommit(index uint64) bool {
182+
return pr.pendingCommit < min(index, pr.Next-1)
183+
}
184+
185+
// SentCommit updates the pendingCommit.
186+
func (pr *Progress) SentCommit(commit uint64) {
187+
if commit > pr.pendingCommit {
188+
pr.pendingCommit = min(commit, pr.Next-1)
189+
}
190+
}
191+
172192
// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
173193
// index acked by it. The method returns false if the given n index comes from
174194
// an outdated message. Otherwise it updates the progress and returns true.
@@ -205,6 +225,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
205225
//
206226
// TODO(tbg): why not use matchHint if it's larger?
207227
pr.Next = pr.Match + 1
228+
pr.pendingCommit = min(pr.pendingCommit, pr.Match)
208229
return true
209230
}
210231

@@ -217,6 +238,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
217238

218239
// Next index shall always be larger than match index.
219240
pr.Next = max(min(rejected, matchHint+1), pr.Match+1)
241+
pr.pendingCommit = min(pr.pendingCommit, pr.Next-1)
220242
pr.MsgAppFlowPaused = false
221243
return true
222244
}

0 commit comments

Comments
 (0)