Skip to content

Commit d63fc22

Browse files
committed
add context to topic logs
1 parent 3d849ac commit d63fc22

26 files changed

+703
-266
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added new topic reader and writer options, `WithReaderLogContext` and `WithWriterLogContext`, which provide a `context.Context` instance to be used in topic event logs.
2+
13
## v3.99.13
24
* Added checking errors for conditionally delete item from pool
35

internal/grpcwrapper/rawtopic/client.go

+2
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S
104104
func (c *Client) StreamWrite(
105105
ctxStreamLifeTime context.Context,
106106
tracer *trace.Topic,
107+
logContext *context.Context,
107108
) (*rawtopicwriter.StreamWriter, error) {
108109
protoResp, err := c.service.StreamWrite(ctxStreamLifeTime)
109110
if err != nil {
@@ -118,6 +119,7 @@ func (c *Client) StreamWrite(
118119
Stream: protoResp,
119120
Tracer: tracer,
120121
InternalStreamID: uuid.New().String(),
122+
LogContext: logContext,
121123
}, nil
122124
}
123125

internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package rawtopicwriter
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"reflect"
@@ -36,6 +37,7 @@ type StreamWriter struct {
3637
readMessagesCount int
3738
writtenMessagesCount int
3839
sessionID string
40+
LogContext *context.Context
3941
}
4042

4143
//nolint:funlen
@@ -52,7 +54,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
5254
defer func() {
5355
// defer needs for set good session id on first init response before trace the message
5456
trace.TopicOnWriterReceiveGRPCMessage(
55-
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
57+
w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
5658
)
5759
}()
5860
if sendErr != nil {
@@ -141,7 +143,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {
141143

142144
err = w.Stream.Send(&protoMsg)
143145
w.writtenMessagesCount++
144-
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
146+
trace.TopicOnWriterSentGRPCMessage(
147+
w.Tracer,
148+
w.LogContext,
149+
w.InternalStreamID,
150+
w.sessionID,
151+
w.writtenMessagesCount,
152+
&protoMsg,
153+
err,
154+
)
145155
if err != nil {
146156
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
147157
}

internal/topic/topicclientinternal/client.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package topicclientinternal
33
import (
44
"context"
55
"errors"
6-
76
"github.com/ydb-platform/ydb-go-genproto/Ydb_Topic_V1"
87
"google.golang.org/grpc"
98

@@ -315,7 +314,8 @@ func (c *Client) StartReader(
315314
if err != nil {
316315
return nil, err
317316
}
318-
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)
317+
318+
internalReader.TopicOnReaderStart(consumer, err)
319319

320320
return topicreader.NewReader(internalReader), nil
321321
}
@@ -356,15 +356,16 @@ func (c *Client) createWriterConfig(
356356
topicPath string,
357357
opts []topicoptions.WriterOption,
358358
) topicwriterinternal.WriterReconnectorConfig {
359-
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
360-
topicwriterinternal.RawTopicWriterStream,
361-
error,
362-
) {
363-
return c.rawClient.StreamWrite(ctx, tracer)
364-
}
359+
//var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
360+
// topicwriterinternal.RawTopicWriterStream,
361+
// error,
362+
//) {
363+
// return c.rawClient.StreamWrite(ctx, tracer)
364+
//}
365365

366366
options := []topicoptions.WriterOption{
367-
topicwriterinternal.WithConnectFunc(connector),
367+
topicwriterinternal.WithRawClient(&c.rawClient),
368+
// topicwriterinternal.WithConnectFunc(connector),
368369
topicwriterinternal.WithTopic(topicPath),
369370
topicwriterinternal.WithCommonConfig(c.cfg.Common),
370371
topicwriterinternal.WithTrace(c.cfg.Trace),

internal/topic/topiclistenerinternal/stream_listener.go

-1
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ func (l *streamListener) initVars(sessionIDCounter *atomic.Int64) {
141141
}
142142
}
143143

144-
//nolint:funlen
145144
func (l *streamListener) initStream(ctx context.Context, client TopicClient) error {
146145
streamCtx, streamClose := context.WithCancelCause(xcontext.ValueOnly(ctx))
147146
l.streamClose = streamClose

internal/topic/topicreadercommon/committer.go

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) {
148148

149149
onDone := trace.TopicOnReaderSendCommitMessage(
150150
c.tracer,
151+
&ctx,
151152
&commits,
152153
)
153154
err := c.send(commits.ToRawMessage())

internal/topic/topicreaderinternal/batched_stream_reader_interface.go

+1
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@ type batchedStreamReader interface {
1515
Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
1616
CloseWithError(ctx context.Context, err error) error
1717
PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
18+
TopicOnReaderStart(consumer string, err error)
1819
}

internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go

+36
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/topic/topicreaderinternal/reader.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ type Reader struct {
3535
readerID int64
3636
}
3737

38+
func (r *Reader) TopicOnReaderStart(consumer string, err error) {
39+
r.reader.TopicOnReaderStart(consumer, err)
40+
}
41+
3842
type ReadMessageBatchOptions struct {
3943
batcherGetOptions
4044
}
@@ -89,14 +93,17 @@ func NewReader(
8993
return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig)
9094
}
9195

96+
reader := newReaderReconnector(
97+
readerID,
98+
readerConnector,
99+
cfg.OperationTimeout(),
100+
cfg.LogContext,
101+
cfg.RetrySettings,
102+
cfg.Trace,
103+
)
104+
92105
res := Reader{
93-
reader: newReaderReconnector(
94-
readerID,
95-
readerConnector,
96-
cfg.OperationTimeout(),
97-
cfg.RetrySettings,
98-
cfg.Trace,
99-
),
106+
reader: reader,
100107
defaultBatchConfig: cfg.DefaultBatchConfig,
101108
tracer: cfg.Trace,
102109
readerID: readerID,

0 commit comments

Comments
 (0)