Skip to content

Commit 77289f5

Browse files
committed
Add context to topic logs
1 parent 26a84de commit 77289f5

28 files changed

+764
-200
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `WithReaderLogContext`, `WithWriterLogContext` options to topic reader/writer to supply log entries with user context fields
2+
13
## v3.108.3
24
* Fixed handling of zero values for DyNumber
35
* Fixed the decimal yql slice bounds out of range

go.sum

+7-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP
1111
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1212
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
1313
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
14-
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
1514
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1615
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
1716
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -72,11 +71,13 @@ github.com/rekby/fixenv v0.6.1/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+X
7271
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
7372
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
7473
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
74+
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
7575
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
7676
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
7777
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
78-
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
7978
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
79+
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
80+
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
8081
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 h1:LY6cI8cP4B9rrpTleZk95+08kl2gF4rixG7+V/dwL6Q=
8182
github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
8283
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
@@ -94,6 +95,10 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
9495
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
9596
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
9697
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
98+
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
99+
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
100+
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
101+
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
97102
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
98103
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
99104
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=

internal/grpcwrapper/rawtopic/client.go

+1
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ func (c *Client) StreamWrite(
126126
Stream: protoResp,
127127
Tracer: tracer,
128128
InternalStreamID: uuid.New().String(),
129+
LogContext: &ctxStreamLifeTime,
129130
}, nil
130131
}
131132

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rawtopicwriter
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"reflect"
@@ -34,6 +35,7 @@ type StreamWriter struct {
3435
readMessagesCount int
3536
writtenMessagesCount int
3637
sessionID string
38+
LogContext *context.Context
3739
}
3840

3941
//nolint:funlen
@@ -50,7 +52,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
5052
defer func() {
5153
// defer needs for set good session id on first init response before trace the message
5254
trace.TopicOnWriterReceiveGRPCMessage(
53-
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
55+
w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
5456
)
5557
}()
5658
if sendErr != nil {
@@ -139,7 +141,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
139141

140142
err = w.Stream.Send(&protoMsg)
141143
w.writtenMessagesCount++
142-
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
144+
trace.TopicOnWriterSentGRPCMessage(
145+
w.Tracer,
146+
w.LogContext,
147+
w.InternalStreamID,
148+
w.sessionID,
149+
w.writtenMessagesCount,
150+
&protoMsg,
151+
err,
152+
)
143153
if err != nil {
144154
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
145155
}

internal/topic/topicclientinternal/client.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,8 @@ func (c *Client) StartReader(
322322
if err != nil {
323323
return nil, err
324324
}
325-
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)
325+
326+
internalReader.TopicOnReaderStart(consumer, err)
326327

327328
return topicreader.NewReader(internalReader), nil
328329
}
@@ -365,15 +366,8 @@ func (c *Client) createWriterConfig(
365366
topicPath string,
366367
opts []topicoptions.WriterOption,
367368
) topicwriterinternal.WriterReconnectorConfig {
368-
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
369-
topicwriterinternal.RawTopicWriterStream,
370-
error,
371-
) {
372-
return c.rawClient.StreamWrite(ctx, tracer)
373-
}
374-
375369
options := []topicoptions.WriterOption{
376-
topicwriterinternal.WithConnectFunc(connector),
370+
topicwriterinternal.WithRawClient(&c.rawClient),
377371
topicwriterinternal.WithTopic(topicPath),
378372
topicwriterinternal.WithCommonConfig(c.cfg.Common),
379373
topicwriterinternal.WithTrace(c.cfg.Trace),

internal/topic/topicreadercommon/committer.go

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) {
148148

149149
onDone := trace.TopicOnReaderSendCommitMessage(
150150
c.tracer,
151+
&ctx,
151152
&commits,
152153
)
153154
err := c.send(commits.ToRawMessage())

internal/topic/topicreaderinternal/batched_stream_reader_interface.go

+1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ type batchedStreamReader interface {
1515
Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
1616
CloseWithError(ctx context.Context, err error) error
1717
PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
18+
TopicOnReaderStart(consumer string, err error)
1819
}

internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go

+36
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topicreaderinternal/reader.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ type Reader struct {
3939
readerID int64
4040
}
4141

42+
func (r *Reader) TopicOnReaderStart(consumer string, err error) {
43+
r.reader.TopicOnReaderStart(consumer, err)
44+
}
45+
4246
type ReadMessageBatchOptions struct {
4347
batcherGetOptions
4448
}
@@ -93,14 +97,17 @@ func NewReader(
9397
return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig)
9498
}
9599

100+
reader := newReaderReconnector(
101+
cfg.BaseContext,
102+
readerID,
103+
readerConnector,
104+
cfg.OperationTimeout(),
105+
cfg.RetrySettings,
106+
cfg.Trace,
107+
)
108+
96109
res := Reader{
97-
reader: newReaderReconnector(
98-
readerID,
99-
readerConnector,
100-
cfg.OperationTimeout(),
101-
cfg.RetrySettings,
102-
cfg.Trace,
103-
),
110+
reader: reader,
104111
defaultBatchConfig: cfg.DefaultBatchConfig,
105112
tracer: cfg.Trace,
106113
readerID: readerID,

0 commit comments

Comments
 (0)