Skip to content

Commit 693ed7b

Browse files
committed
tracker: rename the paused probes flow field
Signed-off-by: Pavel Kalinnikov <[email protected]>
1 parent aacf619 commit 693ed7b

File tree

5 files changed

+58
-53
lines changed

5 files changed

+58
-53
lines changed

raft.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1522,7 +1522,7 @@ func stepLeader(r *raft, m pb.Message) error {
15221522
}
15231523
case pb.MsgHeartbeatResp:
15241524
pr.RecentActive = true
1525-
pr.MsgAppFlowPaused = false
1525+
pr.PauseMsgAppProbes(false)
15261526
r.maybeSendAppend(m.From, pr)
15271527

15281528
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
@@ -1556,7 +1556,7 @@ func stepLeader(r *raft, m pb.Message) error {
15561556
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
15571557
// out the next MsgApp.
15581558
// If snapshot failure, wait for a heartbeat interval before next try
1559-
pr.MsgAppFlowPaused = true
1559+
pr.PauseMsgAppProbes(true)
15601560
case pb.MsgUnreachable:
15611561
// During optimistic replication, if the remote becomes unreachable,
15621562
// there is huge probability that a MsgApp is lost.
@@ -1593,7 +1593,7 @@ func stepLeader(r *raft, m pb.Message) error {
15931593
r.sendTimeoutNow(leadTransferee)
15941594
r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
15951595
} else {
1596-
pr.MsgAppFlowPaused = false
1596+
pr.PauseMsgAppProbes(false)
15971597
r.maybeSendAppend(leadTransferee, pr)
15981598
}
15991599
}

raft_snap_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) {
8383
if sm.trk.Progress[2].Next != 1 {
8484
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
8585
}
86-
if !sm.trk.Progress[2].MsgAppFlowPaused {
87-
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
86+
if !sm.trk.Progress[2].MsgAppProbesPaused {
87+
t.Errorf("msgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
8888
}
8989
}
9090

@@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) {
106106
if sm.trk.Progress[2].Next != 12 {
107107
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
108108
}
109-
if !sm.trk.Progress[2].MsgAppFlowPaused {
110-
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
109+
if !sm.trk.Progress[2].MsgAppProbesPaused {
110+
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
111111
}
112112
}
113113

raft_test.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -128,21 +128,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
128128
r.becomeCandidate()
129129
r.becomeLeader()
130130

131-
r.trk.Progress[2].MsgAppFlowPaused = true
131+
r.trk.Progress[2].PauseMsgAppProbes(true)
132132

133133
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
134-
if !r.trk.Progress[2].MsgAppFlowPaused {
135-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
134+
if !r.trk.Progress[2].MsgAppProbesPaused {
135+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
136136
}
137137

138138
r.trk.Progress[2].BecomeReplicate()
139-
if r.trk.Progress[2].MsgAppFlowPaused {
140-
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused)
139+
if r.trk.Progress[2].MsgAppProbesPaused {
140+
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppProbesPaused)
141141
}
142-
r.trk.Progress[2].MsgAppFlowPaused = true
142+
r.trk.Progress[2].PauseMsgAppProbes(true)
143143
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
144-
if !r.trk.Progress[2].MsgAppFlowPaused {
145-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
144+
if !r.trk.Progress[2].MsgAppProbesPaused {
145+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
146146
}
147147
}
148148

@@ -2784,8 +2784,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27842784
}
27852785
}
27862786

