Skip to content

Commit a817ebc

Browse files
craig[bot]pav-kv
craig[bot]
andcommitted
Merge #126318
126318: raft: clean-up log conflict search r=miraradeva a=pav-kv Use the new `entryID` and `logSlice` types. Move the preceding entry check into the conflict search method rather than do it outside. Add a bunch of TODOs for optimization: most log append requests can skip the term check scanning or do it more efficiently. Epic: CRDB-37516 Release note: none Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents 1c3b6a6 + 8a7cfe0 commit a817ebc

File tree

2 files changed

+96
-64
lines changed

2 files changed

+96
-64
lines changed

pkg/raft/log.go

+57-41
Original file line numberDiff line numberDiff line change
@@ -106,61 +106,77 @@ func (l *raftLog) String() string {
106106
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
107107
// it returns (last index of new entries, true).
108108
func (l *raftLog) maybeAppend(a logSlice) (lastnewi uint64, ok bool) {
109-
if !l.matchTerm(a.prev) {
109+
match, ok := l.findConflict(a)
110+
if !ok {
110111
return 0, false
111112
}
112-
// TODO(pav-kv): propagate logSlice down the stack. It will be used all the
113-
// way down in unstable, for safety checks, and for useful bookkeeping.
114-
115-
lastnewi = a.prev.index + uint64(len(a.entries))
116-
ci := l.findConflict(a.entries)
117-
switch {
118-
case ci == 0:
119-
case ci <= l.committed:
120-
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
121-
default:
122-
offset := a.prev.index + 1
123-
if ci-offset > uint64(len(a.entries)) {
124-
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(a.entries))
125-
}
126-
l.append(a.entries[ci-offset:]...)
127-
}
128-
return lastnewi, true
113+
114+
// Fast-forward to the first mismatching or missing entry.
115+
// NB: prev.index <= match.index <= a.lastIndex(), so the sub-slicing is safe.
116+
a.entries = a.entries[match.index-a.prev.index:]
117+
a.prev = match
118+
119+
// TODO(pav-kv): pass the logSlice down the stack, for safety checks and
120+
// bookkeeping in the unstable structure.
121+
l.append(a.entries...)
122+
return a.lastIndex(), true
129123
}
130124

131125
func (l *raftLog) append(ents ...pb.Entry) {
132126
if len(ents) == 0 {
133127
return
134128
}
135-
if after := ents[0].Index - 1; after < l.committed {
136-
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
129+
if first := ents[0].Index; first <= l.committed {
130+
l.logger.Panicf("entry %d is already committed [committed(%d)]", first, l.committed)
137131
}
138132
l.unstable.truncateAndAppend(ents)
139133
}
140134

141-
// findConflict finds the index of the conflict.
142-
// It returns the first pair of conflicting entries between the existing
143-
// entries and the given entries, if there are any.
144-
// If there is no conflicting entries, and the existing entries contains
145-
// all the given entries, zero will be returned.
146-
// If there is no conflicting entries, but the given entries contains new
147-
// entries, the index of the first new entry will be returned.
148-
// An entry is considered to be conflicting if it has the same index but
149-
// a different term.
150-
// The index of the given entries MUST be continuously increasing.
151-
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
152-
for i := range ents {
153-
if id := pbEntryID(&ents[i]); !l.matchTerm(id) {
154-
if id.index <= l.lastIndex() {
155-
// TODO(pav-kv): can simply print %+v of the id. This will change the
156-
// log format though.
157-
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
158-
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
159-
}
160-
return id.index
135+
// findConflict finds the last entry in the given log slice that matches the
136+
// log. The next entry either mismatches, or is missing.
137+
//
138+
// If the slice partially/fully matches, this method returns true. The returned
139+
// entryID is the ID of the last matching entry. It can be s.prev if it is the
140+
// only matching entry. It is guaranteed that the returned entryID.index is in
141+
// the [s.prev.index, s.lastIndex()] range.
142+
//
143+
// All the entries up to the returned entryID are already present in the log,
144+
// and do not need to be appended again. The caller can safely fast-forward an
145+
// append request to the next entry after it.
146+
//
147+
// Returns false if the given slice mismatches the log entirely, i.e. the s.prev
148+
// entry has a mismatching entryID.term. In this case an append request can not
149+
// proceed.
150+
func (l *raftLog) findConflict(s logSlice) (entryID, bool) {
151+
if !l.matchTerm(s.prev) {
152+
return entryID{}, false
153+
}
154+
155+
// TODO(pav-kv): add a fast-path here using the Log Matching property of raft.
156+
// Check the term match at min(s.lastIndex(), l.lastIndex()) entry, and fall
157+
// back to conflict search only if it mismatches.
158+
// TODO(pav-kv): also, there should be no mismatch if s.term == l.accTerm, so
159+
// the fast-path can avoid this one check too.
160+
//
161+
// TODO(pav-kv): every matchTerm call in the linear scan below can fall back
162+
// to fetching an entry from storage. This is inefficient, we can improve it.
163+
// Logs that don't match at one index, don't match at all indices above. So we
164+
// can use binary search to find the fork.
165+
match := s.prev
166+
for i := range s.entries {
167+
id := pbEntryID(&s.entries[i])
168+
if l.matchTerm(id) {
169+
match = id
170+
continue
161171
}
172+
if id.index <= l.lastIndex() {
173+
// TODO(pav-kv): should simply print %+v of the id.
174+
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
175+
id.index, l.zeroTermOnOutOfBounds(l.term(id.index)), id.term)
176+
}
177+
return match, true
162178
}
163-
return 0
179+
return match, true // all entries match
164180
}
165181

166182
// findConflictByTerm returns a best guess on where this log ends matching

pkg/raft/log_test.go

+39-23
Original file line numberDiff line numberDiff line change
@@ -27,32 +27,48 @@ import (
2727

2828
func TestFindConflict(t *testing.T) {
2929
previousEnts := index(1).terms(1, 2, 3)
30-
tests := []struct {
31-
ents []pb.Entry
32-
wconflict uint64
30+
ids := make([]entryID, 1, len(previousEnts)+1) // dummy (0, 0) at index 0
31+
for i := range previousEnts {
32+
ids = append(ids, pbEntryID(&previousEnts[i]))
33+
}
34+
for _, tt := range []struct {
35+
prev entryID
36+
ents []pb.Entry
37+
notOk bool
38+
want entryID
3339
}{
34-
// no conflict, empty ent
35-
{nil, 0},
40+
// prev does not match the log
41+
{prev: entryID{term: 10, index: 1}, notOk: true},
42+
{prev: entryID{term: 4, index: 1}, ents: index(2).terms(4, 4), notOk: true},
43+
{prev: entryID{term: 5, index: 2}, ents: index(3).terms(5, 6), notOk: true},
44+
// no conflict, empty entries
45+
{ents: nil, want: ids[0]},
3646
// no conflict
37-
{index(1).terms(1, 2, 3), 0},
38-
{index(2).terms(2, 3), 0},
39-
{index(3).terms(3), 0},
47+
{prev: ids[0], ents: index(1).terms(1, 2, 3), want: ids[3]},
48+
{prev: ids[1], ents: index(2).terms(2, 3), want: ids[3]},
49+
{prev: ids[2], ents: index(3).terms(3), want: ids[3]},
4050
// no conflict, but has new entries
41-
{index(1).terms(1, 2, 3, 4, 4), 4},
42-
{index(2).terms(2, 3, 4, 5), 4},
43-
{index(3).terms(3, 4, 4), 4},
44-
{index(4).terms(4, 4), 4},
45-
// conflicts with existing entries
46-
{index(1).terms(4, 4), 1},
47-
{index(2).terms(1, 4, 4), 2},
48-
{index(3).terms(1, 2, 4, 4), 3},
49-
}
50-
51-
for i, tt := range tests {
52-
t.Run(fmt.Sprint(i), func(t *testing.T) {
53-
raftLog := newLog(NewMemoryStorage(), raftLogger)
54-
raftLog.append(previousEnts...)
55-
require.Equal(t, tt.wconflict, raftLog.findConflict(tt.ents))
51+
{prev: ids[0], ents: index(1).terms(1, 2, 3, 4, 4), want: ids[3]},
52+
{prev: ids[1], ents: index(2).terms(2, 3, 4, 4), want: ids[3]},
53+
{prev: ids[2], ents: index(3).terms(3, 4, 4), want: ids[3]},
54+
{prev: ids[3], ents: index(4).terms(4, 4), want: ids[3]},
55+
// passes prev check, but conflicts with existing entries
56+
{prev: ids[0], ents: index(1).terms(4, 4), want: ids[0]},
57+
{prev: ids[1], ents: index(2).terms(1, 4, 4), want: ids[1]},
58+
{prev: ids[2], ents: index(3).terms(2, 2, 4, 4), want: ids[2]},
59+
// out of bounds
60+
{prev: entryID{term: 3, index: 10}, ents: index(11).terms(3), notOk: true},
61+
// just touching the right bound, but still out of bounds
62+
{prev: entryID{term: 3, index: 4}, ents: index(5).terms(3, 3, 4), notOk: true},
63+
} {
64+
t.Run("", func(t *testing.T) {
65+
log := newLog(NewMemoryStorage(), discardLogger)
66+
log.append(previousEnts...)
67+
app := logSlice{term: 100, prev: tt.prev, entries: tt.ents}
68+
require.NoError(t, app.valid())
69+
match, ok := log.findConflict(app)
70+
require.Equal(t, !tt.notOk, ok)
71+
require.Equal(t, tt.want, match)
5672
})
5773
}
5874
}

0 commit comments

Comments
 (0)