Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3f9b634

Browse files
committedMar 6, 2024·
tracker: rename the paused probes flow field
Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
1 parent 9d42a69 commit 3f9b634

File tree

5 files changed

+46
-42
lines changed

5 files changed

+46
-42
lines changed
 

‎raft.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1565,7 +1565,7 @@ func stepLeader(r *raft, m pb.Message) error {
15651565
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
15661566
// out the next MsgApp.
15671567
// If snapshot failure, wait for a heartbeat interval before next try
1568-
pr.MsgAppFlowPaused = true
1568+
pr.PauseMsgAppProbes(true)
15691569
case pb.MsgUnreachable:
15701570
// During optimistic replication, if the remote becomes unreachable,
15711571
// there is huge probability that a MsgApp is lost.

‎raft_snap_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) {
8383
if sm.trk.Progress[2].Next != 1 {
8484
t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next)
8585
}
86-
if !sm.trk.Progress[2].MsgAppFlowPaused {
87-
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
86+
if !sm.trk.Progress[2].MsgAppProbesPaused {
87+
t.Errorf("msgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
8888
}
8989
}
9090

@@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) {
106106
if sm.trk.Progress[2].Next != 12 {
107107
t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next)
108108
}
109-
if !sm.trk.Progress[2].MsgAppFlowPaused {
110-
t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused)
109+
if !sm.trk.Progress[2].MsgAppProbesPaused {
110+
t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused)
111111
}
112112
}
113113

‎raft_test.go

+14-14
Original file line numberDiff line numberDiff line change
@@ -128,21 +128,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
128128
r.becomeCandidate()
129129
r.becomeLeader()
130130

131-
r.trk.Progress[2].MsgAppFlowPaused = true
131+
r.trk.Progress[2].PauseMsgAppProbes(true)
132132

133133
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
134-
if !r.trk.Progress[2].MsgAppFlowPaused {
135-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
134+
if !r.trk.Progress[2].MsgAppProbesPaused {
135+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
136136
}
137137

138138
r.trk.Progress[2].BecomeReplicate()
139-
if r.trk.Progress[2].MsgAppFlowPaused {
140-
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused)
139+
if r.trk.Progress[2].MsgAppProbesPaused {
140+
t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppProbesPaused)
141141
}
142-
r.trk.Progress[2].MsgAppFlowPaused = true
142+
r.trk.Progress[2].PauseMsgAppProbes(true)
143143
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
144-
if !r.trk.Progress[2].MsgAppFlowPaused {
145-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
144+
if !r.trk.Progress[2].MsgAppProbesPaused {
145+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
146146
}
147147
}
148148

@@ -2783,8 +2783,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27832783
}
27842784
}
27852785

