-
Notifications
You must be signed in to change notification settings - Fork 92
/
Copy pathdecoders.go
126 lines (100 loc) · 2.53 KB
/
decoders.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package topicreadercommon
import (
"compress/gzip"
"errors"
"fmt"
"io"
"sync"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
)
type ReadResetter interface {
io.Reader
Reset(r io.Reader) error
}
type decoderPool struct {
pool sync.Pool
}
func (p *decoderPool) Get() ReadResetter {
dec, _ := p.pool.Get().(ReadResetter)
return dec
}
func (p *decoderPool) Put(dec ReadResetter) {
p.pool.Put(dec)
}
func newDecoderPool() *decoderPool {
return &decoderPool{
pool: sync.Pool{},
}
}
type DecoderMap struct {
m map[rawtopiccommon.Codec]PublicCreateDecoderFunc
dp map[rawtopiccommon.Codec]*decoderPool
}
func NewDecoderMap() DecoderMap {
dm := DecoderMap{
m: make(map[rawtopiccommon.Codec]PublicCreateDecoderFunc),
dp: make(map[rawtopiccommon.Codec]*decoderPool),
}
dm.AddDecoder(rawtopiccommon.CodecRaw, func(input io.Reader) (ReadResetter, error) {
return &nopResetter{Reader: input}, nil
})
dm.AddDecoder(rawtopiccommon.CodecGzip, func(input io.Reader) (ReadResetter, error) {
gz, err := gzip.NewReader(input)
if err != nil {
return nil, err
}
return gz, nil
})
return dm
}
type pooledDecoder struct {
ReadResetter
pool *decoderPool
}
func (p *pooledDecoder) Close() error {
if closer, ok := p.ReadResetter.(io.Closer); ok {
closer.Close()
}
p.pool.Put(p.ReadResetter)
return nil
}
func (m *DecoderMap) Decode(codec rawtopiccommon.Codec, input io.Reader) (io.Reader, error) {
createFunc, ok := m.m[codec]
if !ok {
return nil, xerrors.WithStackTrace(xerrors.Wrap(
fmt.Errorf("ydb: failed decompress message with codec %v: %w", codec, ErrPublicUnexpectedCodec),
))
}
pool := m.dp[codec]
decoder := pool.Get()
if decoder == nil {
var err error
decoder, err = createFunc(input)
if err != nil {
return nil, err
}
} else {
if err := decoder.Reset(input); err != nil {
return nil, err
}
}
return &pooledDecoder{
ReadResetter: decoder,
pool: pool,
}, nil
}
func (m *DecoderMap) AddDecoder(codec rawtopiccommon.Codec, createFunc func(io.Reader) (ReadResetter, error)) {
m.m[codec] = createFunc
m.dp[codec] = newDecoderPool()
}
type nopResetter struct {
io.Reader
}
func (n *nopResetter) Reset(r io.Reader) error {
n.Reader = r
return nil
}
type PublicCreateDecoderFunc func(input io.Reader) (ReadResetter, error)
// ErrPublicUnexpectedCodec return when try to read message content with unknown codec
var ErrPublicUnexpectedCodec = xerrors.Wrap(errors.New("ydb: unexpected codec"))