@@ -557,7 +557,7 @@ func (r *RedisStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messa
557
557
seq := streamBuffer .Head ().LeftBoundary ()
558
558
ts , ok := streamBuffer .FindTimestampBySeq (seq )
559
559
decoder := NewBinaryDecoder (head )
560
- redisMessage , err := ParseMessage (decoder , ts , seq )
560
+ parsedMessage , err := ParseMessage (decoder , ts , seq )
561
561
result := ParseResult {}
562
562
if err != nil {
563
563
if errors .Is (err , NotFound ) || errors .Is (err , ResourceNotAvailble ) {
@@ -572,10 +572,20 @@ func (r *RedisStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messa
572
572
result .ParseState = Success
573
573
}
574
574
575
- // redisMessage.(*RedisMessage).isReq = messageType == Request
576
- redisMessage .(* RedisMessage ).seq = seq
577
- result .ReadBytes = redisMessage .ByteSize ()
578
- result .ParsedMessages = []ParsedMessage {redisMessage }
575
+ redisMessage := parsedMessage .(* RedisMessage )
576
+ if messageType != Unknown {
577
+ redisMessage .isReq = messageType == Request
578
+ } else if redisMessage .isReq {
579
+ // first byte is `kArrayMarker`, but not sure is a request, guess it only
580
+ if redisMessage .command == "" {
581
+ redisMessage .isReq = false
582
+ } else {
583
+ redisMessage .isReq = true
584
+ }
585
+ }
586
+ redisMessage .seq = seq
587
+ result .ReadBytes = parsedMessage .ByteSize ()
588
+ result .ParsedMessages = []ParsedMessage {parsedMessage }
579
589
}
580
590
581
591
return result
0 commit comments