Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c6d6aad

Browse files
committedJan 30, 2024
raft: use entryID in raftLog.stableTo
Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
1 parent 35b9d34 commit c6d6aad

7 files changed

+21
-23
lines changed
 

‎log.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func (l *raftLog) acceptApplying(i uint64, size entryEncodingSize, allowUnstable
364364
i < l.maxAppliableIndex(allowUnstable)
365365
}
366366

367-
func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
367+
func (l *raftLog) stableTo(id entryID) { l.unstable.stableTo(id) }
368368

369369
func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
370370

‎log_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ func TestHasNextCommittedEnts(t *testing.T) {
400400

401401
raftLog := newLog(storage, raftLogger)
402402
raftLog.append(ents...)
403-
raftLog.stableTo(4, 1)
403+
raftLog.stableTo(entryID{term: 1, index: 4})
404404
raftLog.maybeCommit(5, 1)
405405
raftLog.appliedTo(tt.applied, 0 /* size */)
406406
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
@@ -458,7 +458,7 @@ func TestNextCommittedEnts(t *testing.T) {
458458

459459
raftLog := newLog(storage, raftLogger)
460460
raftLog.append(ents...)
461-
raftLog.stableTo(4, 1)
461+
raftLog.stableTo(entryID{term: 1, index: 4})
462462
raftLog.maybeCommit(5, 1)
463463
raftLog.appliedTo(tt.applied, 0 /* size */)
464464
raftLog.acceptApplying(tt.applying, 0 /* size */, tt.allowUnstable)
@@ -517,7 +517,7 @@ func TestAcceptApplying(t *testing.T) {
517517

518518
raftLog := newLogWithSize(storage, raftLogger, maxSize)
519519
raftLog.append(ents...)
520-
raftLog.stableTo(4, 1)
520+
raftLog.stableTo(entryID{term: 1, index: 4})
521521
raftLog.maybeCommit(5, 1)
522522
raftLog.appliedTo(3, 0 /* size */)
523523

@@ -566,7 +566,7 @@ func TestAppliedTo(t *testing.T) {
566566

567567
raftLog := newLogWithSize(storage, raftLogger, maxSize)
568568
raftLog.append(ents...)
569-
raftLog.stableTo(4, 1)
569+
raftLog.stableTo(entryID{term: 1, index: 4})
570570
raftLog.maybeCommit(5, 1)
571571
raftLog.appliedTo(3, 0 /* size */)
572572
raftLog.acceptApplying(5, maxSize+overshoot, false /* allowUnstable */)
@@ -604,7 +604,7 @@ func TestNextUnstableEnts(t *testing.T) {
604604

605605
ents := raftLog.nextUnstableEnts()
606606
if l := len(ents); l > 0 {
607-
raftLog.stableTo(ents[l-1].Index, ents[l-1].Term)
607+
raftLog.stableTo(entryID{term: ents[l-1].Term, index: ents[l-1].Index})
608608
}
609609
require.Equal(t, tt.wents, ents)
610610
require.Equal(t, previousEnts[len(previousEnts)-1].Index+1, raftLog.unstable.offset)
@@ -655,7 +655,7 @@ func TestStableTo(t *testing.T) {
655655
t.Run(fmt.Sprint(i), func(t *testing.T) {
656656
raftLog := newLog(NewMemoryStorage(), raftLogger)
657657
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
658-
raftLog.stableTo(tt.stablei, tt.stablet)
658+
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
659659
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
660660
})
661661
}
@@ -692,7 +692,7 @@ func TestStableToWithSnap(t *testing.T) {
692692
require.NoError(t, s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}))
693693
raftLog := newLog(s, raftLogger)
694694
raftLog.append(tt.newEnts...)
695-
raftLog.stableTo(tt.stablei, tt.stablet)
695+
raftLog.stableTo(entryID{term: tt.stablet, index: tt.stablei})
696696
require.Equal(t, tt.wunstable, raftLog.unstable.offset)
697697
})
698698

‎log_unstable.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -131,30 +131,30 @@ func (u *unstable) acceptInProgress() {
131131
// The method should only be called when the caller can attest that the entries
132132
// can not be overwritten by an in-progress log append. See the related comment
133133
// in newStorageAppendRespMsg.
134-
func (u *unstable) stableTo(i, t uint64) {
135-
gt, ok := u.maybeTerm(i)
134+
func (u *unstable) stableTo(id entryID) {
135+
gt, ok := u.maybeTerm(id.index)
136136
if !ok {
137137
// Unstable entry missing. Ignore.
138-
u.logger.Infof("entry at index %d missing from unstable log; ignoring", i)
138+
u.logger.Infof("entry at index %d missing from unstable log; ignoring", id.index)
139139
return
140140
}
141-
if i < u.offset {
141+
if id.index < u.offset {
142142
// Index matched unstable snapshot, not unstable entry. Ignore.
143-
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", i)
143+
u.logger.Infof("entry at index %d matched unstable snapshot; ignoring", id.index)
144144
return
145145
}
146-
if gt != t {
146+
if gt != id.term {
147147
// Term mismatch between unstable entry and specified entry. Ignore.
148148
// This is possible if part or all of the unstable log was replaced
149149
// between that time that a set of entries started to be written to
150150
// stable storage and when they finished.
151151
u.logger.Infof("entry at (index,term)=(%d,%d) mismatched with "+
152-
"entry at (%d,%d) in unstable log; ignoring", i, t, i, gt)
152+
"entry at (%d,%d) in unstable log; ignoring", id.index, id.term, id.index, gt)
153153
return
154154
}
155-
num := int(i + 1 - u.offset)
155+
num := int(id.index + 1 - u.offset)
156156
u.entries = u.entries[num:]
157-
u.offset = i + 1
157+
u.offset = id.index + 1
158158
u.offsetInProgress = max(u.offsetInProgress, u.offset)
159159
u.shrinkEntriesArray()
160160
}

‎log_unstable_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ func TestUnstableStableTo(t *testing.T) {
493493
snapshot: tt.snap,
494494
logger: raftLogger,
495495
}
496-
u.stableTo(tt.index, tt.term)
496+
u.stableTo(entryID{term: tt.term, index: tt.index})
497497
require.Equal(t, tt.woffset, u.offset)
498498
require.Equal(t, tt.woffsetInProgress, u.offsetInProgress)
499499
require.Equal(t, tt.wlen, len(u.entries))

‎raft.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1156,7 +1156,7 @@ func (r *raft) Step(m pb.Message) error {
11561156

11571157
case pb.MsgStorageAppendResp:
11581158
if m.Index != 0 {
1159-
r.raftLog.stableTo(m.Index, m.LogTerm)
1159+
r.raftLog.stableTo(entryID{term: m.LogTerm, index: m.Index})
11601160
}
11611161
if m.Snapshot != nil {
11621162
r.appliedSnap(m.Snapshot)

‎raft_paper_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -923,8 +923,7 @@ func commitNoopEntry(r *raft, s *MemoryStorage) {
923923
r.readMessages()
924924
s.Append(r.raftLog.nextUnstableEnts())
925925
r.raftLog.appliedTo(r.raftLog.committed, 0 /* size */)
926-
last := r.raftLog.lastEntryID()
927-
r.raftLog.stableTo(last.index, last.term) // TODO(pav-kv): pass lastEntryID directly
926+
r.raftLog.stableTo(r.raftLog.lastEntryID())
928927
}
929928

930929
func acceptAndReply(m pb.Message) pb.Message {

‎raft_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ import (
3333
func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
3434
// Append unstable entries.
3535
s.Append(r.raftLog.nextUnstableEnts())
36-
last := r.raftLog.lastEntryID()
37-
r.raftLog.stableTo(last.index, last.term) // TODO(pav-kv): pass lastEntryID directly
36+
r.raftLog.stableTo(r.raftLog.lastEntryID())
3837

3938
// Run post-append steps.
4039
r.advanceMessagesAfterAppend()

0 commit comments

Comments
 (0)
Please sign in to comment.