Skip to content

Commit b4da150

Browse files
committed
raft: consolidate all append message sending
Signed-off-by: Pavel Kalinnikov <[email protected]>
1 parent 8d90676 commit b4da150

7 files changed

+135
-120
lines changed

raft.go

+83-70
Original file line numberDiff line numberDiff line change
@@ -588,54 +588,60 @@ func (r *raft) send(m pb.Message) {
588588
}
589589
}
590590

591-
// sendAppend sends an append RPC with new entries (if any) and the
592-
// current commit index to the given peer.
593-
func (r *raft) sendAppend(to uint64) {
594-
r.maybeSendAppend(to, true)
595-
}
596-
597-
// maybeSendAppend sends an append RPC with new entries to the given peer,
598-
// if necessary. Returns true if a message was sent. The sendIfEmpty
599-
// argument controls whether messages with no entries will be sent
600-
// ("empty" messages are useful to convey updated Commit indexes, but
601-
// are undesirable when we're sending multiple messages in a batch).
591+
// sendAppend sends an append RPC with new entries to the given peer, if
592+
// necessary. Returns true if a message was sent.
602593
//
603-
// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress
604-
// struct contains all the state necessary for deciding whether to send a
605-
// message.
606-
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
607-
pr := r.trk.Progress[to]
608-
if pr.IsPaused() {
594+
// This may send an empty append message (with no entries) if replication to
595+
// this follower is throttled, or there are no new entries but the commit index
596+
// for the follower can be bumped.
597+
func (r *raft) sendAppend(to uint64, pr *tracker.Progress) bool {
598+
if pr.State == tracker.StateProbe {
599+
return !pr.MsgAppFlowPaused && r.maybeSendAppend(to, pr)
600+
} else if pr.State != tracker.StateReplicate {
609601
return false
610-
}
602+
} // only StateReplicate below
603+
604+
// If there are any pending entries and the inflight tracking is not
605+
// saturated, send a regular append message (or snapshot).
606+
if pr.Next <= r.raftLog.lastIndex() && !pr.Inflights.Full() {
607+
return r.maybeSendAppend(to, pr)
608+
}
609+
// NB: the commit index is periodically sent in the heartbeat messages, so
610+
// technically we don't need the CanBumpCommit clause here to guarantee commit
611+
// index convergence on the follower. However, sending it via MsgApp here
612+
// allows faster (no heartbeat interval delay) convergence in some cases.
613+
if pr.CanBumpCommit(r.raftLog.committed) {
614+
return r.maybeSendEmptyAppend(to, pr)
615+
}
616+
// In a throttled StateReplicate, send an empty append message if we haven't
617+
// done so recently.
618+
//
619+
// We must send periodic appends so that eventually the follower either
620+
// accepts or rejects it. If we don't do so, replication can stall if all the
621+
// in-flight appends are lost/dropped.
622+
return !pr.MsgAppFlowPaused && pr.Match < r.raftLog.lastIndex() &&
623+
r.maybeSendEmptyAppend(to, pr)
624+
}
611625

