Skip to content

Commit a7bd1b8

Browse files
authored
Merge branch 'master' into skip-rollback
2 parents 80be127 + 51e23f0 commit a7bd1b8

15 files changed

+140
-58
lines changed

Diff for: CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
* Skipped explicit `Rollback` of transaction on errors (server-side automatically rolled back transactions on errors)
2+
* Fixed race of stop internal processes on close topic writer
3+
* Fixed goroutines leak within topic reader on network problems
24

35
## v3.67.0
46
* Added `ydb.WithNodeAddressMutator` experimental option for mutate node addresses from `discovery.ListEndpoints` response

Diff for: internal/background/worker.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package background
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"runtime/pprof"
78
"sync"
89

@@ -20,6 +21,7 @@ var (
2021
// A Worker must not be copied after first use
2122
type Worker struct {
2223
ctx context.Context //nolint:containedctx
24+
name string
2325
workers sync.WaitGroup
2426
closeReason error
2527
tasksCompleted empty.Chan
@@ -32,8 +34,10 @@ type Worker struct {
3234

3335
type CallbackFunc func(ctx context.Context)
3436

35-
func NewWorker(parent context.Context) *Worker {
36-
w := Worker{}
37+
func NewWorker(parent context.Context, name string) *Worker {
38+
w := Worker{
39+
name: name,
40+
}
3741
w.ctx, w.stop = xcontext.WithCancel(parent)
3842

3943
return &w
@@ -72,7 +76,9 @@ func (b *Worker) Close(ctx context.Context, err error) error {
7276
var resErr error
7377
b.m.WithLock(func() {
7478
if b.closed {
75-
resErr = xerrors.WithStackTrace(ErrAlreadyClosed)
79+
// The error of Close is second close, close reason added for describe previous close only, for better debug
80+
//nolint:errorlint
81+
resErr = xerrors.WithStackTrace(fmt.Errorf("%w with reason: %+v", ErrAlreadyClosed, b.closeReason))
7682

7783
return
7884
}
@@ -122,11 +128,14 @@ func (b *Worker) init() {
122128
}
123129
b.tasks = make(chan backgroundTask)
124130
b.tasksCompleted = make(empty.Chan)
125-
go b.starterLoop()
131+
132+
pprof.Do(b.ctx, pprof.Labels("worker-name", b.name), func(ctx context.Context) {
133+
go b.starterLoop(ctx)
134+
})
126135
})
127136
}
128137

129-
func (b *Worker) starterLoop() {
138+
func (b *Worker) starterLoop(ctx context.Context) {
130139
defer close(b.tasksCompleted)
131140

132141
for bgTask := range b.tasks {
@@ -135,7 +144,7 @@ func (b *Worker) starterLoop() {
135144
go func(task backgroundTask) {
136145
defer b.workers.Done()
137146

138-
pprof.Do(b.ctx, pprof.Labels("background", task.name), task.callback)
147+
pprof.Do(ctx, pprof.Labels("background", task.name), task.callback)
139148
}(bgTask)
140149
}
141150
}

Diff for: internal/background/worker_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func TestWorkerContext(t *testing.T) {
2525
t.Run("Dedicated", func(t *testing.T) {
2626
type ctxkey struct{}
2727
ctx := context.WithValue(context.Background(), ctxkey{}, "2")
28-
w := NewWorker(ctx)
28+
w := NewWorker(ctx, "test-worker, "+t.Name())
2929
require.Equal(t, "2", w.Context().Value(ctxkey{}))
3030
})
3131

@@ -41,7 +41,7 @@ func TestWorkerContext(t *testing.T) {
4141

4242
func TestWorkerStart(t *testing.T) {
4343
t.Run("Started", func(t *testing.T) {
44-
w := NewWorker(xtest.Context(t))
44+
w := NewWorker(xtest.Context(t), "test-worker, "+t.Name())
4545
started := make(empty.Chan)
4646
w.Start("test", func(ctx context.Context) {
4747
close(started)
@@ -50,7 +50,7 @@ func TestWorkerStart(t *testing.T) {
5050
})
5151
t.Run("Stopped", func(t *testing.T) {
5252
ctx := xtest.Context(t)
53-
w := NewWorker(ctx)
53+
w := NewWorker(ctx, "test-worker, "+t.Name())
5454
_ = w.Close(ctx, nil)
5555

5656
started := make(empty.Chan)
@@ -72,7 +72,7 @@ func TestWorkerStart(t *testing.T) {
7272
func TestWorkerClose(t *testing.T) {
7373
t.Run("StopBackground", func(t *testing.T) {
7474
ctx := xtest.Context(t)
75-
w := NewWorker(ctx)
75+
w := NewWorker(ctx, "test-worker, "+t.Name())
7676

7777
started := make(empty.Chan)
7878
stopped := atomic.Bool{}
@@ -89,7 +89,7 @@ func TestWorkerClose(t *testing.T) {
8989

9090
t.Run("DoubleClose", func(t *testing.T) {
9191
ctx := xtest.Context(t)
92-
w := NewWorker(ctx)
92+
w := NewWorker(ctx, "test-worker, "+t.Name())
9393
require.NoError(t, w.Close(ctx, nil))
9494
require.Error(t, w.Close(ctx, nil))
9595
})
@@ -104,7 +104,7 @@ func TestWorkerConcurrentStartAndClose(t *testing.T) {
104104
var counter atomic.Int64
105105

106106
ctx := xtest.Context(t)
107-
w := NewWorker(ctx)
107+
w := NewWorker(ctx, "test-worker, "+t.Name())
108108

109109
stopNewStarts := atomic.Bool{}
110110
var wgStarters sync.WaitGroup
@@ -144,7 +144,7 @@ func TestWorkerConcurrentStartAndClose(t *testing.T) {
144144
func TestWorkerStartCompletedWhileLongWait(t *testing.T) {
145145
xtest.TestManyTimes(t, func(t testing.TB) {
146146
ctx := xtest.Context(t)
147-
w := NewWorker(ctx)
147+
w := NewWorker(ctx, "test-worker, "+t.Name())
148148

149149
allowStop := make(empty.Chan)
150150
closeStarted := make(empty.Chan)

Diff for: internal/topic/topicreaderinternal/committer.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package topicreaderinternal
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"sync/atomic"
78
"time"
89

@@ -52,16 +53,15 @@ type committer struct {
5253
commits CommitRanges
5354
}
5455

55-
func newCommitter(tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc) *committer { //nolint:lll,revive
56+
func newCommitterStopped(tracer *trace.Topic, lifeContext context.Context, mode PublicCommitMode, send sendMessageToServerFunc, readerID int64) *committer { //nolint:lll,revive
5657
res := &committer{
5758
mode: mode,
5859
clock: clockwork.NewRealClock(),
5960
send: send,
60-
backgroundWorker: *background.NewWorker(lifeContext),
61+
backgroundWorker: *background.NewWorker(lifeContext, fmt.Sprintf("ydb-topic-reader-committer: %v", readerID)),
6162
tracer: tracer,
6263
}
6364
res.initChannels()
64-
res.start()
6565

6666
return res
6767
}
@@ -70,7 +70,7 @@ func (c *committer) initChannels() {
7070
c.commitLoopSignal = make(empty.Chan, 1)
7171
}
7272

73-
func (c *committer) start() {
73+
func (c *committer) Start() {
7474
c.backgroundWorker.Start("commit pusher", c.pushCommitsLoop)
7575
}
7676

Diff for: internal/topic/topicreaderinternal/committer_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -377,11 +377,12 @@ func TestCommitterBuffer(t *testing.T) {
377377
}
378378

379379
func newTestCommitter(ctx context.Context, t testing.TB) *committer {
380-
res := newCommitter(&trace.Topic{}, ctx, CommitModeAsync, func(msg rawtopicreader.ClientMessage) error {
380+
res := newCommitterStopped(&trace.Topic{}, ctx, CommitModeAsync, func(msg rawtopicreader.ClientMessage) error {
381381
return nil
382-
})
382+
}, -1)
383+
res.Start()
383384
t.Cleanup(func() {
384-
if err := res.Close(ctx, errors.New("test comitter closed")); err != nil {
385+
if err := res.Close(ctx, errors.New("test committer closed")); err != nil {
385386
require.ErrorIs(t, err, background.ErrAlreadyClosed)
386387
}
387388
})

Diff for: internal/topic/topicreaderinternal/stream_reader_impl.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func newTopicStreamReader(
123123
if err = reader.initSession(); err != nil {
124124
return nil, err
125125
}
126-
if err = reader.startLoops(); err != nil {
126+
if err = reader.startBackgroundWorkers(); err != nil {
127127
return nil, err
128128
}
129129

@@ -150,13 +150,17 @@ func newTopicStreamReaderStopped(
150150
stream: &syncedStream{stream: stream},
151151
cancel: cancel,
152152
batcher: newBatcher(),
153-
backgroundWorkers: *background.NewWorker(stopPump),
154153
readConnectionID: "preinitID-" + readerConnectionID.String(),
155154
readerID: readerID,
156155
rawMessagesFromBuffer: make(chan rawtopicreader.ServerMessage, 1),
157156
}
158157

159-
res.committer = newCommitter(cfg.Trace, labeledContext, cfg.CommitMode, res.send)
158+
res.backgroundWorkers = *background.NewWorker(stopPump, fmt.Sprintf(
159+
"topic-reader-stream-background: %v",
160+
res.readerID,
161+
))
162+
163+
res.committer = newCommitterStopped(cfg.Trace, labeledContext, cfg.CommitMode, res.send, res.readerID)
160164
res.committer.BufferTimeLagTrigger = cfg.CommitterBatchTimeLag
161165
res.committer.BufferCountTrigger = cfg.CommitterBatchCounterTrigger
162166
res.sessionController.init()
@@ -413,11 +417,13 @@ func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error {
413417
return err
414418
}
415419

416-
func (r *topicStreamReaderImpl) startLoops() error {
420+
func (r *topicStreamReaderImpl) startBackgroundWorkers() error {
417421
if err := r.setStarted(); err != nil {
418422
return err
419423
}
420424

425+
r.committer.Start()
426+
421427
r.backgroundWorkers.Start("readMessagesLoop", r.readMessagesLoop)
422428
r.backgroundWorkers.Start("dataRequestLoop", r.dataRequestLoop)
423429
r.backgroundWorkers.Start("updateTokenLoop", r.updateTokenLoop)

Diff for: internal/topic/topicreaderinternal/stream_reader_impl_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1055,7 +1055,7 @@ func newTopicReaderTestEnv(t testing.TB) streamEnv {
10551055
}
10561056

10571057
func (e *streamEnv) Start() {
1058-
require.NoError(e.t, e.reader.startLoops())
1058+
require.NoError(e.t, e.reader.startBackgroundWorkers())
10591059
xtest.SpinWaitCondition(e.t, nil, func() bool {
10601060
return e.reader.restBufferSizeBytes.Load() == e.initialBufferSizeBytes
10611061
})

Diff for: internal/topic/topicreaderinternal/stream_reconnector.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/value"
18-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1918
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
2019
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2120
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -24,6 +23,7 @@ import (
2423
var (
2524
errReconnectRequestOutdated = xerrors.Wrap(errors.New("ydb: reconnect request outdated"))
2625
errReconnect = xerrors.Wrap(errors.New("ydb: reconnect to topic grpc stream"))
26+
errConnectionTimeout = xerrors.Wrap(errors.New("ydb: topic reader connection timeout for stream"))
2727
)
2828

2929
type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error)
@@ -33,6 +33,7 @@ type readerReconnector struct {
3333
clock clockwork.Clock
3434
retrySettings topic.RetrySettings
3535
streamVal batchedStreamReader
36+
streamContextCancel context.CancelCauseFunc
3637
streamErr error
3738
closedErr error
3839
initErr error
@@ -148,6 +149,7 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, err error) error
148149

149150
if r.streamVal != nil {
150151
streamCloseErr := r.streamVal.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed))
152+
r.streamContextCancel(errReaderClosed)
151153
if closeErr == nil {
152154
closeErr = streamCloseErr
153155
}
@@ -267,7 +269,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
267269
_ = oldReader.CloseWithError(ctx, xerrors.WithStackTrace(errReconnect))
268270
}
269271

270-
newStream, err := r.connectWithTimeout()
272+
newStream, newStreamClose, err := r.connectWithTimeout()
271273

272274
if r.isRetriableError(err) {
273275
go func(reason error) {
@@ -281,6 +283,7 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
281283
r.streamErr = err
282284
if err == nil {
283285
r.streamVal = newStream
286+
r.streamContextCancel = newStreamClose
284287
if !r.initDone {
285288
r.initDone = true
286289
close(r.initDoneCh)
@@ -304,14 +307,14 @@ func (r *readerReconnector) checkErrRetryMode(err error, retriesDuration time.Du
304307
return topic.CheckRetryMode(err, r.retrySettings, retriesDuration)
305308
}
306309

307-
func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err error) {
310+
func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, _ context.CancelCauseFunc, err error) {
308311
bgContext := r.background.Context()
309312

310313
if err = bgContext.Err(); err != nil {
311-
return nil, err
314+
return nil, nil, err
312315
}
313316

314-
connectionContext, cancel := xcontext.WithCancel(context.Background())
317+
connectionContext, cancel := context.WithCancelCause(context.WithoutCancel(bgContext))
315318

316319
type connectResult struct {
317320
stream batchedStreamReader
@@ -332,17 +335,17 @@ func (r *readerReconnector) connectWithTimeout() (_ batchedStreamReader, err err
332335
case <-connectionTimoutTimer.Chan():
333336
// cancel connection context only if timeout exceed while connection
334337
// because if cancel context after connect - it will break
335-
cancel()
338+
cancel(xerrors.WithStackTrace(errConnectionTimeout))
336339
res = <-result
337340
case res = <-result:
338341
// pass
339342
}
340343

341344
if res.err == nil {
342-
return res.stream, nil
345+
return res.stream, cancel, nil
343346
}
344347

345-
return nil, res.err
348+
return nil, nil, res.err
346349
}
347350

348351
func (r *readerReconnector) WaitInit(ctx context.Context) error {

Diff for: internal/topic/topicreaderinternal/stream_reconnector_test.go

+14-5
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ func TestTopicReaderReconnectorReadMessageBatch(t *testing.T) {
3434
baseReader.EXPECT().ReadMessageBatch(gomock.Any(), opts).Return(batch, nil)
3535

3636
reader := &readerReconnector{
37-
streamVal: baseReader,
38-
tracer: &trace.Topic{},
37+
streamVal: baseReader,
38+
streamContextCancel: func(cause error) {},
39+
tracer: &trace.Topic{},
3940
}
4041
reader.initChannelsAndClock()
4142
res, err := reader.ReadMessageBatch(context.Background(), opts)
@@ -163,7 +164,11 @@ func TestTopicReaderReconnectorCommit(t *testing.T) {
163164
require.Equal(t, "v", ctx.Value(k{}))
164165
require.Equal(t, expectedCommitRange, offset)
165166
})
166-
reconnector := &readerReconnector{streamVal: stream, tracer: &trace.Topic{}}
167+
reconnector := &readerReconnector{
168+
streamVal: stream,
169+
streamContextCancel: func(cause error) {},
170+
tracer: &trace.Topic{},
171+
}
167172
reconnector.initChannelsAndClock()
168173
require.NoError(t, reconnector.Commit(ctx, expectedCommitRange))
169174
})
@@ -174,7 +179,11 @@ func TestTopicReaderReconnectorCommit(t *testing.T) {
174179
require.Equal(t, "v", ctx.Value(k{}))
175180
require.Equal(t, expectedCommitRange, offset)
176181
}).Return(testErr)
177-
reconnector := &readerReconnector{streamVal: stream, tracer: &trace.Topic{}}
182+
reconnector := &readerReconnector{
183+
streamVal: stream,
184+
streamContextCancel: func(cause error) {},
185+
tracer: &trace.Topic{},
186+
}
178187
reconnector.initChannelsAndClock()
179188
require.ErrorIs(t, reconnector.Commit(ctx, expectedCommitRange), testErr)
180189
})
@@ -209,7 +218,7 @@ func TestTopicReaderReconnectorConnectionLoop(t *testing.T) {
209218

210219
reconnector := &readerReconnector{
211220
connectTimeout: value.InfiniteDuration,
212-
background: *background.NewWorker(ctx),
221+
background: *background.NewWorker(ctx, "test-worker, "+t.Name()),
213222
tracer: &trace.Topic{},
214223
}
215224
reconnector.initChannelsAndClock()

Diff for: internal/topic/topicwriterinternal/writer_options.go

+9
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package topicwriterinternal
33
import (
44
"time"
55

6+
"github.com/jonboulle/clockwork"
7+
68
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
79
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
810
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
@@ -152,3 +154,10 @@ func WithTopic(topic string) PublicWriterOption {
152154
cfg.topic = topic
153155
}
154156
}
157+
158+
// WithClock is private option for tests
159+
func WithClock(clock clockwork.Clock) PublicWriterOption {
160+
return func(cfg *WriterReconnectorConfig) {
161+
cfg.clock = clock
162+
}
163+
}

0 commit comments

Comments
 (0)