@@ -53,11 +53,13 @@ func newStreamListener(
5353 res .initVars (sessionIDCounter )
5454 if err := res .initStream (connectionCtx , client ); err != nil {
5555 res .closeWithTimeout (connectionCtx , err )
56+
5657 return nil , err
5758 }
5859
5960 res .startBackground ()
6061 res .sendDataRequest (config .BufferSize )
62+
6163 return res , nil
6264}
6365
@@ -114,6 +116,7 @@ func (l *streamListener) initVars(sessionIDCounter *atomic.Int64) {
114116 }
115117}
116118
119+ //nolint:funlen
117120func (l * streamListener ) initStream (ctx context.Context , client TopicClient ) error {
118121 streamCtx , streamClose := context .WithCancelCause (xcontext .ValueOnly (ctx ))
119122 l .streamClose = streamClose
@@ -150,20 +153,33 @@ func (l *streamListener) initStream(ctx context.Context, client TopicClient) err
150153
151154 resp , err := l .stream .Recv ()
152155 if err != nil {
153- return xerrors .WithStackTrace (fmt .Errorf ("ydb: failed to receive init response for read stream in the listener: %w" , err ))
156+ return xerrors .WithStackTrace (fmt .Errorf (
157+ "ydb: failed to receive init response for read stream in the listener: %w" ,
158+ err ,
159+ ))
154160 }
155161
156162 if status := resp .StatusData (); ! status .Status .IsSuccess () {
157- // TODO: better handler status error
158- return xerrors .WithStackTrace (fmt .Errorf ("ydb: received bad status on init the topic stream listener: %v (%v)" , status .Status , status .Issues ))
163+ // wrap initialization error as operation status error - for handle with retrier
164+ // https://github.com/ydb-platform/ydb-go-sdk/issues/1361
165+ return xerrors .WithStackTrace (fmt .Errorf (
166+ "ydb: received bad status on init the topic stream listener: %v (%v)" ,
167+ status .Status ,
168+ status .Issues ,
169+ ))
159170 }
160171
161172 initResp , ok := resp .(* rawtopicreader.InitResponse )
162173 if ! ok {
163- return xerrors .WithStackTrace (fmt .Errorf ("bad message type on session init: %v (%v)" , resp , reflect .TypeOf (resp )))
174+ return xerrors .WithStackTrace (fmt .Errorf (
175+ "bad message type on session init: %v (%v)" ,
176+ resp ,
177+ reflect .TypeOf (resp ),
178+ ))
164179 }
165180
166181 l .sessionID = initResp .SessionID
182+
167183 return nil
168184}
169185
@@ -178,13 +194,17 @@ func (l *streamListener) sendMessagesLoop(ctx context.Context) {
178194 l .m .WithLock (func () {
179195 messages = l .messagesToSend
180196 if len (messages ) > 0 {
181- l .messagesToSend = make ([]rawtopicreader.ClientMessage , 0 , len (messages )* 2 )
197+ l .messagesToSend = make ([]rawtopicreader.ClientMessage , 0 , cap (messages ))
182198 }
183199 })
184200
185201 for _ , m := range messages {
186202 if err := l .stream .Send (m ); err != nil {
187- l .closeWithTimeout (ctx , xerrors .WithStackTrace (xerrors .Wrap (fmt .Errorf ("ydb: failed send message by grpc to topic reader stream from listener: %w" , err ))))
203+ l .closeWithTimeout (ctx , xerrors .WithStackTrace (xerrors .Wrap (fmt .Errorf (
204+ "ydb: failed send message by grpc to topic reader stream from listener: %w" ,
205+ err ,
206+ ))))
207+
188208 return
189209 }
190210 }
@@ -203,6 +223,7 @@ func (l *streamListener) receiveMessagesLoop(ctx context.Context) {
203223 l .closeWithTimeout (ctx , xerrors .WithStackTrace (
204224 fmt .Errorf ("ydb: failed read message from the stream in the topic reader listener: %w" , err ),
205225 ))
226+
206227 return
207228 }
208229
@@ -218,14 +239,17 @@ func (l *streamListener) onReceiveServerMessage(ctx context.Context, mess rawtop
218239 case * rawtopicreader.StopPartitionSessionRequest :
219240 err = l .onStopPartitionRequest (ctx , m )
220241 case * rawtopicreader.ReadResponse :
221- err = l .onReadResponse (ctx , m )
242+ err = l .onReadResponse (m )
222243 }
223244 if err != nil {
224245 l .closeWithTimeout (ctx , err )
225246 }
226247}
227248
228- func (l * streamListener ) onStartPartitionRequest (ctx context.Context , m * rawtopicreader.StartPartitionSessionRequest ) error {
249+ func (l * streamListener ) onStartPartitionRequest (
250+ ctx context.Context ,
251+ m * rawtopicreader.StartPartitionSessionRequest ,
252+ ) error {
229253 session := topicreadercommon .NewPartitionSession (
230254 ctx ,
231255 m .PartitionSession .Path ,
@@ -276,10 +300,14 @@ func (l *streamListener) onStartPartitionRequest(ctx context.Context, m *rawtopi
276300 }
277301
278302 l .sendMessage (resp )
303+
279304 return nil
280305}
281306
282- func (l * streamListener ) onStopPartitionRequest (ctx context.Context , m * rawtopicreader.StopPartitionSessionRequest ) error {
307+ func (l * streamListener ) onStopPartitionRequest (
308+ ctx context.Context ,
309+ m * rawtopicreader.StopPartitionSessionRequest ,
310+ ) error {
283311 session , err := l .sessions .Get (m .PartitionSessionID )
284312 if ! m .Graceful && session == nil {
285313 // stop partition may be received twice: graceful and force
@@ -331,10 +359,11 @@ func (l *streamListener) onStopPartitionRequest(ctx context.Context, m *rawtopic
331359 if m .Graceful {
332360 l .sendMessage (& rawtopicreader.StopPartitionSessionResponse {PartitionSessionID : session .StreamPartitionSessionID })
333361 }
362+
334363 return nil
335364}
336365
337- func (l * streamListener ) onReadResponse (ctx context. Context , m * rawtopicreader.ReadResponse ) error {
366+ func (l * streamListener ) onReadResponse (m * rawtopicreader.ReadResponse ) error {
338367 batches , err := topicreadercommon .ReadRawBatchesToPublicBatches (m , l .sessions , l .cfg .Decoders )
339368 if err != nil {
340369 return err
@@ -349,6 +378,7 @@ func (l *streamListener) onReadResponse(ctx context.Context, m *rawtopicreader.R
349378 }
350379 }
351380 l .sendDataRequest (m .BytesSize )
381+
352382 return nil
353383}
354384
0 commit comments