Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 058af6a

Browse files
committedMar 18, 2025·
add context to topic logs
1 parent f6a74d6 commit 058af6a

29 files changed

+829
-208
lines changed
 

‎CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `WithReaderLogContext`, `WithWriterLogContext` options to topic reader/writer to supply log entries with user context fields
2+
13
## v3.104.4
24
* Fixed bug with session query latency metric collector
35

‎go.mod

+7-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/google/uuid v1.6.0
88
github.com/jonboulle/clockwork v0.3.0
99
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77
10+
go.uber.org/zap v1.27.0
1011
golang.org/x/net v0.33.0
1112
golang.org/x/sync v0.10.0
1213
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
@@ -17,18 +18,21 @@ require (
1718
// requires for tests only
1819
require (
1920
github.com/rekby/fixenv v0.6.1
20-
github.com/stretchr/testify v1.7.1
21+
github.com/stretchr/testify v1.10.0
2122
go.uber.org/mock v0.4.0
2223
)
2324

2425
require (
25-
github.com/davecgh/go-spew v1.1.0 // indirect
26+
github.com/davecgh/go-spew v1.1.1 // indirect
2627
github.com/golang/protobuf v1.5.3 // indirect
28+
github.com/kr/pretty v0.1.0 // indirect
2729
github.com/pmezard/go-difflib v1.0.0 // indirect
30+
go.uber.org/multierr v1.11.0 // indirect
2831
golang.org/x/sys v0.28.0 // indirect
2932
golang.org/x/text v0.21.0 // indirect
3033
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
31-
gopkg.in/yaml.v3 v3.0.0 // indirect
34+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
35+
gopkg.in/yaml.v3 v3.0.1 // indirect
3236
)
3337

3438
retract v3.67.1 // decimal broken https://github.com/ydb-platform/ydb-go-sdk/issues/1234

‎go.sum

+20-6
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP
1111
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1212
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1313
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
14-
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
1514
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
15+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
16+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1617
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
1718
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
1819
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -54,6 +55,12 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
5455
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
5556
github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg=
5657
github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
58+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
59+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
60+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
61+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
62+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
63+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
5764
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
5865
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5966
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -63,13 +70,19 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
6370
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
6471
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
6572
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
66-
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
67-
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
73+
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
74+
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
6875
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 h1:LY6cI8cP4B9rrpTleZk95+08kl2gF4rixG7+V/dwL6Q=
6976
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
7077
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
78+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
79+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
7180
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
7281
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
82+
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
83+
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
84+
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
85+
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
7386
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
7487
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
7588
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -146,12 +159,13 @@ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
146159
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
147160
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
148161
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
149-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
150162
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
163+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
164+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
151165
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
152166
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
153167
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
154-
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
155-
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
168+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
169+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
156170
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
157171
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

‎internal/grpcwrapper/rawtopic/client.go

+2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S
104104
func (c *Client) StreamWrite(
105105
ctxStreamLifeTime context.Context,
106106
tracer *trace.Topic,
107+
logContext *context.Context,
107108
) (*rawtopicwriter.StreamWriter, error) {
108109
protoResp, err := c.service.StreamWrite(ctxStreamLifeTime)
109110
if err != nil {
@@ -118,6 +119,7 @@ func (c *Client) StreamWrite(
118119
Stream: protoResp,
119120
Tracer: tracer,
120121
InternalStreamID: uuid.New().String(),
122+
LogContext: logContext,
121123
}, nil
122124
}
123125

‎internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rawtopicwriter
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"reflect"
@@ -34,6 +35,7 @@ type StreamWriter struct {
3435
readMessagesCount int
3536
writtenMessagesCount int
3637
sessionID string
38+
LogContext *context.Context
3739
}
3840

3941
//nolint:funlen
@@ -50,7 +52,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
5052
defer func() {
5153
// defer needs for set good session id on first init response before trace the message
5254
trace.TopicOnWriterReceiveGRPCMessage(
53-
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
55+
w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
5456
)
5557
}()
5658
if sendErr != nil {
@@ -139,7 +141,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
139141

140142
err = w.Stream.Send(&protoMsg)
141143
w.writtenMessagesCount++
142-
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
144+
trace.TopicOnWriterSentGRPCMessage(
145+
w.Tracer,
146+
w.LogContext,
147+
w.InternalStreamID,
148+
w.sessionID,
149+
w.writtenMessagesCount,
150+
&protoMsg,
151+
err,
152+
)
143153
if err != nil {
144154
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
145155
}

‎internal/topic/topicclientinternal/client.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,8 @@ func (c *Client) StartReader(
318318
if err != nil {
319319
return nil, err
320320
}
321-
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)
321+
322+
internalReader.TopicOnReaderStart(consumer, err)
322323

323324
return topicreader.NewReader(internalReader), nil
324325
}
@@ -361,15 +362,8 @@ func (c *Client) createWriterConfig(
361362
topicPath string,
362363
opts []topicoptions.WriterOption,
363364
) topicwriterinternal.WriterReconnectorConfig {
364-
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
365-
topicwriterinternal.RawTopicWriterStream,
366-
error,
367-
) {
368-
return c.rawClient.StreamWrite(ctx, tracer)
369-
}
370-
371365
options := []topicoptions.WriterOption{
372-
topicwriterinternal.WithConnectFunc(connector),
366+
topicwriterinternal.WithRawClient(&c.rawClient),
373367
topicwriterinternal.WithTopic(topicPath),
374368
topicwriterinternal.WithCommonConfig(c.cfg.Common),
375369
topicwriterinternal.WithTrace(c.cfg.Trace),

‎internal/topic/topicreadercommon/committer.go

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) {
148148

149149
onDone := trace.TopicOnReaderSendCommitMessage(
150150
c.tracer,
151+
&ctx,
151152
&commits,
152153
)
153154
err := c.send(commits.ToRawMessage())

‎internal/topic/topicreaderinternal/batched_stream_reader_interface.go

+1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ type batchedStreamReader interface {
1515
Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
1616
CloseWithError(ctx context.Context, err error) error
1717
PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
18+
TopicOnReaderStart(consumer string, err error)
1819
}

‎internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go

+36
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎internal/topic/topicreaderinternal/reader.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type Reader struct {
3535
readerID int64
3636
}
3737

38+
func (r *Reader) TopicOnReaderStart(consumer string, err error) {
39+
r.reader.TopicOnReaderStart(consumer, err)
40+
}
41+
3842
type ReadMessageBatchOptions struct {
3943
batcherGetOptions
4044
}
@@ -89,14 +93,17 @@ func NewReader(
8993
return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig)
9094
}
9195

96+
reader := newReaderReconnector(
97+
cfg.BaseContext,
98+
readerID,
99+
readerConnector,
100+
cfg.OperationTimeout(),
101+
cfg.RetrySettings,
102+
cfg.Trace,
103+
)
104+
92105
res := Reader{
93-
reader: newReaderReconnector(
94-
readerID,
95-
readerConnector,
96-
cfg.OperationTimeout(),
97-
cfg.RetrySettings,
98-
cfg.Trace,
99-
),
106+
reader: reader,
100107
defaultBatchConfig: cfg.DefaultBatchConfig,
101108
tracer: cfg.Trace,
102109
readerID: readerID,

‎internal/topic/topicreaderinternal/stream_reader_impl.go

+116-90
Original file line numberDiff line numberDiff line change
@@ -185,23 +185,26 @@ func (r *topicStreamReaderImpl) WaitInit(_ context.Context) error {
185185
return nil
186186
}
187187

188+
func (r *topicStreamReaderImpl) TopicOnReaderStart(consumer string, err error) {
189+
logCtx := r.cfg.BaseContext
190+
trace.TopicOnReaderStart(r.cfg.Trace, &logCtx, r.readerID, consumer, err)
191+
}
192+
188193
func (r *topicStreamReaderImpl) PopMessagesBatchTx(
189194
ctx context.Context,
190195
tx tx.Transaction,
191196
opts ReadMessageBatchOptions,
192197
) (_ *topicreadercommon.PublicBatch, resErr error) {
193-
traceCtx := ctx
194-
onDone := trace.TopicOnReaderStreamPopBatchTx(
195-
r.cfg.Trace,
196-
&traceCtx,
197-
r.readerID,
198-
r.readConnectionID,
199-
tx.SessionID(),
200-
tx,
201-
)
202-
ctx = traceCtx
203198
defer func() {
204-
onDone(resErr)
199+
logCtx := r.cfg.BaseContext
200+
trace.TopicOnReaderStreamPopBatchTx(
201+
r.cfg.Trace,
202+
&logCtx,
203+
r.readerID,
204+
r.readConnectionID,
205+
tx.SessionID(),
206+
tx,
207+
)(resErr)
205208
}()
206209

207210
batch, err := r.ReadMessageBatch(ctx, opts)
@@ -226,51 +229,47 @@ func (r *topicStreamReaderImpl) commitWithTransaction(
226229
}
227230

228231
req := r.createUpdateOffsetRequest(ctx, batch, tx)
229-
updateOffesetInTransactionErr := retry.Retry(ctx, func(ctx context.Context) (err error) {
230-
traceCtx := ctx
231-
onDone := trace.TopicOnReaderUpdateOffsetsInTransaction(
232-
r.cfg.Trace,
233-
&traceCtx,
234-
r.readerID,
235-
r.readConnectionID,
236-
tx.SessionID(),
237-
tx,
238-
)
232+
updateOffsetInTransactionErr := retry.Retry(ctx, func(ctx context.Context) (err error) {
239233
defer func() {
240-
onDone(err)
234+
logCtx := r.cfg.BaseContext
235+
trace.TopicOnReaderUpdateOffsetsInTransaction(
236+
r.cfg.Trace,
237+
&logCtx,
238+
r.readerID,
239+
r.readConnectionID,
240+
tx.SessionID(),
241+
tx,
242+
)
241243
}()
242244

243-
ctx = traceCtx
244245
err = r.topicClient.UpdateOffsetsInTransaction(ctx, req)
245246

246247
return err
247248
})
248-
if updateOffesetInTransactionErr == nil {
249-
r.addOnTransactionCompletedHandler(ctx, tx, batch, updateOffesetInTransactionErr)
249+
if updateOffsetInTransactionErr == nil {
250+
r.addOnTransactionCompletedHandler(ctx, tx, batch, updateOffsetInTransactionErr)
250251
} else {
251252
_ = retry.Retry(ctx, func(ctx context.Context) (err error) {
252-
traceCtx := ctx
253-
onDone := trace.TopicOnReaderTransactionRollback(
254-
r.cfg.Trace,
255-
&traceCtx,
256-
r.readerID,
257-
r.readConnectionID,
258-
tx.SessionID(),
259-
tx,
260-
)
261-
ctx = traceCtx
262253
defer func() {
263-
onDone(err)
254+
logCtx := r.cfg.BaseContext
255+
trace.TopicOnReaderTransactionRollback(
256+
r.cfg.Trace,
257+
&logCtx,
258+
r.readerID,
259+
r.readConnectionID,
260+
tx.SessionID(),
261+
tx,
262+
)(err)
264263
}()
265264

266265
return tx.Rollback(ctx)
267266
})
268267

269268
_ = r.CloseWithError(xcontext.ValueOnly(ctx), xerrors.WithStackTrace(xerrors.Retryable(
270-
fmt.Errorf("ydb: failed add topic offsets in transaction: %w", updateOffesetInTransactionErr),
269+
fmt.Errorf("ydb: failed add topic offsets in transaction: %w", updateOffsetInTransactionErr),
271270
)))
272271

273-
return updateOffesetInTransactionErr
272+
return updateOffsetInTransactionErr
274273
}
275274

276275
return nil
@@ -284,19 +283,19 @@ func (r *topicStreamReaderImpl) addOnTransactionCompletedHandler(
284283
) {
285284
commitRange := topicreadercommon.GetCommitRange(batch)
286285
tx.OnCompleted(func(transactionResult error) {
287-
traceCtx := ctx
288-
onDone := trace.TopicOnReaderTransactionCompleted(
289-
r.cfg.Trace,
290-
&traceCtx,
291-
r.readerID,
292-
r.readConnectionID,
293-
tx.SessionID(),
294-
tx,
295-
transactionResult,
296-
)
297-
defer onDone()
286+
defer func() {
287+
logCtx := r.cfg.BaseContext
288+
trace.TopicOnReaderTransactionCompleted(
289+
r.cfg.Trace,
290+
&logCtx,
291+
r.readerID,
292+
r.readConnectionID,
293+
tx.SessionID(),
294+
tx,
295+
transactionResult,
296+
)
297+
}()
298298

299-
ctx = traceCtx
300299
if transactionResult == nil {
301300
topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd)
302301
} else {
@@ -344,27 +343,39 @@ func (r *topicStreamReaderImpl) ReadMessageBatch(
344343
ctx context.Context,
345344
opts ReadMessageBatchOptions,
346345
) (batch *topicreadercommon.PublicBatch, err error) {
347-
onDone := trace.TopicOnReaderReadMessages(
348-
r.cfg.Trace,
349-
&ctx,
350-
opts.MinCount,
351-
opts.MaxCount,
352-
r.getRestBufferBytes(),
353-
)
354346
defer func() {
347+
traceFunc := func(
348+
messagesCount int,
349+
topic string,
350+
partitionID int64,
351+
partitionSessionID int64,
352+
offsetStart int64,
353+
offsetEnd int64,
354+
freeBufferCapacity int,
355+
) {
356+
mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext)
357+
onDone := trace.TopicOnReaderReadMessages(
358+
r.cfg.Trace,
359+
mergeCtx.AsCtx(),
360+
opts.MinCount,
361+
opts.MaxCount,
362+
r.getRestBufferBytes(),
363+
)
364+
onDone(messagesCount, topic, partitionID, partitionSessionID, offsetStart, offsetEnd, freeBufferCapacity, nil)
365+
ctx = mergeCtx.RequestContext
366+
}
355367
if batch == nil {
356-
onDone(0, "", -1, -1, -1, -1, r.getRestBufferBytes(), err)
368+
traceFunc(0, "", -1, -1, -1, -1, r.getRestBufferBytes())
357369
} else {
358370
commitRange := topicreadercommon.GetCommitRange(batch)
359-
onDone(
371+
traceFunc(
360372
len(batch.Messages),
361373
batch.Topic(),
362374
batch.PartitionID(),
363375
topicreadercommon.BatchGetPartitionSession(batch).StreamPartitionSessionID.ToInt64(),
364376
commitRange.CommitOffsetStart.ToInt64(),
365377
commitRange.CommitOffsetEnd.ToInt64(),
366378
r.getRestBufferBytes(),
367-
err,
368379
)
369380
}
370381
}()
@@ -462,7 +473,6 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer(
462473
if err != nil {
463474
return err
464475
}
465-
466476
onDone := trace.TopicOnReaderPartitionReadStopResponse(
467477
r.cfg.Trace,
468478
r.readConnectionID,
@@ -525,18 +535,19 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange topicrea
525535
return xerrors.WithStackTrace(errCommitWithNilPartitionSession)
526536
}
527537

528-
session := commitRange.PartitionSession
529-
onDone := trace.TopicOnReaderCommit(
530-
r.cfg.Trace,
531-
&ctx,
532-
session.Topic,
533-
session.PartitionID,
534-
session.StreamPartitionSessionID.ToInt64(),
535-
commitRange.CommitOffsetStart.ToInt64(),
536-
commitRange.CommitOffsetEnd.ToInt64(),
537-
)
538538
defer func() {
539-
onDone(err)
539+
session := commitRange.PartitionSession
540+
mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext)
541+
trace.TopicOnReaderCommit(
542+
r.cfg.Trace,
543+
mergeCtx.AsCtx(),
544+
session.Topic,
545+
session.PartitionID,
546+
session.StreamPartitionSessionID.ToInt64(),
547+
commitRange.CommitOffsetStart.ToInt64(),
548+
commitRange.CommitOffsetEnd.ToInt64(),
549+
)
550+
ctx = mergeCtx.RequestContext
540551
}()
541552

542553
if err = r.checkCommitRange(commitRange); err != nil {
@@ -574,7 +585,9 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange topicreadercommon.C
574585
func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error {
575586
err := r.stream.Send(msg)
576587
if err != nil {
577-
trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err)
588+
logCtx := r.cfg.BaseContext
589+
trace.TopicOnReaderError(r.cfg.Trace, &logCtx, r.readConnectionID, err)
590+
578591
_ = r.CloseWithError(r.ctx, err)
579592
}
580593

@@ -613,8 +626,9 @@ func (r *topicStreamReaderImpl) setStarted() error {
613626
func (r *topicStreamReaderImpl) initSession() (err error) {
614627
initMessage := topicreadercommon.CreateInitMessage(r.cfg.Consumer, r.cfg.ReadSelectors)
615628

616-
onDone := trace.TopicOnReaderInit(r.cfg.Trace, r.readConnectionID, initMessage)
617629
defer func() {
630+
logCtx := r.cfg.BaseContext
631+
onDone := trace.TopicOnReaderInit(r.cfg.Trace, &logCtx, r.readConnectionID, initMessage)
618632
onDone(r.readConnectionID, err)
619633
}()
620634

@@ -664,9 +678,10 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) {
664678
for {
665679
serverMessage, err := r.stream.Recv()
666680
if err != nil {
667-
trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err)
681+
logCtx := r.cfg.BaseContext
682+
trace.TopicOnReaderError(r.cfg.Trace, &logCtx, r.readConnectionID, err)
668683
if errors.Is(err, rawtopicreader.ErrUnexpectedMessageType) {
669-
trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, r.readConnectionID, err)
684+
trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, &logCtx, r.readConnectionID, err)
670685
// new messages can be added to protocol, it must be backward compatible to old programs
671686
// and skip message is safe
672687
continue
@@ -712,14 +727,18 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) {
712727
case *rawtopicreader.UpdateTokenResponse:
713728
r.onUpdateTokenResponse(m)
714729
default:
715-
trace.TopicOnReaderUnknownGrpcMessage(
716-
r.cfg.Trace,
717-
r.readConnectionID,
718-
xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
719-
"ydb: unexpected message type in stream reader: %v",
720-
reflect.TypeOf(serverMessage),
721-
))),
722-
)
730+
{
731+
logCtx := r.cfg.BaseContext
732+
trace.TopicOnReaderUnknownGrpcMessage(
733+
r.cfg.Trace,
734+
&logCtx,
735+
r.readConnectionID,
736+
xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
737+
"ydb: unexpected message type in stream reader: %v",
738+
reflect.TypeOf(serverMessage),
739+
))),
740+
)
741+
}
723742
}
724743
}
725744
}
@@ -753,7 +772,8 @@ func (r *topicStreamReaderImpl) dataRequestLoop(ctx context.Context) {
753772
}
754773

755774
resCapacity := r.addRestBufferBytes(sum)
756-
trace.TopicOnReaderSentDataRequest(r.cfg.Trace, r.readConnectionID, sum, resCapacity)
775+
logCtx := r.cfg.BaseContext
776+
trace.TopicOnReaderSentDataRequest(r.cfg.Trace, &logCtx, r.readConnectionID, sum, resCapacity)
757777
if err := r.sendDataRequest(sum); err != nil {
758778
return
759779
}
@@ -793,9 +813,9 @@ func (r *topicStreamReaderImpl) updateTokenLoop(ctx context.Context) {
793813

794814
func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) (err error) {
795815
resCapacity := r.addRestBufferBytes(-msg.BytesSize)
796-
onDone := trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, r.readConnectionID, resCapacity, msg)
797816
defer func() {
798-
onDone(err)
817+
logCtx := r.cfg.BaseContext
818+
trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, &logCtx, r.readConnectionID, resCapacity, msg)
799819
}()
800820

801821
batches, err2 := topicreadercommon.ReadRawBatchesToPublicBatches(msg, &r.sessionController, r.cfg.Decoders)
@@ -813,8 +833,10 @@ func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse)
813833
}
814834

815835
func (r *topicStreamReaderImpl) CloseWithError(ctx context.Context, reason error) (closeErr error) {
816-
onDone := trace.TopicOnReaderClose(r.cfg.Trace, r.readConnectionID, reason)
817-
defer onDone(closeErr)
836+
defer func() {
837+
logCtx := r.cfg.BaseContext
838+
trace.TopicOnReaderClose(r.cfg.Trace, &logCtx, r.readConnectionID, reason)
839+
}()
818840

819841
isFirstClose := false
820842
r.m.WithLock(func() {
@@ -862,8 +884,10 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse
862884
}
863885
partition.SetCommittedOffsetForward(commit.CommittedOffset)
864886

887+
logCtx := r.cfg.BaseContext
865888
trace.TopicOnReaderCommittedNotify(
866889
r.cfg.Trace,
890+
&logCtx,
867891
r.readConnectionID,
868892
partition.Topic,
869893
partition.PartitionID,
@@ -878,12 +902,14 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse
878902
}
879903

880904
func (r *topicStreamReaderImpl) updateToken(ctx context.Context) {
905+
logCtx := r.cfg.BaseContext
881906
onUpdateToken := trace.TopicOnReaderUpdateToken(
882907
r.cfg.Trace,
908+
&logCtx,
883909
r.readConnectionID,
884910
)
885911
token, err := r.cfg.Cred.Token(ctx)
886-
onSent := onUpdateToken(len(token), err)
912+
onSent := onUpdateToken(&ctx, len(token), err)
887913
if err != nil {
888914
return
889915
}

‎internal/topic/topicreaderinternal/stream_reconnector.go

+38-12
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error)
3434
type readerReconnector struct {
3535
background background.Worker
3636
clock clockwork.Clock
37+
logContext context.Context //nolint:containedctx
3738
retrySettings topic.RetrySettings
3839
streamVal batchedStreamReader
3940
streamContextCancel context.CancelCauseFunc
@@ -52,6 +53,7 @@ type readerReconnector struct {
5253
}
5354

5455
func newReaderReconnector(
56+
logContext context.Context,
5557
readerID int64,
5658
connector readerConnectFunc,
5759
connectTimeout time.Duration,
@@ -64,6 +66,7 @@ func newReaderReconnector(
6466
readerConnect: connector,
6567
streamErr: errUnconnected,
6668
connectTimeout: connectTimeout,
69+
logContext: logContext,
6770
tracer: tracer,
6871
retrySettings: retrySettings,
6972
}
@@ -78,6 +81,15 @@ func newReaderReconnector(
7881
return res
7982
}
8083

84+
func (r *readerReconnector) TopicOnReaderStart(consumer string, err error) {
85+
logCtx := r.logContext
86+
trace.TopicOnReaderStart(r.tracer, &logCtx, r.readerID, consumer, err)
87+
}
88+
89+
func (r *readerReconnector) SetLogContext(ctx context.Context) {
90+
r.logContext = ctx
91+
}
92+
8193
func (r *readerReconnector) PopMessagesBatchTx(
8294
ctx context.Context,
8395
tx tx.Transaction,
@@ -263,8 +275,6 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) {
263275
}
264276
}
265277

266-
onReconnectionDone := trace.TopicOnReaderReconnect(r.tracer, request.reason)
267-
268278
if request.reason != nil {
269279
retryBackoff, stopRetryReason := r.checkErrRetryMode(
270280
request.reason,
@@ -286,21 +296,25 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) {
286296
}
287297
} else {
288298
_ = r.CloseWithError(ctx, stopRetryReason)
289-
onReconnectionDone(stopRetryReason)
299+
logCtx := r.logContext
300+
trace.TopicOnReaderReconnect(r.tracer, &logCtx, request.reason)(stopRetryReason)
290301

291302
return
292303
}
293304
}
294305

295306
err := r.reconnect(ctx, request.reason, request.oldReader)
296-
onReconnectionDone(err)
307+
logCtx := r.logContext
308+
trace.TopicOnReaderReconnect(r.tracer, &logCtx, request.reason)(err)
297309
}
298310
}
299311

300312
//nolint:funlen
301313
func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldReader batchedStreamReader) (err error) {
302-
onDone := trace.TopicOnReaderReconnect(r.tracer, reason)
303-
defer func() { onDone(err) }()
314+
defer func() {
315+
logCtx := r.logContext
316+
trace.TopicOnReaderReconnect(r.tracer, &logCtx, reason)(err)
317+
}()
304318

305319
if err = ctx.Err(); err != nil {
306320
return err
@@ -339,9 +353,15 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
339353
r.background.Start("ydb topic reader send reconnect message", func(ctx context.Context) {
340354
select {
341355
case r.reconnectFromBadStream <- newReconnectRequest(oldReader, sendReason):
342-
trace.TopicOnReaderReconnectRequest(r.tracer, err, true)
356+
{
357+
logCtx := r.logContext
358+
trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, true)
359+
}
343360
case <-ctx.Done():
344-
trace.TopicOnReaderReconnectRequest(r.tracer, ctx.Err(), false)
361+
{
362+
logCtx := r.logContext
363+
trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, false)
364+
}
345365
}
346366
})
347367
default:
@@ -440,11 +460,17 @@ func (r *readerReconnector) fireReconnectOnRetryableError(stream batchedStreamRe
440460

441461
select {
442462
case r.reconnectFromBadStream <- newReconnectRequest(stream, err):
443-
// send signal
444-
trace.TopicOnReaderReconnectRequest(r.tracer, err, true)
463+
{
464+
// send signal
465+
logCtx := r.logContext
466+
trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, true)
467+
}
445468
default:
446-
// previous reconnect signal in process, no need sent signal more
447-
trace.TopicOnReaderReconnectRequest(r.tracer, err, false)
469+
{
470+
// previous reconnect signal in process, no need sent signal more
471+
logCtx := r.logContext
472+
trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, false)
473+
}
448474
}
449475
}
450476

