Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: support lazy replication #131588

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@ type Config struct {
// TODO(#129411): deprecate !AsyncStorageWrites mode as it's not used in
// CRDB.
AsyncStorageWrites bool
// LazyReplication instructs raft to hold off constructing MsgApp messages
// eagerly in reaction to Step() calls.
//
// If LazyReplication is false, a MsgApp can be constructed any time it
// becomes possible, e.g. when a new entry is appended to the leader's log, or
// the in-flight volume to a peer drops below the max-inflight limits.
//
// If LazyReplication is true, MsgApp messages for StateReplicate peers are
// constructed on demand, when requested by the application layer via the
// RawNode.SendMsgApp method.
LazyReplication bool

// MaxSizePerMsg limits the max byte size of each append message. Smaller
// value lowers the raft recovery cost(initial probing and message lost
Expand Down Expand Up @@ -331,6 +342,7 @@ type raft struct {
trk tracker.ProgressTracker
electionTracker tracker.ElectionTracker
fortificationTracker tracker.FortificationTracker
lazyReplication bool

state StateType

Expand Down Expand Up @@ -448,6 +460,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: entryEncodingSize(c.MaxSizePerMsg),
maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize),
lazyReplication: c.LazyReplication,
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
Expand Down Expand Up @@ -671,7 +684,9 @@ func (r *raft) maybeSendAppend(to pb.PeerID) bool {
pr := r.trk.Progress(to)

last, commit := r.raftLog.lastIndex(), r.raftLog.committed
if !pr.ShouldSendMsgApp(last, commit, r.advanceCommitViaMsgAppOnly()) {
sendEntries := pr.ShouldSendEntries(last, r.lazyReplication)
sendProbe := !sendEntries && pr.ShouldSendProbe(last, commit, r.advanceCommitViaMsgAppOnly())
if !sendEntries && !sendProbe {
return false
}

Expand All @@ -683,7 +698,7 @@ func (r *raft) maybeSendAppend(to pb.PeerID) bool {
return r.maybeSendSnapshot(to, pr)
}
var entries []pb.Entry
if pr.CanSendEntries(last) {
if sendEntries {
if entries, err = r.raftLog.entries(prevIndex, r.maxMsgSize); err != nil {
// Send a snapshot if we failed to get the entries.
return r.maybeSendSnapshot(to, pr)
Expand Down
1 change: 1 addition & 0 deletions pkg/raft/rafttest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"interaction_env_handler_raft_log.go",
"interaction_env_handler_raftstate.go",
"interaction_env_handler_report_unreachable.go",
"interaction_env_handler_send_msgapp.go",
"interaction_env_handler_send_snapshot.go",
"interaction_env_handler_set_randomized_election_timeout.go",
"interaction_env_handler_stabilize.go",
Expand Down
6 changes: 6 additions & 0 deletions pkg/raft/rafttest/interaction_env_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ func (env *InteractionEnv) Handle(t *testing.T, d datadriven.TestData) string {
// propose-conf-change 2 v1=true
// v5
err = env.handleProposeConfChange(t, d)

case "send-msg-app":
// Send a MsgApp from the leader to a peer.
// Example: send-msg-app 1 to=2 lo=10 hi=20
err = env.handleSendMsgApp(t, d)

case "report-unreachable":
// Calls <1st>.ReportUnreachable(<2nd>).
//
Expand Down
2 changes: 2 additions & 0 deletions pkg/raft/rafttest/interaction_env_handler_add_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e
arg.Scan(t, i, &snap.Data)
case "async-storage-writes":
arg.Scan(t, i, &cfg.AsyncStorageWrites)
case "lazy-replication":
arg.Scan(t, i, &cfg.LazyReplication)
case "prevote":
arg.Scan(t, i, &cfg.PreVote)
case "checkquorum":
Expand Down
49 changes: 49 additions & 0 deletions pkg/raft/rafttest/interaction_env_handler_send_msgapp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rafttest

import (
"fmt"
"math"
"testing"

"github.com/cockroachdb/cockroach/pkg/raft"
pb "github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

func (env *InteractionEnv) handleSendMsgApp(t *testing.T, d datadriven.TestData) error {
require.Len(t, d.CmdArgs, 4)
from := firstAsNodeIdx(t, d)
var to int
var lo, hi uint64
d.ScanArgs(t, "to", &to)
d.ScanArgs(t, "lo", &lo)
d.ScanArgs(t, "hi", &hi)
return env.SendMsgApp(from, pb.PeerID(to), lo, hi)
}

func (env *InteractionEnv) SendMsgApp(from int, to pb.PeerID, lo, hi uint64) error {
rn := env.Nodes[from].RawNode
snap := rn.LogSnapshot()
ls, err := snap.LogSlice(lo, hi, math.MaxUint64)
if err != nil {
return err
}
if msg, ok := rn.SendMsgApp(to, ls); ok {
env.Output.WriteString(raft.DescribeMessage(msg, defaultEntryFormatter))
env.Messages = append(env.Messages, msg)
} else {
env.Output.WriteString(fmt.Sprintf("could not send MsgApp (%d,%d] to %d", lo, hi, to))
}
return nil
}
155 changes: 155 additions & 0 deletions pkg/raft/testdata/lazy_replication.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# This test demonstrates the "lazy replication" feature. The leader sends MsgApp
# messages to StateReplication peers only when requested explicitly by the
# application.

# Skip logging the boilerplate. Set up a raft group of 3 nodes, and elect node 1
# as the leader. Nodes 2 and 3 are the followers.
log-level none
----
ok

add-nodes 3 voters=(1,2,3) index=10 lazy-replication=true
----
ok

campaign 1
----
ok

stabilize
----
ok

log-level debug
----
ok

# Propose a couple of entries.
propose 1 data-1
----
ok

propose 1 data-2
----
ok

# NB: no entries are sent to the followers yet.
stabilize
----
> 1 handling Ready
Ready MustSync=true:
Entries:
1/12 EntryNormal "data-1"
1/13 EntryNormal "data-2"

# Attempt to send a misaligned MsgApp. No-op.
send-msg-app 1 to=2 lo=10 hi=13
----
could not send MsgApp (10,13] to 2

# Send a MsgApp to node 2, containing both entries.
send-msg-app 1 to=2 lo=11 hi=13
----
1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[
1/12 EntryNormal "data-1"
1/13 EntryNormal "data-2"
]

# Send a MsgApp to node 3, containing only one entry.
send-msg-app 1 to=3 lo=11 hi=12
----
1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data-1"]

# The followers receive the entries and reply to the leader. The leader commits
# both entries, but the replication flow to node 3 still has one pending entry.
stabilize
----
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[
1/12 EntryNormal "data-1"
1/13 EntryNormal "data-2"
]
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data-1"]
> 2 handling Ready
Ready MustSync=true:
Entries:
1/12 EntryNormal "data-1"
1/13 EntryNormal "data-2"
Messages:
2->1 MsgAppResp Term:1 Log:0/13 Commit:11
> 3 handling Ready
Ready MustSync=true:
Entries:
1/12 EntryNormal "data-1"
Messages:
3->1 MsgAppResp Term:1 Log:0/12 Commit:11
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/13 Commit:11
3->1 MsgAppResp Term:1 Log:0/12 Commit:11
> 1 handling Ready
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1
CommittedEntries:
1/12 EntryNormal "data-1"
1/13 EntryNormal "data-2"
Messages:
1->2 MsgApp Term:1 Log:1/13 Commit:13
1->3 MsgApp Term:1 Log:1/12 Commit:13
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/13 Commit:13
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/12 Commit:13
> 2 handling Ready
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1
CommittedEntries:
1/12 EntryNormal "data-1"
1/13 EntryNormal "data-2"
Messages:
2->1 MsgAppResp Term:1 Log:0/13 Commit:13
> 3 handling Ready
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:12 Lead:1 LeadEpoch:1
CommittedEntries:
1/12 EntryNormal "data-1"
Messages:
3->1 MsgAppResp Term:1 Log:0/12 Commit:12
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/13 Commit:13
3->1 MsgAppResp Term:1 Log:0/12 Commit:12

# One entry still to be replicated to node 3.
status 1
----
1: StateReplicate match=13 next=14 sentCommit=11 matchCommit=11
2: StateReplicate match=13 next=14 sentCommit=13 matchCommit=13
3: StateReplicate match=12 next=13 sentCommit=13 matchCommit=12

# Replicate it.
send-msg-app 1 to=3 lo=12 hi=13
----
1->3 MsgApp Term:1 Log:1/12 Commit:13 Entries:[1/13 EntryNormal "data-2"]

stabilize
----
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/12 Commit:13 Entries:[1/13 EntryNormal "data-2"]
> 3 handling Ready
Ready MustSync=true:
HardState Term:1 Vote:1 Commit:13 Lead:1 LeadEpoch:1
Entries:
1/13 EntryNormal "data-2"
CommittedEntries:
1/13 EntryNormal "data-2"
Messages:
3->1 MsgAppResp Term:1 Log:0/13 Commit:13
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/13 Commit:13

# The leader has converged to a fully replicated state.
status 1
----
1: StateReplicate match=13 next=14 sentCommit=11 matchCommit=11
2: StateReplicate match=13 next=14 sentCommit=13 matchCommit=13
3: StateReplicate match=13 next=14 sentCommit=13 matchCommit=13
65 changes: 35 additions & 30 deletions pkg/raft/tracker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type Progress struct {
// cases, we need to continue sending MsgApp once in a while to guarantee
// progress, but we only do so when MsgAppProbesPaused is false (it is reset on
// receiving a heartbeat response), to not overflow the receiver. See
// IsPaused() and ShouldSendMsgApp().
// IsPaused(), ShouldSendEntries(), and ShouldSendProbe().
MsgAppProbesPaused bool

// Inflights is a sliding window for the inflight messages.
Expand Down Expand Up @@ -304,47 +304,52 @@ func (pr *Progress) IsPaused() bool {
}
}

// ShouldSendMsgApp returns true if the leader should send a MsgApp to the
// follower represented by this Progress. The given last and commit index of the
// leader log help determining if there is outstanding workload, and contribute
// to this decision-making.
// ShouldSendEntries returns true if the leader should send a MsgApp with at
// least one entry, to the follower represented by this Progress. The given last
// index of the leader log helps to determine if there is outstanding work.
//
// In StateProbe, a message is sent periodically. The flow is paused after every
// message, and un-paused on a heartbeat response. This ensures that probes are
// not too frequent, and eventually the MsgApp is either accepted or rejected.
//
// In StateReplicate, generally a message is sent if there are log entries that
// are not yet in-flight, and the in-flight limits are not exceeded. Otherwise,
// we don't send a message, or send a "probe" message in a few situations.
//
// A probe message (containing no log entries) is sent if the follower's commit
// index can be updated, or there hasn't been a probe message recently. We must
// send a message periodically even if all log entries are in-flight, in order
// to guarantee that eventually the flow is either accepted or rejected.
// we don't send a message, or send a "probe" message in a few situations (see
// ShouldSendPing). If lazyReplication flag is true, entries sending is disabled
// and delegated to the application layer.
//
// In StateSnapshot, we do not send append messages.
func (pr *Progress) ShouldSendEntries(last uint64, lazyReplication bool) bool {
switch pr.State {
case StateProbe:
return !pr.MsgAppProbesPaused && pr.CanSendEntries(last)
case StateReplicate:
return !lazyReplication && pr.CanSendEntries(last)
case StateSnapshot:
return false
default:
panic("unexpected state")
}
}

// ShouldSendProbe returns true if the leader should send a "probe" MsgApp to
// this peer.
//
// If advanceCommit is true, it means that MsgApp owns the responsibility of
// closing the followers' commit index gap even if some MsgApp messages gets
// dropped. If it's false, it means that the responsibility is on MsgHeartbeat.
func (pr *Progress) ShouldSendMsgApp(last, commit uint64, advanceCommit bool) bool {
// A probe message (containing no log entries) is sent if the peer's Match and
// MatchCommit indices have not converged to the leader's state, and a MsgApp
// has not been sent recently.
//
// We must send a message periodically even if all updates are already in flight
// to this peer, to guarantee that eventually the flow is either accepted or
// rejected.
func (pr *Progress) ShouldSendProbe(last, commit uint64, advanceCommit bool) bool {
switch pr.State {
case StateProbe:
return !pr.MsgAppProbesPaused

case StateReplicate:
// If the in-flight limits are not saturated, and there are pending entries
// (Next <= lastIndex), send a MsgApp with some entries.
if pr.CanSendEntries(last) {
return true
}
// We can't send any entries at this point, but we need to be sending a
// MsgApp periodically, to guarantee liveness of the MsgApp flow: the
// follower eventually will reply with an ack or reject.
//
// If the follower's log is outdated, and we haven't recently sent a MsgApp
// (according to the MsgAppProbesPaused flag), send one now. This is going
// to be an empty "probe" MsgApp.
// (according to the MsgAppProbesPaused flag), send one now.
if pr.Match < last && !pr.MsgAppProbesPaused {
return true
}
Expand All @@ -356,10 +361,10 @@ func (pr *Progress) ShouldSendMsgApp(last, commit uint64, advanceCommit bool) bo
return true
}

// Send an empty MsgApp containing the latest commit index if we know that
// the follower's commit index is stale and we haven't recently sent a
// MsgApp (according to the MsgAppProbesPaused flag).

// Send the latest commit index if we know that the peer's commit index is
// stale, and we haven't recently sent a MsgApp (according to the
// MsgAppProbesPaused flag).
//
// NOTE: This is a different condition than the one above because we only
// send this message if pr.MsgAppProbesPaused is false. After this message,
// pr.MsgAppProbesPaused will be set to true until we receive a heartbeat
Expand Down
Loading