Skip to content

Commit 6464f7d

Browse files
authored
Merge pull request #1634 Prevented create decoder instance until start read a message from topics
2 parents e45a77b + 273a3f9 commit 6464f7d

File tree

5 files changed

+36
-13
lines changed

5 files changed

+36
-13
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Prevented create decoder instance until start read a message from topics
2+
13
## v3.99.4
24
* Fixed bug with wrong context on session closing
35
* Fixed goroutine leak on closing `database/sql` driver

internal/topic/topicreadercommon/message.go

+10-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"io"
89
"time"
910

1011
"github.com/ydb-platform/ydb-go-sdk/v3/internal/empty"
@@ -84,14 +85,18 @@ type PublicMessageContentUnmarshaler interface {
8485
}
8586

8687
func createReader(decoders DecoderMap, codec rawtopiccommon.Codec, rawBytes []byte) oneTimeReader {
87-
reader, err := decoders.Decode(codec, bytes.NewReader(rawBytes))
88-
if err != nil {
89-
reader = errorReader{
90-
err: fmt.Errorf("failed to decode message with codec '%v': %w", codec, err),
88+
var maker readerMaker = func() io.Reader {
89+
reader, err := decoders.Decode(codec, bytes.NewReader(rawBytes))
90+
if err != nil {
91+
reader = errorReader{
92+
err: fmt.Errorf("failed to decode message with codec '%v': %w", codec, err),
93+
}
9194
}
95+
96+
return reader
9297
}
9398

94-
return newOneTimeReader(reader)
99+
return newOneTimeReader(maker)
95100
}
96101

97102
type errorReader struct {

internal/topic/topicreadercommon/message_content_pool_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@ func BenchmarkConsumeContent(b *testing.B) {
1616
b.ReportAllocs()
1717
content := []byte("asd")
1818
reader := bytes.NewReader(content)
19-
msg := PublicMessage{data: newOneTimeReader(reader)}
19+
msg := PublicMessage{data: newOneTimeReaderFromReader(reader)}
2020
for i := 0; i < b.N; i++ {
2121
reader.Reset(content)
2222
msg.dataConsumed = false
23-
msg.data = newOneTimeReader(reader)
23+
msg.data = newOneTimeReaderFromReader(reader)
2424
err := msg.UnmarshalTo(emptyConsumer{})
2525
if err != nil {
2626
b.Fatal()

internal/topic/topicreadercommon/one_time_reader.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,35 @@ import (
77
type oneTimeReader struct {
88
err error
99
reader io.Reader
10+
11+
// prevent early create decoder, because it can consume a lot of memory
12+
// https://github.com/ydb-platform/ydb-go-sdk/issues/1341
13+
readerMaker readerMaker
1014
}
1115

12-
func newOneTimeReader(reader io.Reader) oneTimeReader {
16+
type readerMaker func() io.Reader
17+
18+
func newOneTimeReader(readerMaker readerMaker) oneTimeReader {
1319
return oneTimeReader{
14-
reader: reader,
20+
readerMaker: readerMaker,
1521
}
1622
}
1723

24+
func newOneTimeReaderFromReader(reader io.Reader) oneTimeReader {
25+
maker := func() io.Reader { return reader }
26+
27+
return newOneTimeReader(maker)
28+
}
29+
1830
func (s *oneTimeReader) Read(p []byte) (n int, err error) {
1931
if s.err != nil {
2032
return 0, s.err
2133
}
2234

35+
if s.reader == nil {
36+
s.reader = s.readerMaker()
37+
}
38+
2339
n, err = s.reader.Read(p)
2440
if err != nil {
2541
s.err = err

internal/topic/topicreadercommon/one_time_reader_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
func TestOneTimeReader(t *testing.T) {
1414
t.Run("FullRead", func(t *testing.T) {
15-
r := newOneTimeReader(bytes.NewReader([]byte{1, 2, 3}))
15+
r := newOneTimeReaderFromReader(bytes.NewReader([]byte{1, 2, 3}))
1616
dstBuf := make([]byte, 3)
1717
n, err := r.Read(dstBuf)
1818
require.NoError(t, err)
@@ -24,7 +24,7 @@ func TestOneTimeReader(t *testing.T) {
2424
require.Equal(t, io.EOF, r.err)
2525
})
2626
t.Run("DstMoreThenContent", func(t *testing.T) {
27-
r := newOneTimeReader(bytes.NewReader([]byte{1, 2, 3}))
27+
r := newOneTimeReaderFromReader(bytes.NewReader([]byte{1, 2, 3}))
2828
dstBuf := make([]byte, 4)
2929
n, err := r.Read(dstBuf)
3030
require.NoError(t, err)
@@ -37,7 +37,7 @@ func TestOneTimeReader(t *testing.T) {
3737
require.Equal(t, io.EOF, r.err)
3838
})
3939
t.Run("ReadLess", func(t *testing.T) {
40-
r := newOneTimeReader(bytes.NewReader([]byte{1, 2, 3}))
40+
r := newOneTimeReaderFromReader(bytes.NewReader([]byte{1, 2, 3}))
4141
dstBuf := make([]byte, 2)
4242
n, err := r.Read(dstBuf)
4343
require.NoError(t, err)
@@ -55,7 +55,7 @@ func TestOneTimeReader(t *testing.T) {
5555
require.Equal(t, 0, n)
5656
})
5757
t.Run("InnerErr", func(t *testing.T) {
58-
r := newOneTimeReader(nil)
58+
r := newOneTimeReaderFromReader(nil)
5959

6060
bufSize := 2
6161
preparedData := make([]byte, 2*bufSize)

0 commit comments

Comments
 (0)