Skip to content

Commit 14394d5

Browse files
committed
refactor internal/topic/topicwriterinternal/queue_test and upload with off gocognit linter
1 parent a2bf458 commit 14394d5

File tree

2 files changed

+34
-18
lines changed

2 files changed

+34
-18
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Refactored `internal/topic/topicwriterinternal/queue_test.go` and extract funcs
12
* Refactored `log/driver.go` and extract funcs
23
* Refactored `internal/table/scanner/scanner.go` and extract funcs
34
* Refactored `log/topic.go` and extract funcs

internal/topic/topicwriterinternal/queue_test.go

+33-18
Original file line numberDiff line numberDiff line change
@@ -175,24 +175,7 @@ func TestMessageQueue_GetMessages(t *testing.T) {
175175

176176
waitTimeout := time.Second * 10
177177
startWait := time.Now()
178-
waitReader:
179-
for {
180-
if lastReadSeqNo.Load() == lastSentSeqNo {
181-
readCancel()
182-
}
183-
select {
184-
case <-readFinished:
185-
break waitReader
186-
case stack := <-fatalChan:
187-
t.Fatal(stack)
188-
default:
189-
}
190-
191-
runtime.Gosched()
192-
if time.Since(startWait) > waitTimeout {
193-
t.Fatal()
194-
}
195-
}
178+
waitReader(&lastReadSeqNo, lastSentSeqNo, readCancel, readFinished, fatalChan, startWait, waitTimeout, t)
196179
})
197180

198181
t.Run("ClosedContext", func(t *testing.T) {
@@ -234,6 +217,38 @@ func TestMessageQueue_GetMessages(t *testing.T) {
234217
})
235218
}
236219

220+
// waitReader waits for a condition where the lastReadSeqNo is equal to the lastSentSeqNo.
221+
// It periodically checks for changes in the lastReadSeqNo and waits for a read to finish or a fatal error to occur.
222+
// If the waitTimeout is reached, a fatal error is triggered.
223+
func waitReader(
224+
lastReadSeqNo *xatomic.Int64,
225+
lastSentSeqNo int64,
226+
readCancel func(),
227+
readFinished <-chan struct{},
228+
fatalChan <-chan string,
229+
startWait time.Time,
230+
waitTimeout time.Duration,
231+
t *testing.T,
232+
) {
233+
waitReader:
234+
for {
235+
if lastReadSeqNo.Load() == lastSentSeqNo {
236+
readCancel()
237+
}
238+
select {
239+
case <-readFinished:
240+
break waitReader
241+
case stack := <-fatalChan:
242+
t.Fatal(stack)
243+
default:
244+
}
245+
runtime.Gosched()
246+
if time.Since(startWait) > waitTimeout {
247+
t.Fatal()
248+
}
249+
}
250+
}
251+
237252
func TestMessageQueue_ResetSentProgress(t *testing.T) {
238253
ctx := context.Background()
239254

0 commit comments

Comments
 (0)