Skip to content

Commit c6dcb00

Browse files
committed
add context to topic logs
1 parent f6a74d6 commit c6dcb00

29 files changed

+829
-208
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `WithReaderLogContext`, `WithWriterLogContext` options to topic reader/writer to supply log entries with user context fields
2+
13
## v3.104.4
24
* Fixed bug with session query latency metric collector
35

go.mod

+7-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ require (
77
github.com/google/uuid v1.6.0
88
github.com/jonboulle/clockwork v0.3.0
99
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77
10+
go.uber.org/zap v1.27.0
1011
golang.org/x/net v0.33.0
1112
golang.org/x/sync v0.10.0
1213
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
@@ -17,18 +18,21 @@ require (
1718
// requires for tests only
1819
require (
1920
github.com/rekby/fixenv v0.6.1
20-
github.com/stretchr/testify v1.7.1
21+
github.com/stretchr/testify v1.10.0
2122
go.uber.org/mock v0.4.0
2223
)
2324

2425
require (
25-
github.com/davecgh/go-spew v1.1.0 // indirect
26+
github.com/davecgh/go-spew v1.1.1 // indirect
2627
github.com/golang/protobuf v1.5.3 // indirect
28+
github.com/kr/pretty v0.1.0 // indirect
2729
github.com/pmezard/go-difflib v1.0.0 // indirect
30+
go.uber.org/multierr v1.11.0 // indirect
2831
golang.org/x/sys v0.28.0 // indirect
2932
golang.org/x/text v0.21.0 // indirect
3033
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
31-
gopkg.in/yaml.v3 v3.0.0 // indirect
34+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
35+
gopkg.in/yaml.v3 v3.0.1 // indirect
3236
)
3337

3438
retract v3.67.1 // decimal broken https://github.com/ydb-platform/ydb-go-sdk/issues/1234

go.sum

+20-6
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP
1111
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1212
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1313
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
14-
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
1514
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
15+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
16+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1617
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
1718
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
1819
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -54,6 +55,12 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
5455
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
5556
github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg=
5657
github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
58+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
59+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
60+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
61+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
62+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
63+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
5764
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
5865
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5966
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@@ -63,13 +70,19 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
6370
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
6471
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
6572
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
66-
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
67-
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
73+
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
74+
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
6875
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 h1:LY6cI8cP4B9rrpTleZk95+08kl2gF4rixG7+V/dwL6Q=
6976
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
7077
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
78+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
79+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
7180
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
7281
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
82+
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
83+
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
84+
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
85+
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
7386
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
7487
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
7588
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -146,12 +159,13 @@ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
146159
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
147160
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
148161
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
149-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
150162
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
163+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
164+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
151165
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
152166
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
153167
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
154-
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
155-
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
168+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
169+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
156170
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
157171
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

internal/grpcwrapper/rawtopic/client.go

+2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S
104104
func (c *Client) StreamWrite(
105105
ctxStreamLifeTime context.Context,
106106
tracer *trace.Topic,
107+
logContext *context.Context,
107108
) (*rawtopicwriter.StreamWriter, error) {
108109
protoResp, err := c.service.StreamWrite(ctxStreamLifeTime)
109110
if err != nil {
@@ -118,6 +119,7 @@ func (c *Client) StreamWrite(
118119
Stream: protoResp,
119120
Tracer: tracer,
120121
InternalStreamID: uuid.New().String(),
122+
LogContext: logContext,
121123
}, nil
122124
}
123125

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rawtopicwriter
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"reflect"
@@ -34,6 +35,7 @@ type StreamWriter struct {
3435
readMessagesCount int
3536
writtenMessagesCount int
3637
sessionID string
38+
LogContext *context.Context
3739
}
3840

3941
//nolint:funlen
@@ -50,7 +52,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
5052
defer func() {
5153
// defer needs for set good session id on first init response before trace the message
5254
trace.TopicOnWriterReceiveGRPCMessage(
53-
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
55+
w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
5456
)
5557
}()
5658
if sendErr != nil {
@@ -139,7 +141,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
139141

140142
err = w.Stream.Send(&protoMsg)
141143
w.writtenMessagesCount++
142-
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
144+
trace.TopicOnWriterSentGRPCMessage(
145+
w.Tracer,
146+
w.LogContext,
147+
w.InternalStreamID,
148+
w.sessionID,
149+
w.writtenMessagesCount,
150+
&protoMsg,
151+
err,
152+
)
143153
if err != nil {
144154
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
145155
}

internal/topic/topicclientinternal/client.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,8 @@ func (c *Client) StartReader(
318318
if err != nil {
319319
return nil, err
320320
}
321-
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)
321+
322+
internalReader.TopicOnReaderStart(consumer, err)
322323

323324
return topicreader.NewReader(internalReader), nil
324325
}
@@ -361,15 +362,8 @@ func (c *Client) createWriterConfig(
361362
topicPath string,
362363
opts []topicoptions.WriterOption,
363364
) topicwriterinternal.WriterReconnectorConfig {
364-
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
365-
topicwriterinternal.RawTopicWriterStream,
366-
error,
367-
) {
368-
return c.rawClient.StreamWrite(ctx, tracer)
369-
}
370-
371365
options := []topicoptions.WriterOption{
372-
topicwriterinternal.WithConnectFunc(connector),
366+
topicwriterinternal.WithRawClient(&c.rawClient),
373367
topicwriterinternal.WithTopic(topicPath),
374368
topicwriterinternal.WithCommonConfig(c.cfg.Common),
375369
topicwriterinternal.WithTrace(c.cfg.Trace),

internal/topic/topicreadercommon/committer.go

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) {
148148

149149
onDone := trace.TopicOnReaderSendCommitMessage(
150150
c.tracer,
151+
&ctx,
151152
&commits,
152153
)
153154
err := c.send(commits.ToRawMessage())

internal/topic/topicreaderinternal/batched_stream_reader_interface.go

+1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ type batchedStreamReader interface {
1515
Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
1616
CloseWithError(ctx context.Context, err error) error
1717
PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
18+
TopicOnReaderStart(consumer string, err error)
1819
}

internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go

+36
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topicreaderinternal/reader.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type Reader struct {
3535
readerID int64
3636
}
3737

38+
func (r *Reader) TopicOnReaderStart(consumer string, err error) {
39+
r.reader.TopicOnReaderStart(consumer, err)
40+
}
41+
3842
type ReadMessageBatchOptions struct {
3943
batcherGetOptions
4044
}
@@ -89,14 +93,17 @@ func NewReader(
8993
return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig)
9094
}
9195

96+
reader := newReaderReconnector(
97+
cfg.BaseContext,
98+
readerID,
99+
readerConnector,
100+
cfg.OperationTimeout(),
101+
cfg.RetrySettings,
102+
cfg.Trace,
103+
)
104+
92105
res := Reader{
93-
reader: newReaderReconnector(
94-
readerID,
95-
readerConnector,
96-
cfg.OperationTimeout(),
97-
cfg.RetrySettings,
98-
cfg.Trace,
99-
),
106+
reader: reader,
100107
defaultBatchConfig: cfg.DefaultBatchConfig,
101108
tracer: cfg.Trace,
102109
readerID: readerID,

0 commit comments

Comments
 (0)