626+
// maybeSendAppend sends a non-empty append message to the given follower. It
627+
// may send a snapshot instead if the required section of the log is no longer
628+
// available in this leader's log. Returns true if a message was sent.
629+
func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool {
630+
// TODO(pav-kv): when pr.Next is updated, we always know the term of entry
631+
// pr.Next-1, because the previous append message contains it. We should store
632+
// (Next-1, Term) in Progress, instead of just Next. Then we don't have to
633+
// fetch the term here, and may avoid an unnecessary snapshot.
612634
prevIndex := pr.Next - 1
613635
prevTerm, err := r.raftLog.term(prevIndex)
614636
if err != nil {
615637
// The log probably got truncated at >= pr.Next, so we can't catch up the
616638
// follower log anymore. Send a snapshot instead.
617639
return r.maybeSendSnapshot(to, pr)
618640
}
619-
620-
var ents []pb.Entry
621-
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
622-
// Otherwise, if we had a full Inflights and all inflight messages were in
623-
// fact dropped, replication to that follower would stall. Instead, an empty
624-
// MsgApp will eventually reach the follower (heartbeats responses prompt the
625-
// leader to send an append), allowing it to be acked or rejected, both of
626-
// which will clear out Inflights.
627-
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
628-
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
629-
}
630-
if len(ents) == 0 && !sendIfEmpty {
631-
return false
632-
}
633-
// TODO(pav-kv): move this check up to where err is returned.
641+
ents, err := r.raftLog.entries(pr.Next, r.maxMsgSize)
634642
if err != nil { // send a snapshot if we failed to get the entries
635643
return r.maybeSendSnapshot(to, pr)
636644
}
637-
638-
// Send the actual MsgApp otherwise, and update the progress accordingly.
639645
r.send(pb.Message{
640646
To: to,
641647
Type: pb.MsgApp,
@@ -649,6 +655,29 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
649655
return true
650656
}
651657

658+
func (r *raft) maybeSendEmptyAppend(to uint64, pr *tracker.Progress) bool {
659+
// TODO(pav-kv): when pr.Next is updated, we always know the term of entry
660+
// pr.Next-1, because the append message contains it. Store (Next-1, Term) in
661+
// Progress, instead of just Next. Then we don't have to fetch the term and
662+
// send a potentially unnecessary snapshot here.
663+
prevTerm, err := r.raftLog.term(pr.Next - 1)
664+
if err != nil {
665+
// The log probably got truncated at >= pr.Next, so we can't catch up the
666+
// follower log anymore. Send a snapshot instead.
667+
return r.maybeSendSnapshot(to, pr)
668+
}
669+
r.send(pb.Message{
670+
To: to,
671+
Type: pb.MsgApp,
672+
Index: pr.Next - 1,
673+
LogTerm: prevTerm,
674+
Commit: r.raftLog.committed,
675+
})
676+
pr.SentEntries(0, 0)
677+
pr.SentCommit(r.raftLog.committed)
678+
return true
679+
}
680+
652681
// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
653682
// node. Returns true iff the snapshot message has been emitted successfully.
654683
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
@@ -700,11 +729,11 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
700729
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
701730
// according to the progress recorded in r.trk.
702731
func (r *raft) bcastAppend() {
703-
r.trk.Visit(func(id uint64, _ *tracker.Progress) {
732+
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
704733
if id == r.id {
705734
return
706735
}
707-
r.sendAppend(id)
736+
r.sendAppend(id, pr)
708737
})
709738
}
710739

@@ -1482,7 +1511,7 @@ func stepLeader(r *raft, m pb.Message) error {
14821511
if pr.State == tracker.StateReplicate {
14831512
pr.BecomeProbe()
14841513
}
1485-
r.sendAppend(m.From)
1514+
r.sendAppend(m.From, pr)
14861515
}
14871516
} else {
14881517
// We want to update our tracking if the response updates our
@@ -1521,21 +1550,13 @@ func stepLeader(r *raft, m pb.Message) error {
15211550
// to respond to pending read index requests
15221551
releasePendingReadIndexMessages(r)
15231552
r.bcastAppend()
1524-
} else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) {
1525-
// This node may be missing the latest commit index, so send it.
1526-
// NB: this is not strictly necessary because the periodic heartbeat
1527-
// messages deliver commit indices too. However, a message sent now
1528-
// may arrive earlier than the next heartbeat fires.
1529-
r.sendAppend(m.From)
15301553
}
1531-
// We've updated flow control information above, which may
1532-
// allow us to send multiple (size-limited) in-flight messages
1533-
// at once (such as when transitioning from probe to
1534-
// replicate, or when freeTo() covers multiple messages). If
1535-
// we have more entries to send, send as many messages as we
1536-
// can (without sending empty messages for the commit index)
1554+
// We've updated flow control information above, which may allow us to
1555+
// send multiple (size-limited) in-flight messages at once (such as when
1556+
// transitioning from StateProbe to StateReplicate). Send as many
1557+
// messages as we can.
15371558
if r.id != m.From {
1538-
for r.maybeSendAppend(m.From, false /* sendIfEmpty */) {
1559+
for r.sendAppend(m.From, pr) {
15391560
}
15401561
}
15411562
// Transfer leadership is in progress.
@@ -1562,9 +1583,7 @@ func stepLeader(r *raft, m pb.Message) error {
15621583
// Note that StateSnapshot typically satisfies pr.Match < lastIndex, but
15631584
// `pr.Paused()` is always true for StateSnapshot, so sendAppend is a
15641585
// no-op.
1565-
if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe {
1566-
r.sendAppend(m.From)
1567-
}
1586+
r.sendAppend(m.From, pr)
15681587

15691588
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
15701589
return nil
@@ -1634,7 +1653,8 @@ func stepLeader(r *raft, m pb.Message) error {
16341653
r.sendTimeoutNow(leadTransferee)
16351654
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
16361655
} else {
1637-
r.sendAppend(leadTransferee)
1656+
pr.MsgAppFlowPaused = false // force a MsgApp even if paused
1657+
r.sendAppend(leadTransferee, pr)
16381658
}
16391659
}
16401660
return nil
@@ -1985,21 +2005,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co
19852005
return cs
19862006
}
19872007