2787-
if !r.trk.Progress[2].MsgAppFlowPaused {
2788-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
2787+
if !r.trk.Progress[2].MsgAppProbesPaused {
2788+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
27892789
}
27902790
for j := 0; j < 10; j++ {
27912791
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@@ -2799,8 +2799,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27992799
for j := 0; j < r.heartbeatTimeout; j++ {
28002800
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
28012801
}
2802-
if !r.trk.Progress[2].MsgAppFlowPaused {
2803-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
2802+
if !r.trk.Progress[2].MsgAppProbesPaused {
2803+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
28042804
}
28052805

28062806
// consume the heartbeat
@@ -2822,8 +2822,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
28222822
if msg[0].Index != 0 {
28232823
t.Errorf("index = %d, want %d", msg[0].Index, 0)
28242824
}
2825-
if !r.trk.Progress[2].MsgAppFlowPaused {
2826-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
2825+
if !r.trk.Progress[2].MsgAppProbesPaused {
2826+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
28272827
}
28282828
}
28292829

tracker/progress.go

+20-15
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,13 @@ type Progress struct {
8787
// This is always true on the leader.
8888
RecentActive bool
8989

90-
// MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
91-
// happens in StateProbe, or StateReplicate with saturated Inflights. In both
92-
// cases, we need to continue sending MsgApp once in a while to guarantee
93-
// progress, but we only do so when MsgAppFlowPaused is false (it is reset on
94-
// receiving a heartbeat response), to not overflow the receiver. See
95-
// IsPaused().
96-
MsgAppFlowPaused bool
90+
// MsgAppProbesPaused set to true prevents sending "probe" MsgApp messages to
91+
// this follower. Used in StateProbe, or StateReplicate when all entries are
92+
// in-flight or the in-flight volume exceeds limits. See ShouldSendMsgApp().
93+
//
94+
// TODO(pav-kv): unexport this field. It is used by a few tests, but should be
95+
// replaced by PauseMsgAppProbes() and ShouldSendMsgApp().
96+
MsgAppProbesPaused bool
9797

9898
// Inflights is a sliding window for the inflight messages.
9999
// Each inflight message contains one or more log entries.
@@ -113,7 +113,7 @@ type Progress struct {
113113
IsLearner bool
114114
}
115115

116-
// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
116+
// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused,
117117
// PendingSnapshot, and Inflights.
118118
func (pr *Progress) ResetState(state StateType) {
119119
pr.PauseMsgAppProbes(false)
@@ -169,7 +169,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) {
169169
// PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on
170170
// the passed-in bool.
171171
func (pr *Progress) PauseMsgAppProbes(pause bool) {
172-
pr.MsgAppFlowPaused = pause
172+
pr.MsgAppProbesPaused = pause
173173
}
174174

175175
// CanSendEntries returns true if the flow control state allows sending at least
@@ -250,12 +250,17 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
250250
// operation, this is false. A throttled node will be contacted less frequently
251251
// until it has reached a state in which it's able to accept a steady stream of
252252
// log entries again.
253+
//
254+
// TODO(pav-kv): this method is deprecated, remove it. It is still used in tests
255+
// and String(), find a way to avoid this. The problem is that the actual flow
256+
// control state depends on the log size and commit index, which are not part of
257+
// this Progress struct - they are passed-in to methods like ShouldSendMsgApp().
253258
func (pr *Progress) IsPaused() bool {
254259
switch pr.State {
255260
case StateProbe:
256-
return pr.MsgAppFlowPaused
261+
return pr.MsgAppProbesPaused
257262
case StateReplicate:
258-
return pr.MsgAppFlowPaused && pr.Inflights.Full()
263+
return pr.MsgAppProbesPaused && pr.Inflights.Full()
259264
case StateSnapshot:
260265
return true
261266
default:
@@ -285,10 +290,10 @@ func (pr *Progress) IsPaused() bool {
285290
func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
286291
switch pr.State {
287292
case StateProbe:
288-
return !pr.MsgAppFlowPaused
293+
return !pr.MsgAppProbesPaused
289294
case StateReplicate:
290295
return pr.CanBumpCommit(commit) ||
291-
pr.Match < last && (!pr.MsgAppFlowPaused || pr.CanSendEntries(last))
296+
pr.Match < last && (!pr.MsgAppProbesPaused || pr.CanSendEntries(last))
292297
case StateSnapshot:
293298
return false
294299
default:
@@ -299,9 +304,9 @@ func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
299304
func (pr *Progress) IsPausedExt(lastIndex uint64) bool {
300305
switch pr.State {
301306
case StateProbe:
302-
return pr.MsgAppFlowPaused
307+
return pr.MsgAppProbesPaused
303308
case StateReplicate:
304-
return pr.Match >= lastIndex || pr.MsgAppFlowPaused && (pr.Inflights.Full() || pr.Next > lastIndex)
309+
return pr.Match >= lastIndex || pr.MsgAppProbesPaused && (pr.Inflights.Full() || pr.Next > lastIndex)
305310
case StateSnapshot:
306311
return true
307312
default:

tracker/progress_test.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ func TestProgressString(t *testing.T) {
2424
ins := NewInflights(1, 0)
2525
ins.Add(123, 1)
2626
pr := &Progress{
27-
Match: 1,
28-
Next: 2,
29-
State: StateSnapshot,
30-
PendingSnapshot: 123,
31-
RecentActive: false,
32-
MsgAppFlowPaused: true,
33-
IsLearner: true,
34-
Inflights: ins,
27+
Match: 1,
28+
Next: 2,
29+
State: StateSnapshot,
30+
PendingSnapshot: 123,
31+
RecentActive: false,
32+
MsgAppProbesPaused: true,
33+
IsLearner: true,
34+
Inflights: ins,
3535
}
3636
const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]`
3737
assert.Equal(t, exp, pr.String())
@@ -53,26 +53,26 @@ func TestProgressIsPaused(t *testing.T) {
5353
}
5454
for i, tt := range tests {
5555
p := &Progress{
56-
State: tt.state,
57-
MsgAppFlowPaused: tt.paused,
58-
Inflights: NewInflights(256, 0),
56+
State: tt.state,
57+
MsgAppProbesPaused: tt.paused,
58+
Inflights: NewInflights(256, 0),
5959
}
6060
assert.Equal(t, tt.w, p.IsPaused(), i)
6161
}
6262
}
6363

6464
// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset
65-
// MsgAppFlowPaused.
65+
// MsgAppProbesPaused.
6666
func TestProgressResume(t *testing.T) {
6767
p := &Progress{
68-
Next: 2,
69-
MsgAppFlowPaused: true,
68+
Next: 2,
69+
MsgAppProbesPaused: true,
7070
}
7171
p.MaybeDecrTo(1, 1)
72-
assert.False(t, p.MsgAppFlowPaused)
73-
p.MsgAppFlowPaused = true
72+
assert.False(t, p.MsgAppProbesPaused)
73+
p.MsgAppProbesPaused = true
7474
p.MaybeUpdate(2)
75-
assert.False(t, p.MsgAppFlowPaused)
75+
assert.False(t, p.MsgAppProbesPaused)
7676
}
7777

7878
func TestProgressBecomeProbe(t *testing.T) {

0 commit comments

Comments
 (0)