From 94eb9ca89eaf1bd742823b753f728497c25051c2 Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Tue, 18 Mar 2025 15:48:00 +0100 Subject: [PATCH] Add context to topic logs --- CHANGELOG.md | 2 + go.mod | 5 +- go.sum | 9 +- internal/grpcwrapper/rawtopic/client.go | 1 + .../rawtopic/rawtopicwriter/streamwriter.go | 14 +- internal/topic/topicclientinternal/client.go | 12 +- internal/topic/topicreadercommon/committer.go | 1 + .../batched_stream_reader_interface.go | 1 + .../batched_stream_reader_mock_test.go | 36 +++ internal/topic/topicreaderinternal/reader.go | 21 +- .../topicreaderinternal/stream_reader_impl.go | 204 +++++++------ .../topicreaderinternal/stream_reconnector.go | 50 +++- .../topic/topicwriterinternal/encoders.go | 8 + .../topicwriterinternal/encoders_test.go | 25 +- .../topicwriterinternal/writer_config.go | 4 + .../topicwriterinternal/writer_options.go | 9 + .../topicwriterinternal/writer_reconnector.go | 26 +- .../writer_single_stream.go | 42 ++- .../topicwriterinternal/writer_transaction.go | 2 +- internal/xcontext/merge_contexts.go | 40 +++ log/context.go | 4 + log/topic.go | 53 ++-- tests/integration/topic_cdc_reader_test.go | 11 + tests/integration/topic_log_test.go | 275 ++++++++++++++++++ tests/slo/go.sum | 2 + topic/topicoptions/topicoptions_reader.go | 9 + topic/topicoptions/topicoptions_writer.go | 9 + trace/topic.go | 32 +- trace/topic_gtrace.go | 81 ++++-- 29 files changed, 785 insertions(+), 203 deletions(-) create mode 100644 internal/xcontext/merge_contexts.go create mode 100644 tests/integration/topic_log_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 92f9f5b5f..69a6297c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added `WithReaderLogContext`, `WithWriterLogContext` options to topic reader/writer to supply log entries with user context fields + ## v3.105.2 * Improved the `ydb.WithSessionPoolSessionUsageLimit` option for allow `time.Duration` as argument type for limit max session time to live since create time diff --git a/go.mod b/go.mod index 5948b495a..8b4ac67c1 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/jonboulle/clockwork v0.3.0 github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 go.uber.org/goleak v1.3.0 + go.uber.org/zap v1.27.0 golang.org/x/net v0.33.0 golang.org/x/sync v0.10.0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 @@ -18,15 +19,15 @@ require ( // requires for tests only require ( github.com/rekby/fixenv v0.6.1 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.8.1 go.uber.org/mock v0.4.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect diff --git a/go.sum b/go.sum index d99a653f3..37610ce6b 100644 --- a/go.sum +++ b/go.sum @@ -11,7 +11,6 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -68,11 +67,13 @@ github.com/rekby/fixenv v0.6.1/go.mod h1:/b5LRc06BYJtslRtHKxsPWFT/ySpHV+rWvzTg+X github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 h1:LY6cI8cP4B9rrpTleZk95+08kl2gF4rixG7+V/dwL6Q= github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= @@ -80,6 +81,10 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= diff --git a/internal/grpcwrapper/rawtopic/client.go b/internal/grpcwrapper/rawtopic/client.go index 986abea39..c06963d07 100644 --- a/internal/grpcwrapper/rawtopic/client.go +++ b/internal/grpcwrapper/rawtopic/client.go @@ -126,6 +126,7 @@ func (c *Client) StreamWrite( Stream: protoResp, Tracer: tracer, InternalStreamID: uuid.New().String(), + LogContext: &ctxStreamLifeTime, }, nil } diff --git a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go index c714a2d4a..f578eb55b 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go +++ b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go @@ -1,6 +1,7 @@ package rawtopicwriter import ( + "context" "errors" "fmt" "reflect" @@ -34,6 +35,7 @@ type StreamWriter struct { readMessagesCount int writtenMessagesCount int sessionID string + LogContext *context.Context } //nolint:funlen @@ -50,7 +52,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) { defer func() { // defer needs for set good session id on first init response before trace the message trace.TopicOnWriterReceiveGRPCMessage( - w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr, + w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr, ) }() if sendErr != nil { @@ -139,7 +141,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) { err = w.Stream.Send(&protoMsg) w.writtenMessagesCount++ - trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err) + trace.TopicOnWriterSentGRPCMessage( + w.Tracer, + w.LogContext, + w.InternalStreamID, + w.sessionID, + w.writtenMessagesCount, + &protoMsg, + err, + ) if err != nil { return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err))) } diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index ee973e71d..f6aab498a 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -322,7 +322,8 @@ func (c *Client) StartReader( if err != nil { return nil, err } - trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err) + + internalReader.TopicOnReaderStart(consumer, err) return topicreader.NewReader(internalReader), nil } @@ -365,15 +366,8 @@ func (c *Client) createWriterConfig( topicPath string, opts []topicoptions.WriterOption, ) topicwriterinternal.WriterReconnectorConfig { - var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) ( - topicwriterinternal.RawTopicWriterStream, - error, - ) { - return c.rawClient.StreamWrite(ctx, tracer) - } - options := []topicoptions.WriterOption{ - topicwriterinternal.WithConnectFunc(connector), + topicwriterinternal.WithRawClient(&c.rawClient), topicwriterinternal.WithTopic(topicPath), topicwriterinternal.WithCommonConfig(c.cfg.Common), topicwriterinternal.WithTrace(c.cfg.Trace), diff --git a/internal/topic/topicreadercommon/committer.go b/internal/topic/topicreadercommon/committer.go index e27ab6d6e..1ae0aca22 100644 --- a/internal/topic/topicreadercommon/committer.go +++ b/internal/topic/topicreadercommon/committer.go @@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) { onDone := trace.TopicOnReaderSendCommitMessage( c.tracer, + &ctx, &commits, ) err := c.send(commits.ToRawMessage()) diff --git a/internal/topic/topicreaderinternal/batched_stream_reader_interface.go b/internal/topic/topicreaderinternal/batched_stream_reader_interface.go index b65a4e8ae..8eb5a81a4 100644 --- a/internal/topic/topicreaderinternal/batched_stream_reader_interface.go +++ b/internal/topic/topicreaderinternal/batched_stream_reader_interface.go @@ -15,4 +15,5 @@ type batchedStreamReader interface { Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error CloseWithError(ctx context.Context, err error) error PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll + TopicOnReaderStart(consumer string, err error) } diff --git a/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go b/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go index 85194d433..10e918867 100644 --- a/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go +++ b/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go @@ -192,6 +192,42 @@ func (c *MockbatchedStreamReaderReadMessageBatchCall) DoAndReturn(f func(context return c } +// TopicOnReaderStart mocks base method. +func (m *MockbatchedStreamReader) TopicOnReaderStart(consumer string, err error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "TopicOnReaderStart", consumer, err) +} + +// TopicOnReaderStart indicates an expected call of TopicOnReaderStart. +func (mr *MockbatchedStreamReaderMockRecorder) TopicOnReaderStart(consumer, err any) *MockbatchedStreamReaderTopicOnReaderStartCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TopicOnReaderStart", reflect.TypeOf((*MockbatchedStreamReader)(nil).TopicOnReaderStart), consumer, err) + return &MockbatchedStreamReaderTopicOnReaderStartCall{Call: call} +} + +// MockbatchedStreamReaderTopicOnReaderStartCall wrap *gomock.Call +type MockbatchedStreamReaderTopicOnReaderStartCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockbatchedStreamReaderTopicOnReaderStartCall) Return() *MockbatchedStreamReaderTopicOnReaderStartCall { + c.Call = c.Call.Return() + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockbatchedStreamReaderTopicOnReaderStartCall) Do(f func(string, error)) *MockbatchedStreamReaderTopicOnReaderStartCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockbatchedStreamReaderTopicOnReaderStartCall) DoAndReturn(f func(string, error)) *MockbatchedStreamReaderTopicOnReaderStartCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // WaitInit mocks base method. func (m *MockbatchedStreamReader) WaitInit(ctx context.Context) error { m.ctrl.T.Helper() diff --git a/internal/topic/topicreaderinternal/reader.go b/internal/topic/topicreaderinternal/reader.go index 1330b0da7..258b29a32 100644 --- a/internal/topic/topicreaderinternal/reader.go +++ b/internal/topic/topicreaderinternal/reader.go @@ -39,6 +39,10 @@ type Reader struct { readerID int64 } +func (r *Reader) TopicOnReaderStart(consumer string, err error) { + r.reader.TopicOnReaderStart(consumer, err) +} + type ReadMessageBatchOptions struct { batcherGetOptions } @@ -93,14 +97,17 @@ func NewReader( return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig) } + reader := newReaderReconnector( + cfg.BaseContext, + readerID, + readerConnector, + cfg.OperationTimeout(), + cfg.RetrySettings, + cfg.Trace, + ) + res := Reader{ - reader: newReaderReconnector( - readerID, - readerConnector, - cfg.OperationTimeout(), - cfg.RetrySettings, - cfg.Trace, - ), + reader: reader, defaultBatchConfig: cfg.DefaultBatchConfig, tracer: cfg.Trace, readerID: readerID, diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index 69b3a7973..91e2f2642 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -185,23 +185,26 @@ func (r *topicStreamReaderImpl) WaitInit(_ context.Context) error { return nil } +func (r *topicStreamReaderImpl) TopicOnReaderStart(consumer string, err error) { + logCtx := r.cfg.BaseContext + trace.TopicOnReaderStart(r.cfg.Trace, &logCtx, r.readerID, consumer, err) +} + func (r *topicStreamReaderImpl) PopMessagesBatchTx( ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions, ) (_ *topicreadercommon.PublicBatch, resErr error) { - traceCtx := ctx - onDone := trace.TopicOnReaderStreamPopBatchTx( - r.cfg.Trace, - &traceCtx, - r.readerID, - r.readConnectionID, - tx.SessionID(), - tx, - ) - ctx = traceCtx defer func() { - onDone(resErr) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderStreamPopBatchTx( + r.cfg.Trace, + &logCtx, + r.readerID, + r.readConnectionID, + tx.SessionID(), + tx, + )(resErr) }() batch, err := r.ReadMessageBatch(ctx, opts) @@ -226,51 +229,47 @@ func (r *topicStreamReaderImpl) commitWithTransaction( } req := r.createUpdateOffsetRequest(ctx, batch, tx) - updateOffesetInTransactionErr := retry.Retry(ctx, func(ctx context.Context) (err error) { - traceCtx := ctx - onDone := trace.TopicOnReaderUpdateOffsetsInTransaction( - r.cfg.Trace, - &traceCtx, - r.readerID, - r.readConnectionID, - tx.SessionID(), - tx, - ) + updateOffsetInTransactionErr := retry.Retry(ctx, func(ctx context.Context) (err error) { defer func() { - onDone(err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderUpdateOffsetsInTransaction( + r.cfg.Trace, + &logCtx, + r.readerID, + r.readConnectionID, + tx.SessionID(), + tx, + ) }() - ctx = traceCtx err = r.topicClient.UpdateOffsetsInTransaction(ctx, req) return err }) - if updateOffesetInTransactionErr == nil { - r.addOnTransactionCompletedHandler(ctx, tx, batch, updateOffesetInTransactionErr) + if updateOffsetInTransactionErr == nil { + r.addOnTransactionCompletedHandler(ctx, tx, batch, updateOffsetInTransactionErr) } else { _ = retry.Retry(ctx, func(ctx context.Context) (err error) { - traceCtx := ctx - onDone := trace.TopicOnReaderTransactionRollback( - r.cfg.Trace, - &traceCtx, - r.readerID, - r.readConnectionID, - tx.SessionID(), - tx, - ) - ctx = traceCtx defer func() { - onDone(err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderTransactionRollback( + r.cfg.Trace, + &logCtx, + r.readerID, + r.readConnectionID, + tx.SessionID(), + tx, + )(err) }() return tx.Rollback(ctx) }) _ = r.CloseWithError(xcontext.ValueOnly(ctx), xerrors.WithStackTrace(xerrors.Retryable( - fmt.Errorf("ydb: failed add topic offsets in transaction: %w", updateOffesetInTransactionErr), + fmt.Errorf("ydb: failed add topic offsets in transaction: %w", updateOffsetInTransactionErr), ))) - return updateOffesetInTransactionErr + return updateOffsetInTransactionErr } return nil @@ -284,19 +283,19 @@ func (r *topicStreamReaderImpl) addOnTransactionCompletedHandler( ) { commitRange := topicreadercommon.GetCommitRange(batch) tx.OnCompleted(func(transactionResult error) { - traceCtx := ctx - onDone := trace.TopicOnReaderTransactionCompleted( - r.cfg.Trace, - &traceCtx, - r.readerID, - r.readConnectionID, - tx.SessionID(), - tx, - transactionResult, - ) - defer onDone() + defer func() { + logCtx := r.cfg.BaseContext + trace.TopicOnReaderTransactionCompleted( + r.cfg.Trace, + &logCtx, + r.readerID, + r.readConnectionID, + tx.SessionID(), + tx, + transactionResult, + ) + }() - ctx = traceCtx if transactionResult == nil { topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd) } else { @@ -344,19 +343,31 @@ func (r *topicStreamReaderImpl) ReadMessageBatch( ctx context.Context, opts ReadMessageBatchOptions, ) (batch *topicreadercommon.PublicBatch, err error) { - onDone := trace.TopicOnReaderReadMessages( - r.cfg.Trace, - &ctx, - opts.MinCount, - opts.MaxCount, - r.getRestBufferBytes(), - ) defer func() { + traceFunc := func( + messagesCount int, + topic string, + partitionID int64, + partitionSessionID int64, + offsetStart int64, + offsetEnd int64, + freeBufferCapacity int, + ) { + mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext) + onDone := trace.TopicOnReaderReadMessages( + r.cfg.Trace, + &mergeCtx, + opts.MinCount, + opts.MaxCount, + r.getRestBufferBytes(), + ) + onDone(messagesCount, topic, partitionID, partitionSessionID, offsetStart, offsetEnd, freeBufferCapacity, nil) + } if batch == nil { - onDone(0, "", -1, -1, -1, -1, r.getRestBufferBytes(), err) + traceFunc(0, "", -1, -1, -1, -1, r.getRestBufferBytes()) } else { commitRange := topicreadercommon.GetCommitRange(batch) - onDone( + traceFunc( len(batch.Messages), batch.Topic(), batch.PartitionID(), @@ -364,7 +375,6 @@ func (r *topicStreamReaderImpl) ReadMessageBatch( commitRange.CommitOffsetStart.ToInt64(), commitRange.CommitOffsetEnd.ToInt64(), r.getRestBufferBytes(), - err, ) } }() @@ -462,7 +472,6 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer( if err != nil { return err } - onDone := trace.TopicOnReaderPartitionReadStopResponse( r.cfg.Trace, r.readConnectionID, @@ -525,18 +534,18 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange topicrea return xerrors.WithStackTrace(errCommitWithNilPartitionSession) } - session := commitRange.PartitionSession - onDone := trace.TopicOnReaderCommit( - r.cfg.Trace, - &ctx, - session.Topic, - session.PartitionID, - session.StreamPartitionSessionID.ToInt64(), - commitRange.CommitOffsetStart.ToInt64(), - commitRange.CommitOffsetEnd.ToInt64(), - ) defer func() { - onDone(err) + session := commitRange.PartitionSession + mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext) + trace.TopicOnReaderCommit( + r.cfg.Trace, + &mergeCtx, + session.Topic, + session.PartitionID, + session.StreamPartitionSessionID.ToInt64(), + commitRange.CommitOffsetStart.ToInt64(), + commitRange.CommitOffsetEnd.ToInt64(), + ) }() if err = r.checkCommitRange(commitRange); err != nil { @@ -574,7 +583,9 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange topicreadercommon.C func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error { err := r.stream.Send(msg) if err != nil { - trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderError(r.cfg.Trace, &logCtx, r.readConnectionID, err) + _ = r.CloseWithError(r.ctx, err) } @@ -613,8 +624,9 @@ func (r *topicStreamReaderImpl) setStarted() error { func (r *topicStreamReaderImpl) initSession() (err error) { initMessage := topicreadercommon.CreateInitMessage(r.cfg.Consumer, r.cfg.ReadSelectors) - onDone := trace.TopicOnReaderInit(r.cfg.Trace, r.readConnectionID, initMessage) defer func() { + logCtx := r.cfg.BaseContext + onDone := trace.TopicOnReaderInit(r.cfg.Trace, &logCtx, r.readConnectionID, initMessage) onDone(r.readConnectionID, err) }() @@ -664,9 +676,10 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) { for { serverMessage, err := r.stream.Recv() if err != nil { - trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderError(r.cfg.Trace, &logCtx, r.readConnectionID, err) if errors.Is(err, rawtopicreader.ErrUnexpectedMessageType) { - trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, r.readConnectionID, err) + trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, &logCtx, r.readConnectionID, err) // new messages can be added to protocol, it must be backward compatible to old programs // and skip message is safe continue @@ -716,14 +729,18 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) { case *rawtopicreader.UpdateTokenResponse: r.onUpdateTokenResponse(m) default: - trace.TopicOnReaderUnknownGrpcMessage( - r.cfg.Trace, - r.readConnectionID, - xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( - "ydb: unexpected message type in stream reader: %v", - reflect.TypeOf(serverMessage), - ))), - ) + { + logCtx := r.cfg.BaseContext + trace.TopicOnReaderUnknownGrpcMessage( + r.cfg.Trace, + &logCtx, + r.readConnectionID, + xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( + "ydb: unexpected message type in stream reader: %v", + reflect.TypeOf(serverMessage), + ))), + ) + } } } } @@ -757,7 +774,8 @@ func (r *topicStreamReaderImpl) dataRequestLoop(ctx context.Context) { } resCapacity := r.addRestBufferBytes(sum) - trace.TopicOnReaderSentDataRequest(r.cfg.Trace, r.readConnectionID, sum, resCapacity) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderSentDataRequest(r.cfg.Trace, &logCtx, r.readConnectionID, sum, resCapacity) if err := r.sendDataRequest(sum); err != nil { return } @@ -797,9 +815,9 @@ func (r *topicStreamReaderImpl) updateTokenLoop(ctx context.Context) { func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) (err error) { resCapacity := r.addRestBufferBytes(-msg.BytesSize) - onDone := trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, r.readConnectionID, resCapacity, msg) defer func() { - onDone(err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, &logCtx, r.readConnectionID, resCapacity, msg) }() batches, err2 := topicreadercommon.ReadRawBatchesToPublicBatches(msg, &r.sessionController, r.cfg.Decoders) @@ -817,8 +835,10 @@ func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) } func (r *topicStreamReaderImpl) CloseWithError(ctx context.Context, reason error) (closeErr error) { - onDone := trace.TopicOnReaderClose(r.cfg.Trace, r.readConnectionID, reason) - defer onDone(closeErr) + defer func() { + logCtx := r.cfg.BaseContext + trace.TopicOnReaderClose(r.cfg.Trace, &logCtx, r.readConnectionID, reason) + }() isFirstClose := false r.m.WithLock(func() { @@ -866,8 +886,10 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse } partition.SetCommittedOffsetForward(commit.CommittedOffset) + logCtx := r.cfg.BaseContext trace.TopicOnReaderCommittedNotify( r.cfg.Trace, + &logCtx, r.readConnectionID, partition.Topic, partition.PartitionID, @@ -882,12 +904,14 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse } func (r *topicStreamReaderImpl) updateToken(ctx context.Context) { + logCtx := r.cfg.BaseContext onUpdateToken := trace.TopicOnReaderUpdateToken( r.cfg.Trace, + &logCtx, r.readConnectionID, ) token, err := r.cfg.Cred.Token(ctx) - onSent := onUpdateToken(len(token), err) + onSent := onUpdateToken(&ctx, len(token), err) if err != nil { return } diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index 1f41cb39b..10133f794 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -34,6 +34,7 @@ type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error) type readerReconnector struct { background background.Worker clock clockwork.Clock + logContext context.Context //nolint:containedctx retrySettings topic.RetrySettings streamVal batchedStreamReader streamContextCancel context.CancelCauseFunc @@ -52,6 +53,7 @@ type readerReconnector struct { } func newReaderReconnector( + logContext context.Context, readerID int64, connector readerConnectFunc, connectTimeout time.Duration, @@ -64,6 +66,7 @@ func newReaderReconnector( readerConnect: connector, streamErr: errUnconnected, connectTimeout: connectTimeout, + logContext: logContext, tracer: tracer, retrySettings: retrySettings, } @@ -78,6 +81,15 @@ func newReaderReconnector( return res } +func (r *readerReconnector) TopicOnReaderStart(consumer string, err error) { + logCtx := r.logContext + trace.TopicOnReaderStart(r.tracer, &logCtx, r.readerID, consumer, err) +} + +func (r *readerReconnector) SetLogContext(ctx context.Context) { + r.logContext = ctx +} + func (r *readerReconnector) PopMessagesBatchTx( ctx context.Context, tx tx.Transaction, @@ -263,8 +275,6 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) { } } - onReconnectionDone := trace.TopicOnReaderReconnect(r.tracer, request.reason) - if request.reason != nil { retryBackoff, stopRetryReason := r.checkErrRetryMode( request.reason, @@ -286,21 +296,25 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) { } } else { _ = r.CloseWithError(ctx, stopRetryReason) - onReconnectionDone(stopRetryReason) + logCtx := r.logContext + trace.TopicOnReaderReconnect(r.tracer, &logCtx, request.reason)(stopRetryReason) return } } err := r.reconnect(ctx, request.reason, request.oldReader) - onReconnectionDone(err) + logCtx := r.logContext + trace.TopicOnReaderReconnect(r.tracer, &logCtx, request.reason)(err) } } //nolint:funlen func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldReader batchedStreamReader) (err error) { - onDone := trace.TopicOnReaderReconnect(r.tracer, reason) - defer func() { onDone(err) }() + defer func() { + logCtx := r.logContext + trace.TopicOnReaderReconnect(r.tracer, &logCtx, reason)(err) + }() if err = ctx.Err(); err != nil { return err @@ -339,9 +353,15 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead r.background.Start("ydb topic reader send reconnect message", func(ctx context.Context) { select { case r.reconnectFromBadStream <- newReconnectRequest(oldReader, sendReason): - trace.TopicOnReaderReconnectRequest(r.tracer, err, true) + { + logCtx := r.logContext + trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, true) + } case <-ctx.Done(): - trace.TopicOnReaderReconnectRequest(r.tracer, ctx.Err(), false) + { + logCtx := r.logContext + trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, false) + } } }) default: @@ -440,11 +460,17 @@ func (r *readerReconnector) fireReconnectOnRetryableError(stream batchedStreamRe select { case r.reconnectFromBadStream <- newReconnectRequest(stream, err): - // send signal - trace.TopicOnReaderReconnectRequest(r.tracer, err, true) + { + // send signal + logCtx := r.logContext + trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, true) + } default: - // previous reconnect signal in process, no need sent signal more - trace.TopicOnReaderReconnectRequest(r.tracer, err, false) + { + // previous reconnect signal in process, no need sent signal more + logCtx := r.logContext + trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, false) + } } } diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index 8bd5ea17b..06fcafc25 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -3,6 +3,7 @@ package topicwriterinternal import ( "bytes" "compress/gzip" + "context" "fmt" "io" "sync" @@ -164,9 +165,11 @@ type EncoderSelector struct { parallelCompressors int batchCounter int measureIntervalBatches int + logContext context.Context //nolint:containedctx } func NewEncoderSelector( + logContext context.Context, m *MultiEncoder, allowedCodecs rawtopiccommon.SupportedCodecs, parallelCompressors int, @@ -184,6 +187,7 @@ func NewEncoderSelector( tracer: tracer, writerReconnectorID: writerReconnectorID, sessionID: sessionID, + logContext: logContext, } res.ResetAllowedCodecs(allowedCodecs) @@ -193,8 +197,10 @@ func NewEncoderSelector( func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (rawtopiccommon.Codec, error) { codec, err := s.selectCodec(messages) if err == nil { + logCtx := s.logContext onCompressDone := trace.TopicOnWriterCompressMessages( s.tracer, + &logCtx, s.writerReconnectorID, s.sessionID, codec.ToInt32(), @@ -263,8 +269,10 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt if len(messages) > 0 { firstSeqNo = messages[0].SeqNo } + logCtx := s.logContext onCompressDone := trace.TopicOnWriterCompressMessages( s.tracer, + &logCtx, s.writerReconnectorID, s.sessionID, codec.ToInt32(), diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index b2bf7bd97..ac0672e4a 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -3,6 +3,7 @@ package topicwriterinternal import ( "bytes" "compress/gzip" + "context" "fmt" "io" "strings" @@ -15,14 +16,32 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) +func NewTestEncoderSelector( + m *MultiEncoder, + allowedCodecs rawtopiccommon.SupportedCodecs, + parallelCompressors int, + tracer *trace.Topic, + writerReconnectorID, sessionID string, +) EncoderSelector { + return NewEncoderSelector( + context.Background(), + m, + allowedCodecs, + parallelCompressors, + tracer, + writerReconnectorID, + sessionID, + ) +} + func TestEncoderSelector_CodecMeasure(t *testing.T) { t.Run("Empty", func(t *testing.T) { - s := NewEncoderSelector(testCommonEncoders, nil, 1, &trace.Topic{}, "", "") + s := NewTestEncoderSelector(testCommonEncoders, nil, 1, &trace.Topic{}, "", "") _, err := s.measureCodecs(nil) require.Error(t, err) }) t.Run("One", func(t *testing.T) { - s := NewEncoderSelector( + s := NewTestEncoderSelector( NewMultiEncoder(), rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw}, 1, @@ -44,7 +63,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { ) testSelectCodec := func(t testing.TB, targetCodec rawtopiccommon.Codec, smallCount, largeCount int) { - s := NewEncoderSelector(testCommonEncoders, rawtopiccommon.SupportedCodecs{ + s := NewTestEncoderSelector(testCommonEncoders, rawtopiccommon.SupportedCodecs{ rawtopiccommon.CodecRaw, rawtopiccommon.CodecGzip, }, 4, diff --git a/internal/topic/topicwriterinternal/writer_config.go b/internal/topic/topicwriterinternal/writer_config.go index 461b71906..52e296c3b 100644 --- a/internal/topic/topicwriterinternal/writer_config.go +++ b/internal/topic/topicwriterinternal/writer_config.go @@ -1,11 +1,13 @@ package topicwriterinternal import ( + "context" "time" "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-sdk/v3/credentials" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -19,7 +21,9 @@ type WritersCommonConfig struct { compressorCount int maxBytesPerMessage int + LogContext context.Context //nolint:containedctx Tracer *trace.Topic + rawTopicClient *rawtopic.Client cred credentials.Credentials credUpdateInterval time.Duration clock clockwork.Clock diff --git a/internal/topic/topicwriterinternal/writer_options.go b/internal/topic/topicwriterinternal/writer_options.go index 4afa72f13..534629d8a 100644 --- a/internal/topic/topicwriterinternal/writer_options.go +++ b/internal/topic/topicwriterinternal/writer_options.go @@ -7,6 +7,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/credentials" "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -77,6 +78,14 @@ func WithCredentials(cred credentials.Credentials) PublicWriterOption { } } +// WithRawClient for internal usage only +// no proxy to public interface +func WithRawClient(rawClient *rawtopic.Client) PublicWriterOption { + return func(cfg *WriterReconnectorConfig) { + cfg.rawTopicClient = rawClient + } +} + func WithCodec(codec rawtopiccommon.Codec) PublicWriterOption { return func(cfg *WriterReconnectorConfig) { cfg.forceCodec = codec diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 83aafaabc..974449441 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -113,6 +113,22 @@ func NewWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnector WithProducerID(uuid.NewString())(&cfg) } + if cfg.Connect == nil { + logContext := context.Background() + if cfg.LogContext != nil { + logContext = cfg.LogContext + } + + var connector ConnectFunc = func(ctx context.Context, tracer *trace.Topic) ( + RawTopicWriterStream, + error, + ) { + return cfg.rawTopicClient.StreamWrite(xcontext.MergeContexts(ctx, logContext), tracer) + } + + cfg.Connect = connector + } + return cfg } @@ -324,8 +340,10 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage) w.m.WithRLock(func() { sessionID = w.sessionID }) + logCtx := w.cfg.LogContext onCompressDone := trace.TopicOnWriterCompressMessages( w.cfg.Tracer, + &logCtx, w.writerInstanceID, sessionID, w.cfg.forceCodec.ToInt32(), @@ -340,6 +358,7 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage) } err := cacheMessages(res, targetCodec, w.cfg.compressorCount) onCompressDone(err) + if err != nil { return nil, err } @@ -366,9 +385,9 @@ func (w *WriterReconnector) Close(ctx context.Context) error { } func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr error) { - onDone := trace.TopicOnWriterClose(w.cfg.Tracer, w.writerInstanceID, reason) defer func() { - onDone(resErr) + logCtx := w.cfg.LogContext + trace.TopicOnWriterClose(w.cfg.Tracer, &logCtx, w.writerInstanceID, reason) }() // stop background work and single stream writer @@ -387,7 +406,6 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err func (w *WriterReconnector) connectionLoop(ctx context.Context) { attempt := 0 - createStreamContext := func() (context.Context, context.CancelFunc) { // need suppress parent context cancelation for flush buffer while close writer return xcontext.WithCancel(xcontext.ValueOnly(ctx)) @@ -425,8 +443,10 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) { } } + logCtx := w.cfg.LogContext onWriterStarted := trace.TopicOnWriterReconnect( w.cfg.Tracer, + &logCtx, w.writerInstanceID, w.cfg.topic, w.cfg.producerID, diff --git a/internal/topic/topicwriterinternal/writer_single_stream.go b/internal/topic/topicwriterinternal/writer_single_stream.go index d1869a7e9..0f1fe8c7c 100644 --- a/internal/topic/topicwriterinternal/writer_single_stream.go +++ b/internal/topic/topicwriterinternal/writer_single_stream.go @@ -125,8 +125,17 @@ func (w *SingleStreamWriter) start() { } func (w *SingleStreamWriter) initStream() (err error) { - traceOnDone := trace.TopicOnWriterInitStream(w.cfg.Tracer, w.cfg.reconnectorInstanceID, w.cfg.topic, w.cfg.producerID) - defer func() { traceOnDone(w.SessionID, err) }() + defer func() { + logCtx := w.cfg.LogContext + traceOnDone := trace.TopicOnWriterInitStream( + w.cfg.Tracer, + &logCtx, + w.cfg.reconnectorInstanceID, + w.cfg.topic, + w.cfg.producerID, + ) + traceOnDone(w.SessionID, err) + }() req := w.createInitRequest() if err = w.cfg.stream.Send(&req); err != nil { @@ -149,6 +158,7 @@ func (w *SingleStreamWriter) initStream() (err error) { } w.Encoder = NewEncoderSelector( + w.cfg.LogContext, w.cfg.encodersMap, w.allowedCodecs, w.cfg.compressorCount, @@ -203,15 +213,19 @@ func (w *SingleStreamWriter) receiveMessagesLoop(ctx context.Context) { case *rawtopicwriter.UpdateTokenResponse: // pass default: - trace.TopicOnWriterReadUnknownGrpcMessage( - w.cfg.Tracer, - w.cfg.reconnectorInstanceID, - w.SessionID, - xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( - "ydb: unexpected message type in stream reader: %v", - reflect.TypeOf(m), - ))), - ) + { + logCtx := w.cfg.LogContext + trace.TopicOnWriterReadUnknownGrpcMessage( + w.cfg.Tracer, + &logCtx, + w.cfg.reconnectorInstanceID, + w.SessionID, + xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( + "ydb: unexpected message type in stream reader: %v", + reflect.TypeOf(m), + ))), + ) + } } } } @@ -232,16 +246,20 @@ func (w *SingleStreamWriter) sendMessagesFromQueueToStreamLoop(ctx context.Conte return } + err = sendMessagesToStream(w.cfg.stream, w.cfg.maxBytesPerMessage, targetCodec, messages) + + logCtx := w.cfg.LogContext onSentComplete := trace.TopicOnWriterSendMessages( w.cfg.Tracer, + &logCtx, w.cfg.reconnectorInstanceID, w.SessionID, targetCodec.ToInt32(), messages[0].SeqNo, len(messages), ) - err = sendMessagesToStream(w.cfg.stream, w.cfg.maxBytesPerMessage, targetCodec, messages) onSentComplete(err) + if err != nil { err = xerrors.WithStackTrace(fmt.Errorf("ydb: error send message to topic stream: %w", err)) _ = w.close(ctx, err) diff --git a/internal/topic/topicwriterinternal/writer_transaction.go b/internal/topic/topicwriterinternal/writer_transaction.go index ae1a108d2..414841eeb 100644 --- a/internal/topic/topicwriterinternal/writer_transaction.go +++ b/internal/topic/topicwriterinternal/writer_transaction.go @@ -39,10 +39,10 @@ func (w *WriterWithTransaction) onBeforeCommitTransaction(ctx context.Context) ( w.streamWriter.GetSessionID(), w.tx.ID(), ) - ctx = traceCtx defer func() { onDone(err, w.streamWriter.GetSessionID()) + ctx = traceCtx }() // wait message flushing diff --git a/internal/xcontext/merge_contexts.go b/internal/xcontext/merge_contexts.go new file mode 100644 index 000000000..fd4c006b8 --- /dev/null +++ b/internal/xcontext/merge_contexts.go @@ -0,0 +1,40 @@ +package xcontext + +import ( + "context" + "time" +) + +var _ context.Context = (*MergedContexts)(nil) + +type MergedContexts struct { + additionalValues context.Context //nolint:containedctx + deadlineContext context.Context //nolint:containedctx +} + +func (ctx *MergedContexts) Deadline() (deadline time.Time, ok bool) { + return ctx.deadlineContext.Deadline() +} + +func (ctx *MergedContexts) Done() <-chan struct{} { + return ctx.deadlineContext.Done() +} + +func (ctx *MergedContexts) Err() error { + return ctx.deadlineContext.Err() +} + +func (ctx *MergedContexts) Value(key interface{}) interface{} { + if ctx.deadlineContext.Value(key) != nil { + return ctx.deadlineContext.Value(key) + } + + return ctx.additionalValues.Value(key) +} + +func MergeContexts(req context.Context, log context.Context) context.Context { + return &MergedContexts{ + additionalValues: log, + deadlineContext: req, + } +} diff --git a/log/context.go b/log/context.go index b724e5ec3..abf168a7e 100644 --- a/log/context.go +++ b/log/context.go @@ -37,5 +37,9 @@ func NamesFromContext(ctx context.Context) []string { } func with(ctx context.Context, lvl Level, names ...string) context.Context { + if ctx == nil { + ctx = context.Background() + } + return WithLevel(WithNames(ctx, names...), lvl) } diff --git a/log/topic.go b/log/topic.go index 06837c9b3..ad2d453a1 100644 --- a/log/topic.go +++ b/log/topic.go @@ -1,7 +1,6 @@ package log import ( - "context" "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" @@ -25,7 +24,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "reconnect") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "reconnect") start := time.Now() l.Log(ctx, "topic reader reconnect starting...") @@ -40,7 +39,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "reconnect", "request") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "reconnect", "request") l.Log(ctx, "topic reader reconnect request", kv.NamedError("reason", info.Reason), kv.Bool("was_sent", info.WasSent), @@ -52,7 +51,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderPartitionEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "partition", "read", "start", "response") + ctx := with(*info.PartitionContext, TRACE, "ydb", "topic", "reader", "partition", "read", "start", "response") start := time.Now() l.Log(ctx, "topic reader start partition read response starting...", kv.String("topic", info.Topic), @@ -97,7 +96,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderPartitionEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "partition", "read", "stop", "response") + ctx := with(info.PartitionContext, TRACE, "ydb", "topic", "reader", "partition", "read", "stop", "response") start := time.Now() l.Log(ctx, "topic reader stop partition read response starting...", kv.String("reader_connection_id", info.ReaderConnectionID), @@ -133,7 +132,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(*info.RequestContext, TRACE, "ydb", "topic", "reader", "commit") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "commit") start := time.Now() l.Log(ctx, "topic reader commit starting...", kv.String("topic", info.Topic), @@ -170,7 +169,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "send", "commit", "message") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "send", "commit", "message") start := time.Now() commitInfo := info.CommitsInfo.GetCommitsInfo() @@ -211,7 +210,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "committed", "notify") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "committed", "notify") l.Log(ctx, "topic reader received commit ack", kv.String("reader_connection_id", info.ReaderConnectionID), kv.String("topic", info.Topic), @@ -224,7 +223,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "close") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "close") start := time.Now() l.Log(ctx, "topic reader close starting...", kv.String("reader_connection_id", info.ReaderConnectionID), @@ -253,7 +252,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "init") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "init") start := time.Now() l.Log(ctx, "topic reader init starting...", kv.String("pre_init_reader_connection_id", info.PreInitReaderConnectionID), @@ -284,7 +283,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "error") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "error") l.Log(WithLevel(ctx, INFO), "topic reader has grpc stream error", kv.Error(info.Error), kv.String("reader_connection_id", info.ReaderConnectionID), @@ -299,7 +298,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "update", "token") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "update", "token") start := time.Now() l.Log(ctx, "topic reader token update starting...", kv.String("reader_connection_id", info.ReaderConnectionID), @@ -347,7 +346,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "sent", "data", "request") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "sent", "data", "request") l.Log(ctx, "topic reader sent data request", kv.String("reader_connection_id", info.ReaderConnectionID), kv.Int("request_bytes", info.RequestBytes), @@ -360,7 +359,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "receive", "data", "response") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "receive", "data", "response") start := time.Now() partitionsCount, batchesCount, messagesCount := info.DataResponse.GetPartitionBatchMessagesCounts() l.Log(ctx, "topic reader data response received, process starting...", @@ -404,7 +403,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return nil } - ctx := with(*info.RequestContext, TRACE, "ydb", "topic", "reader", "read", "messages") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "read", "messages") start := time.Now() l.Log(ctx, "topic read messages, waiting...", kv.Int("min_count", info.MinCount), @@ -436,7 +435,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "unknown", "grpc", "message") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "unknown", "grpc", "message") l.Log(WithLevel(ctx, INFO), "topic reader received unknown grpc message", kv.Error(info.Error), kv.String("reader_connection_id", info.ReaderConnectionID), @@ -648,7 +647,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "reconnect") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "reconnect") start := time.Now() l.Log(ctx, "connect to topic writer stream starting...", kv.String("topic", info.Topic), @@ -694,7 +693,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "stream", "init") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "stream", "init") start := time.Now() l.Log(ctx, "topic writer init stream starting...", kv.String("topic", info.Topic), @@ -734,7 +733,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { start := time.Now() return func(doneInfo trace.TopicOnWriterBeforeCommitTransactionDoneInfo) { - ctx := with(*info.Ctx, TRACE, "ydb", "topic", "writer", "beforecommit") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "beforecommit") l.Log(ctx, "topic writer wait of flush messages before commit transaction", kv.String("kqp_session_id", info.KqpSessionID), kv.String("topic_session_id_start", info.TopicSessionID), @@ -750,7 +749,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { start := time.Now() return func(doneInfo trace.TopicOnWriterAfterFinishTransactionDoneInfo) { - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "beforecommit") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "beforecommit") l.Log(ctx, "topic writer close writer after transaction finished", kv.String("kqp_session_id", info.SessionID), kv.String("tx_id", info.TransactionID), @@ -762,7 +761,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "close") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "close") start := time.Now() l.Log(ctx, "topic writer close starting...", kv.String("writer_instance_id", info.WriterInstanceID), @@ -793,7 +792,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "compress", "messages") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "compress", "messages") start := time.Now() l.Log(ctx, "topic writer compress messages starting...", kv.String("writer_instance_id", info.WriterInstanceID), @@ -836,7 +835,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "send", "messages") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "send", "messages") start := time.Now() l.Log(ctx, "topic writer send messages starting...", kv.String("writer_instance_id", info.WriterInstanceID), @@ -874,7 +873,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return } acks := info.Acks.GetAcks() - ctx := with(context.Background(), DEBUG, "ydb", "topic", "writer", "receive", "result") + ctx := with(*info.Context, DEBUG, "ydb", "topic", "writer", "receive", "result") l.Log(ctx, "topic writer received result from server", kv.String("writer_instance_id", info.WriterInstanceID), kv.String("session_id", info.SessionID), @@ -895,7 +894,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "grpc") l.Log( ctx, "topic writer sent grpc message (message body and metadata are removed)", kv.String("topic_stream_internal_id", info.TopicStreamInternalID), @@ -911,7 +910,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "grpc") l.Log( ctx, "topic writer received grpc message (message body and metadata are removed)", kv.String("topic_stream_internal_id", info.TopicStreamInternalID), @@ -926,7 +925,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return } - ctx := with(context.Background(), DEBUG, "ydb", "topic", "writer", "read", "unknown", "grpc", "message") + ctx := with(*info.Context, DEBUG, "ydb", "topic", "writer", "read", "unknown", "grpc", "message") l.Log(ctx, "topic writer receive unknown grpc message from server", kv.Error(info.Error), kv.String("writer_instance_id", info.WriterInstanceID), diff --git a/tests/integration/topic_cdc_reader_test.go b/tests/integration/topic_cdc_reader_test.go index 81cd5e51d..d41adfdb5 100644 --- a/tests/integration/topic_cdc_reader_test.go +++ b/tests/integration/topic_cdc_reader_test.go @@ -189,6 +189,17 @@ func createFeedAndReader( return db, reader } +func createFeedAndReaderForDB( + ctx context.Context, + t *testing.T, + db *ydb.Driver, + opts ...topicoptions.ReaderOption, +) (*ydb.Driver, *topicreader.Reader) { + createCDCFeed(ctx, t, db) + reader := createFeedReader(t, db, opts...) + return db, reader +} + var sendCDCCounter int64 func sendCDCMessage(ctx context.Context, t *testing.T, db *ydb.Driver) { diff --git a/tests/integration/topic_log_test.go b/tests/integration/topic_log_test.go new file mode 100644 index 000000000..325932804 --- /dev/null +++ b/tests/integration/topic_log_test.go @@ -0,0 +1,275 @@ +//go:build integration +// +build integration + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/log" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +var internalLogger atomic.Value + +func SetInternalLogger(logger *zap.Logger) { + internalLogger.Store(logger.WithOptions(zap.AddCallerSkip(2))) +} + +func Error(ctx context.Context, msg string, fields ...zap.Field) { + logInternal(ctx, zapcore.ErrorLevel, msg, fields) +} + +func logInternal(ctx context.Context, lvl zapcore.Level, msg string, fields []zap.Field) { + lh := from(ctx) + if lvl == zapcore.DebugLevel && lh.riseDebug { + lvl = zapcore.InfoLevel + } + ce := lh.Check(lvl, msg) + if ce == nil { + return + } + ce.Write(fields...) +} + +type loggerKey struct{} + +// GetInternal returns context logger. +// Should be used only in integration code and with great care +// because logger is configured with zap.AddCallerSkip. +func GetInternal(ctx context.Context) *zap.Logger { + return from(ctx).Logger +} + +type loggerHolder struct { + *zap.Logger + riseDebug bool +} + +func from(ctx context.Context) loggerHolder { + if l, ok := ctx.Value(loggerKey{}).(loggerHolder); ok { + return l + } + if l, ok := internalLogger.Load().(*zap.Logger); ok { + return loggerHolder{Logger: l} + } + // Fallback, so we don't need to manually init logger in every test. + SetInternalLogger(zap.Must(zap.NewDevelopmentConfig().Build())) + return from(ctx) +} + +func With(ctx context.Context, fields ...zap.Field) context.Context { + lh := from(ctx) + lh.Logger = lh.Logger.With(fields...) + return context.WithValue(ctx, loggerKey{}, lh) +} + +type SafeBuffer struct { + buf bytes.Buffer + mu sync.Mutex +} + +func (s *SafeBuffer) Write(p []byte) (n int, err error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.Write(p) +} + +func (s *SafeBuffer) String() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.String() +} + +func SetupZapLogger() (*zap.Logger, *SafeBuffer) { + buf := SafeBuffer{} + syncer := zapcore.AddSync(&buf) + ws := &zapcore.BufferedWriteSyncer{ + WS: syncer, + Size: 512 * 1024, // 512 kB + FlushInterval: time.Millisecond, + } + enc := zapcore.NewJSONEncoder(zapcore.EncoderConfig{ + MessageKey: "M", + LevelKey: "L", + TimeKey: "T", + NameKey: "N", + CallerKey: "C", + FunctionKey: "F", + StacktraceKey: "S", + EncodeLevel: zapcore.LowercaseLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeDuration: zapcore.SecondsDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }) + + core := zapcore.NewCore(enc, ws, zapcore.DebugLevel) + + l := zap.New(core) + SetInternalLogger(l) + return l, &buf +} + +var _ log.Logger = adapter{} + +type adapter struct { + minLevel zapcore.Level +} + +func (a adapter) Log(ctx context.Context, msg string, fields ...log.Field) { + level := Level(ctx) + if !a.minLevel.Enabled(level) { + return + } + l := GetInternal(ctx) + for _, name := range log.NamesFromContext(ctx) { + l = l.Named(name) + } + l.WithOptions(zap.AddCallerSkip(1)).Log(level, msg, Fields(fields)...) +} + +//nolint:exhaustive // good enough. +func fieldToField(field log.Field) zap.Field { + switch field.Type() { + case log.IntType: + return zap.Int(field.Key(), field.IntValue()) + case log.Int64Type: + return zap.Int64(field.Key(), field.Int64Value()) + case log.StringType: + return zap.String(field.Key(), field.StringValue()) + case log.BoolType: + return zap.Bool(field.Key(), field.BoolValue()) + case log.DurationType: + return zap.Duration(field.Key(), field.DurationValue()) + case log.StringsType: + return zap.Strings(field.Key(), field.StringsValue()) + case log.ErrorType: + return zap.Error(field.ErrorValue()) + case log.StringerType: + return zap.Stringer(field.Key(), field.Stringer()) + default: + return zap.Any(field.Key(), field.AnyValue()) + } +} + +func Fields(fields []log.Field) []zap.Field { + ff := make([]zap.Field, len(fields)) + for i, f := range fields { + ff[i] = fieldToField(f) + } + return ff +} + +//nolint:exhaustive // good enough. +func Level(ctx context.Context) zapcore.Level { + switch log.LevelFromContext(ctx) { + case log.TRACE, log.DEBUG: + return zapcore.DebugLevel + case log.INFO: + return zapcore.InfoLevel + case log.WARN: + return zapcore.WarnLevel + case log.ERROR: + return zapcore.ErrorLevel + case log.FATAL: + return zapcore.FatalLevel + default: + return zapcore.InvalidLevel + } +} + +func WithTraces(minLevel zapcore.Level, d trace.Detailer, opts ...log.Option) ydb.Option { + a := adapter{minLevel: minLevel} + return ydb.MergeOptions( + ydb.WithTraceDriver(log.Driver(a, d, opts...)), + ydb.WithTraceTable(log.Table(a, d, opts...)), + ydb.WithTraceScripting(log.Scripting(a, d, opts...)), + ydb.WithTraceScheme(log.Scheme(a, d, opts...)), + ydb.WithTraceCoordination(log.Coordination(a, d, opts...)), + ydb.WithTraceRatelimiter(log.Ratelimiter(a, d, opts...)), + ydb.WithTraceDiscovery(log.Discovery(a, d, opts...)), + ydb.WithTraceTopic(log.Topic(a, d, opts...)), + ydb.WithTraceDatabaseSQL(log.DatabaseSQL(a, d, opts...)), + ) +} + +type LogEntry struct { + Level string `json:"L"` + Timestamp string `json:"T"` + Namespace string `json:"N"` + Message string `json:"M"` + ContextName string `json:"context_name"` + Entity string `json:"entity"` + Endpoint string `json:"endpoint"` + Method string `json:"method"` +} + +func (l *LogEntry) FormatMessageShort() string { + return fmt.Sprintf(`"N":"%s","M":"%s"`, l.Namespace, l.Message) +} + +func (l *LogEntry) FormatMessage() string { + return fmt.Sprintf(`"N":"%s","M":"%s","context_name":"%s","entity":"%s"`, l.Namespace, l.Message, l.ContextName, l.Entity) +} + +func TestTopicReadMessagesLog(t *testing.T) { + _, buf := SetupZapLogger() + + ctx := xtest.Context(t) + + ctx = With(ctx, zap.String("context_name", "test_context")) + ctx = With(ctx, zap.String("entity", "reader")) + + driver := connect(t, WithTraces(zapcore.DebugLevel, trace.DetailsAll)) + + db, reader := createFeedAndReaderForDB(ctx, t, driver, topicoptions.WithReaderLogContext(ctx)) + + sendCDCMessage(ctx, t, db) + msg, err := reader.ReadMessage(ctx) + require.NoError(t, err) + require.NotEmpty(t, msg.CreatedAt) + t.Logf("msg: %#v", msg) + + require.NoError(t, err) + err = topicsugar.ReadMessageDataWithCallback(msg, func(data []byte) error { + t.Log("Content:", string(data)) + return nil + }) + require.NoError(t, err) + + sendCDCMessage(ctx, t, db) + batch, err := reader.ReadMessagesBatch(ctx) + require.NoError(t, err) + require.NotEmpty(t, batch.Messages) + + strLog := buf.String() + strLog = strings.Replace(strLog, "\n", ",\n", -1) + strLog = "[" + strLog[:len(strLog)-2] + "]" + + var logs []LogEntry + err = json.Unmarshal([]byte(strLog), &logs) + require.NoError(t, err) + + for _, logString := range logs { + if strings.HasPrefix(logString.Namespace, "ydb.topic.reader") { + require.Equal(t, logString.ContextName, "test_context") + require.Equal(t, logString.Entity, "reader") + } + } +} diff --git a/tests/slo/go.sum b/tests/slo/go.sum index 3d478bc4a..e3c6fc006 100644 --- a/tests/slo/go.sum +++ b/tests/slo/go.sum @@ -1876,11 +1876,13 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= diff --git a/topic/topicoptions/topicoptions_reader.go b/topic/topicoptions/topicoptions_reader.go index 6569db055..cf2bee0bf 100644 --- a/topic/topicoptions/topicoptions_reader.go +++ b/topic/topicoptions/topicoptions_reader.go @@ -1,6 +1,7 @@ package topicoptions import ( + "context" "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" @@ -281,3 +282,11 @@ func WithReaderWithoutConsumer(saveStateOnReconnection bool) ReaderOption { cfg.CommitMode = CommitModeNone } } + +// WithReaderLogContext allows providing a context.Context instance which will be used +// in log/topic events. +func WithReaderLogContext(ctx context.Context) ReaderOption { + return func(cfg *topicreaderinternal.ReaderConfig) { + cfg.BaseContext = ctx + } +} diff --git a/topic/topicoptions/topicoptions_writer.go b/topic/topicoptions/topicoptions_writer.go index d74befa4e..f4271e312 100644 --- a/topic/topicoptions/topicoptions_writer.go +++ b/topic/topicoptions/topicoptions_writer.go @@ -1,6 +1,7 @@ package topicoptions import ( + "context" "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" @@ -214,3 +215,11 @@ func WithWriterTrace(t trace.Topic) WriterOption { //nolint:gocritic func WithWriterUpdateTokenInterval(interval time.Duration) WriterOption { return topicwriterinternal.WithTokenUpdateInterval(interval) } + +// WithWriterLogContext allows providing a context.Context instance which will be used +// in log/topic events. +func WithWriterLogContext(ctx context.Context) WriterOption { + return func(cfg *topicwriterinternal.WriterReconnectorConfig) { + cfg.LogContext = ctx + } +} diff --git a/trace/topic.go b/trace/topic.go index b4829e108..74916d5bb 100644 --- a/trace/topic.go +++ b/trace/topic.go @@ -161,6 +161,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderStartInfo struct { + Context *context.Context ReaderID int64 Consumer string Error error @@ -201,11 +202,13 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderSendCommitMessageStartInfo struct { + Context *context.Context CommitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderStreamCommitInfo struct { + Context *context.Context Topic string PartitionID int64 PartitionSessionID int64 @@ -225,6 +228,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCommittedNotifyInfo struct { + Context *context.Context ReaderConnectionID string Topic string PartitionID int64 @@ -234,12 +238,14 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderErrorInfo struct { + Context *context.Context ReaderConnectionID string Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderSentDataRequestInfo struct { + Context *context.Context ReaderConnectionID string RequestBytes int LocalBufferSizeAfterSent int @@ -247,6 +253,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReceiveDataResponseStartInfo struct { + Context *context.Context ReaderConnectionID string LocalBufferSizeAfterReceive int DataResponse TopicReaderDataResponseInfo @@ -265,7 +272,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReadMessagesStartInfo struct { - RequestContext *context.Context + Context *context.Context MinCount int MaxCount int FreeBufferCapacity int @@ -285,13 +292,15 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnReadUnknownGrpcMessageInfo struct { + Context *context.Context ReaderConnectionID string Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReconnectStartInfo struct { - Reason error + Context *context.Context + Reason error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -301,13 +310,14 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReconnectRequestInfo struct { + Context *context.Context Reason error WasSent bool } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCommitStartInfo struct { - RequestContext *context.Context + Context *context.Context Topic string PartitionID int64 PartitionSessionID int64 @@ -322,6 +332,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCloseStartInfo struct { + Context *context.Context ReaderConnectionID string CloseReason error } @@ -333,6 +344,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderInitStartInfo struct { + Context *context.Context PreInitReaderConnectionID string InitRequestInfo TopicReadStreamInitRequestInfo } @@ -351,11 +363,13 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnReadUpdateTokenStartInfo struct { + Context *context.Context ReaderConnectionID string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnReadUpdateTokenMiddleTokenReceivedInfo struct { + Context *context.Context TokenLen int Error error } @@ -460,6 +474,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterReconnectStartInfo struct { + Context *context.Context WriterInstanceID string Topic string ProducerID string @@ -477,6 +492,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterInitStreamStartInfo struct { + Context *context.Context WriterInstanceID string Topic string ProducerID string @@ -490,6 +506,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterCloseStartInfo struct { + Context *context.Context WriterInstanceID string Reason error } @@ -501,6 +518,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterCompressMessagesStartInfo struct { + Context *context.Context WriterInstanceID string SessionID string Codec int32 @@ -516,6 +534,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterSendMessagesStartInfo struct { + Context *context.Context WriterInstanceID string SessionID string Codec int32 @@ -530,6 +549,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterResultMessagesInfo struct { + Context *context.Context WriterInstanceID string SessionID string PartitionID int64 @@ -552,7 +572,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterBeforeCommitTransactionStartInfo struct { - Ctx *context.Context + Context *context.Context KqpSessionID string TopicSessionID string TransactionID string @@ -566,6 +586,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterAfterFinishTransactionStartInfo struct { + Context *context.Context Error error SessionID string TransactionID string @@ -578,6 +599,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterSentGRPCMessageInfo struct { + Context *context.Context TopicStreamInternalID string SessionID string MessageNumber int @@ -587,6 +609,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterReceiveGRPCMessageInfo struct { + Context *context.Context TopicStreamInternalID string SessionID string MessageNumber int @@ -596,6 +619,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterReadUnknownGrpcMessageInfo struct { + Context *context.Context WriterInstanceID string SessionID string Error error diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index 00345b761..ae23d289e 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -1535,16 +1535,18 @@ func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMe fn(t1) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { +func TopicOnReaderStart(t *Topic, c *context.Context, readerID int64, consumer string, e error) { var p TopicReaderStartInfo + p.Context = c p.ReaderID = readerID p.Consumer = consumer p.Error = e t.onReaderStart(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReconnect(t *Topic, reason error) func(error) { +func TopicOnReaderReconnect(t *Topic, c *context.Context, reason error) func(error) { var p TopicReaderReconnectStartInfo + p.Context = c p.Reason = reason res := t.onReaderReconnect(p) return func(e error) { @@ -1554,8 +1556,9 @@ func TopicOnReaderReconnect(t *Topic, reason error) func(error) { } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { +func TopicOnReaderReconnectRequest(t *Topic, c *context.Context, reason error, wasSent bool) { var p TopicReaderReconnectRequestInfo + p.Context = c p.Reason = reason p.WasSent = wasSent t.onReaderReconnectRequest(p) @@ -1607,9 +1610,9 @@ func TopicOnReaderEndPartitionSession(t *Topic, readerConnectionID string, parti t.onReaderEndPartitionSession(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { +func TopicOnReaderCommit(t *Topic, c *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo - p.RequestContext = requestContext + p.Context = c p.Topic = topic p.PartitionID = partitionID p.PartitionSessionID = partitionSessionID @@ -1623,8 +1626,9 @@ func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { +func TopicOnReaderSendCommitMessage(t *Topic, c *context.Context, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { var p TopicReaderSendCommitMessageStartInfo + p.Context = c p.CommitsInfo = commitsInfo res := t.onReaderSendCommitMessage(p) return func(e error) { @@ -1634,8 +1638,9 @@ func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendC } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { +func TopicOnReaderCommittedNotify(t *Topic, c *context.Context, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { var p TopicReaderCommittedNotifyInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.Topic = topic p.PartitionID = partitionID @@ -1644,8 +1649,9 @@ func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic str t.onReaderCommittedNotify(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) func(closeError error) { +func TopicOnReaderClose(t *Topic, c *context.Context, readerConnectionID string, closeReason error) func(closeError error) { var p TopicReaderCloseStartInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.CloseReason = closeReason res := t.onReaderClose(p) @@ -1656,8 +1662,9 @@ func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { +func TopicOnReaderInit(t *Topic, c *context.Context, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { var p TopicReaderInitStartInfo + p.Context = c p.PreInitReaderConnectionID = preInitReaderConnectionID p.InitRequestInfo = initRequestInfo res := t.onReaderInit(p) @@ -1669,19 +1676,22 @@ func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestIn } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { +func TopicOnReaderError(t *Topic, c *context.Context, readerConnectionID string, e error) { var p TopicReaderErrorInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderError(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen int, _ error) func(error) { +func TopicOnReaderUpdateToken(t *Topic, c *context.Context, readerConnectionID string) func(_ *context.Context, tokenLen int, _ error) func(error) { var p OnReadUpdateTokenStartInfo + p.Context = c p.ReaderConnectionID = readerConnectionID res := t.onReaderUpdateToken(p) - return func(tokenLen int, e error) func(error) { + return func(c *context.Context, tokenLen int, e error) func(error) { var p OnReadUpdateTokenMiddleTokenReceivedInfo + p.Context = c p.TokenLen = tokenLen p.Error = e res := res(p) @@ -1790,16 +1800,18 @@ func TopicOnReaderReceiveGRPCMessage(t *Topic, readerID int64, sessionID string, t.onReaderReceiveGRPCMessage(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { +func TopicOnReaderSentDataRequest(t *Topic, c *context.Context, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { var p TopicReaderSentDataRequestInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.RequestBytes = requestBytes p.LocalBufferSizeAfterSent = localBufferSizeAfterSent t.onReaderSentDataRequest(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { +func TopicOnReaderReceiveDataResponse(t *Topic, c *context.Context, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { var p TopicReaderReceiveDataResponseStartInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.LocalBufferSizeAfterReceive = localBufferSizeAfterReceive p.DataResponse = dataResponse @@ -1811,9 +1823,9 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { +func TopicOnReaderReadMessages(t *Topic, c *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo - p.RequestContext = requestContext + p.Context = c p.MinCount = minCount p.MaxCount = maxCount p.FreeBufferCapacity = freeBufferCapacity @@ -1832,15 +1844,17 @@ func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCou } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e error) { +func TopicOnReaderUnknownGrpcMessage(t *Topic, c *context.Context, readerConnectionID string, e error) { var p OnReadUnknownGrpcMessageInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderUnknownGrpcMessage(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, producerID string, attempt int) func(connectionResult error) func(error) { +func TopicOnWriterReconnect(t *Topic, c *context.Context, writerInstanceID string, topic string, producerID string, attempt int) func(connectionResult error) func(error) { var p TopicWriterReconnectStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.Topic = topic p.ProducerID = producerID @@ -1858,8 +1872,9 @@ func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, pro } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { +func TopicOnWriterInitStream(t *Topic, c *context.Context, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { var p TopicWriterInitStreamStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.Topic = topic p.ProducerID = producerID @@ -1872,8 +1887,9 @@ func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, pr } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(error) { +func TopicOnWriterClose(t *Topic, c *context.Context, writerInstanceID string, reason error) func(error) { var p TopicWriterCloseStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.Reason = reason res := t.onWriterClose(p) @@ -1884,9 +1900,9 @@ func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(er } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSessionID string, topicSessionID string, transactionID string) func(_ error, topicSessionID string) { +func TopicOnWriterBeforeCommitTransaction(t *Topic, c *context.Context, kqpSessionID string, topicSessionID string, transactionID string) func(_ error, topicSessionID string) { var p TopicOnWriterBeforeCommitTransactionStartInfo - p.Ctx = ctx + p.Context = c p.KqpSessionID = kqpSessionID p.TopicSessionID = topicSessionID p.TransactionID = transactionID @@ -1899,8 +1915,9 @@ func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSes } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, transactionID string) func(closeError error) { +func TopicOnWriterAfterFinishTransaction(t *Topic, c *context.Context, e error, sessionID string, transactionID string) func(closeError error) { var p TopicOnWriterAfterFinishTransactionStartInfo + p.Context = c p.Error = e p.SessionID = sessionID p.TransactionID = transactionID @@ -1912,8 +1929,9 @@ func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, tr } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { +func TopicOnWriterCompressMessages(t *Topic, c *context.Context, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { var p TopicWriterCompressMessagesStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.Codec = codec @@ -1928,8 +1946,9 @@ func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { +func TopicOnWriterSendMessages(t *Topic, c *context.Context, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { var p TopicWriterSendMessagesStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.Codec = codec @@ -1943,8 +1962,9 @@ func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID stri } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID string, partitionID int64, acks TopicWriterResultMessagesInfoAcks) { +func TopicOnWriterReceiveResult(t *Topic, c *context.Context, writerInstanceID string, sessionID string, partitionID int64, acks TopicWriterResultMessagesInfoAcks) { var p TopicWriterResultMessagesInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.PartitionID = partitionID @@ -1952,8 +1972,9 @@ func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID str t.onWriterReceiveResult(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromClient, e error) { +func TopicOnWriterSentGRPCMessage(t *Topic, c *context.Context, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromClient, e error) { var p TopicWriterSentGRPCMessageInfo + p.Context = c p.TopicStreamInternalID = topicStreamInternalID p.SessionID = sessionID p.MessageNumber = messageNumber @@ -1962,8 +1983,9 @@ func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessio t.onWriterSentGRPCMessage(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromServer, e error) { +func TopicOnWriterReceiveGRPCMessage(t *Topic, c *context.Context, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromServer, e error) { var p TopicWriterReceiveGRPCMessageInfo + p.Context = c p.TopicStreamInternalID = topicStreamInternalID p.SessionID = sessionID p.MessageNumber = messageNumber @@ -1972,8 +1994,9 @@ func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, ses t.onWriterReceiveGRPCMessage(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { +func TopicOnWriterReadUnknownGrpcMessage(t *Topic, c *context.Context, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.Error = e