1988-
if r.maybeCommit() {
1989-
// If the configuration change means that more entries are committed now,
1990-
// broadcast/append to everyone in the updated config.
1991-
r.bcastAppend()
1992-
} else {
1993-
// Otherwise, still probe the newly added replicas; there's no reason to
1994-
// let them wait out a heartbeat interval (or the next incoming
1995-
// proposal).
1996-
r.trk.Visit(func(id uint64, pr *tracker.Progress) {
1997-
if id == r.id {
1998-
return
1999-
}
2000-
r.maybeSendAppend(id, false /* sendIfEmpty */)
2001-
})
2002-
}
2008+
// If the configuration change means that more entries are committed now,
2009+
// broadcast/append to everyone in the updated config.
2010+
//
2011+
// Otherwise, still probe the newly added replicas; there's no reason to let
2012+
// them wait out a heartbeat interval (or the next incoming proposal).
2013+
r.maybeCommit()
2014+
r.bcastAppend()
2015+
20032016
// If the leadTransferee was removed or demoted, abort the leadership transfer.
20042017
if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
20052018
r.abortLeaderTransfer()

raft_paper_test.go

+11-16
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ import (
3333
"testing"
3434

3535
pb "go.etcd.io/raft/v3/raftpb"
36+
37+
"github.com/stretchr/testify/assert"
38+
"github.com/stretchr/testify/require"
3639
)
3740

3841
func TestFollowerUpdateTermFromMessage(t *testing.T) {
@@ -448,25 +451,17 @@ func TestLeaderCommitEntry(t *testing.T) {
448451
r.Step(acceptAndReply(m))
449452
}
450453

451-
if g := r.raftLog.committed; g != li+1 {
452-
t.Errorf("committed = %d, want %d", g, li+1)
453-
}
454-
wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
455-
if g := r.raftLog.nextCommittedEnts(true); !reflect.DeepEqual(g, wents) {
456-
t.Errorf("nextCommittedEnts = %+v, want %+v", g, wents)
457-
}
454+
require.Equal(t, li+1, r.raftLog.committed)
455+
require.Equal(t, []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}},
456+
r.raftLog.nextCommittedEnts(true))
457+
458458
msgs := r.readMessages()
459+
require.Len(t, msgs, 2)
459460
sort.Sort(messageSlice(msgs))
460461
for i, m := range msgs {
461-
if w := uint64(i + 2); m.To != w {
462-
t.Errorf("to = %x, want %x", m.To, w)
463-
}
464-
if m.Type != pb.MsgApp {
465-
t.Errorf("type = %v, want %v", m.Type, pb.MsgApp)
466-
}
467-
if m.Commit != li+1 {
468-
t.Errorf("commit = %d, want %d", m.Commit, li+1)
469-
}
462+
assert.Equal(t, pb.MsgApp, m.Type)
463+
assert.Equal(t, uint64(i+2), m.To)
464+
assert.Equal(t, li+1, m.Commit)
470465
}
471466
}
472467

raft_test.go

+29-25
Original file line numberDiff line numberDiff line change
@@ -128,22 +128,24 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
128128
r.becomeCandidate()
129129
r.becomeLeader()
130130

131-
r.trk.Progress[2].MsgAppFlowPaused = true
131+
pr := r.trk.Progress[2]
132+
pr.MsgAppFlowPaused = true
132133

133134
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
134-
if !r.trk.Progress[2].MsgAppFlowPaused {
135-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
136-
}
135+
require.True(t, pr.MsgAppFlowPaused)
136+
pr.BecomeReplicate()
137+
require.False(t, pr.MsgAppFlowPaused)
138+
pr.MsgAppFlowPaused = true
139+
_ = r.readMessages()
137140

