Skip to content

Commit cec977d

Browse files
committed
raft: expose max message size in SendAppend
Epic: none Release note: none
1 parent 766480c commit cec977d

File tree

2 files changed

+10
-13
lines changed

2 files changed

+10
-13
lines changed

pkg/raft/raft.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -606,12 +606,12 @@ func (r *raft) send(m pb.Message) {
606606
// if the follower log and commit index are up-to-date, the flow is paused (for
607607
// reasons like in-flight limits), or the message could not be constructed.
608608
func (r *raft) maybeSendAppend(to uint64) bool {
609-
return r.maybeSendAppendImpl(to, r.enableLazyAppends)
609+
return r.maybeSendAppendImpl(to, r.maxMsgSize, r.enableLazyAppends)
610610
}
611611

612612
// maybeSendAppendImpl is the same as maybeSendAppend, but it supports the lazy
613613
// mode for StateReplicate, in which appends are not sent.
614-
func (r *raft) maybeSendAppendImpl(to uint64, lazy bool) bool {
614+
func (r *raft) maybeSendAppendImpl(to uint64, maxSize entryEncodingSize, lazy bool) bool {
615615
pr := r.trk.Progress[to]
616616
last, commit := r.raftLog.lastIndex(), r.raftLog.committed
617617
msgAppType := pr.ShouldSendMsgApp(last, commit)
@@ -632,7 +632,7 @@ func (r *raft) maybeSendAppendImpl(to uint64, lazy bool) bool {
632632

633633
var entries []pb.Entry
634634
if msgAppType == tracker.MsgAppWithEntries {
635-
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
635+
if entries, err = r.raftLog.entries(pr.Next, min(maxSize, r.maxMsgSize)); err != nil {
636636
// Send a snapshot if we failed to get the entries.
637637
return r.maybeSendSnapshot(to, pr)
638638
}

pkg/raft/rawnode.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -125,25 +125,22 @@ func (rn *RawNode) Ready() Ready {
125125
return rd
126126
}
127127

128-
// SendAppend a log replication messages to a particular node. The message
128+
// SendAppend sends a log replication messages to a particular node. The message
129129
// will be available via next Ready.
130130
//
131131
// This method is used when Config.EnableLazyAppends is true. Typically, it
132132
// should be called before processing Ready, for all node IDs in StateReplicate
133133
// which have pending replication work.
134134
//
135-
// TODO(pav-kv): this should take and propagate maxBytes down the stack, for
136-
// integration with Admission Control. AC will be calling this method under two
137-
// conditions: the "send queue" is not empty (see EntriesReady), and there are
138-
// some spare send tokens (hence the maxBytes argument).
139-
//
140-
// TODO(pav-kv): as an intermediate step, we should pull Progress.Inflights out
141-
// of internals. This today plays the role of the send tokens tracker.
142-
func (rn *RawNode) SendAppend(to uint64) bool {
135+
// TODO(pav-kv): integrate with Admission Control, which will be calling this
136+
// under two conditions: the node's Next <= raftLog.lastIndex (i.e. the "send
137+
// queue" is not empty), and and there are some spare send tokens (hence the
138+
// maxSize parameter).
139+
func (rn *RawNode) SendAppend(to uint64, maxSize uint64) bool {
143140
if !rn.raft.enableLazyAppends || to == rn.raft.id {
144141
return false
145142
}
146-
return rn.raft.maybeSendAppendImpl(to, false /* lazy */)
143+
return rn.raft.maybeSendAppendImpl(to, entryEncodingSize(maxSize), false /* lazy */)
147144
}
148145

149146
// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there

0 commit comments

Comments
 (0)