diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b6bfbe92..73085e7c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added `WithReaderLogContext`, `WithWriterLogContext` options to topic reader/writer to supply log entries with user context fields + ## v3.104.4 * Fixed bug with session query latency metric collector diff --git a/go.mod b/go.mod index 4ef90b399..cefcf47b4 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/google/uuid v1.6.0 github.com/jonboulle/clockwork v0.3.0 github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 + go.uber.org/zap v1.27.0 golang.org/x/net v0.33.0 golang.org/x/sync v0.10.0 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 @@ -17,18 +18,21 @@ require ( // requires for tests only require ( github.com/rekby/fixenv v0.6.1 - github.com/stretchr/testify v1.7.1 + github.com/stretchr/testify v1.10.0 go.uber.org/mock v0.4.0 ) require ( - github.com/davecgh/go-spew v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/kr/pretty v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/sys v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect - gopkg.in/yaml.v3 v3.0.0 // indirect + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) retract v3.67.1 // decimal broken https://github.com/ydb-platform/ydb-go-sdk/issues/1234 diff --git a/go.sum b/go.sum index e63c7c224..48e522b1b 100644 --- a/go.sum +++ b/go.sum @@ -11,8 +11,9 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -54,6 +55,12 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/jonboulle/clockwork v0.3.0 h1:9BSCMi8C+0qdApAp4auwX0RkLGUjs956h0EkuQymUhg= github.com/jonboulle/clockwork v0.3.0/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -63,13 +70,19 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77 h1:LY6cI8cP4B9rrpTleZk95+08kl2gF4rixG7+V/dwL6Q= github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -146,12 +159,13 @@ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA= -gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/grpcwrapper/rawtopic/client.go b/internal/grpcwrapper/rawtopic/client.go index 203622e85..c2a8308a4 100644 --- a/internal/grpcwrapper/rawtopic/client.go +++ b/internal/grpcwrapper/rawtopic/client.go @@ -104,6 +104,7 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S func (c *Client) StreamWrite( ctxStreamLifeTime context.Context, tracer *trace.Topic, + logContext *context.Context, ) (*rawtopicwriter.StreamWriter, error) { protoResp, err := c.service.StreamWrite(ctxStreamLifeTime) if err != nil { @@ -118,6 +119,7 @@ func (c *Client) StreamWrite( Stream: protoResp, Tracer: tracer, InternalStreamID: uuid.New().String(), + LogContext: logContext, }, nil } diff --git a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go index c714a2d4a..f578eb55b 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go +++ b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go @@ -1,6 +1,7 @@ package rawtopicwriter import ( + "context" "errors" "fmt" "reflect" @@ -34,6 +35,7 @@ type StreamWriter struct { readMessagesCount int writtenMessagesCount int sessionID string + LogContext *context.Context } //nolint:funlen @@ -50,7 +52,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) { defer func() { // defer needs for set good session id on first init response before trace the message trace.TopicOnWriterReceiveGRPCMessage( - w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr, + w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr, ) }() if sendErr != nil { @@ -139,7 +141,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) { err = w.Stream.Send(&protoMsg) w.writtenMessagesCount++ - trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err) + trace.TopicOnWriterSentGRPCMessage( + w.Tracer, + w.LogContext, + w.InternalStreamID, + w.sessionID, + w.writtenMessagesCount, + &protoMsg, + err, + ) if err != nil { return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err))) } diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index 05ce27fa6..5bebbf047 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -318,7 +318,8 @@ func (c *Client) StartReader( if err != nil { return nil, err } - trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err) + + internalReader.TopicOnReaderStart(consumer, err) return topicreader.NewReader(internalReader), nil } @@ -361,15 +362,8 @@ func (c *Client) createWriterConfig( topicPath string, opts []topicoptions.WriterOption, ) topicwriterinternal.WriterReconnectorConfig { - var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) ( - topicwriterinternal.RawTopicWriterStream, - error, - ) { - return c.rawClient.StreamWrite(ctx, tracer) - } - options := []topicoptions.WriterOption{ - topicwriterinternal.WithConnectFunc(connector), + topicwriterinternal.WithRawClient(&c.rawClient), topicwriterinternal.WithTopic(topicPath), topicwriterinternal.WithCommonConfig(c.cfg.Common), topicwriterinternal.WithTrace(c.cfg.Trace), diff --git a/internal/topic/topicreadercommon/committer.go b/internal/topic/topicreadercommon/committer.go index e27ab6d6e..1ae0aca22 100644 --- a/internal/topic/topicreadercommon/committer.go +++ b/internal/topic/topicreadercommon/committer.go @@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) { onDone := trace.TopicOnReaderSendCommitMessage( c.tracer, + &ctx, &commits, ) err := c.send(commits.ToRawMessage()) diff --git a/internal/topic/topicreaderinternal/batched_stream_reader_interface.go b/internal/topic/topicreaderinternal/batched_stream_reader_interface.go index b65a4e8ae..8eb5a81a4 100644 --- a/internal/topic/topicreaderinternal/batched_stream_reader_interface.go +++ b/internal/topic/topicreaderinternal/batched_stream_reader_interface.go @@ -15,4 +15,5 @@ type batchedStreamReader interface { Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error CloseWithError(ctx context.Context, err error) error PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll + TopicOnReaderStart(consumer string, err error) } diff --git a/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go b/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go index 85194d433..10e918867 100644 --- a/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go +++ b/internal/topic/topicreaderinternal/batched_stream_reader_mock_test.go @@ -192,6 +192,42 @@ func (c *MockbatchedStreamReaderReadMessageBatchCall) DoAndReturn(f func(context return c } +// TopicOnReaderStart mocks base method. +func (m *MockbatchedStreamReader) TopicOnReaderStart(consumer string, err error) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "TopicOnReaderStart", consumer, err) +} + +// TopicOnReaderStart indicates an expected call of TopicOnReaderStart. +func (mr *MockbatchedStreamReaderMockRecorder) TopicOnReaderStart(consumer, err any) *MockbatchedStreamReaderTopicOnReaderStartCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TopicOnReaderStart", reflect.TypeOf((*MockbatchedStreamReader)(nil).TopicOnReaderStart), consumer, err) + return &MockbatchedStreamReaderTopicOnReaderStartCall{Call: call} +} + +// MockbatchedStreamReaderTopicOnReaderStartCall wrap *gomock.Call +type MockbatchedStreamReaderTopicOnReaderStartCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockbatchedStreamReaderTopicOnReaderStartCall) Return() *MockbatchedStreamReaderTopicOnReaderStartCall { + c.Call = c.Call.Return() + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockbatchedStreamReaderTopicOnReaderStartCall) Do(f func(string, error)) *MockbatchedStreamReaderTopicOnReaderStartCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockbatchedStreamReaderTopicOnReaderStartCall) DoAndReturn(f func(string, error)) *MockbatchedStreamReaderTopicOnReaderStartCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // WaitInit mocks base method. func (m *MockbatchedStreamReader) WaitInit(ctx context.Context) error { m.ctrl.T.Helper() diff --git a/internal/topic/topicreaderinternal/reader.go b/internal/topic/topicreaderinternal/reader.go index 1b7e995c9..c403d0870 100644 --- a/internal/topic/topicreaderinternal/reader.go +++ b/internal/topic/topicreaderinternal/reader.go @@ -35,6 +35,10 @@ type Reader struct { readerID int64 } +func (r *Reader) TopicOnReaderStart(consumer string, err error) { + r.reader.TopicOnReaderStart(consumer, err) +} + type ReadMessageBatchOptions struct { batcherGetOptions } @@ -89,14 +93,17 @@ func NewReader( return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig) } + reader := newReaderReconnector( + cfg.BaseContext, + readerID, + readerConnector, + cfg.OperationTimeout(), + cfg.RetrySettings, + cfg.Trace, + ) + res := Reader{ - reader: newReaderReconnector( - readerID, - readerConnector, - cfg.OperationTimeout(), - cfg.RetrySettings, - cfg.Trace, - ), + reader: reader, defaultBatchConfig: cfg.DefaultBatchConfig, tracer: cfg.Trace, readerID: readerID, diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index 64f8716c7..0519b8753 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -185,23 +185,26 @@ func (r *topicStreamReaderImpl) WaitInit(_ context.Context) error { return nil } +func (r *topicStreamReaderImpl) TopicOnReaderStart(consumer string, err error) { + logCtx := r.cfg.BaseContext + trace.TopicOnReaderStart(r.cfg.Trace, &logCtx, r.readerID, consumer, err) +} + func (r *topicStreamReaderImpl) PopMessagesBatchTx( ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions, ) (_ *topicreadercommon.PublicBatch, resErr error) { - traceCtx := ctx - onDone := trace.TopicOnReaderStreamPopBatchTx( - r.cfg.Trace, - &traceCtx, - r.readerID, - r.readConnectionID, - tx.SessionID(), - tx, - ) - ctx = traceCtx defer func() { - onDone(resErr) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderStreamPopBatchTx( + r.cfg.Trace, + &logCtx, + r.readerID, + r.readConnectionID, + tx.SessionID(), + tx, + )(resErr) }() batch, err := r.ReadMessageBatch(ctx, opts) @@ -226,51 +229,47 @@ func (r *topicStreamReaderImpl) commitWithTransaction( } req := r.createUpdateOffsetRequest(ctx, batch, tx) - updateOffesetInTransactionErr := retry.Retry(ctx, func(ctx context.Context) (err error) { - traceCtx := ctx - onDone := trace.TopicOnReaderUpdateOffsetsInTransaction( - r.cfg.Trace, - &traceCtx, - r.readerID, - r.readConnectionID, - tx.SessionID(), - tx, - ) + updateOffsetInTransactionErr := retry.Retry(ctx, func(ctx context.Context) (err error) { defer func() { - onDone(err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderUpdateOffsetsInTransaction( + r.cfg.Trace, + &logCtx, + r.readerID, + r.readConnectionID, + tx.SessionID(), + tx, + ) }() - ctx = traceCtx err = r.topicClient.UpdateOffsetsInTransaction(ctx, req) return err }) - if updateOffesetInTransactionErr == nil { - r.addOnTransactionCompletedHandler(ctx, tx, batch, updateOffesetInTransactionErr) + if updateOffsetInTransactionErr == nil { + r.addOnTransactionCompletedHandler(ctx, tx, batch, updateOffsetInTransactionErr) } else { _ = retry.Retry(ctx, func(ctx context.Context) (err error) { - traceCtx := ctx - onDone := trace.TopicOnReaderTransactionRollback( - r.cfg.Trace, - &traceCtx, - r.readerID, - r.readConnectionID, - tx.SessionID(), - tx, - ) - ctx = traceCtx defer func() { - onDone(err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderTransactionRollback( + r.cfg.Trace, + &logCtx, + r.readerID, + r.readConnectionID, + tx.SessionID(), + tx, + )(err) }() return tx.Rollback(ctx) }) _ = r.CloseWithError(xcontext.ValueOnly(ctx), xerrors.WithStackTrace(xerrors.Retryable( - fmt.Errorf("ydb: failed add topic offsets in transaction: %w", updateOffesetInTransactionErr), + fmt.Errorf("ydb: failed add topic offsets in transaction: %w", updateOffsetInTransactionErr), ))) - return updateOffesetInTransactionErr + return updateOffsetInTransactionErr } return nil @@ -284,19 +283,19 @@ func (r *topicStreamReaderImpl) addOnTransactionCompletedHandler( ) { commitRange := topicreadercommon.GetCommitRange(batch) tx.OnCompleted(func(transactionResult error) { - traceCtx := ctx - onDone := trace.TopicOnReaderTransactionCompleted( - r.cfg.Trace, - &traceCtx, - r.readerID, - r.readConnectionID, - tx.SessionID(), - tx, - transactionResult, - ) - defer onDone() + defer func() { + logCtx := r.cfg.BaseContext + trace.TopicOnReaderTransactionCompleted( + r.cfg.Trace, + &logCtx, + r.readerID, + r.readConnectionID, + tx.SessionID(), + tx, + transactionResult, + ) + }() - ctx = traceCtx if transactionResult == nil { topicreadercommon.BatchGetPartitionSession(batch).SetCommittedOffsetForward(commitRange.CommitOffsetEnd) } else { @@ -344,19 +343,32 @@ func (r *topicStreamReaderImpl) ReadMessageBatch( ctx context.Context, opts ReadMessageBatchOptions, ) (batch *topicreadercommon.PublicBatch, err error) { - onDone := trace.TopicOnReaderReadMessages( - r.cfg.Trace, - &ctx, - opts.MinCount, - opts.MaxCount, - r.getRestBufferBytes(), - ) defer func() { + traceFunc := func( + messagesCount int, + topic string, + partitionID int64, + partitionSessionID int64, + offsetStart int64, + offsetEnd int64, + freeBufferCapacity int, + ) { + mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext) + onDone := trace.TopicOnReaderReadMessages( + r.cfg.Trace, + mergeCtx.AsCtx(), + opts.MinCount, + opts.MaxCount, + r.getRestBufferBytes(), + ) + onDone(messagesCount, topic, partitionID, partitionSessionID, offsetStart, offsetEnd, freeBufferCapacity, nil) + ctx = mergeCtx.RequestContext + } if batch == nil { - onDone(0, "", -1, -1, -1, -1, r.getRestBufferBytes(), err) + traceFunc(0, "", -1, -1, -1, -1, r.getRestBufferBytes()) } else { commitRange := topicreadercommon.GetCommitRange(batch) - onDone( + traceFunc( len(batch.Messages), batch.Topic(), batch.PartitionID(), @@ -364,7 +376,6 @@ func (r *topicStreamReaderImpl) ReadMessageBatch( commitRange.CommitOffsetStart.ToInt64(), commitRange.CommitOffsetEnd.ToInt64(), r.getRestBufferBytes(), - err, ) } }() @@ -462,7 +473,6 @@ func (r *topicStreamReaderImpl) onStopPartitionSessionRequestFromBuffer( if err != nil { return err } - onDone := trace.TopicOnReaderPartitionReadStopResponse( r.cfg.Trace, r.readConnectionID, @@ -525,18 +535,19 @@ func (r *topicStreamReaderImpl) Commit(ctx context.Context, commitRange topicrea return xerrors.WithStackTrace(errCommitWithNilPartitionSession) } - session := commitRange.PartitionSession - onDone := trace.TopicOnReaderCommit( - r.cfg.Trace, - &ctx, - session.Topic, - session.PartitionID, - session.StreamPartitionSessionID.ToInt64(), - commitRange.CommitOffsetStart.ToInt64(), - commitRange.CommitOffsetEnd.ToInt64(), - ) defer func() { - onDone(err) + session := commitRange.PartitionSession + mergeCtx := xcontext.MergeContexts(ctx, r.cfg.BaseContext) + trace.TopicOnReaderCommit( + r.cfg.Trace, + mergeCtx.AsCtx(), + session.Topic, + session.PartitionID, + session.StreamPartitionSessionID.ToInt64(), + commitRange.CommitOffsetStart.ToInt64(), + commitRange.CommitOffsetEnd.ToInt64(), + ) + ctx = mergeCtx.RequestContext }() if err = r.checkCommitRange(commitRange); err != nil { @@ -574,7 +585,9 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange topicreadercommon.C func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error { err := r.stream.Send(msg) if err != nil { - trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderError(r.cfg.Trace, &logCtx, r.readConnectionID, err) + _ = r.CloseWithError(r.ctx, err) } @@ -613,8 +626,9 @@ func (r *topicStreamReaderImpl) setStarted() error { func (r *topicStreamReaderImpl) initSession() (err error) { initMessage := topicreadercommon.CreateInitMessage(r.cfg.Consumer, r.cfg.ReadSelectors) - onDone := trace.TopicOnReaderInit(r.cfg.Trace, r.readConnectionID, initMessage) defer func() { + logCtx := r.cfg.BaseContext + onDone := trace.TopicOnReaderInit(r.cfg.Trace, &logCtx, r.readConnectionID, initMessage) onDone(r.readConnectionID, err) }() @@ -664,9 +678,10 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) { for { serverMessage, err := r.stream.Recv() if err != nil { - trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderError(r.cfg.Trace, &logCtx, r.readConnectionID, err) if errors.Is(err, rawtopicreader.ErrUnexpectedMessageType) { - trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, r.readConnectionID, err) + trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, &logCtx, r.readConnectionID, err) // new messages can be added to protocol, it must be backward compatible to old programs // and skip message is safe continue @@ -712,14 +727,18 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) { case *rawtopicreader.UpdateTokenResponse: r.onUpdateTokenResponse(m) default: - trace.TopicOnReaderUnknownGrpcMessage( - r.cfg.Trace, - r.readConnectionID, - xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( - "ydb: unexpected message type in stream reader: %v", - reflect.TypeOf(serverMessage), - ))), - ) + { + logCtx := r.cfg.BaseContext + trace.TopicOnReaderUnknownGrpcMessage( + r.cfg.Trace, + &logCtx, + r.readConnectionID, + xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( + "ydb: unexpected message type in stream reader: %v", + reflect.TypeOf(serverMessage), + ))), + ) + } } } } @@ -753,7 +772,8 @@ func (r *topicStreamReaderImpl) dataRequestLoop(ctx context.Context) { } resCapacity := r.addRestBufferBytes(sum) - trace.TopicOnReaderSentDataRequest(r.cfg.Trace, r.readConnectionID, sum, resCapacity) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderSentDataRequest(r.cfg.Trace, &logCtx, r.readConnectionID, sum, resCapacity) if err := r.sendDataRequest(sum); err != nil { return } @@ -793,9 +813,9 @@ func (r *topicStreamReaderImpl) updateTokenLoop(ctx context.Context) { func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) (err error) { resCapacity := r.addRestBufferBytes(-msg.BytesSize) - onDone := trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, r.readConnectionID, resCapacity, msg) defer func() { - onDone(err) + logCtx := r.cfg.BaseContext + trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, &logCtx, r.readConnectionID, resCapacity, msg) }() batches, err2 := topicreadercommon.ReadRawBatchesToPublicBatches(msg, &r.sessionController, r.cfg.Decoders) @@ -813,8 +833,10 @@ func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) } func (r *topicStreamReaderImpl) CloseWithError(ctx context.Context, reason error) (closeErr error) { - onDone := trace.TopicOnReaderClose(r.cfg.Trace, r.readConnectionID, reason) - defer onDone(closeErr) + defer func() { + logCtx := r.cfg.BaseContext + trace.TopicOnReaderClose(r.cfg.Trace, &logCtx, r.readConnectionID, reason) + }() isFirstClose := false r.m.WithLock(func() { @@ -862,8 +884,10 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse } partition.SetCommittedOffsetForward(commit.CommittedOffset) + logCtx := r.cfg.BaseContext trace.TopicOnReaderCommittedNotify( r.cfg.Trace, + &logCtx, r.readConnectionID, partition.Topic, partition.PartitionID, @@ -878,12 +902,14 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse } func (r *topicStreamReaderImpl) updateToken(ctx context.Context) { + logCtx := r.cfg.BaseContext onUpdateToken := trace.TopicOnReaderUpdateToken( r.cfg.Trace, + &logCtx, r.readConnectionID, ) token, err := r.cfg.Cred.Token(ctx) - onSent := onUpdateToken(len(token), err) + onSent := onUpdateToken(&ctx, len(token), err) if err != nil { return } diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index 1f41cb39b..10133f794 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -34,6 +34,7 @@ type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error) type readerReconnector struct { background background.Worker clock clockwork.Clock + logContext context.Context //nolint:containedctx retrySettings topic.RetrySettings streamVal batchedStreamReader streamContextCancel context.CancelCauseFunc @@ -52,6 +53,7 @@ type readerReconnector struct { } func newReaderReconnector( + logContext context.Context, readerID int64, connector readerConnectFunc, connectTimeout time.Duration, @@ -64,6 +66,7 @@ func newReaderReconnector( readerConnect: connector, streamErr: errUnconnected, connectTimeout: connectTimeout, + logContext: logContext, tracer: tracer, retrySettings: retrySettings, } @@ -78,6 +81,15 @@ func newReaderReconnector( return res } +func (r *readerReconnector) TopicOnReaderStart(consumer string, err error) { + logCtx := r.logContext + trace.TopicOnReaderStart(r.tracer, &logCtx, r.readerID, consumer, err) +} + +func (r *readerReconnector) SetLogContext(ctx context.Context) { + r.logContext = ctx +} + func (r *readerReconnector) PopMessagesBatchTx( ctx context.Context, tx tx.Transaction, @@ -263,8 +275,6 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) { } } - onReconnectionDone := trace.TopicOnReaderReconnect(r.tracer, request.reason) - if request.reason != nil { retryBackoff, stopRetryReason := r.checkErrRetryMode( request.reason, @@ -286,21 +296,25 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) { } } else { _ = r.CloseWithError(ctx, stopRetryReason) - onReconnectionDone(stopRetryReason) + logCtx := r.logContext + trace.TopicOnReaderReconnect(r.tracer, &logCtx, request.reason)(stopRetryReason) return } } err := r.reconnect(ctx, request.reason, request.oldReader) - onReconnectionDone(err) + logCtx := r.logContext + trace.TopicOnReaderReconnect(r.tracer, &logCtx, request.reason)(err) } } //nolint:funlen func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldReader batchedStreamReader) (err error) { - onDone := trace.TopicOnReaderReconnect(r.tracer, reason) - defer func() { onDone(err) }() + defer func() { + logCtx := r.logContext + trace.TopicOnReaderReconnect(r.tracer, &logCtx, reason)(err) + }() if err = ctx.Err(); err != nil { return err @@ -339,9 +353,15 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead r.background.Start("ydb topic reader send reconnect message", func(ctx context.Context) { select { case r.reconnectFromBadStream <- newReconnectRequest(oldReader, sendReason): - trace.TopicOnReaderReconnectRequest(r.tracer, err, true) + { + logCtx := r.logContext + trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, true) + } case <-ctx.Done(): - trace.TopicOnReaderReconnectRequest(r.tracer, ctx.Err(), false) + { + logCtx := r.logContext + trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, false) + } } }) default: @@ -440,11 +460,17 @@ func (r *readerReconnector) fireReconnectOnRetryableError(stream batchedStreamRe select { case r.reconnectFromBadStream <- newReconnectRequest(stream, err): - // send signal - trace.TopicOnReaderReconnectRequest(r.tracer, err, true) + { + // send signal + logCtx := r.logContext + trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, true) + } default: - // previous reconnect signal in process, no need sent signal more - trace.TopicOnReaderReconnectRequest(r.tracer, err, false) + { + // previous reconnect signal in process, no need sent signal more + logCtx := r.logContext + trace.TopicOnReaderReconnectRequest(r.tracer, &logCtx, err, false) + } } } diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index 8bd5ea17b..06fcafc25 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -3,6 +3,7 @@ package topicwriterinternal import ( "bytes" "compress/gzip" + "context" "fmt" "io" "sync" @@ -164,9 +165,11 @@ type EncoderSelector struct { parallelCompressors int batchCounter int measureIntervalBatches int + logContext context.Context //nolint:containedctx } func NewEncoderSelector( + logContext context.Context, m *MultiEncoder, allowedCodecs rawtopiccommon.SupportedCodecs, parallelCompressors int, @@ -184,6 +187,7 @@ func NewEncoderSelector( tracer: tracer, writerReconnectorID: writerReconnectorID, sessionID: sessionID, + logContext: logContext, } res.ResetAllowedCodecs(allowedCodecs) @@ -193,8 +197,10 @@ func NewEncoderSelector( func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (rawtopiccommon.Codec, error) { codec, err := s.selectCodec(messages) if err == nil { + logCtx := s.logContext onCompressDone := trace.TopicOnWriterCompressMessages( s.tracer, + &logCtx, s.writerReconnectorID, s.sessionID, codec.ToInt32(), @@ -263,8 +269,10 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt if len(messages) > 0 { firstSeqNo = messages[0].SeqNo } + logCtx := s.logContext onCompressDone := trace.TopicOnWriterCompressMessages( s.tracer, + &logCtx, s.writerReconnectorID, s.sessionID, codec.ToInt32(), diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index b2bf7bd97..ac0672e4a 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -3,6 +3,7 @@ package topicwriterinternal import ( "bytes" "compress/gzip" + "context" "fmt" "io" "strings" @@ -15,14 +16,32 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) +func NewTestEncoderSelector( + m *MultiEncoder, + allowedCodecs rawtopiccommon.SupportedCodecs, + parallelCompressors int, + tracer *trace.Topic, + writerReconnectorID, sessionID string, +) EncoderSelector { + return NewEncoderSelector( + context.Background(), + m, + allowedCodecs, + parallelCompressors, + tracer, + writerReconnectorID, + sessionID, + ) +} + func TestEncoderSelector_CodecMeasure(t *testing.T) { t.Run("Empty", func(t *testing.T) { - s := NewEncoderSelector(testCommonEncoders, nil, 1, &trace.Topic{}, "", "") + s := NewTestEncoderSelector(testCommonEncoders, nil, 1, &trace.Topic{}, "", "") _, err := s.measureCodecs(nil) require.Error(t, err) }) t.Run("One", func(t *testing.T) { - s := NewEncoderSelector( + s := NewTestEncoderSelector( NewMultiEncoder(), rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw}, 1, @@ -44,7 +63,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { ) testSelectCodec := func(t testing.TB, targetCodec rawtopiccommon.Codec, smallCount, largeCount int) { - s := NewEncoderSelector(testCommonEncoders, rawtopiccommon.SupportedCodecs{ + s := NewTestEncoderSelector(testCommonEncoders, rawtopiccommon.SupportedCodecs{ rawtopiccommon.CodecRaw, rawtopiccommon.CodecGzip, }, 4, diff --git a/internal/topic/topicwriterinternal/writer_config.go b/internal/topic/topicwriterinternal/writer_config.go index 461b71906..52e296c3b 100644 --- a/internal/topic/topicwriterinternal/writer_config.go +++ b/internal/topic/topicwriterinternal/writer_config.go @@ -1,11 +1,13 @@ package topicwriterinternal import ( + "context" "time" "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-sdk/v3/credentials" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -19,7 +21,9 @@ type WritersCommonConfig struct { compressorCount int maxBytesPerMessage int + LogContext context.Context //nolint:containedctx Tracer *trace.Topic + rawTopicClient *rawtopic.Client cred credentials.Credentials credUpdateInterval time.Duration clock clockwork.Clock diff --git a/internal/topic/topicwriterinternal/writer_options.go b/internal/topic/topicwriterinternal/writer_options.go index 4afa72f13..534629d8a 100644 --- a/internal/topic/topicwriterinternal/writer_options.go +++ b/internal/topic/topicwriterinternal/writer_options.go @@ -7,6 +7,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/credentials" "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -77,6 +78,14 @@ func WithCredentials(cred credentials.Credentials) PublicWriterOption { } } +// WithRawClient for internal usage only +// no proxy to public interface +func WithRawClient(rawClient *rawtopic.Client) PublicWriterOption { + return func(cfg *WriterReconnectorConfig) { + cfg.rawTopicClient = rawClient + } +} + func WithCodec(codec rawtopiccommon.Codec) PublicWriterOption { return func(cfg *WriterReconnectorConfig) { cfg.forceCodec = codec diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 83aafaabc..0ea5463e2 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -113,6 +113,22 @@ func NewWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnector WithProducerID(uuid.NewString())(&cfg) } + if cfg.Connect == nil { + logContext := context.Background() + if cfg.LogContext != nil { + logContext = cfg.LogContext + } + + var connector ConnectFunc = func(ctx context.Context, tracer *trace.Topic) ( + RawTopicWriterStream, + error, + ) { + return cfg.rawTopicClient.StreamWrite(ctx, tracer, &logContext) + } + + cfg.Connect = connector + } + return cfg } @@ -324,8 +340,10 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage) w.m.WithRLock(func() { sessionID = w.sessionID }) + logCtx := w.cfg.LogContext onCompressDone := trace.TopicOnWriterCompressMessages( w.cfg.Tracer, + &logCtx, w.writerInstanceID, sessionID, w.cfg.forceCodec.ToInt32(), @@ -340,6 +358,7 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage) } err := cacheMessages(res, targetCodec, w.cfg.compressorCount) onCompressDone(err) + if err != nil { return nil, err } @@ -366,9 +385,9 @@ func (w *WriterReconnector) Close(ctx context.Context) error { } func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr error) { - onDone := trace.TopicOnWriterClose(w.cfg.Tracer, w.writerInstanceID, reason) defer func() { - onDone(resErr) + logCtx := w.cfg.LogContext + trace.TopicOnWriterClose(w.cfg.Tracer, &logCtx, w.writerInstanceID, reason) }() // stop background work and single stream writer @@ -387,7 +406,6 @@ func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr err func (w *WriterReconnector) connectionLoop(ctx context.Context) { attempt := 0 - createStreamContext := func() (context.Context, context.CancelFunc) { // need suppress parent context cancelation for flush buffer while close writer return xcontext.WithCancel(xcontext.ValueOnly(ctx)) @@ -425,8 +443,10 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) { } } + logCtx := w.cfg.LogContext onWriterStarted := trace.TopicOnWriterReconnect( w.cfg.Tracer, + &logCtx, w.writerInstanceID, w.cfg.topic, w.cfg.producerID, diff --git a/internal/topic/topicwriterinternal/writer_single_stream.go b/internal/topic/topicwriterinternal/writer_single_stream.go index d1869a7e9..0f1fe8c7c 100644 --- a/internal/topic/topicwriterinternal/writer_single_stream.go +++ b/internal/topic/topicwriterinternal/writer_single_stream.go @@ -125,8 +125,17 @@ func (w *SingleStreamWriter) start() { } func (w *SingleStreamWriter) initStream() (err error) { - traceOnDone := trace.TopicOnWriterInitStream(w.cfg.Tracer, w.cfg.reconnectorInstanceID, w.cfg.topic, w.cfg.producerID) - defer func() { traceOnDone(w.SessionID, err) }() + defer func() { + logCtx := w.cfg.LogContext + traceOnDone := trace.TopicOnWriterInitStream( + w.cfg.Tracer, + &logCtx, + w.cfg.reconnectorInstanceID, + w.cfg.topic, + w.cfg.producerID, + ) + traceOnDone(w.SessionID, err) + }() req := w.createInitRequest() if err = w.cfg.stream.Send(&req); err != nil { @@ -149,6 +158,7 @@ func (w *SingleStreamWriter) initStream() (err error) { } w.Encoder = NewEncoderSelector( + w.cfg.LogContext, w.cfg.encodersMap, w.allowedCodecs, w.cfg.compressorCount, @@ -203,15 +213,19 @@ func (w *SingleStreamWriter) receiveMessagesLoop(ctx context.Context) { case *rawtopicwriter.UpdateTokenResponse: // pass default: - trace.TopicOnWriterReadUnknownGrpcMessage( - w.cfg.Tracer, - w.cfg.reconnectorInstanceID, - w.SessionID, - xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( - "ydb: unexpected message type in stream reader: %v", - reflect.TypeOf(m), - ))), - ) + { + logCtx := w.cfg.LogContext + trace.TopicOnWriterReadUnknownGrpcMessage( + w.cfg.Tracer, + &logCtx, + w.cfg.reconnectorInstanceID, + w.SessionID, + xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( + "ydb: unexpected message type in stream reader: %v", + reflect.TypeOf(m), + ))), + ) + } } } } @@ -232,16 +246,20 @@ func (w *SingleStreamWriter) sendMessagesFromQueueToStreamLoop(ctx context.Conte return } + err = sendMessagesToStream(w.cfg.stream, w.cfg.maxBytesPerMessage, targetCodec, messages) + + logCtx := w.cfg.LogContext onSentComplete := trace.TopicOnWriterSendMessages( w.cfg.Tracer, + &logCtx, w.cfg.reconnectorInstanceID, w.SessionID, targetCodec.ToInt32(), messages[0].SeqNo, len(messages), ) - err = sendMessagesToStream(w.cfg.stream, w.cfg.maxBytesPerMessage, targetCodec, messages) onSentComplete(err) + if err != nil { err = xerrors.WithStackTrace(fmt.Errorf("ydb: error send message to topic stream: %w", err)) _ = w.close(ctx, err) diff --git a/internal/topic/topicwriterinternal/writer_transaction.go b/internal/topic/topicwriterinternal/writer_transaction.go index ae1a108d2..414841eeb 100644 --- a/internal/topic/topicwriterinternal/writer_transaction.go +++ b/internal/topic/topicwriterinternal/writer_transaction.go @@ -39,10 +39,10 @@ func (w *WriterWithTransaction) onBeforeCommitTransaction(ctx context.Context) ( w.streamWriter.GetSessionID(), w.tx.ID(), ) - ctx = traceCtx defer func() { onDone(err, w.streamWriter.GetSessionID()) + ctx = traceCtx }() // wait message flushing diff --git a/internal/xcontext/merge_contexts.go b/internal/xcontext/merge_contexts.go new file mode 100644 index 000000000..a4888db05 --- /dev/null +++ b/internal/xcontext/merge_contexts.go @@ -0,0 +1,45 @@ +package xcontext + +import ( + "context" + "time" +) + +type MergedContexts struct { + context.Context //nolint:containedctx + BaseContext context.Context //nolint:containedctx + RequestContext context.Context //nolint:containedctx +} + +func (ctx MergedContexts) Deadline() (deadline time.Time, ok bool) { + return ctx.RequestContext.Deadline() +} + +func (ctx MergedContexts) Done() <-chan struct{} { + return ctx.RequestContext.Done() +} + +func (ctx MergedContexts) Err() error { + return ctx.RequestContext.Err() +} + +func (ctx MergedContexts) Value(key interface{}) interface{} { + if ctx.RequestContext.Value(key) != nil { + return ctx.RequestContext.Value(key) + } + + return ctx.BaseContext.Value(key) +} + +func (ctx MergedContexts) AsCtx() *context.Context { + var ret context.Context = &ctx + + return &ret +} + +func MergeContexts(req context.Context, log context.Context) MergedContexts { + return MergedContexts{ + BaseContext: log, + RequestContext: req, + } +} diff --git a/log/context.go b/log/context.go index b724e5ec3..abf168a7e 100644 --- a/log/context.go +++ b/log/context.go @@ -37,5 +37,9 @@ func NamesFromContext(ctx context.Context) []string { } func with(ctx context.Context, lvl Level, names ...string) context.Context { + if ctx == nil { + ctx = context.Background() + } + return WithLevel(WithNames(ctx, names...), lvl) } diff --git a/log/topic.go b/log/topic.go index 06837c9b3..ad2d453a1 100644 --- a/log/topic.go +++ b/log/topic.go @@ -1,7 +1,6 @@ package log import ( - "context" "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" @@ -25,7 +24,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "reconnect") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "reconnect") start := time.Now() l.Log(ctx, "topic reader reconnect starting...") @@ -40,7 +39,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "reconnect", "request") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "reconnect", "request") l.Log(ctx, "topic reader reconnect request", kv.NamedError("reason", info.Reason), kv.Bool("was_sent", info.WasSent), @@ -52,7 +51,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderPartitionEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "partition", "read", "start", "response") + ctx := with(*info.PartitionContext, TRACE, "ydb", "topic", "reader", "partition", "read", "start", "response") start := time.Now() l.Log(ctx, "topic reader start partition read response starting...", kv.String("topic", info.Topic), @@ -97,7 +96,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderPartitionEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "partition", "read", "stop", "response") + ctx := with(info.PartitionContext, TRACE, "ydb", "topic", "reader", "partition", "read", "stop", "response") start := time.Now() l.Log(ctx, "topic reader stop partition read response starting...", kv.String("reader_connection_id", info.ReaderConnectionID), @@ -133,7 +132,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(*info.RequestContext, TRACE, "ydb", "topic", "reader", "commit") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "commit") start := time.Now() l.Log(ctx, "topic reader commit starting...", kv.String("topic", info.Topic), @@ -170,7 +169,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "send", "commit", "message") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "send", "commit", "message") start := time.Now() commitInfo := info.CommitsInfo.GetCommitsInfo() @@ -211,7 +210,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "committed", "notify") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "committed", "notify") l.Log(ctx, "topic reader received commit ack", kv.String("reader_connection_id", info.ReaderConnectionID), kv.String("topic", info.Topic), @@ -224,7 +223,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "close") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "close") start := time.Now() l.Log(ctx, "topic reader close starting...", kv.String("reader_connection_id", info.ReaderConnectionID), @@ -253,7 +252,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "init") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "init") start := time.Now() l.Log(ctx, "topic reader init starting...", kv.String("pre_init_reader_connection_id", info.PreInitReaderConnectionID), @@ -284,7 +283,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "error") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "error") l.Log(WithLevel(ctx, INFO), "topic reader has grpc stream error", kv.Error(info.Error), kv.String("reader_connection_id", info.ReaderConnectionID), @@ -299,7 +298,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "update", "token") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "update", "token") start := time.Now() l.Log(ctx, "topic reader token update starting...", kv.String("reader_connection_id", info.ReaderConnectionID), @@ -347,7 +346,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "sent", "data", "request") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "sent", "data", "request") l.Log(ctx, "topic reader sent data request", kv.String("reader_connection_id", info.ReaderConnectionID), kv.Int("request_bytes", info.RequestBytes), @@ -360,7 +359,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "receive", "data", "response") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "receive", "data", "response") start := time.Now() partitionsCount, batchesCount, messagesCount := info.DataResponse.GetPartitionBatchMessagesCounts() l.Log(ctx, "topic reader data response received, process starting...", @@ -404,7 +403,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return nil } - ctx := with(*info.RequestContext, TRACE, "ydb", "topic", "reader", "read", "messages") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "read", "messages") start := time.Now() l.Log(ctx, "topic read messages, waiting...", kv.Int("min_count", info.MinCount), @@ -436,7 +435,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "unknown", "grpc", "message") + ctx := with(*info.Context, TRACE, "ydb", "topic", "reader", "unknown", "grpc", "message") l.Log(WithLevel(ctx, INFO), "topic reader received unknown grpc message", kv.Error(info.Error), kv.String("reader_connection_id", info.ReaderConnectionID), @@ -648,7 +647,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "reconnect") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "reconnect") start := time.Now() l.Log(ctx, "connect to topic writer stream starting...", kv.String("topic", info.Topic), @@ -694,7 +693,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "stream", "init") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "stream", "init") start := time.Now() l.Log(ctx, "topic writer init stream starting...", kv.String("topic", info.Topic), @@ -734,7 +733,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { start := time.Now() return func(doneInfo trace.TopicOnWriterBeforeCommitTransactionDoneInfo) { - ctx := with(*info.Ctx, TRACE, "ydb", "topic", "writer", "beforecommit") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "beforecommit") l.Log(ctx, "topic writer wait of flush messages before commit transaction", kv.String("kqp_session_id", info.KqpSessionID), kv.String("topic_session_id_start", info.TopicSessionID), @@ -750,7 +749,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { start := time.Now() return func(doneInfo trace.TopicOnWriterAfterFinishTransactionDoneInfo) { - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "beforecommit") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "beforecommit") l.Log(ctx, "topic writer close writer after transaction finished", kv.String("kqp_session_id", info.SessionID), kv.String("tx_id", info.TransactionID), @@ -762,7 +761,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "close") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "close") start := time.Now() l.Log(ctx, "topic writer close starting...", kv.String("writer_instance_id", info.WriterInstanceID), @@ -793,7 +792,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "compress", "messages") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "compress", "messages") start := time.Now() l.Log(ctx, "topic writer compress messages starting...", kv.String("writer_instance_id", info.WriterInstanceID), @@ -836,7 +835,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "send", "messages") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "send", "messages") start := time.Now() l.Log(ctx, "topic writer send messages starting...", kv.String("writer_instance_id", info.WriterInstanceID), @@ -874,7 +873,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return } acks := info.Acks.GetAcks() - ctx := with(context.Background(), DEBUG, "ydb", "topic", "writer", "receive", "result") + ctx := with(*info.Context, DEBUG, "ydb", "topic", "writer", "receive", "result") l.Log(ctx, "topic writer received result from server", kv.String("writer_instance_id", info.WriterInstanceID), kv.String("session_id", info.SessionID), @@ -895,7 +894,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "grpc") l.Log( ctx, "topic writer sent grpc message (message body and metadata are removed)", kv.String("topic_stream_internal_id", info.TopicStreamInternalID), @@ -911,7 +910,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + ctx := with(*info.Context, TRACE, "ydb", "topic", "writer", "grpc") l.Log( ctx, "topic writer received grpc message (message body and metadata are removed)", kv.String("topic_stream_internal_id", info.TopicStreamInternalID), @@ -926,7 +925,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return } - ctx := with(context.Background(), DEBUG, "ydb", "topic", "writer", "read", "unknown", "grpc", "message") + ctx := with(*info.Context, DEBUG, "ydb", "topic", "writer", "read", "unknown", "grpc", "message") l.Log(ctx, "topic writer receive unknown grpc message from server", kv.Error(info.Error), kv.String("writer_instance_id", info.WriterInstanceID), diff --git a/tests/integration/topic_cdc_reader_test.go b/tests/integration/topic_cdc_reader_test.go index 81cd5e51d..d41adfdb5 100644 --- a/tests/integration/topic_cdc_reader_test.go +++ b/tests/integration/topic_cdc_reader_test.go @@ -189,6 +189,17 @@ func createFeedAndReader( return db, reader } +func createFeedAndReaderForDB( + ctx context.Context, + t *testing.T, + db *ydb.Driver, + opts ...topicoptions.ReaderOption, +) (*ydb.Driver, *topicreader.Reader) { + createCDCFeed(ctx, t, db) + reader := createFeedReader(t, db, opts...) + return db, reader +} + var sendCDCCounter int64 func sendCDCMessage(ctx context.Context, t *testing.T, db *ydb.Driver) { diff --git a/tests/integration/topic_log.json b/tests/integration/topic_log.json new file mode 100644 index 000000000..f9bfe3206 --- /dev/null +++ b/tests/integration/topic_log.json @@ -0,0 +1,29 @@ +[ +{"L":"debug","T":"2025-02-26T18:21:27.231+0100","N":"ydb.topic.reader.reconnect.request","M":"topic reader reconnect request","context_name":"test_context","entity":"reader","error":"retryable/CUSTOM (code = -1, source error = \"ydb: first connection attempt not finished\")","was_sent":true}, + +{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.init","M":"topic reader init starting...","context_name":"test_context","entity":"reader","pre_init_reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","consumer":"test-consumer","topics":["/local/test/feed"]}, + +{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.init","M":"topic reader init done","context_name":"test_context","entity":"reader","pre_init_reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","consumer":"test-consumer","topics":["/local/test/feed"],"latency":0.000012292}, + +{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.reconnect","M":"topic reader reconnect starting...","context_name":"test_context","entity":"reader"}, + +{"L":"info","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.reconnect","M":"topic reader reconnect done","context_name":"test_context","entity":"reader","latency":0.000006292}, + +{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.reconnect","M":"topic reader reconnect starting...","context_name":"test_context","entity":"reader"}, + +{"L":"info","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.reconnect","M":"topic reader reconnect done","context_name":"test_context","entity":"reader","latency":0.00000325}, + +{"L":"debug","T":"2025-02-26T18:21:27.235+0100","N":"ydb.topic.reader.sent.data.request","M":"topic reader sent data request","context_name":"test_context","entity":"reader","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","request_bytes":1048576,"local_capacity":1048576}, + +{"L":"debug","T":"2025-02-26T18:21:27.262+0100","N":"ydb.topic.reader.partition.read.start.response","M":"topic reader start partition read response starting...","context_name":"test_context","entity":"reader","topic":"/local/test/feed","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","partition_id":0,"partition_session_id":1}, + +{"L":"info","T":"2025-02-26T18:21:27.262+0100","N":"ydb.topic.reader.partition.read.start.response","M":"topic reader start partition read response done","context_name":"test_context","entity":"reader","topic":"/local/test/feed","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","partition_id":0,"partition_session_id":1,"latency":0.000129625}, + +{"L":"debug","T":"2025-02-26T18:21:27.263+0100","N":"ydb.topic.reader.receive.data.response","M":"topic reader data response received, process starting...","context_name":"test_context","entity":"reader","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","received_bytes":118,"local_capacity":1048458,"partitions_count":1,"batches_count":1,"messages_count":1}, + +{"L":"debug","T":"2025-02-26T18:21:27.263+0100","N":"ydb.topic.reader.read.messages","M":"topic read messages, waiting...","context_name":"test_context","entity":"reader","min_count":1,"max_count":1,"local_capacity_before":1048576}, + +{"L":"debug","T":"2025-02-26T18:21:27.263+0100","N":"ydb.topic.reader.read.messages","M":"topic read messages done","context_name":"test_context","entity":"reader","min_count":1,"max_count":1,"local_capacity_before":1048576,"latency":0.000005666}, + +{"L":"debug","T":"2025-02-26T18:21:27.263+0100","N":"ydb.topic.reader.sent.data.request","M":"topic reader sent data request","context_name":"test_context","entity":"reader","reader_connection_id":"test-consumer_1_196_7260002587652326787_v1","request_bytes":118,"local_capacity":1048576} +] \ No newline at end of file diff --git a/tests/integration/topic_log_test.go b/tests/integration/topic_log_test.go new file mode 100644 index 000000000..a00557ce0 --- /dev/null +++ b/tests/integration/topic_log_test.go @@ -0,0 +1,267 @@ +//go:build integration +// +build integration + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/ydb-platform/ydb-go-sdk/v3" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest" + "github.com/ydb-platform/ydb-go-sdk/v3/log" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions" + "github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar" + "github.com/ydb-platform/ydb-go-sdk/v3/trace" +) + +var internalLogger atomic.Value + +func SetInternalLogger(logger *zap.Logger) { + internalLogger.Store(logger.WithOptions(zap.AddCallerSkip(2))) +} + +func Error(ctx context.Context, msg string, fields ...zap.Field) { + logInternal(ctx, zapcore.ErrorLevel, msg, fields) +} + +func logInternal(ctx context.Context, lvl zapcore.Level, msg string, fields []zap.Field) { + lh := from(ctx) + if lvl == zapcore.DebugLevel && lh.riseDebug { + lvl = zapcore.InfoLevel + } + ce := lh.Check(lvl, msg) + if ce == nil { + return + } + ce.Write(fields...) +} + +type loggerKey struct{} + +// GetInternal returns context logger. +// Should be used only in integration code and with great care +// because logger is configured with zap.AddCallerSkip. +func GetInternal(ctx context.Context) *zap.Logger { + return from(ctx).Logger +} + +type loggerHolder struct { + *zap.Logger + riseDebug bool +} + +func from(ctx context.Context) loggerHolder { + if l, ok := ctx.Value(loggerKey{}).(loggerHolder); ok { + return l + } + if l, ok := internalLogger.Load().(*zap.Logger); ok { + return loggerHolder{Logger: l} + } + // Fallback, so we don't need to manually init logger in every test. + SetInternalLogger(zap.Must(zap.NewDevelopmentConfig().Build())) + return from(ctx) +} + +func With(ctx context.Context, fields ...zap.Field) context.Context { + lh := from(ctx) + lh.Logger = lh.Logger.With(fields...) + return context.WithValue(ctx, loggerKey{}, lh) +} + +type SafeBuffer struct { + buf bytes.Buffer + mu sync.Mutex +} + +func (s *SafeBuffer) Write(p []byte) (n int, err error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.Write(p) +} + +func (s *SafeBuffer) String() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.String() +} + +func SetupZapLogger() (*zap.Logger, *SafeBuffer) { + buf := SafeBuffer{} + syncer := zapcore.AddSync(&buf) + ws := &zapcore.BufferedWriteSyncer{ + WS: syncer, + Size: 512 * 1024, // 512 kB + FlushInterval: time.Millisecond, + } + enc := zapcore.NewJSONEncoder(zapcore.EncoderConfig{ + MessageKey: "M", + LevelKey: "L", + TimeKey: "T", + NameKey: "N", + CallerKey: "C", + FunctionKey: "F", + StacktraceKey: "S", + EncodeLevel: zapcore.LowercaseLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeDuration: zapcore.SecondsDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }) + + core := zapcore.NewCore(enc, ws, zapcore.DebugLevel) + + l := zap.New(core) + SetInternalLogger(l) + return l, &buf +} + +var _ log.Logger = adapter{} + +type adapter struct { + minLevel zapcore.Level +} + +func (a adapter) Log(ctx context.Context, msg string, fields ...log.Field) { + level := Level(ctx) + if !a.minLevel.Enabled(level) { + return + } + l := GetInternal(ctx) + for _, name := range log.NamesFromContext(ctx) { + l = l.Named(name) + } + l.WithOptions(zap.AddCallerSkip(1)).Log(level, msg, Fields(fields)...) +} + +//nolint:exhaustive // good enough. +func fieldToField(field log.Field) zap.Field { + switch field.Type() { + case log.IntType: + return zap.Int(field.Key(), field.IntValue()) + case log.Int64Type: + return zap.Int64(field.Key(), field.Int64Value()) + case log.StringType: + return zap.String(field.Key(), field.StringValue()) + case log.BoolType: + return zap.Bool(field.Key(), field.BoolValue()) + case log.DurationType: + return zap.Duration(field.Key(), field.DurationValue()) + case log.StringsType: + return zap.Strings(field.Key(), field.StringsValue()) + case log.ErrorType: + return zap.Error(field.ErrorValue()) + case log.StringerType: + return zap.Stringer(field.Key(), field.Stringer()) + default: + return zap.Any(field.Key(), field.AnyValue()) + } +} + +func Fields(fields []log.Field) []zap.Field { + ff := make([]zap.Field, len(fields)) + for i, f := range fields { + ff[i] = fieldToField(f) + } + return ff +} + +//nolint:exhaustive // good enough. +func Level(ctx context.Context) zapcore.Level { + switch log.LevelFromContext(ctx) { + case log.TRACE, log.DEBUG: + return zapcore.DebugLevel + case log.INFO: + return zapcore.InfoLevel + case log.WARN: + return zapcore.WarnLevel + case log.ERROR: + return zapcore.ErrorLevel + case log.FATAL: + return zapcore.FatalLevel + default: + return zapcore.InvalidLevel + } +} + +func WithTraces(minLevel zapcore.Level, d trace.Detailer, opts ...log.Option) ydb.Option { + a := adapter{minLevel: minLevel} + return ydb.MergeOptions( + ydb.WithTraceDriver(log.Driver(a, d, opts...)), + ydb.WithTraceTable(log.Table(a, d, opts...)), + ydb.WithTraceScripting(log.Scripting(a, d, opts...)), + ydb.WithTraceScheme(log.Scheme(a, d, opts...)), + ydb.WithTraceCoordination(log.Coordination(a, d, opts...)), + ydb.WithTraceRatelimiter(log.Ratelimiter(a, d, opts...)), + ydb.WithTraceDiscovery(log.Discovery(a, d, opts...)), + ydb.WithTraceTopic(log.Topic(a, d, opts...)), + ydb.WithTraceDatabaseSQL(log.DatabaseSQL(a, d, opts...)), + ) +} + +type LogEntry struct { + Level string `json:"L"` + Timestamp string `json:"T"` + Namespace string `json:"N"` + Message string `json:"M"` + ContextName string `json:"context_name"` + Entity string `json:"entity"` + Endpoint string `json:"endpoint"` + Method string `json:"method"` +} + +func (l *LogEntry) FormatMessage() string { + return fmt.Sprintf(`"N":"%s","M":"%s","context_name":"%s","entity":"%s"`, l.Namespace, l.Message, l.ContextName, l.Entity) +} + +func TestTopicReadMessagesLog(t *testing.T) { + _, buf := SetupZapLogger() + + ctx := xtest.Context(t) + + ctx = With(ctx, zap.String("context_name", "test_context")) + ctx = With(ctx, zap.String("entity", "reader")) + + driver := connect(t, WithTraces(zapcore.DebugLevel, trace.DetailsAll)) + + db, reader := createFeedAndReaderForDB(ctx, t, driver, topicoptions.WithReaderLogContext(ctx)) + + sendCDCMessage(ctx, t, db) + msg, err := reader.ReadMessage(ctx) + require.NoError(t, err) + require.NotEmpty(t, msg.CreatedAt) + t.Logf("msg: %#v", msg) + + require.NoError(t, err) + err = topicsugar.ReadMessageDataWithCallback(msg, func(data []byte) error { + t.Log("Content:", string(data)) + return nil + }) + require.NoError(t, err) + + sendCDCMessage(ctx, t, db) + batch, err := reader.ReadMessagesBatch(ctx) + require.NoError(t, err) + require.NotEmpty(t, batch.Messages) + + fileData, err := os.ReadFile("topic_log.json") + var logs []LogEntry + err = json.Unmarshal(fileData, &logs) + require.NoError(t, err) + + strLog := buf.String() + for _, logString := range logs { + assert.Contains(t, strLog, logString.FormatMessage()) + } +} diff --git a/topic/topicoptions/topicoptions_reader.go b/topic/topicoptions/topicoptions_reader.go index 6569db055..cf2bee0bf 100644 --- a/topic/topicoptions/topicoptions_reader.go +++ b/topic/topicoptions/topicoptions_reader.go @@ -1,6 +1,7 @@ package topicoptions import ( + "context" "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" @@ -281,3 +282,11 @@ func WithReaderWithoutConsumer(saveStateOnReconnection bool) ReaderOption { cfg.CommitMode = CommitModeNone } } + +// WithReaderLogContext allows providing a context.Context instance which will be used +// in log/topic events. +func WithReaderLogContext(ctx context.Context) ReaderOption { + return func(cfg *topicreaderinternal.ReaderConfig) { + cfg.BaseContext = ctx + } +} diff --git a/topic/topicoptions/topicoptions_writer.go b/topic/topicoptions/topicoptions_writer.go index d74befa4e..f4271e312 100644 --- a/topic/topicoptions/topicoptions_writer.go +++ b/topic/topicoptions/topicoptions_writer.go @@ -1,6 +1,7 @@ package topicoptions import ( + "context" "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" @@ -214,3 +215,11 @@ func WithWriterTrace(t trace.Topic) WriterOption { //nolint:gocritic func WithWriterUpdateTokenInterval(interval time.Duration) WriterOption { return topicwriterinternal.WithTokenUpdateInterval(interval) } + +// WithWriterLogContext allows providing a context.Context instance which will be used +// in log/topic events. +func WithWriterLogContext(ctx context.Context) WriterOption { + return func(cfg *topicwriterinternal.WriterReconnectorConfig) { + cfg.LogContext = ctx + } +} diff --git a/trace/topic.go b/trace/topic.go index 00b41b81a..8445726ab 100644 --- a/trace/topic.go +++ b/trace/topic.go @@ -153,6 +153,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderStartInfo struct { + Context *context.Context ReaderID int64 Consumer string Error error @@ -183,11 +184,13 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderSendCommitMessageStartInfo struct { + Context *context.Context CommitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderStreamCommitInfo struct { + Context *context.Context Topic string PartitionID int64 PartitionSessionID int64 @@ -207,6 +210,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCommittedNotifyInfo struct { + Context *context.Context ReaderConnectionID string Topic string PartitionID int64 @@ -216,12 +220,14 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderErrorInfo struct { + Context *context.Context ReaderConnectionID string Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderSentDataRequestInfo struct { + Context *context.Context ReaderConnectionID string RequestBytes int LocalBufferSizeAfterSent int @@ -229,6 +235,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReceiveDataResponseStartInfo struct { + Context *context.Context ReaderConnectionID string LocalBufferSizeAfterReceive int DataResponse TopicReaderDataResponseInfo @@ -247,7 +254,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReadMessagesStartInfo struct { - RequestContext *context.Context + Context *context.Context MinCount int MaxCount int FreeBufferCapacity int @@ -267,13 +274,15 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnReadUnknownGrpcMessageInfo struct { + Context *context.Context ReaderConnectionID string Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReconnectStartInfo struct { - Reason error + Context *context.Context + Reason error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -283,13 +292,14 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReconnectRequestInfo struct { + Context *context.Context Reason error WasSent bool } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCommitStartInfo struct { - RequestContext *context.Context + Context *context.Context Topic string PartitionID int64 PartitionSessionID int64 @@ -304,6 +314,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCloseStartInfo struct { + Context *context.Context ReaderConnectionID string CloseReason error } @@ -315,6 +326,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderInitStartInfo struct { + Context *context.Context PreInitReaderConnectionID string InitRequestInfo TopicReadStreamInitRequestInfo } @@ -333,11 +345,13 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnReadUpdateTokenStartInfo struct { + Context *context.Context ReaderConnectionID string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnReadUpdateTokenMiddleTokenReceivedInfo struct { + Context *context.Context TokenLen int Error error } @@ -424,6 +438,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterReconnectStartInfo struct { + Context *context.Context WriterInstanceID string Topic string ProducerID string @@ -441,6 +456,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterInitStreamStartInfo struct { + Context *context.Context WriterInstanceID string Topic string ProducerID string @@ -454,6 +470,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterCloseStartInfo struct { + Context *context.Context WriterInstanceID string Reason error } @@ -465,6 +482,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterCompressMessagesStartInfo struct { + Context *context.Context WriterInstanceID string SessionID string Codec int32 @@ -480,6 +498,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterSendMessagesStartInfo struct { + Context *context.Context WriterInstanceID string SessionID string Codec int32 @@ -494,6 +513,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterResultMessagesInfo struct { + Context *context.Context WriterInstanceID string SessionID string PartitionID int64 @@ -516,7 +536,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterBeforeCommitTransactionStartInfo struct { - Ctx *context.Context + Context *context.Context KqpSessionID string TopicSessionID string TransactionID string @@ -530,6 +550,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterAfterFinishTransactionStartInfo struct { + Context *context.Context Error error SessionID string TransactionID string @@ -542,6 +563,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterSentGRPCMessageInfo struct { + Context *context.Context TopicStreamInternalID string SessionID string MessageNumber int @@ -551,6 +573,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterReceiveGRPCMessageInfo struct { + Context *context.Context TopicStreamInternalID string SessionID string MessageNumber int @@ -560,6 +583,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterReadUnknownGrpcMessageInfo struct { + Context *context.Context WriterInstanceID string SessionID string Error error diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index f9c98db5b..e7e9f7b08 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -1457,16 +1457,18 @@ func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMe fn(t1) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { +func TopicOnReaderStart(t *Topic, c *context.Context, readerID int64, consumer string, e error) { var p TopicReaderStartInfo + p.Context = c p.ReaderID = readerID p.Consumer = consumer p.Error = e t.onReaderStart(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReconnect(t *Topic, reason error) func(error) { +func TopicOnReaderReconnect(t *Topic, c *context.Context, reason error) func(error) { var p TopicReaderReconnectStartInfo + p.Context = c p.Reason = reason res := t.onReaderReconnect(p) return func(e error) { @@ -1476,8 +1478,9 @@ func TopicOnReaderReconnect(t *Topic, reason error) func(error) { } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { +func TopicOnReaderReconnectRequest(t *Topic, c *context.Context, reason error, wasSent bool) { var p TopicReaderReconnectRequestInfo + p.Context = c p.Reason = reason p.WasSent = wasSent t.onReaderReconnectRequest(p) @@ -1517,9 +1520,9 @@ func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { +func TopicOnReaderCommit(t *Topic, c *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo - p.RequestContext = requestContext + p.Context = c p.Topic = topic p.PartitionID = partitionID p.PartitionSessionID = partitionSessionID @@ -1533,8 +1536,9 @@ func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { +func TopicOnReaderSendCommitMessage(t *Topic, c *context.Context, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { var p TopicReaderSendCommitMessageStartInfo + p.Context = c p.CommitsInfo = commitsInfo res := t.onReaderSendCommitMessage(p) return func(e error) { @@ -1544,8 +1548,9 @@ func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendC } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { +func TopicOnReaderCommittedNotify(t *Topic, c *context.Context, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { var p TopicReaderCommittedNotifyInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.Topic = topic p.PartitionID = partitionID @@ -1554,8 +1559,9 @@ func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic str t.onReaderCommittedNotify(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) func(closeError error) { +func TopicOnReaderClose(t *Topic, c *context.Context, readerConnectionID string, closeReason error) func(closeError error) { var p TopicReaderCloseStartInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.CloseReason = closeReason res := t.onReaderClose(p) @@ -1566,8 +1572,9 @@ func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { +func TopicOnReaderInit(t *Topic, c *context.Context, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { var p TopicReaderInitStartInfo + p.Context = c p.PreInitReaderConnectionID = preInitReaderConnectionID p.InitRequestInfo = initRequestInfo res := t.onReaderInit(p) @@ -1579,19 +1586,22 @@ func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestIn } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { +func TopicOnReaderError(t *Topic, c *context.Context, readerConnectionID string, e error) { var p TopicReaderErrorInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderError(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen int, _ error) func(error) { +func TopicOnReaderUpdateToken(t *Topic, c *context.Context, readerConnectionID string) func(_ *context.Context, tokenLen int, _ error) func(error) { var p OnReadUpdateTokenStartInfo + p.Context = c p.ReaderConnectionID = readerConnectionID res := t.onReaderUpdateToken(p) - return func(tokenLen int, e error) func(error) { + return func(c *context.Context, tokenLen int, e error) func(error) { var p OnReadUpdateTokenMiddleTokenReceivedInfo + p.Context = c p.TokenLen = tokenLen p.Error = e res := res(p) @@ -1680,16 +1690,18 @@ func TopicOnReaderTransactionRollback(t *Topic, c *context.Context, readerID int } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { +func TopicOnReaderSentDataRequest(t *Topic, c *context.Context, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { var p TopicReaderSentDataRequestInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.RequestBytes = requestBytes p.LocalBufferSizeAfterSent = localBufferSizeAfterSent t.onReaderSentDataRequest(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { +func TopicOnReaderReceiveDataResponse(t *Topic, c *context.Context, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { var p TopicReaderReceiveDataResponseStartInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.LocalBufferSizeAfterReceive = localBufferSizeAfterReceive p.DataResponse = dataResponse @@ -1701,9 +1713,9 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { +func TopicOnReaderReadMessages(t *Topic, c *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo - p.RequestContext = requestContext + p.Context = c p.MinCount = minCount p.MaxCount = maxCount p.FreeBufferCapacity = freeBufferCapacity @@ -1722,15 +1734,17 @@ func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCou } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e error) { +func TopicOnReaderUnknownGrpcMessage(t *Topic, c *context.Context, readerConnectionID string, e error) { var p OnReadUnknownGrpcMessageInfo + p.Context = c p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderUnknownGrpcMessage(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, producerID string, attempt int) func(connectionResult error) func(error) { +func TopicOnWriterReconnect(t *Topic, c *context.Context, writerInstanceID string, topic string, producerID string, attempt int) func(connectionResult error) func(error) { var p TopicWriterReconnectStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.Topic = topic p.ProducerID = producerID @@ -1748,8 +1762,9 @@ func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, pro } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { +func TopicOnWriterInitStream(t *Topic, c *context.Context, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { var p TopicWriterInitStreamStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.Topic = topic p.ProducerID = producerID @@ -1762,8 +1777,9 @@ func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, pr } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(error) { +func TopicOnWriterClose(t *Topic, c *context.Context, writerInstanceID string, reason error) func(error) { var p TopicWriterCloseStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.Reason = reason res := t.onWriterClose(p) @@ -1774,9 +1790,9 @@ func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(er } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSessionID string, topicSessionID string, transactionID string) func(_ error, topicSessionID string) { +func TopicOnWriterBeforeCommitTransaction(t *Topic, c *context.Context, kqpSessionID string, topicSessionID string, transactionID string) func(_ error, topicSessionID string) { var p TopicOnWriterBeforeCommitTransactionStartInfo - p.Ctx = ctx + p.Context = c p.KqpSessionID = kqpSessionID p.TopicSessionID = topicSessionID p.TransactionID = transactionID @@ -1789,8 +1805,9 @@ func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSes } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, transactionID string) func(closeError error) { +func TopicOnWriterAfterFinishTransaction(t *Topic, c *context.Context, e error, sessionID string, transactionID string) func(closeError error) { var p TopicOnWriterAfterFinishTransactionStartInfo + p.Context = c p.Error = e p.SessionID = sessionID p.TransactionID = transactionID @@ -1802,8 +1819,9 @@ func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, tr } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { +func TopicOnWriterCompressMessages(t *Topic, c *context.Context, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { var p TopicWriterCompressMessagesStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.Codec = codec @@ -1818,8 +1836,9 @@ func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { +func TopicOnWriterSendMessages(t *Topic, c *context.Context, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { var p TopicWriterSendMessagesStartInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.Codec = codec @@ -1833,8 +1852,9 @@ func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID stri } } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID string, partitionID int64, acks TopicWriterResultMessagesInfoAcks) { +func TopicOnWriterReceiveResult(t *Topic, c *context.Context, writerInstanceID string, sessionID string, partitionID int64, acks TopicWriterResultMessagesInfoAcks) { var p TopicWriterResultMessagesInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.PartitionID = partitionID @@ -1842,8 +1862,9 @@ func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID str t.onWriterReceiveResult(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromClient, e error) { +func TopicOnWriterSentGRPCMessage(t *Topic, c *context.Context, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromClient, e error) { var p TopicWriterSentGRPCMessageInfo + p.Context = c p.TopicStreamInternalID = topicStreamInternalID p.SessionID = sessionID p.MessageNumber = messageNumber @@ -1852,8 +1873,9 @@ func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessio t.onWriterSentGRPCMessage(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromServer, e error) { +func TopicOnWriterReceiveGRPCMessage(t *Topic, c *context.Context, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromServer, e error) { var p TopicWriterReceiveGRPCMessageInfo + p.Context = c p.TopicStreamInternalID = topicStreamInternalID p.SessionID = sessionID p.MessageNumber = messageNumber @@ -1862,8 +1884,9 @@ func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, ses t.onWriterReceiveGRPCMessage(p) } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { +func TopicOnWriterReadUnknownGrpcMessage(t *Topic, c *context.Context, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo + p.Context = c p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.Error = e