138-
r.trk.Progress[2].BecomeReplicate()
139-
if r.trk.Progress[2].MsgAppFlowPaused {
140-
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused)
141-
}
142-
r.trk.Progress[2].MsgAppFlowPaused = true
143141
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
144-
if r.trk.Progress[2].MsgAppFlowPaused {
145-
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused)
146-
}
142+
msgs := r.readMessages()
143+
require.Len(t, msgs, 1)
144+
require.Equal(t, pb.MsgApp, msgs[0].Type)
145+
require.Equal(t, uint64(2), msgs[0].To)
146+
require.Len(t, msgs[0].Entries, 1)
147+
148+
require.True(t, pr.MsgAppFlowPaused)
147149
}
148150

149151
func TestProgressPaused(t *testing.T) {
@@ -2764,7 +2766,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27642766
r.becomeCandidate()
27652767
r.becomeLeader()
27662768
r.readMessages()
2767-
r.trk.Progress[2].BecomeProbe()
2769+
pr := r.trk.Progress[2]
2770+
pr.BecomeProbe()
27682771

27692772
// each round is a heartbeat
27702773
for i := 0; i < 3; i++ {
@@ -2773,7 +2776,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27732776
// loop. After that, the follower is paused until a heartbeat response is
27742777
// received.
27752778
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2776-
r.sendAppend(2)
2779+
r.sendAppend(2, pr)
27772780
msg := r.readMessages()
27782781
if len(msg) != 1 {
27792782
t.Errorf("len(msg) = %d, want %d", len(msg), 1)
@@ -2788,7 +2791,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27882791
}
27892792
for j := 0; j < 10; j++ {
27902793
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2791-
r.sendAppend(2)
2794+
r.sendAppend(2, pr)
27922795
if l := len(r.readMessages()); l != 0 {
27932796
t.Errorf("len(msg) = %d, want %d", l, 0)
27942797
}
@@ -2831,11 +2834,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
28312834
r.becomeCandidate()
28322835
r.becomeLeader()
28332836
r.readMessages()
2834-
r.trk.Progress[2].BecomeReplicate()
2837+
pr := r.trk.Progress[2]
2838+
pr.BecomeReplicate()
28352839

28362840
for i := 0; i < 10; i++ {
28372841
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2838-
r.sendAppend(2)
2842+
r.sendAppend(2, pr)
28392843
msgs := r.readMessages()
28402844
if len(msgs) != 1 {
28412845
t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
@@ -2848,11 +2852,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
28482852
r.becomeCandidate()
28492853
r.becomeLeader()
28502854
r.readMessages()
2851-
r.trk.Progress[2].BecomeSnapshot(10)
2855+
pr := r.trk.Progress[2]
2856+
pr.BecomeSnapshot(10)
28522857

28532858
for i := 0; i < 10; i++ {
28542859
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2855-
r.sendAppend(2)
2860+
r.sendAppend(2, pr)
28562861
msgs := r.readMessages()
28572862
if len(msgs) != 0 {
28582863
t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
@@ -3678,9 +3683,7 @@ func TestLeaderTransferToSlowFollower(t *testing.T) {
36783683

36793684
nt.recover()
36803685
lead := nt.peers[1].(*raft)
3681-
if lead.trk.Progress[3].Match != 1 {
3682-
t.Fatalf("node 1 has match %x for node 3, want %x", lead.trk.Progress[3].Match, 1)
3683-
}
3686+
require.Equal(t, uint64(1), lead.trk.Progress[3].Match)
36843687

36853688
// Transfer leadership to 3 when node 3 is lack of log.
36863689
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
@@ -4671,16 +4674,17 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) {
46714674
r1.becomeCandidate()
46724675
r1.becomeLeader()
46734676
r1.readMessages()
4674-
r1.trk.Progress[2].BecomeReplicate()
4677+
pr := r1.trk.Progress[2]
4678+
pr.BecomeReplicate()
46754679

46764680
r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
46774681

46784682
// r1 sends 2 MsgApp messages to r2.
46794683
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
4680-
r1.sendAppend(2)
4684+
r1.sendAppend(2, pr)
46814685
req1 := expectOneMessage(t, r1)
46824686
mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")})
4683-
r1.sendAppend(2)
4687+
r1.sendAppend(2, pr)
46844688
req2 := expectOneMessage(t, r1)
46854689

46864690
// r2 receives the second MsgApp first due to reordering.

0 commit comments

Comments
 (0)