Skip to content

kvserver: load committed entries after mu.Unlock() #143689

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,9 @@ func LoadTerm(
// which is a serious issue. But if the caller is unsure, they can check the
// LastIndex to distinguish.
//
// The bytesAccount is used to account for and limit the loaded bytes. It can be
// nil when the accounting / limiting is not needed.
//
// TODO(#132114): eliminate both ErrCompacted and ErrUnavailable.
// TODO(pavelkalinnikov): return all entries we've read, consider maxSize a
// target size. Currently we may read one extra entry and drop it.
Expand Down
66 changes: 36 additions & 30 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,14 +946,13 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}

var hasReady bool
var outboundMsgs []raftpb.Message
var msgStorageAppend raftpb.Message
var toApply []raftpb.Entry
var ready raft.Ready
var logSnapshot raft.LogSnapshot

rac2ModeToUse := r.replicationAdmissionControlModeToUse(ctx)
// Replication AC v2 state that is initialized while holding Replica.mu.
replicaStateInfoMap := r.raftMu.replicaStateScratchForFlowControl
var raftNodeBasicState replica_rac2.RaftNodeBasicState
var logSnapshot raft.LogSnapshot

r.mu.Lock()
rac2ModeForReady := r.mu.currentRACv2Mode
Expand All @@ -980,31 +979,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
}
logSnapshot = raftGroup.LogSnapshot()
if hasReady = raftGroup.HasReady(); hasReady {
// Since we are holding raftMu, only the Slice() call below will use
// raftMu.bytesAccount. It tracks memory usage that this Ready incurs.
r.attachRaftEntriesMonitorRaftMuLocked()
// TODO(pav-kv): currently, Slice() only accounts for entry bytes loaded
// from log storage, and ignores the in-memory unstable entries. Pass a
// flow control struct down the stack, and do a more complete accounting
// in raft. This will also eliminate the "side channel" plumbing hack with
// this bytesAccount.
rd := raftGroup.Ready()
if !rd.Committed.Empty() {
// TODO(pav-kv): do this loading when Replica.mu is released. We don't
// want IO under Replica.mu.
if toApply, err = logSnapshot.Slice(
rd.Committed, r.store.cfg.RaftMaxCommittedSizePerReady,
); err != nil {
return false, err
}
}
// We apply committed entries during this handleRaftReady, so it is ok to
// release the corresponding memory tokens at the end of this func. Next
// time we enter this function, the account will be empty again.
defer r.detachRaftEntriesMonitorRaftMuLocked()

logRaftReady(ctx, rd)
outboundMsgs, msgStorageAppend = splitLocalStorageMsgs(rd.Messages)
ready = raftGroup.Ready()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a bunch of redundant checks between HasReady() and Ready() calls. We could further squash this in the future as:

ready, hasReady = raftGroup.Ready()

}
if switchToPullModeAfterReady {
raftGroup.SetLazyReplication(true)
Expand All @@ -1027,7 +1002,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0
return unquiesceAndWakeLeader, nil
})
r.mu.applyingEntries = len(toApply) != 0
r.mu.applyingEntries = !ready.Committed.Empty()
pausedFollowers := r.mu.pausedFollowers
r.mu.Unlock()
if errors.Is(err, errRemoved) {
Expand All @@ -1036,6 +1011,13 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
} else if err != nil {
return stats, errors.Wrap(err, "checking raft group for Ready")
}

var outboundMsgs []raftpb.Message
var msgStorageAppend raftpb.Message
if hasReady {
logRaftReady(ctx, ready)
outboundMsgs, msgStorageAppend = splitLocalStorageMsgs(ready.Messages)
}
// Even if we don't have a Ready, or entries in Ready,
// replica_rac2.Processor may need to do some work.
raftEvent := rac2.RaftEventFromMsgStorageAppendAndMsgApps(
Expand Down Expand Up @@ -1073,6 +1055,30 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
r.traceMessageSends(outboundMsgs, "sending messages")
r.sendRaftMessages(ctx, outboundMsgs, pausedFollowers, true /* willDeliverLocal */)

// Load the committed entries to be applied after releasing Replica.mu, to
// ensure that we don't have IO under this narrow/lightweight mutex. The
// RawNode can be making progress in the meantime, but it will never overwrite
// the committed entries it has been observing during the Ready() call.
//
// Also, do this loading after r.sendRaftMessages so that the outgoing
// messages don't need to wait for the storage interaction.
var toApply []raftpb.Entry
if !ready.Committed.Empty() {
// TODO(pav-kv): currently, Slice() only accounts for entry bytes loaded
// from log storage, and ignores the in-memory unstable entries. Consider a
// more complete flow control mechanism here, and eliminating the plumbing
// hack with the bytesAccount.
r.attachRaftEntriesMonitorRaftMuLocked()
// We apply committed entries during this handleRaftReady, so it is ok to
// release the corresponding memory tokens at the end of this func. Next
// time we enter this function, the account will be empty again.
defer r.detachRaftEntriesMonitorRaftMuLocked()
if toApply, err = logSnapshot.Slice(
ready.Committed, r.store.cfg.RaftMaxCommittedSizePerReady,
); err != nil {
return stats, errors.Wrap(err, "loading committed entries")
}
}
// If the ready struct includes entries that have been committed, these
// entries will be applied to the Replica's replicated state machine down
// below, after appending new entries to the raft log and sending messages
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_raftlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,11 @@ func (r *replicaLogStorage) entriesLocked(
//
// TODO(pav-kv): we need better safety guardrails here. The log storage type
// can remember the readable bounds, and assert that reads do not cross them.
// TODO(pav-kv): r.raftMu.bytesAccount is broken - can't rely on raftMu here.
entries, _, loadedSize, err := logstore.LoadEntries(
r.AnnotateCtx(context.TODO()),
r.mu.stateLoader.StateLoader, r.store.TODOEngine(), r.RangeID,
r.store.raftEntryCache, r.raftMu.sideloaded, lo, hi, maxBytes,
&r.raftMu.bytesAccount,
nil, // bytesAccount is not used when reading under Replica.mu
)
r.store.metrics.RaftStorageReadBytes.Inc(int64(loadedSize))
return entries, err
Expand Down
Loading