‎internal/topic/topicwriterinternal/encoders.go

+8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package topicwriterinternal
33
import (
44
"bytes"
55
"compress/gzip"
6+
"context"
67
"fmt"
78
"io"
89
"sync"
@@ -164,9 +165,11 @@ type EncoderSelector struct {
164165
parallelCompressors int
165166
batchCounter int
166167
measureIntervalBatches int
168+
logContext context.Context //nolint:containedctx
167169
}
168170

169171
func NewEncoderSelector(
172+
logContext context.Context,
170173
m *MultiEncoder,
171174
allowedCodecs rawtopiccommon.SupportedCodecs,
172175
parallelCompressors int,
@@ -184,6 +187,7 @@ func NewEncoderSelector(
184187
tracer: tracer,
185188
writerReconnectorID: writerReconnectorID,
186189
sessionID: sessionID,
190+
logContext: logContext,
187191
}
188192
res.ResetAllowedCodecs(allowedCodecs)
189193

@@ -193,8 +197,10 @@ func NewEncoderSelector(
193197
func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (rawtopiccommon.Codec, error) {
194198
codec, err := s.selectCodec(messages)
195199
if err == nil {
200+
logCtx := s.logContext
196201
onCompressDone := trace.TopicOnWriterCompressMessages(
197202
s.tracer,
203+
&logCtx,
198204
s.writerReconnectorID,
199205
s.sessionID,
200206
codec.ToInt32(),
@@ -263,8 +269,10 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt
263269
if len(messages) > 0 {
264270
firstSeqNo = messages[0].SeqNo
265271
}
272+
logCtx := s.logContext
266273
onCompressDone := trace.TopicOnWriterCompressMessages(
267274
s.tracer,
275+
&logCtx,
268276
s.writerReconnectorID,
269277
s.sessionID,
270278
codec.ToInt32(),

‎internal/topic/topicwriterinternal/encoders_test.go

+22-3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package topicwriterinternal
33
import (
44
"bytes"
55
"compress/gzip"
6+
"context"
67
"fmt"
78
"io"
89
"strings"
@@ -15,14 +16,32 @@ import (
1516
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
1617
)
1718

19+
func NewTestEncoderSelector(
20+
m *MultiEncoder,
21+
allowedCodecs rawtopiccommon.SupportedCodecs,
22+
parallelCompressors int,
23+
tracer *trace.Topic,
24+
writerReconnectorID, sessionID string,
25+
) EncoderSelector {
26+
return NewEncoderSelector(
27+
context.Background(),
28+
m,
29+
allowedCodecs,
30+
parallelCompressors,
31+
tracer,
32+
writerReconnectorID,
33+
sessionID,
34+
)
35+
}
36+
1837
func TestEncoderSelector_CodecMeasure(t *testing.T) {
1938
t.Run("Empty", func(t *testing.T) {
20-
s := NewEncoderSelector(testCommonEncoders, nil, 1, &trace.Topic{}, "", "")
39+
s := NewTestEncoderSelector(testCommonEncoders, nil, 1, &trace.Topic{}, "", "")
2140
_, err := s.measureCodecs(nil)
2241
require.Error(t, err)
2342
})
2443
t.Run("One", func(t *testing.T) {
25-
s := NewEncoderSelector(
44+
s := NewTestEncoderSelector(
2645
NewMultiEncoder(),
2746
rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw},
2847
1,
@@ -44,7 +63,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) {
4463
)
4564

4665
testSelectCodec := func(t testing.TB, targetCodec rawtopiccommon.Codec, smallCount, largeCount int) {
47-
s := NewEncoderSelector(testCommonEncoders, rawtopiccommon.SupportedCodecs{
66+
s := NewTestEncoderSelector(testCommonEncoders, rawtopiccommon.SupportedCodecs{
4867
rawtopiccommon.CodecRaw,
4968
rawtopiccommon.CodecGzip,
5069
}, 4,

‎internal/topic/topicwriterinternal/writer_config.go

+4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package topicwriterinternal
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/jonboulle/clockwork"
78

89
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic"
911
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1012
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
1113
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -19,7 +21,9 @@ type WritersCommonConfig struct {
1921
compressorCount int
2022
maxBytesPerMessage int
2123

24+
LogContext context.Context //nolint:containedctx
2225
Tracer *trace.Topic
26+
rawTopicClient *rawtopic.Client
2327
cred credentials.Credentials
2428
credUpdateInterval time.Duration
2529
clock clockwork.Clock

‎internal/topic/topicwriterinternal/writer_options.go

+9
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/ydb-platform/ydb-go-sdk/v3/credentials"
99
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter"
1213
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
@@ -77,6 +78,14 @@ func WithCredentials(cred credentials.Credentials) PublicWriterOption {
7778
}
7879
}
7980

81+
// WithRawClient for internal usage only
82+
// no proxy to public interface
83+
func WithRawClient(rawClient *rawtopic.Client) PublicWriterOption {
84+
return func(cfg *WriterReconnectorConfig) {
85+
cfg.rawTopicClient = rawClient
86+
}
87+
}
88+
8089
func WithCodec(codec rawtopiccommon.Codec) PublicWriterOption {
8190
return func(cfg *WriterReconnectorConfig) {
8291
cfg.forceCodec = codec

‎internal/topic/topicwriterinternal/writer_reconnector.go

+23-3
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,22 @@ func NewWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnector
113113
WithProducerID(uuid.NewString())(&cfg)
114114
}
115115

116+
if cfg.Connect == nil {
117+
logContext := context.Background()
118+
if cfg.LogContext != nil {
119+
logContext = cfg.LogContext
120+
}
121+
122+
var connector ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
123+
RawTopicWriterStream,
124+
error,
125+
) {
126+
return cfg.rawTopicClient.StreamWrite(ctx, tracer, &logContext)
127+
}
128+
129+
cfg.Connect = connector
130+
}
131+
116132
return cfg
117133
}
118134

@@ -324,8 +340,10 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage)
324340
w.m.WithRLock(func() {
325341
sessionID = w.sessionID
326342
})
343+
logCtx := w.cfg.LogContext
327344
onCompressDone := trace.TopicOnWriterCompressMessages(
328345
w.cfg.Tracer,
346+
&logCtx,
329347
w.writerInstanceID,
330348
sessionID,
331349
w.cfg.forceCodec.ToInt32(),
@@ -340,6 +358,7 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage)
340358
}
341359
err := cacheMessages(res, targetCodec, w.cfg.compressorCount)
342360
onCompressDone(err)
361+
343362
if err != nil {
344363
return nil, err
345364
}
@@ -366,9 +385,9 @@ func (w *WriterReconnector) Close(ctx context.Context) error {
366385
}
367386

368387
func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr error) {
369-
onDone := trace.TopicOnWriterClose(w.cfg.Tracer, w.writerInstanceID, reason)
370388
defer func() {
371-
onDone(resErr)
389+
logCtx := w.cfg.LogContext
390+
trace.TopicOnWriterClose(w.cfg.Tracer, &logCtx, w.writerInstanceID, reason)
372391
}()
373392

374393
// stop background work and single stream writer
@@ -387,7 +406,6 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err
387406

388407
func (w *WriterReconnector) connectionLoop(ctx context.Context) {
389408
attempt := 0
390-
391409
createStreamContext := func() (context.Context, context.CancelFunc) {
392410
// need suppress parent context cancelation for flush buffer while close writer
393411
return xcontext.WithCancel(xcontext.ValueOnly(ctx))
@@ -425,8 +443,10 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
425443
}
426444
}
427445

446+
logCtx := w.cfg.LogContext
428447
onWriterStarted := trace.TopicOnWriterReconnect(
429448
w.cfg.Tracer,
449+
&logCtx,
430450
w.writerInstanceID,
431451
w.cfg.topic,
432452
w.cfg.producerID,

‎internal/topic/topicwriterinternal/writer_single_stream.go

+30-12
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,17 @@ func (w *SingleStreamWriter) start() {
125125
}
126126

127127
func (w *SingleStreamWriter) initStream() (err error) {
128-
traceOnDone := trace.TopicOnWriterInitStream(w.cfg.Tracer, w.cfg.reconnectorInstanceID, w.cfg.topic, w.cfg.producerID)
129-
defer func() { traceOnDone(w.SessionID, err) }()
128+
defer func() {
129+
logCtx := w.cfg.LogContext
130+
traceOnDone := trace.TopicOnWriterInitStream(
131+
w.cfg.Tracer,
132+
&logCtx,
133+
w.cfg.reconnectorInstanceID,
134+
w.cfg.topic,
135+
w.cfg.producerID,
136+
)
137+
traceOnDone(w.SessionID, err)
138+
}()
130139

131140
req := w.createInitRequest()
132141
if err = w.cfg.stream.Send(&req); err != nil {
@@ -149,6 +158,7 @@ func (w *SingleStreamWriter) initStream() (err error) {
149158
}
150159

151160
w.Encoder = NewEncoderSelector(
161+
w.cfg.LogContext,
152162
w.cfg.encodersMap,
153163
w.allowedCodecs,
154164
w.cfg.compressorCount,
@@ -203,15 +213,19 @@ func (w *SingleStreamWriter) receiveMessagesLoop(ctx context.Context) {
203213
case *rawtopicwriter.UpdateTokenResponse:
204214
// pass
205215
default:
206-
trace.TopicOnWriterReadUnknownGrpcMessage(
207-
w.cfg.Tracer,
208-
w.cfg.reconnectorInstanceID,
209-
w.SessionID,
210-
xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
211-
"ydb: unexpected message type in stream reader: %v",
212-
reflect.TypeOf(m),
213-
))),
214-
)
216+
{
217+
logCtx := w.cfg.LogContext
218+
trace.TopicOnWriterReadUnknownGrpcMessage(
219+
w.cfg.Tracer,
220+
&logCtx,
221+
w.cfg.reconnectorInstanceID,
222+
w.SessionID,
223+
xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
224+
"ydb: unexpected message type in stream reader: %v",
225+
reflect.TypeOf(m),
226+
))),
227+
)
228+
}
215229
}
216230
}
217231
}
@@ -232,16 +246,20 @@ func (w *SingleStreamWriter) sendMessagesFromQueueToStreamLoop(ctx context.Conte
232246
return
233247
}
234248

249+
err = sendMessagesToStream(w.cfg.stream, w.cfg.maxBytesPerMessage, targetCodec, messages)
250+
251+
logCtx := w.cfg.LogContext
235252
onSentComplete := trace.TopicOnWriterSendMessages(
236253
w.cfg.Tracer,
254+
&logCtx,
237255
w.cfg.reconnectorInstanceID,
238256
w.SessionID,
239257
targetCodec.ToInt32(),
240258
messages[0].SeqNo,
241259
len(messages),
242260
)
243-
err = sendMessagesToStream(w.cfg.stream, w.cfg.maxBytesPerMessage, targetCodec, messages)
244261
onSentComplete(err)
262+
245263
if err != nil {
246264
err = xerrors.WithStackTrace(fmt.Errorf("ydb: error send message to topic stream: %w", err))
247265
_ = w.close(ctx, err)

‎internal/topic/topicwriterinternal/writer_transaction.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ func (w *WriterWithTransaction) onBeforeCommitTransaction(ctx context.Context) (
3939
w.streamWriter.GetSessionID(),
4040
w.tx.ID(),
4141
)
42-
ctx = traceCtx
4342

4443
defer func() {
4544
onDone(err, w.streamWriter.GetSessionID())
45+
ctx = traceCtx
4646
}()
4747

4848
// wait message flushing

‎internal/xcontext/merge_contexts.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package xcontext
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
type MergedContexts struct {
9+
context.Context //nolint:containedctx
10+
BaseContext context.Context //nolint:containedctx
11+
RequestContext context.Context //nolint:containedctx
12+
}
13+
14+
func (ctx MergedContexts) Deadline() (deadline time.Time, ok bool) {
15+
return ctx.RequestContext.Deadline()
16+
}
17+
18+
func (ctx MergedContexts) Done() <-chan struct{} {
19+
return ctx.RequestContext.Done()
20+
}
21+
22+
func (ctx MergedContexts) Err() error {
23+
return ctx.RequestContext.Err()
24+
}
25+
26+
func (ctx MergedContexts) Value(key interface{}) interface{} {
27+
if ctx.RequestContext.Value(key) != nil {
28+
return ctx.RequestContext.Value(key)
29+
}
30+
31+
return ctx.BaseContext.Value(key)
32+
}
33+
34+
func (ctx MergedContexts) AsCtx() *context.Context {
35+
var ret context.Context = &ctx
36+
37+
return &ret
38+
}
39+
40+
func MergeContexts(req context.Context, log context.Context) MergedContexts {
41+
return MergedContexts{
42+
BaseContext: log,
43+
RequestContext: req,
44+
}
45+
}

‎log/context.go

+4
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,9 @@ func NamesFromContext(ctx context.Context) []string {
3737
}
3838

3939
func with(ctx context.Context, lvl Level, names ...string) context.Context {
40+
if ctx == nil {
41+
ctx = context.Background()
42+
}
43+
4044
return WithLevel(WithNames(ctx, names...), lvl)
4145
}

‎log/topic.go

+26-27
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package log
22

33
import (
4-
"context"
54
"time"
65

76
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"
@@ -25,7 +24,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
2524
if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 {
2625
return nil
2726
}
28-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "reconnect")
27+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "reconnect")
2928
start := time.Now()
3029
l.Log(ctx, "topic reader reconnect starting...")
3130

@@ -40,7 +39,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
4039
if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 {
4140
return
4241
}
43-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "reconnect", "request")
42+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "reconnect", "request")
4443
l.Log(ctx, "topic reader reconnect request",
4544
kv.NamedError("reason", info.Reason),
4645
kv.Bool("was_sent", info.WasSent),
@@ -52,7 +51,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
5251
if d.Details()&trace.TopicReaderPartitionEvents == 0 {
5352
return nil
5453
}
55-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "partition", "read", "start", "response")
54+
ctx := with(*info.PartitionContext, TRACE, "ydb", "topic", "reader", "partition", "read", "start", "response")
5655
start := time.Now()
5756
l.Log(ctx, "topic reader start partition read response starting...",
5857
kv.String("topic", info.Topic),
@@ -97,7 +96,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
9796
if d.Details()&trace.TopicReaderPartitionEvents == 0 {
9897
return nil
9998
}
100-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "partition", "read", "stop", "response")
99+
ctx := with(info.PartitionContext, TRACE, "ydb", "topic", "reader", "partition", "read", "stop", "response")
101100
start := time.Now()
102101
l.Log(ctx, "topic reader stop partition read response starting...",
103102
kv.String("reader_connection_id", info.ReaderConnectionID),
@@ -133,7 +132,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
133132
if d.Details()&trace.TopicReaderStreamEvents == 0 {
134133
return nil
135134
}
136-
ctx := with(*info.RequestContext, TRACE, "ydb", "topic", "reader", "commit")
135+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "commit")
137136
start := time.Now()
138137
l.Log(ctx, "topic reader commit starting...",
139138
kv.String("topic", info.Topic),
@@ -170,7 +169,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
170169
if d.Details()&trace.TopicReaderStreamEvents == 0 {
171170
return nil
172171
}
173-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "send", "commit", "message")
172+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "send", "commit", "message")
174173
start := time.Now()
175174

176175
commitInfo := info.CommitsInfo.GetCommitsInfo()
@@ -211,7 +210,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
211210
if d.Details()&trace.TopicReaderStreamEvents == 0 {
212211
return
213212
}
214-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "committed", "notify")
213+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "committed", "notify")
215214
l.Log(ctx, "topic reader received commit ack",
216215
kv.String("reader_connection_id", info.ReaderConnectionID),
217216
kv.String("topic", info.Topic),
@@ -224,7 +223,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
224223
if d.Details()&trace.TopicReaderStreamEvents == 0 {
225224
return nil
226225
}
227-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "close")
226+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "close")
228227
start := time.Now()
229228
l.Log(ctx, "topic reader close starting...",
230229
kv.String("reader_connection_id", info.ReaderConnectionID),
@@ -253,7 +252,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
253252
if d.Details()&trace.TopicReaderStreamEvents == 0 {
254253
return nil
255254
}
256-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "init")
255+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "init")
257256
start := time.Now()
258257
l.Log(ctx, "topic reader init starting...",
259258
kv.String("pre_init_reader_connection_id", info.PreInitReaderConnectionID),
@@ -284,7 +283,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
284283
if d.Details()&trace.TopicReaderStreamEvents == 0 {
285284
return
286285
}
287-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "error")
286+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "error")
288287
l.Log(WithLevel(ctx, INFO), "topic reader has grpc stream error",
289288
kv.Error(info.Error),
290289
kv.String("reader_connection_id", info.ReaderConnectionID),
@@ -299,7 +298,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
299298
if d.Details()&trace.TopicReaderStreamEvents == 0 {
300299
return nil
301300
}
302-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "update", "token")
301+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "update", "token")
303302
start := time.Now()
304303
l.Log(ctx, "topic reader token update starting...",
305304
kv.String("reader_connection_id", info.ReaderConnectionID),
@@ -347,7 +346,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
347346
if d.Details()&trace.TopicReaderMessageEvents == 0 {
348347
return
349348
}
350-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "sent", "data", "request")
349+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "sent", "data", "request")
351350
l.Log(ctx, "topic reader sent data request",
352351
kv.String("reader_connection_id", info.ReaderConnectionID),
353352
kv.Int("request_bytes", info.RequestBytes),
@@ -360,7 +359,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
360359
if d.Details()&trace.TopicReaderMessageEvents == 0 {
361360
return nil
362361
}
363-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "receive", "data", "response")
362+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "receive", "data", "response")
364363
start := time.Now()
365364
partitionsCount, batchesCount, messagesCount := info.DataResponse.GetPartitionBatchMessagesCounts()
366365
l.Log(ctx, "topic reader data response received, process starting...",
@@ -404,7 +403,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
404403
if d.Details()&trace.TopicReaderMessageEvents == 0 {
405404
return nil
406405
}
407-
ctx := with(*info.RequestContext, TRACE, "ydb", "topic", "reader", "read", "messages")
406+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "read", "messages")
408407
start := time.Now()
409408
l.Log(ctx, "topic read messages, waiting...",
410409
kv.Int("min_count", info.MinCount),
@@ -436,7 +435,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
436435
if d.Details()&trace.TopicReaderMessageEvents == 0 {
437436
return
438437
}
439-
ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "unknown", "grpc", "message")
438+
ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "unknown", "grpc", "message")
440439
l.Log(WithLevel(ctx, INFO), "topic reader received unknown grpc message",
441440
kv.Error(info.Error),
442441
kv.String("reader_connection_id", info.ReaderConnectionID),
@@ -648,7 +647,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
648647
if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 {
649648
return nil
650649
}
651-
ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "reconnect")
650+
ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "reconnect")
652651
start := time.Now()
653652
l.Log(ctx, "connect to topic writer stream starting...",
654653
kv.String("topic", info.Topic),
@@ -694,7 +693,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
694693
if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 {
695694
return nil
696695
}
697-
ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "stream", "init")
696+
ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "stream", "init")
698697
start := time.Now()
699698
l.Log(ctx, "topic writer init stream starting...",
700699
kv.String("topic", info.Topic),
@@ -734,7 +733,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
734733
start := time.Now()
735734

736735
return func(doneInfo trace.TopicOnWriterBeforeCommitTransactionDoneInfo) {
737-
ctx := with(*info.Ctx, TRACE, "ydb", "topic", "writer", "beforecommit")
736+
ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "beforecommit")
738737
l.Log(ctx, "topic writer wait of flush messages before commit transaction",
739738
kv.String("kqp_session_id", info.KqpSessionID),
740739
kv.String("topic_session_id_start", info.TopicSessionID),
@@ -750,7 +749,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
750749
start := time.Now()
751750

752751
return func(doneInfo trace.TopicOnWriterAfterFinishTransactionDoneInfo) {
753-
ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "beforecommit")
752+
ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "beforecommit")
754753
l.Log(ctx, "topic writer close writer after transaction finished",
755754
kv.String("kqp_session_id", info.SessionID),
756755
kv.String("tx_id", info.TransactionID),
@@ -762,7 +761,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
762761
if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 {
763762
return nil
764763
}
765-
ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "close")
764+
ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "close")
766765
start := time.Now()
767766
l.Log(ctx, "topic writer close starting...",
768767
kv.String("writer_instance_id", info.WriterInstanceID),
@@ -793,7 +792,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
793792
if d.Details()&trace.TopicWriterStreamEvents == 0 {
794793
return nil
795794
}
796-
ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "compress", "messages")
795+
ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "compress", "messages")
797796
start := time.Now()
798797
l.Log(ctx, "topic writer compress messages starting...",
799798
kv.String("writer_instance_id", info.WriterInstanceID),
@@ -836,7 +835,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
836835
if d.Details()&trace.TopicWriterStreamEvents == 0 {
837836
return nil
838837
}
839-
ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "send", "messages")
838+
ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "send", "messages")
840839
start := time.Now()
841840
l.Log(ctx, "topic writer send messages starting...",
842841
kv.String("writer_instance_id", info.WriterInstanceID),
@@ -874,7 +873,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
874873
return
875874
}
876875
acks := info.Acks.GetAcks()
877-
ctx := with(context.Background(), DEBUG, "ydb", "topic", "writer", "receive", "result")
876+
ctx := with(*info.Context, DEBUG, "ydb", "topic", "writer", "receive", "result")
878877
l.Log(ctx, "topic writer received result from server",
879878
kv.String("writer_instance_id", info.WriterInstanceID),
880879
kv.String("session_id", info.SessionID),
@@ -895,7 +894,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
895894
return
896895
}
897896

