Skip to content

raft: support lazy appends #125002

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,14 @@ type Config struct {
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s. See Little's law to understand the maths behind.
MaxInflightBytes uint64
// EnableLazyAppends makes raft hold off constructing log append messages in
// response to Step() calls, for the StateReplicate followers. The messages
// can be triggered via a separate RawNode.SendAppend method.
//
// This way, the application has better control when raft may call Storage
// methods and allocate memory for entries and messages. This provides flow
// control for leader->follower log replication streams.
EnableLazyAppends bool

// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
Expand Down Expand Up @@ -342,6 +350,10 @@ type raft struct {
// Messages in this list have the type MsgAppResp, MsgVoteResp, or
// MsgPreVoteResp. See the comment in raft.send for details.
msgsAfterAppend []pb.Message
// enableLazyAppends delays append message construction and sending until the
// RawNode is explicitly requested to do so. This provides control over the
// follower replication flows.
enableLazyAppends bool

// the leader id
lead uint64
Expand Down Expand Up @@ -425,6 +437,7 @@ func newRaft(c *Config) *raft {
maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize),
trk: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes),
enableLazyAppends: c.EnableLazyAppends,
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
Expand Down Expand Up @@ -593,10 +606,19 @@ func (r *raft) send(m pb.Message) {
// if the follower log and commit index are up-to-date, the flow is paused (for
// reasons like in-flight limits), or the message could not be constructed.
func (r *raft) maybeSendAppend(to uint64) bool {
pr := r.trk.Progress[to]
return r.maybeSendAppendImpl(to, r.enableLazyAppends)
}

// maybeSendAppendImpl is the same as maybeSendAppend, but it supports the lazy
// mode for StateReplicate, in which appends are not sent.
func (r *raft) maybeSendAppendImpl(to uint64, lazy bool) bool {
pr := r.trk.Progress[to]
last, commit := r.raftLog.lastIndex(), r.raftLog.committed
if !pr.ShouldSendMsgApp(last, commit) {
msgAppType := pr.ShouldSendMsgApp(last, commit)
if msgAppType == tracker.MsgAppNone {
return false
}
if lazy && pr.State == tracker.StateReplicate && msgAppType == tracker.MsgAppWithEntries {
return false
}

Expand All @@ -609,7 +631,7 @@ func (r *raft) maybeSendAppend(to uint64) bool {
}

var entries []pb.Entry
if pr.CanSendEntries(last) {
if msgAppType == tracker.MsgAppWithEntries {
if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil {
// Send a snapshot if we failed to get the entries.
return r.maybeSendSnapshot(to, pr)
Expand Down
32 changes: 32 additions & 0 deletions pkg/raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,38 @@ func (rn *RawNode) Ready() Ready {
return rd
}

// SendAppend sends a log replication (MsgApp) message to a particular node. The
// message will be available via next Ready.
//
// It may also send a snapshot (MsgSnap) instead, if the log can't be read (e.g.
// it was already compacted at the follower's Next index).
//
// The total size of log entries in the message is <= Config.MaxSizePerMsg, with
// the exception that the last entry can bring a size < maxSize to a size
// exceeding maxSize.
//
// This method is used when Config.EnableLazyAppends is true. Typically, it
// should be called before processing Ready, for all node IDs in StateReplicate
// which have pending replication work.
//
// TODO(pav-kv): deprecate Config.MsgSizePerMsg, and pass the maxSize in, for an
// accurate and flexible size control.
//
// TODO(pav-kv): integrate with Admission Control, which will be calling this
// under two conditions: the node's Next <= raftLog.lastIndex (i.e. the "send
// queue" is not empty), and there are some acquired "send tokens" (hence the
// need for maxSize parameter).
//
// TODO(pav-kv): since the caller is tracking the replication state, it knows
// Next, and might also have a good idea on the number and sizes of entries to
// send. Pass additional parameters, and do safety checks.
func (rn *RawNode) SendAppend(to uint64) bool {
if !rn.raft.enableLazyAppends || to == rn.raft.id {
return false
}
return rn.raft.maybeSendAppendImpl(to, false /* lazy */)
}

// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
// is no obligation that the Ready must be handled.
func (rn *RawNode) readyWithoutAccept() Ready {
Expand Down
50 changes: 30 additions & 20 deletions pkg/raft/tracker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,6 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) {
pr.MsgAppProbesPaused = true
}

// CanSendEntries returns true if the flow control state allows sending at least
// one log entry to this follower.
//
// Must be used with StateProbe or StateReplicate.
func (pr *Progress) CanSendEntries(lastIndex uint64) bool {
return pr.Next <= lastIndex && (pr.State == StateProbe || !pr.Inflights.Full())
}

// CanBumpCommit returns true if sending the given commit index can potentially
// advance the follower's commit index.
func (pr *Progress) CanBumpCommit(index uint64) bool {
Expand Down Expand Up @@ -274,6 +266,14 @@ func (pr *Progress) IsPaused() bool {
}
}

type MsgAppType uint8

const (
MsgAppNone MsgAppType = iota
MsgAppWithEntries
MsgAppProbe
)

// ShouldSendMsgApp returns true if the leader should send a MsgApp to the
// follower represented by this Progress. The given last and commit index of the
// leader log help determining if there is outstanding workload, and contribute
Expand All @@ -293,35 +293,45 @@ func (pr *Progress) IsPaused() bool {
// to guarantee that eventually the flow is either accepted or rejected.
//
// In StateSnapshot, we do not send append messages.
func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
func (pr *Progress) ShouldSendMsgApp(last, commit uint64) MsgAppType {
switch pr.State {
case StateProbe:
return !pr.MsgAppProbesPaused
switch {
case pr.MsgAppProbesPaused:
return MsgAppNone
case pr.Next <= last:
return MsgAppWithEntries
default:
return MsgAppProbe
}

case StateReplicate:
// If the in-flight limits are not saturated, and there are pending entries
// (Next <= lastIndex), send a MsgApp with some entries.
if pr.CanSendEntries(last) {
return true
}
switch {
// If there are pending entries, and the in-flight limits are not saturated,
// send a MsgApp with some entries.
case pr.Next <= last && !pr.Inflights.Full():
return MsgAppWithEntries
// We can't send any entries at this point, but we need to be sending a
// MsgApp periodically, to guarantee liveness of the MsgApp flow: the
// follower eventually will reply with an ack or reject.
//
// If the follower's log is outdated, and we haven't recently sent a MsgApp
// (according to the MsgAppProbesPaused flag), send one now. This is going
// to be an empty "probe" MsgApp.
if pr.Match < last && !pr.MsgAppProbesPaused {
return true
}
case pr.Match < last && !pr.MsgAppProbesPaused:
return MsgAppProbe
// Send an empty MsgApp containing the latest commit index if:
// - our commit index exceeds the in-flight commit index, and
// - sending it can commit at least one of the follower's entries
// (including the ones still in flight to it).
return pr.CanBumpCommit(commit)
case pr.CanBumpCommit(commit):
return MsgAppProbe
default:
return MsgAppNone
}

case StateSnapshot:
return false
return MsgAppNone
default:
panic("unexpected state")
}
Expand Down
Loading