2786-
if !r.trk.Progress[2].MsgAppFlowPaused {
2787-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
2786+
if !r.trk.Progress[2].MsgAppProbesPaused {
2787+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
27882788
}
27892789
for j := 0; j < 10; j++ {
27902790
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@@ -2798,8 +2798,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
27982798
for j := 0; j < r.heartbeatTimeout; j++ {
27992799
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
28002800
}
2801-
if !r.trk.Progress[2].MsgAppFlowPaused {
2802-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
2801+
if !r.trk.Progress[2].MsgAppProbesPaused {
2802+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
28032803
}
28042804

28052805
// consume the heartbeat
@@ -2821,8 +2821,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
28212821
if msg[0].Index != 0 {
28222822
t.Errorf("index = %d, want %d", msg[0].Index, 0)
28232823
}
2824-
if !r.trk.Progress[2].MsgAppFlowPaused {
2825-
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused)
2824+
if !r.trk.Progress[2].MsgAppProbesPaused {
2825+
t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused)
28262826
}
28272827
}
28282828

‎tracker/progress.go

+18-13
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,13 @@ type Progress struct {
100100
// This is always true on the leader.
101101
RecentActive bool
102102

103-
// MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
104-
// happens in StateProbe, or StateReplicate with saturated Inflights. In both
105-
// cases, we need to continue sending MsgApp once in a while to guarantee
106-
// progress, but we only do so when MsgAppFlowPaused is false (it is reset on
107-
// receiving a heartbeat response), to not overflow the receiver. See
108-
// IsPaused().
109-
MsgAppFlowPaused bool
103+
// MsgAppProbesPaused set to true prevents sending "probe" MsgApp messages to
104+
// this follower. Used in StateProbe, or StateReplicate when all entries are
105+
// in-flight or the in-flight volume exceeds limits. See ShouldSendMsgApp().
106+
//
107+
// TODO(pav-kv): unexport this field. It is used by a few tests, but should be
108+
// replaced by PauseMsgAppProbes() and ShouldSendMsgApp().
109+
MsgAppProbesPaused bool
110110

111111
// Inflights is a sliding window for the inflight messages.
112112
// Each inflight message contains one or more log entries.
@@ -126,7 +126,7 @@ type Progress struct {
126126
IsLearner bool
127127
}
128128

129-
// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
129+
// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused,
130130
// PendingSnapshot, and Inflights.
131131
func (pr *Progress) ResetState(state StateType) {
132132
pr.PauseMsgAppProbes(false)
@@ -183,7 +183,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) {
183183
// PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on
184184
// the passed-in bool.
185185
func (pr *Progress) PauseMsgAppProbes(pause bool) {
186-
pr.MsgAppFlowPaused = pause
186+
pr.MsgAppProbesPaused = pause
187187
}
188188

189189
// CanSendEntries returns true if the flow control state allows sending at least
@@ -273,12 +273,17 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
273273
// operation, this is false. A throttled node will be contacted less frequently
274274
// until it has reached a state in which it's able to accept a steady stream of
275275
// log entries again.
276+
//
277+
// TODO(pav-kv): this method is deprecated, remove it. It is still used in tests
278+
// and String(), find a way to avoid this. The problem is that the actual flow
279+
// control state depends on the log size and commit index, which are not part of
280+
// this Progress struct - they are passed-in to methods like ShouldSendMsgApp().
276281
func (pr *Progress) IsPaused() bool {
277282
switch pr.State {
278283
case StateProbe:
279-
return pr.MsgAppFlowPaused
284+
return pr.MsgAppProbesPaused
280285
case StateReplicate:
281-
return pr.MsgAppFlowPaused && pr.Inflights.Full()
286+
return pr.MsgAppProbesPaused && pr.Inflights.Full()
282287
case StateSnapshot:
283288
return true
284289
default:
@@ -308,10 +313,10 @@ func (pr *Progress) IsPaused() bool {
308313
func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool {
309314
switch pr.State {
310315
case StateProbe:
311-
return !pr.MsgAppFlowPaused
316+
return !pr.MsgAppProbesPaused
312317
case StateReplicate:
313318
return pr.CanBumpCommit(commit) ||
314-
pr.Match < last && (!pr.MsgAppFlowPaused || pr.CanSendEntries(last))
319+
pr.Match < last && (!pr.MsgAppProbesPaused || pr.CanSendEntries(last))
315320
case StateSnapshot:
316321
return false
317322
default:

‎tracker/progress_test.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ func TestProgressString(t *testing.T) {
3030
State: StateSnapshot,
3131
PendingSnapshot: 123,
3232
RecentActive: false,
33-
MsgAppFlowPaused: true,
3433
IsLearner: true,
3534
Inflights: ins,
3635
}
@@ -54,29 +53,29 @@ func TestProgressIsPaused(t *testing.T) {
5453
}
5554
for i, tt := range tests {
5655
p := &Progress{
57-
State: tt.state,
58-
MsgAppFlowPaused: tt.paused,
59-
Inflights: NewInflights(256, 0),
56+
State: tt.state,
57+
MsgAppProbesPaused: tt.paused,
58+
Inflights: NewInflights(256, 0),
6059
}
6160
assert.Equal(t, tt.w, p.IsPaused(), i)
6261
}
6362
}
6463

65-
// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and
64+
// TestProgressResume ensures that MaybeDecrTo resets MsgAppProbesPaused, and
6665
// MaybeUpdate does not.
6766
//
6867
// TODO(pav-kv): there is little sense in testing these micro-behaviours in the
6968
// struct. We should test the visible behaviour instead.
7069
func TestProgressResume(t *testing.T) {
7170
p := &Progress{
72-
Next: 2,
73-
MsgAppFlowPaused: true,
71+
Next: 2,
72+
MsgAppProbesPaused: true,
7473
}
7574
p.MaybeDecrTo(1, 1)
76-
assert.False(t, p.MsgAppFlowPaused)
77-
p.MsgAppFlowPaused = true
75+
assert.False(t, p.MsgAppProbesPaused)
76+
p.MsgAppProbesPaused = true
7877
p.MaybeUpdate(2)
79-
assert.True(t, p.MsgAppFlowPaused)
78+
assert.True(t, p.MsgAppProbesPaused)
8079
}
8180

8281
func TestProgressBecomeProbe(t *testing.T) {

0 commit comments

Comments
 (0)
Please sign in to comment.