898-
ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc")
897+
ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "grpc")
899898
l.Log(
900899
ctx, "topic writer sent grpc message (message body and metadata are removed)",
901900
kv.String("topic_stream_internal_id", info.TopicStreamInternalID),
@@ -911,7 +910,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
911910
return
912911
}
913912

914-
ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc")
913+
ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "grpc")
915914
l.Log(
916915
ctx, "topic writer received grpc message (message body and metadata are removed)",
917916
kv.String("topic_stream_internal_id", info.TopicStreamInternalID),
@@ -926,7 +925,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
926925
if d.Details()&trace.TopicWriterStreamEvents == 0 {
927926
return
928927
}
929-
ctx := with(context.Background(), DEBUG, "ydb", "topic", "writer", "read", "unknown", "grpc", "message")
928+
ctx := with(*info.Context, DEBUG, "ydb", "topic", "writer", "read", "unknown", "grpc", "message")
930929
l.Log(ctx, "topic writer receive unknown grpc message from server",
931930
kv.Error(info.Error),
932931
kv.String("writer_instance_id", info.WriterInstanceID),

‎tests/integration/topic_cdc_reader_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,17 @@ func createFeedAndReader(
189189
return db, reader
190190
}
191191

192+
func createFeedAndReaderForDB(
193+
ctx context.Context,
194+
t *testing.T,
195+
db *ydb.Driver,
196+
opts ...topicoptions.ReaderOption,
197+
) (*ydb.Driver, *topicreader.Reader) {
198+
createCDCFeed(ctx, t, db)
199+
reader := createFeedReader(t, db, opts...)
200+
return db, reader
201+
}
202+
192203
var sendCDCCounter int64
193204

194205
func sendCDCMessage(ctx context.Context, t *testing.T, db *ydb.Driver) {

‎tests/integration/topic_log.json

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
[
2+
{"L":"debug","T":"2025-02-26T18:21:27.231+0100","N":"ydb.topic.reader.reconnect.request","M":"topic reader reconnect request","context_name":"test_context","entity":"reader","error":"retryable/CUSTOM (code = -1, source error = \"ydb: first connection attempt not finished\")","was_sent":true},
3+
4+
{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.init","M":"topic reader init starting...","context_name":"test_context","entity":"reader","pre_init_reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","consumer":"test-consumer","topics":["/local/test/feed"]},
5+
6+
{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.init","M":"topic reader init done","context_name":"test_context","entity":"reader","pre_init_reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","consumer":"test-consumer","topics":["/local/test/feed"],"latency":0.000012292},
7+
8+
{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.reconnect","M":"topic reader reconnect starting...","context_name":"test_context","entity":"reader"},
9+
10+
{"L":"info","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.reconnect","M":"topic reader reconnect done","context_name":"test_context","entity":"reader","latency":0.000006292},
11+
12+
{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.reconnect","M":"topic reader reconnect starting...","context_name":"test_context","entity":"reader"},
13+
14+
{"L":"info","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.reconnect","M":"topic reader reconnect done","context_name":"test_context","entity":"reader","latency":0.00000325},
15+
16+
{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.sent.data.request","M":"topic reader sent data request","context_name":"test_context","entity":"reader","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","request_bytes":1048576,"local_capacity":1048576},
17+
18+
{"L":"debug","T":"2025-02-26T18:21:27.262+0100","N":"ydb.topic.reader.partition.read.start.response","M":"topic reader start partition read response starting...","context_name":"test_context","entity":"reader","topic":"/local/test/feed","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","partition_id":0,"partition_session_id":1},
19+
20+
{"L":"info","T":"2025-02-26T18:21:27.262+0100","N":"ydb.topic.reader.partition.read.start.response","M":"topic reader start partition read response done","context_name":"test_context","entity":"reader","topic":"/local/test/feed","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","partition_id":0,"partition_session_id":1,"latency":0.000129625},
21+
22+
{"L":"debug","T":"2025-02-26T18:21:27.263+0100","N":"ydb.topic.reader.receive.data.response","M":"topic reader data response received, process starting...","context_name":"test_context","entity":"reader","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","received_bytes":118,"local_capacity":1048458,"partitions_count":1,"batches_count":1,"messages_count":1},
23+
24+
{"L":"debug","T":"2025-02-26T18:21:27.263+0100","N":"ydb.topic.reader.read.messages","M":"topic read messages, waiting...","context_name":"test_context","entity":"reader","min_count":1,"max_count":1,"local_capacity_before":1048576},
25+
26+
{"L":"debug","T":"2025-02-26T18:21:27.263+0100","N":"ydb.topic.reader.read.messages","M":"topic read messages done","context_name":"test_context","entity":"reader","min_count":1,"max_count":1,"local_capacity_before":1048576,"latency":0.000005666},
27+
28+
{"L":"debug","T":"2025-02-26T18:21:27.263+0100","N":"ydb.topic.reader.sent.data.request","M":"topic reader sent data request","context_name":"test_context","entity":"reader","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","request_bytes":118,"local_capacity":1048576}
29+
]

‎tests/integration/topic_log_test.go

+267
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
//go:build integration
2+
// +build integration
3+
4+
package integration
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"encoding/json"
10+
"fmt"
11+
"os"
12+
"sync"
13+
"sync/atomic"
14+
"testing"
15+
"time"
16+
17+
"github.com/stretchr/testify/assert"
18+
"github.com/stretchr/testify/require"
19+
"go.uber.org/zap"
20+
"go.uber.org/zap/zapcore"
21+
22+
"github.com/ydb-platform/ydb-go-sdk/v3"
23+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
24+
"github.com/ydb-platform/ydb-go-sdk/v3/log"
25+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
26+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
27+
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
28+
)
29+
30+
var internalLogger atomic.Value
31+
32+
func SetInternalLogger(logger *zap.Logger) {
33+
internalLogger.Store(logger.WithOptions(zap.AddCallerSkip(2)))
34+
}
35+
36+
func Error(ctx context.Context, msg string, fields ...zap.Field) {
37+
logInternal(ctx, zapcore.ErrorLevel, msg, fields)
38+
}
39+
40+
func logInternal(ctx context.Context, lvl zapcore.Level, msg string, fields []zap.Field) {
41+
lh := from(ctx)
42+
if lvl == zapcore.DebugLevel && lh.riseDebug {
43+
lvl = zapcore.InfoLevel
44+
}
45+
ce := lh.Check(lvl, msg)
46+
if ce == nil {
47+
return
48+
}
49+
ce.Write(fields...)
50+
}
51+
52+
type loggerKey struct{}
53+
54+
// GetInternal returns context logger.
55+
// Should be used only in integration code and with great care
56+
// because logger is configured with zap.AddCallerSkip.
57+
func GetInternal(ctx context.Context) *zap.Logger {
58+
return from(ctx).Logger
59+
}
60+
61+
type loggerHolder struct {
62+
*zap.Logger
63+
riseDebug bool
64+
}
65+
66+
func from(ctx context.Context) loggerHolder {
67+
if l, ok := ctx.Value(loggerKey{}).(loggerHolder); ok {
68+
return l
69+
}
70+
if l, ok := internalLogger.Load().(*zap.Logger); ok {
71+
return loggerHolder{Logger: l}
72+
}
73+
// Fallback, so we don't need to manually init logger in every test.
74+
SetInternalLogger(zap.Must(zap.NewDevelopmentConfig().Build()))
75+
return from(ctx)
76+
}
77+
78+
func With(ctx context.Context, fields ...zap.Field) context.Context {
79+
lh := from(ctx)
80+
lh.Logger = lh.Logger.With(fields...)
81+
return context.WithValue(ctx, loggerKey{}, lh)
82+
}
83+
84+
type SafeBuffer struct {
85+
buf bytes.Buffer
86+
mu sync.Mutex
87+
}
88+
89+
func (s *SafeBuffer) Write(p []byte) (n int, err error) {
90+
s.mu.Lock()
91+
defer s.mu.Unlock()
92+
return s.buf.Write(p)
93+
}
94+
95+
func (s *SafeBuffer) String() string {
96+
s.mu.Lock()
97+
defer s.mu.Unlock()
98+
return s.buf.String()
99+
}
100+
101+
func SetupZapLogger() (*zap.Logger, *SafeBuffer) {
102+
buf := SafeBuffer{}
103+
syncer := zapcore.AddSync(&buf)
104+
ws := &zapcore.BufferedWriteSyncer{
105+
WS: syncer,
106+
Size: 512 * 1024, // 512 kB
107+
FlushInterval: time.Millisecond,
108+
}
109+
enc := zapcore.NewJSONEncoder(zapcore.EncoderConfig{
110+
MessageKey: "M",
111+
LevelKey: "L",
112+
TimeKey: "T",
113+
NameKey: "N",
114+
CallerKey: "C",
115+
FunctionKey: "F",
116+
StacktraceKey: "S",
117+
EncodeLevel: zapcore.LowercaseLevelEncoder,
118+
EncodeTime: zapcore.ISO8601TimeEncoder,
119+
EncodeDuration: zapcore.SecondsDurationEncoder,
120+
EncodeCaller: zapcore.ShortCallerEncoder,
121+
})
122+
123+
core := zapcore.NewCore(enc, ws, zapcore.DebugLevel)
124+
125+
l := zap.New(core)
126+
SetInternalLogger(l)
127+
return l, &buf
128+
}
129+
130+
var _ log.Logger = adapter{}
131+
132+
type adapter struct {
133+
minLevel zapcore.Level
134+
}
135+
136+
func (a adapter) Log(ctx context.Context, msg string, fields ...log.Field) {
137+
level := Level(ctx)
138+
if !a.minLevel.Enabled(level) {
139+
return
140+
}
141+
l := GetInternal(ctx)
142+
for _, name := range log.NamesFromContext(ctx) {
143+
l = l.Named(name)
144+
}
145+
l.WithOptions(zap.AddCallerSkip(1)).Log(level, msg, Fields(fields)...)
146+
}
147+
148+
//nolint:exhaustive // good enough.
149+
func fieldToField(field log.Field) zap.Field {
150+
switch field.Type() {
151+
case log.IntType:
152+
return zap.Int(field.Key(), field.IntValue())
153+
case log.Int64Type:
154+
return zap.Int64(field.Key(), field.Int64Value())
155+
case log.StringType:
156+
return zap.String(field.Key(), field.StringValue())
157+
case log.BoolType:
158+
return zap.Bool(field.Key(), field.BoolValue())
159+
case log.DurationType:
160+
return zap.Duration(field.Key(), field.DurationValue())
161+
case log.StringsType:
162+
return zap.Strings(field.Key(), field.StringsValue())
163+
case log.ErrorType:
164+
return zap.Error(field.ErrorValue())
165+
case log.StringerType:
166+
return zap.Stringer(field.Key(), field.Stringer())
167+
default:
168+
return zap.Any(field.Key(), field.AnyValue())
169+
}
170+
}
171+
172+
func Fields(fields []log.Field) []zap.Field {
173+
ff := make([]zap.Field, len(fields))
174+
for i, f := range fields {
175+
ff[i] = fieldToField(f)
176+
}
177+
return ff
178+
}
179+
180+
//nolint:exhaustive // good enough.
181+
func Level(ctx context.Context) zapcore.Level {
182+
switch log.LevelFromContext(ctx) {
183+
case log.TRACE, log.DEBUG:
184+
return zapcore.DebugLevel
185+
case log.INFO:
186+
return zapcore.InfoLevel
187+
case log.WARN:
188+
return zapcore.WarnLevel
189+
case log.ERROR:
190+
return zapcore.ErrorLevel
191+
case log.FATAL:
192+
return zapcore.FatalLevel
193+
default:
194+
return zapcore.InvalidLevel
195+
}
196+
}
197+
198+
func WithTraces(minLevel zapcore.Level, d trace.Detailer, opts ...log.Option) ydb.Option {
199+
a := adapter{minLevel: minLevel}
200+
return ydb.MergeOptions(
201+
ydb.WithTraceDriver(log.Driver(a, d, opts...)),
202+
ydb.WithTraceTable(log.Table(a, d, opts...)),
203+
ydb.WithTraceScripting(log.Scripting(a, d, opts...)),
204+
ydb.WithTraceScheme(log.Scheme(a, d, opts...)),
205+
ydb.WithTraceCoordination(log.Coordination(a, d, opts...)),
206+
ydb.WithTraceRatelimiter(log.Ratelimiter(a, d, opts...)),
207+
ydb.WithTraceDiscovery(log.Discovery(a, d, opts...)),
208+
ydb.WithTraceTopic(log.Topic(a, d, opts...)),
209+
ydb.WithTraceDatabaseSQL(log.DatabaseSQL(a, d, opts...)),
210+
)
211+
}
212+
213+
type LogEntry struct {
214+
Level string `json:"L"`
215+
Timestamp string `json:"T"`
216+
Namespace string `json:"N"`
217+
Message string `json:"M"`
218+
ContextName string `json:"context_name"`
219+
Entity string `json:"entity"`
220+
Endpoint string `json:"endpoint"`
221+
Method string `json:"method"`
222+
}
223+
224+
func (l *LogEntry) FormatMessage() string {
225+
return fmt.Sprintf(`"N":"%s","M":"%s","context_name":"%s","entity":"%s"`, l.Namespace, l.Message, l.ContextName, l.Entity)
226+
}
227+
228+
func TestTopicReadMessagesLog(t *testing.T) {
229+
_, buf := SetupZapLogger()
230+
231+
ctx := xtest.Context(t)
232+
233+
ctx = With(ctx, zap.String("context_name", "test_context"))
234+
ctx = With(ctx, zap.String("entity", "reader"))
235+
236+
driver := connect(t, WithTraces(zapcore.DebugLevel, trace.DetailsAll))
237+
238+
db, reader := createFeedAndReaderForDB(ctx, t, driver, topicoptions.WithReaderLogContext(ctx))
239+
240+
sendCDCMessage(ctx, t, db)
241+
msg, err := reader.ReadMessage(ctx)
242+
require.NoError(t, err)
243+
require.NotEmpty(t, msg.CreatedAt)
244+
t.Logf("msg: %#v", msg)
245+
246+
require.NoError(t, err)
247+
err = topicsugar.ReadMessageDataWithCallback(msg, func(data []byte) error {
248+
t.Log("Content:", string(data))
249+
return nil
250+
})
251+
require.NoError(t, err)
252+
253+
sendCDCMessage(ctx, t, db)
254+
batch, err := reader.ReadMessagesBatch(ctx)
255+
require.NoError(t, err)
256+
require.NotEmpty(t, batch.Messages)
257+
258+
fileData, err := os.ReadFile("topic_log.json")
259+
var logs []LogEntry
260+
err = json.Unmarshal(fileData, &logs)
261+
require.NoError(t, err)
262+
263+
strLog := buf.String()
264+
for _, logString := range logs {
265+
assert.Contains(t, strLog, logString.FormatMessage())
266+
}
267+
}

‎topic/topicoptions/topicoptions_reader.go

+9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package topicoptions
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/ydb-platform/ydb-go-sdk/v3/internal/config"
@@ -281,3 +282,11 @@ func WithReaderWithoutConsumer(saveStateOnReconnection bool) ReaderOption {
281282
cfg.CommitMode = CommitModeNone
282283
}
283284
}
285+
286+
// WithReaderLogContext allows providing a context.Context instance which will be used
287+
// in log/topic events.
288+
func WithReaderLogContext(ctx context.Context) ReaderOption {
289+
return func(cfg *topicreaderinternal.ReaderConfig) {
290+
cfg.BaseContext = ctx
291+
}
292+
}

‎topic/topicoptions/topicoptions_writer.go

+9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package topicoptions
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
@@ -214,3 +215,11 @@ func WithWriterTrace(t trace.Topic) WriterOption { //nolint:gocritic
214215
func WithWriterUpdateTokenInterval(interval time.Duration) WriterOption {
215216
return topicwriterinternal.WithTokenUpdateInterval(interval)
216217
}
218+
219+
// WithWriterLogContext allows providing a context.Context instance which will be used
220+
// in log/topic events.
221+
func WithWriterLogContext(ctx context.Context) WriterOption {
222+
return func(cfg *topicwriterinternal.WriterReconnectorConfig) {
223+
cfg.LogContext = ctx
224+
}
225+
}

‎trace/topic.go

+28-4
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ type (
153153

154154
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
155155
TopicReaderStartInfo struct {
156+
Context *context.Context
156157
ReaderID int64
157158
Consumer string
158159
Error error
@@ -183,11 +184,13 @@ type (
183184

184185
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
185186
TopicReaderSendCommitMessageStartInfo struct {
187+
Context *context.Context
186188
CommitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo
187189
}
188190

189191
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
190192
TopicReaderStreamCommitInfo struct {
193+
Context *context.Context
191194
Topic string
192195
PartitionID int64
193196
PartitionSessionID int64
@@ -207,6 +210,7 @@ type (
207210

208211
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
209212
TopicReaderCommittedNotifyInfo struct {
213+
Context *context.Context
210214
ReaderConnectionID string
211215
Topic string
212216
PartitionID int64
@@ -216,19 +220,22 @@ type (
216220

217221
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
218222
TopicReaderErrorInfo struct {
223+
Context *context.Context
219224
ReaderConnectionID string
220225
Error error
221226
}
222227

223228
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
224229
TopicReaderSentDataRequestInfo struct {
230+
Context *context.Context
225231
ReaderConnectionID string
226232
RequestBytes int
227233
LocalBufferSizeAfterSent int
228234
}
229235

230236
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
231237
TopicReaderReceiveDataResponseStartInfo struct {
238+
Context *context.Context
232239
ReaderConnectionID string
233240
LocalBufferSizeAfterReceive int
234241
DataResponse TopicReaderDataResponseInfo
@@ -247,7 +254,7 @@ type (
247254

248255
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
249256
TopicReaderReadMessagesStartInfo struct {
250-
RequestContext *context.Context
257+
Context *context.Context
251258
MinCount int
252259
MaxCount int
253260
FreeBufferCapacity int
@@ -267,13 +274,15 @@ type (
267274

268275
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
269276
OnReadUnknownGrpcMessageInfo struct {
277+
Context *context.Context
270278
ReaderConnectionID string
271279
Error error
272280
}
273281

274282
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
275283
TopicReaderReconnectStartInfo struct {
276-
Reason error
284+
Context *context.Context
285+
Reason error
277286
}
278287

279288
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
@@ -283,13 +292,14 @@ type (
283292

284293
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
285294
TopicReaderReconnectRequestInfo struct {
295+
Context *context.Context
286296
Reason error
287297
WasSent bool
288298
}
289299

290300
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
291301
TopicReaderCommitStartInfo struct {
292-
RequestContext *context.Context
302+
Context *context.Context
293303
Topic string
294304
PartitionID int64
295305
PartitionSessionID int64
@@ -304,6 +314,7 @@ type (
304314

305315
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
306316
TopicReaderCloseStartInfo struct {
317+
Context *context.Context
307318
ReaderConnectionID string
308319
CloseReason error
309320
}
@@ -315,6 +326,7 @@ type (
315326

316327
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
317328
TopicReaderInitStartInfo struct {
329+
Context *context.Context
318330
PreInitReaderConnectionID string
319331
InitRequestInfo TopicReadStreamInitRequestInfo
320332
}
@@ -333,11 +345,13 @@ type (
333345

334346
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
335347
OnReadUpdateTokenStartInfo struct {
348+
Context *context.Context
336349
ReaderConnectionID string
337350
}
338351

339352
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
340353
OnReadUpdateTokenMiddleTokenReceivedInfo struct {
354+
Context *context.Context
341355
TokenLen int
342356
Error error
343357
}
@@ -424,6 +438,7 @@ type (
424438

425439
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
426440
TopicWriterReconnectStartInfo struct {
441+
Context *context.Context
427442
WriterInstanceID string
428443
Topic string
429444
ProducerID string
@@ -441,6 +456,7 @@ type (
441456

442457
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
443458
TopicWriterInitStreamStartInfo struct {
459+
Context *context.Context
444460
WriterInstanceID string
445461
Topic string
446462
ProducerID string
@@ -454,6 +470,7 @@ type (
454470

455471
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
456472
TopicWriterCloseStartInfo struct {
473+
Context *context.Context
457474
WriterInstanceID string
458475
Reason error
459476
}
@@ -465,6 +482,7 @@ type (
465482

466483
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
467484
TopicWriterCompressMessagesStartInfo struct {
485+
Context *context.Context
468486
WriterInstanceID string
469487
SessionID string
470488
Codec int32
@@ -480,6 +498,7 @@ type (
480498

481499
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
482500
TopicWriterSendMessagesStartInfo struct {
501+
Context *context.Context
483502
WriterInstanceID string
484503
SessionID string
485504
Codec int32
@@ -494,6 +513,7 @@ type (
494513

495514
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
496515
TopicWriterResultMessagesInfo struct {
516+
Context *context.Context
497517
WriterInstanceID string
498518
SessionID string
499519
PartitionID int64
@@ -516,7 +536,7 @@ type (
516536

517537
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
518538
TopicOnWriterBeforeCommitTransactionStartInfo struct {
519-
Ctx *context.Context
539+
Context *context.Context
520540
KqpSessionID string
521541
TopicSessionID string
522542
TransactionID string
@@ -530,6 +550,7 @@ type (
530550

531551
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
532552
TopicOnWriterAfterFinishTransactionStartInfo struct {
553+
Context *context.Context
533554
Error error
534555
SessionID string
535556
TransactionID string
@@ -542,6 +563,7 @@ type (
542563

543564
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
544565
TopicWriterSentGRPCMessageInfo struct {
566+
Context *context.Context
545567
TopicStreamInternalID string
546568
SessionID string
547569
MessageNumber int
@@ -551,6 +573,7 @@ type (
551573

552574
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
553575
TopicWriterReceiveGRPCMessageInfo struct {
576+
Context *context.Context
554577
TopicStreamInternalID string
555578
SessionID string
556579
MessageNumber int
@@ -560,6 +583,7 @@ type (
560583

561584
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
562585
TopicOnWriterReadUnknownGrpcMessageInfo struct {
586+
Context *context.Context
563587
WriterInstanceID string
564588
SessionID string
565589
Error error

‎trace/topic_gtrace.go

+52-29
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)
Please sign in to comment.