diff --git a/CHANGELOG.md b/CHANGELOG.md index 994697977..ad5467241 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ + +* Supported pool of decoders + ## v3.104.5 * Added query client session pool metrics: create_in_progress, in_use, waiters_queue * Added pool item closing for not-alived item @@ -6,7 +9,7 @@ * Fixed bug with session query latency metric collector ## v3.104.3 -* Changed argument types in `table.Client.ReadRows` to public types for compatibility with mock-generation +* Changed argument types in `table.Client.ReadRows` to public types for compatibility with mock-generation ## v3.104.2 * Added bindings options into `ydb.ParamsFromMap` for bind wide time types @@ -25,10 +28,10 @@ * Supported wide `Date32`, `Datetime64` and `Timestamp64` types ## v3.101.4 -* Switched internal type of result `ydb.Driver.Query()` from `*internal/query.Client` to `query.Client` interface +* Switched internal type of result `ydb.Driver.Query()` from `*internal/query.Client` to `query.Client` interface ## v3.101.3 -* Added `query.TransactionActor` type alias to `query.TxActor` for compatibility with `table.Client` API's +* Added `query.TransactionActor` type alias to `query.TxActor` for compatibility with `table.Client` API's * Removed comment `experimental` from `ydb.ParamsBuilder` and `ydb.ParamsFromMap` * Fixed panic on closing `internal/query/sessionCore.done` channel twice * Fixed hangup when try to send batch of messages with size more, then grpc limits from topic writer internals diff --git a/internal/topic/topicreadercommon/decoders.go b/internal/topic/topicreadercommon/decoders.go index a0625aed5..ae02e21e6 100644 --- a/internal/topic/topicreadercommon/decoders.go +++ b/internal/topic/topicreadercommon/decoders.go @@ -5,43 +5,122 @@ import ( "errors" "fmt" "io" + "sync" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" ) +type ReadResetter interface { + io.Reader + Reset(r io.Reader) error +} + +type decoderPool struct { + pool sync.Pool +} + +func (p *decoderPool) Get() ReadResetter { + dec, _ := p.pool.Get().(ReadResetter) + + return dec +} + +func (p *decoderPool) Put(dec ReadResetter) { + p.pool.Put(dec) +} + +func newDecoderPool() *decoderPool { + return &decoderPool{ + pool: sync.Pool{}, + } +} + type DecoderMap struct { - m map[rawtopiccommon.Codec]PublicCreateDecoderFunc + m map[rawtopiccommon.Codec]PublicCreateDecoderFunc + dp map[rawtopiccommon.Codec]*decoderPool } func NewDecoderMap() DecoderMap { - return DecoderMap{ - m: map[rawtopiccommon.Codec]PublicCreateDecoderFunc{ - rawtopiccommon.CodecRaw: func(input io.Reader) (io.Reader, error) { - return input, nil - }, - rawtopiccommon.CodecGzip: func(input io.Reader) (io.Reader, error) { - return gzip.NewReader(input) - }, - }, + dm := DecoderMap{ + m: make(map[rawtopiccommon.Codec]PublicCreateDecoderFunc), + dp: make(map[rawtopiccommon.Codec]*decoderPool), } + + dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (ReadResetter, error) { + return &nopResetter{Reader: input}, nil + }) + + dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (ReadResetter, error) { + gz, err := gzip.NewReader(input) + if err != nil { + return nil, err + } + + return gz, nil + }) + + return dm } -func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc PublicCreateDecoderFunc) { - m.m[codec] = createFunc +type pooledDecoder struct { + ReadResetter + pool *decoderPool +} + +func (p *pooledDecoder) Close() error { + if closer, ok := p.ReadResetter.(io.Closer); ok { + closer.Close() + } + p.pool.Put(p.ReadResetter) + + return nil } func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Reader, error) { - if f := m.m[codec]; f != nil { - return f(input) + createFunc, ok := m.m[codec] + if !ok { + return nil, xerrors.WithStackTrace(xerrors.Wrap( + fmt.Errorf("ydb: failed decompress message with codec %v: %w", codec, ErrPublicUnexpectedCodec), + )) + } + + pool := m.dp[codec] + decoder := pool.Get() + if decoder == nil { + var err error + decoder, err = createFunc(input) + if err != nil { + return nil, err + } + } else { + if err := decoder.Reset(input); err != nil { + return nil, err + } } - return nil, xerrors.WithStackTrace(xerrors.Wrap( - fmt.Errorf("ydb: failed decompress message with codec %v: %w", codec, ErrPublicUnexpectedCodec), - )) + return &pooledDecoder{ + ReadResetter: decoder, + pool: pool, + }, nil +} + +func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc func(io.Reader) (ReadResetter, error)) { + m.m[codec] = createFunc + m.dp[codec] = newDecoderPool() +} + +type nopResetter struct { + io.Reader +} + +func (n *nopResetter) Reset(r io.Reader) error { + n.Reader = r + + return nil } -type PublicCreateDecoderFunc func(input io.Reader) (io.Reader, error) +type PublicCreateDecoderFunc func(input io.Reader) (ReadResetter, error) // ErrPublicUnexpectedCodec return when try to read message content with unknown codec var ErrPublicUnexpectedCodec = xerrors.Wrap(errors.New("ydb: unexpected codec")) diff --git a/internal/topic/topicreadercommon/decoders_test.go b/internal/topic/topicreadercommon/decoders_test.go new file mode 100644 index 000000000..bb2fa6779 --- /dev/null +++ b/internal/topic/topicreadercommon/decoders_test.go @@ -0,0 +1,168 @@ +package topicreadercommon + +import ( + "bytes" + "compress/gzip" + "errors" + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" +) + +func TestDecoderMap(t *testing.T) { + decoderMap := NewDecoderMap() + + t.Run("DecodeRaw", func(t *testing.T) { + data := []byte("test data") + reader := bytes.NewReader(data) + + decodedReader, err := decoderMap.Decode(rawtopiccommon.CodecRaw, reader) + require.NoError(t, err) + + result, err := io.ReadAll(decodedReader) + require.NoError(t, err) + require.Equal(t, data, result) + }) + + t.Run("DecodeGzip", func(t *testing.T) { + data := []byte("test data") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decodedReader, err := decoderMap.Decode(rawtopiccommon.CodecGzip, &buf) + require.NoError(t, err) + + result, err := io.ReadAll(decodedReader) + require.NoError(t, err) + require.Equal(t, data, result) + }) + + t.Run("DecodeUnknownCodec", func(t *testing.T) { + _, err := decoderMap.Decode(rawtopiccommon.Codec(999), bytes.NewReader([]byte{})) + require.Error(t, err) + require.True(t, errors.Is(err, ErrPublicUnexpectedCodec)) + }) + + t.Run("DecodeCustomCodec", func(t *testing.T) { + dm := NewDecoderMap() + customCodec := rawtopiccommon.Codec(1001) + dm.AddDecoder(customCodec, func(input io.Reader) (ReadResetter, error) { + return gzip.NewReader(input) + }) + require.Len(t, dm.dp, 3) + + data := []byte("custom test data") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decodedReader, err := dm.Decode(customCodec, &buf) + require.NoError(t, err) + defer decodedReader.(io.Closer).Close() + + result, err := io.ReadAll(decodedReader) + require.NoError(t, err) + require.Equal(t, string(data), string(result)) + + data2 := []byte("second test data") + var buf2 bytes.Buffer + gzipWriter2 := gzip.NewWriter(&buf2) + _, err = gzipWriter2.Write(data2) + require.NoError(t, err) + require.NoError(t, gzipWriter2.Close()) + + decodedReader2, err := dm.Decode(customCodec, &buf2) + require.NoError(t, err) + defer decodedReader2.(io.Closer).Close() + + result2, err := io.ReadAll(decodedReader2) + require.NoError(t, err) + require.Equal(t, string(data2), string(result2)) + }) + + t.Run("DecodeCustomCodec", func(t *testing.T) { + dm := NewDecoderMap() + customCodec := rawtopiccommon.Codec(1001) + dm.AddDecoder(customCodec, func(input io.Reader) (ReadResetter, error) { + return gzip.NewReader(input) + }) + require.Len(t, dm.dp, 3) + + data := []byte("custom test data") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decodedReader, err := dm.Decode(customCodec, &buf) + require.NoError(t, err) + result, err := io.ReadAll(decodedReader) + require.NoError(t, err) + require.Equal(t, string(data), string(result)) + require.NoError(t, decodedReader.(io.Closer).Close()) + + data2 := []byte("second test data") + var buf2 bytes.Buffer + gzipWriter2 := gzip.NewWriter(&buf2) + _, err = gzipWriter2.Write(data2) + require.NoError(t, err) + require.NoError(t, gzipWriter2.Close()) + + decodedReader2, err := dm.Decode(customCodec, &buf2) + require.NoError(t, err) + result2, err := io.ReadAll(decodedReader2) + require.NoError(t, err) + require.Equal(t, string(data2), string(result2)) + require.NoError(t, decodedReader2.(io.Closer).Close()) + }) + + t.Run("PoolReuse", func(t *testing.T) { + dm := NewDecoderMap() + customCodec := rawtopiccommon.Codec(1002) + + dm.AddDecoder(customCodec, func(input io.Reader) (ReadResetter, error) { + return gzip.NewReader(input) + }) + + data1 := []byte("hello") + var buf1 bytes.Buffer + gzipWriter1 := gzip.NewWriter(&buf1) + _, err := gzipWriter1.Write(data1) + require.NoError(t, err) + require.NoError(t, gzipWriter1.Close()) + + reader1, err := dm.Decode(customCodec, &buf1) + require.NoError(t, err, "first decoding should succeed") + + result1, err := io.ReadAll(reader1) + require.NoError(t, err, "reading first message should succeed") + require.Equal(t, string(data1), string(result1), "data should match") + + require.NoError(t, reader1.(io.Closer).Close(), "closing first reader should succeed") + + data2 := []byte("world") + var buf2 bytes.Buffer + gzipWriter2 := gzip.NewWriter(&buf2) + _, err = gzipWriter2.Write(data2) + require.NoError(t, err) + require.NoError(t, gzipWriter2.Close()) + + reader2, err := dm.Decode(customCodec, &buf2) + require.NoError(t, err, "second decoding should succeed") + + result2, err := io.ReadAll(reader2) + require.NoError(t, err, "reading second message should succeed") + require.Equal(t, string(data2), string(result2), "data of second message should match") + + require.NoError(t, reader2.(io.Closer).Close(), "closing second reader should succeed") + }) +} diff --git a/internal/topic/topicreadercommon/message.go b/internal/topic/topicreadercommon/message.go index 752b38b78..439327748 100644 --- a/internal/topic/topicreadercommon/message.go +++ b/internal/topic/topicreadercommon/message.go @@ -58,10 +58,11 @@ func (m *PublicMessage) UnmarshalTo(dst PublicMessageContentUnmarshaler) error { if m.dataConsumed { return xerrors.WithStackTrace(errMessageWasReadEarly) } - m.dataConsumed = true + err := callbackOnReaderContent(globalReadMessagePool, m, m.UncompressedSize, dst) + m.data.Close() - return callbackOnReaderContent(globalReadMessagePool, m, m.UncompressedSize, dst) + return err } // Read implements io.Reader @@ -73,7 +74,16 @@ func (m *PublicMessage) UnmarshalTo(dst PublicMessageContentUnmarshaler) error { func (m *PublicMessage) Read(p []byte) (n int, err error) { m.dataConsumed = true - return m.data.Read(p) + n, err = m.data.Read(p) + if err != nil { + m.data.Close() + } + + return n, err +} + +func (m *PublicMessage) Close() error { + return m.data.Close() } // PublicMessageContentUnmarshaler is interface for unmarshal message content diff --git a/internal/topic/topicreadercommon/message_test.go b/internal/topic/topicreadercommon/message_test.go new file mode 100644 index 000000000..fec52fb73 --- /dev/null +++ b/internal/topic/topicreadercommon/message_test.go @@ -0,0 +1,136 @@ +package topicreadercommon + +import ( + "bytes" + "compress/gzip" + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" +) + +func TestPublicMessage(t *testing.T) { + t.Run("DecoderClosesAfterFullRead", func(t *testing.T) { + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write([]byte("test")) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + msg := &PublicMessage{data: newOneTimeReaderFromReader(gzipReader)} + + _, err = io.ReadAll(msg) + require.NoError(t, err, "ReadAll() should complete without errors") + + _, err = gzipReader.Read([]byte{0}) + require.Error(t, err, "gzip.Reader should be closed after full read") + }) + + t.Run("DecoderNotClosedBeforeEOF", func(t *testing.T) { + data := []byte("test") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + msg := &PublicMessage{data: newOneTimeReaderFromReader(gzipReader)} + + readBuf := make([]byte, 10) + n, err := msg.Read(readBuf) + require.Equal(t, io.EOF, err, "gzip.Reader returns EOF immediately after last byte") + require.Equal(t, len(data), n, "should read all data at once") + + require.NoError(t, msg.Close(), "explicit Close after EOF should succeed") + }) + + t.Run("ReadAfterCloseReturnsEOF", func(t *testing.T) { + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write([]byte("test")) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + msg := &PublicMessage{data: newOneTimeReaderFromReader(gzipReader)} + + require.NoError(t, msg.Close(), "Close() should execute without errors") + + readBuf := make([]byte, 2) + n, err := msg.Read(readBuf) + require.Equal(t, 0, n, "After Close(), Read() should return 0 bytes") + require.Equal(t, io.EOF, err, "After Close(), Read() should return EOF") + }) + + t.Run("DecoderClosesAfterReadToEOF", func(t *testing.T) { + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + _, err := gw.Write([]byte("test")) + require.NoError(t, err) + require.NoError(t, gw.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + msg := &PublicMessage{data: newOneTimeReaderFromReader(gzipReader)} + + _, err = io.ReadAll(msg) + require.NoError(t, err) + + _, err = gzipReader.Read([]byte{0}) + require.Error(t, err, "gzip.Reader should be closed after full read to EOF") + }) + + t.Run("DecoderReuseFromPool", func(t *testing.T) { + dm := NewDecoderMap() + customCodec := rawtopiccommon.Codec(1006) + + dm.AddDecoder(customCodec, func(input io.Reader) (ReadResetter, error) { + return gzip.NewReader(input) + }) + + var buf1 bytes.Buffer + gw1 := gzip.NewWriter(&buf1) + _, err := gw1.Write([]byte("message")) + require.NoError(t, err) + require.NoError(t, gw1.Close()) + + reader1, err := dm.Decode(customCodec, &buf1) + require.NoError(t, err) + + msg1 := &PublicMessage{data: newOneTimeReaderFromReader(reader1)} + + data1, err := io.ReadAll(msg1) + require.NoError(t, err) + require.Equal(t, "message", string(data1)) + + require.NoError(t, msg1.Close()) + + var buf2 bytes.Buffer + gw2 := gzip.NewWriter(&buf2) + _, err = gw2.Write([]byte("message2")) + require.NoError(t, err) + require.NoError(t, gw2.Close()) + + reader2, err := dm.Decode(customCodec, &buf2) + require.NoError(t, err) + + msg2 := &PublicMessage{data: newOneTimeReaderFromReader(reader2)} + + data2, err := io.ReadAll(msg2) + require.NoError(t, err) + require.Equal(t, "message2", string(data2)) + + require.NoError(t, msg2.Close()) + }) +} diff --git a/internal/topic/topicreadercommon/one_time_reader.go b/internal/topic/topicreadercommon/one_time_reader.go index ee9a05789..794f56874 100644 --- a/internal/topic/topicreadercommon/one_time_reader.go +++ b/internal/topic/topicreadercommon/one_time_reader.go @@ -1,6 +1,7 @@ package topicreadercommon import ( + "errors" "io" ) @@ -44,3 +45,25 @@ func (s *oneTimeReader) Read(p []byte) (n int, err error) { return n, err } + +func (s *oneTimeReader) Close() error { + if s.err != nil && !errors.Is(s.err, io.EOF) { + return s.err + } + if s.reader == nil { + s.reader = s.readerMaker() + } + if closer, ok := s.reader.(io.Closer); ok { + err := closer.Close() + if err != nil { + s.err = err + s.reader = nil + + return err + } + } + s.reader = nil + s.err = io.EOF + + return nil +} diff --git a/internal/topic/topicreadercommon/one_time_reader_test.go b/internal/topic/topicreadercommon/one_time_reader_test.go index bc8510279..bd1c6e80c 100644 --- a/internal/topic/topicreadercommon/one_time_reader_test.go +++ b/internal/topic/topicreadercommon/one_time_reader_test.go @@ -2,12 +2,15 @@ package topicreadercommon import ( "bytes" + "compress/gzip" "errors" "io" "testing" "testing/iotest" "github.com/stretchr/testify/require" + + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" ) func TestOneTimeReader(t *testing.T) { @@ -68,7 +71,6 @@ func TestOneTimeReader(t *testing.T) { } r.reader = iotest.TimeoutReader(bytes.NewReader(preparedData)) - // first read is ok firstBuf := make([]byte, bufSize) n, err := r.Read(firstBuf) require.NoError(t, err) @@ -76,16 +78,111 @@ func TestOneTimeReader(t *testing.T) { require.Equal(t, preparedData[:bufSize], firstBuf) require.NoError(t, err) - // iotest.TimeoutReader return timeout for second read secondBuf := make([]byte, bufSize) n, err = r.Read(secondBuf) require.Equal(t, err, iotest.ErrTimeout) require.Equal(t, 0, n) require.Equal(t, make([]byte, bufSize), secondBuf) - // Next read again n, err = r.Read(secondBuf) require.Equal(t, err, iotest.ErrTimeout) require.Equal(t, 0, n) }) + + t.Run("CloseWithoutRead", func(t *testing.T) { + reader := newOneTimeReaderFromReader(bytes.NewReader([]byte("test"))) + err := reader.Close() + require.NoError(t, err) + }) + + t.Run("CloseTwice", func(t *testing.T) { + reader := newOneTimeReaderFromReader(bytes.NewReader([]byte("test"))) + require.NoError(t, reader.Close()) + require.NoError(t, reader.Close()) + }) + + t.Run("CloseReleasesResourcesWithGzipDecoder", func(t *testing.T) { + data := []byte("test data for gzip") + var buf bytes.Buffer + gzWriter := gzip.NewWriter(&buf) + _, err := gzWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzWriter.Close()) + + gzReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + reader := newOneTimeReaderFromReader(gzReader) + + tmpBuf := make([]byte, 4) + _, err = reader.Read(tmpBuf) + require.NoError(t, err) + + err = reader.Close() + require.NoError(t, err, "Close() should not return error for gzip.Reader") + + n, err := reader.Read(tmpBuf) + require.Equal(t, 0, n, "After Close(), read should return 0 bytes") + require.ErrorIs(t, err, io.EOF, "After Close(), read should return EOF") + }) + + t.Run("ReadAfterCloseReturnsEOFWithGzip", func(t *testing.T) { + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write([]byte("gzip data")) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + gzipReader, err := gzip.NewReader(&buf) + require.NoError(t, err) + + r := newOneTimeReaderFromReader(gzipReader) + require.NoError(t, r.Close(), "Close() should succeed") + + readBuf := make([]byte, 2) + n, err := r.Read(readBuf) + require.Equal(t, 0, n, "Read() after Close() should return 0 bytes") + require.Equal(t, io.EOF, err, "Read() after Close() should return EOF") + }) + + t.Run("GzipDecoderReturnedToPoolAfterClose", func(t *testing.T) { + dm := NewDecoderMap() + codec := rawtopiccommon.CodecGzip + + dm.AddDecoder(codec, func(input io.Reader) (ReadResetter, error) { + return gzip.NewReader(input) + }) + + data := []byte("pool reuse test") + var buf bytes.Buffer + gzipWriter := gzip.NewWriter(&buf) + _, err := gzipWriter.Write(data) + require.NoError(t, err) + require.NoError(t, gzipWriter.Close()) + + decoder, err := dm.Decode(codec, &buf) + require.NoError(t, err) + + reader := newOneTimeReaderFromReader(decoder) + result, err := io.ReadAll(&reader) + require.NoError(t, err) + require.Equal(t, "pool reuse test", string(result)) + + require.NoError(t, reader.Close(), "Close() should not return error") + + var buf2 bytes.Buffer + gzipWriter2 := gzip.NewWriter(&buf2) + _, err = gzipWriter2.Write([]byte("next message")) + require.NoError(t, err) + require.NoError(t, gzipWriter2.Close()) + + reader2, err := dm.Decode(codec, &buf2) + require.NoError(t, err) + + result2, err := io.ReadAll(reader2) + require.NoError(t, err) + require.Equal(t, "next message", string(result2)) + + require.NoError(t, reader2.(io.Closer).Close()) + }) }