Skip to content

Commit 5268914

Browse files
betterlmyshuwenwei
authored andcommitted
fix: fix EOF error when decoding columns with empty string or zero po… (#155)
* fix: fix EOF error when decoding columns with empty string or zero positionCount * Update column_decoder_test.go 增加license header * fix: return error when resp is nil after reconnect * fix: GetCurrentRowTime returns time.Time to avoid precision ambiguity
1 parent 35e9dc4 commit 5268914

File tree

4 files changed

+275
-17
lines changed

4 files changed

+275
-17
lines changed

client/column_decoder.go

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,18 @@ func (decoder *Int32ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataTyp
100100
// +---------------+-----------------+-------------+
101101
// | byte | list[byte] | list[int32] |
102102
// +---------------+-----------------+-------------+
103+
104+
if positionCount == 0 {
105+
switch dataType {
106+
case INT32, DATE:
107+
return NewIntColumn(0, 0, nil, []int32{})
108+
case FLOAT:
109+
return NewFloatColumn(0, 0, nil, []float32{})
110+
default:
111+
return nil, fmt.Errorf("invalid data type: %v", dataType)
112+
}
113+
}
114+
103115
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
104116
if err != nil {
105117
return nil, err
@@ -166,6 +178,18 @@ func (decoder *Int64ArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataTyp
166178
// +---------------+-----------------+-------------+
167179
// | byte | list[byte] | list[int64] |
168180
// +---------------+-----------------+-------------+
181+
182+
if positionCount == 0 {
183+
switch dataType {
184+
case INT64, TIMESTAMP:
185+
return NewLongColumn(0, 0, nil, []int64{})
186+
case DOUBLE:
187+
return NewDoubleColumn(0, 0, nil, []float64{})
188+
default:
189+
return nil, fmt.Errorf("invalid data type: %v", dataType)
190+
}
191+
}
192+
169193
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
170194
if err != nil {
171195
return nil, err
@@ -212,6 +236,11 @@ func (decoder *ByteArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataType
212236
if dataType != BOOLEAN {
213237
return nil, fmt.Errorf("invalid data type: %v", dataType)
214238
}
239+
240+
if positionCount == 0 {
241+
return NewBooleanColumn(0, 0, nil, []bool{})
242+
}
243+
215244
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
216245
if err != nil {
217246
return nil, err
@@ -245,6 +274,11 @@ func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataTy
245274
if TEXT != dataType {
246275
return nil, fmt.Errorf("invalid data type: %v", dataType)
247276
}
277+
278+
if positionCount == 0 {
279+
return NewBinaryColumn(0, 0, nil, []*Binary{})
280+
}
281+
248282
nullIndicators, err := deserializeNullIndicators(reader, positionCount)
249283
if err != nil {
250284
return nil, err
@@ -259,12 +293,17 @@ func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader *bytes.Reader, dataTy
259293
if err != nil {
260294
return nil, err
261295
}
262-
value := make([]byte, length)
263-
_, err = reader.Read(value)
264-
if err != nil {
265-
return nil, err
296+
297+
if length == 0 {
298+
values[i] = NewBinary([]byte{})
299+
} else {
300+
value := make([]byte, length)
301+
_, err = reader.Read(value)
302+
if err != nil {
303+
return nil, err
304+
}
305+
values[i] = NewBinary(value)
266306
}
267-
values[i] = NewBinary(value)
268307
}
269308
return NewBinaryColumn(0, positionCount, nullIndicators, values)
270309
}

client/column_decoder_test.go

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package client
21+
22+
import (
23+
"bytes"
24+
"encoding/binary"
25+
"testing"
26+
)
27+
28+
func buildNullIndicatorBytes(nulls []bool) []byte {
29+
var buf bytes.Buffer
30+
hasNull := false
31+
for _, n := range nulls {
32+
if n {
33+
hasNull = true
34+
break
35+
}
36+
}
37+
if !hasNull {
38+
buf.WriteByte(0)
39+
return buf.Bytes()
40+
}
41+
buf.WriteByte(1)
42+
packed := make([]byte, (len(nulls)+7)/8)
43+
for i, n := range nulls {
44+
if n {
45+
packed[i/8] |= 0b10000000 >> (uint(i) % 8)
46+
}
47+
}
48+
buf.Write(packed)
49+
return buf.Bytes()
50+
}
51+
52+
func TestBinaryArrayColumnDecoder_EmptyString(t *testing.T) {
53+
var buf bytes.Buffer
54+
buf.Write(buildNullIndicatorBytes([]bool{false}))
55+
_ = binary.Write(&buf, binary.BigEndian, int32(0))
56+
57+
col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 1)
58+
if err != nil {
59+
t.Fatalf("ReadColumn failed: %v", err)
60+
}
61+
if col.GetPositionCount() != 1 {
62+
t.Fatalf("expected positionCount=1, got %d", col.GetPositionCount())
63+
}
64+
if col.IsNull(0) {
65+
t.Fatal("row 0 should not be null")
66+
}
67+
val, err := col.GetBinary(0)
68+
if err != nil {
69+
t.Fatalf("GetBinary(0) failed: %v", err)
70+
}
71+
if len(val.values) != 0 {
72+
t.Fatalf("expected empty string, got %q", string(val.values))
73+
}
74+
}
75+
76+
func TestBinaryArrayColumnDecoder_NullThenEmptyString(t *testing.T) {
77+
var buf bytes.Buffer
78+
buf.Write(buildNullIndicatorBytes([]bool{true, false}))
79+
_ = binary.Write(&buf, binary.BigEndian, int32(0))
80+
81+
col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 2)
82+
if err != nil {
83+
t.Fatalf("ReadColumn failed: %v", err)
84+
}
85+
if !col.IsNull(0) {
86+
t.Error("row 0 should be null")
87+
}
88+
if col.IsNull(1) {
89+
t.Error("row 1 should not be null")
90+
}
91+
val, err := col.GetBinary(1)
92+
if err != nil {
93+
t.Fatalf("GetBinary(1) failed: %v", err)
94+
}
95+
if len(val.values) != 0 {
96+
t.Fatalf("expected empty string, got %q", string(val.values))
97+
}
98+
}
99+
100+
func TestBinaryArrayColumnDecoder_WithNull(t *testing.T) {
101+
var buf bytes.Buffer
102+
buf.Write(buildNullIndicatorBytes([]bool{false, true, false}))
103+
writeText := func(s string) {
104+
_ = binary.Write(&buf, binary.BigEndian, int32(len(s)))
105+
buf.WriteString(s)
106+
}
107+
writeText("hello")
108+
writeText("world")
109+
110+
col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), TEXT, 3)
111+
if err != nil {
112+
t.Fatalf("ReadColumn failed: %v", err)
113+
}
114+
if col.IsNull(0) {
115+
t.Error("row 0 should not be null")
116+
}
117+
if v, _ := col.GetBinary(0); string(v.values) != "hello" {
118+
t.Errorf("row 0: expected \"hello\", got %q", string(v.values))
119+
}
120+
if !col.IsNull(1) {
121+
t.Error("row 1 should be null")
122+
}
123+
if col.IsNull(2) {
124+
t.Error("row 2 should not be null")
125+
}
126+
if v, _ := col.GetBinary(2); string(v.values) != "world" {
127+
t.Errorf("row 2: expected \"world\", got %q", string(v.values))
128+
}
129+
}
130+
131+
func TestInt64ArrayColumnDecoder_WithNull(t *testing.T) {
132+
var buf bytes.Buffer
133+
buf.Write(buildNullIndicatorBytes([]bool{false, true, false}))
134+
_ = binary.Write(&buf, binary.BigEndian, int64(100))
135+
_ = binary.Write(&buf, binary.BigEndian, int64(200))
136+
137+
col, err := (&Int64ArrayColumnDecoder{}).ReadColumn(bytes.NewReader(buf.Bytes()), INT64, 3)
138+
if err != nil {
139+
t.Fatalf("ReadColumn failed: %v", err)
140+
}
141+
if col.IsNull(0) {
142+
t.Error("row 0 should not be null")
143+
}
144+
if v, _ := col.GetLong(0); v != 100 {
145+
t.Errorf("row 0: expected 100, got %d", v)
146+
}
147+
if !col.IsNull(1) {
148+
t.Error("row 1 should be null")
149+
}
150+
if col.IsNull(2) {
151+
t.Error("row 2 should not be null")
152+
}
153+
if v, _ := col.GetLong(2); v != 200 {
154+
t.Errorf("row 2: expected 200, got %d", v)
155+
}
156+
}
157+
158+
func TestColumnDecoder_ZeroPositionCount(t *testing.T) {
159+
empty := func() *bytes.Reader { return bytes.NewReader([]byte{}) }
160+
161+
t.Run("Int32ArrayColumnDecoder", func(t *testing.T) {
162+
col, err := (&Int32ArrayColumnDecoder{}).ReadColumn(empty(), INT32, 0)
163+
if err != nil {
164+
t.Fatalf("ReadColumn failed: %v", err)
165+
}
166+
if col.GetPositionCount() != 0 {
167+
t.Errorf("expected positionCount=0, got %d", col.GetPositionCount())
168+
}
169+
})
170+
171+
t.Run("Int64ArrayColumnDecoder", func(t *testing.T) {
172+
col, err := (&Int64ArrayColumnDecoder{}).ReadColumn(empty(), INT64, 0)
173+
if err != nil {
174+
t.Fatalf("ReadColumn failed: %v", err)
175+
}
176+
if col.GetPositionCount() != 0 {
177+
t.Errorf("expected positionCount=0, got %d", col.GetPositionCount())
178+
}
179+
})
180+
181+
t.Run("ByteArrayColumnDecoder", func(t *testing.T) {
182+
col, err := (&ByteArrayColumnDecoder{}).ReadColumn(empty(), BOOLEAN, 0)
183+
if err != nil {
184+
t.Fatalf("ReadColumn failed: %v", err)
185+
}
186+
if col.GetPositionCount() != 0 {
187+
t.Errorf("expected positionCount=0, got %d", col.GetPositionCount())
188+
}
189+
})
190+
191+
t.Run("BinaryArrayColumnDecoder", func(t *testing.T) {
192+
col, err := (&BinaryArrayColumnDecoder{}).ReadColumn(empty(), TEXT, 0)
193+
if err != nil {
194+
t.Fatalf("ReadColumn failed: %v", err)
195+
}
196+
if col.GetPositionCount() != 0 {
197+
t.Errorf("expected positionCount=0, got %d", col.GetPositionCount())
198+
}
199+
})
200+
}

client/session.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -519,10 +519,15 @@ func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64) (*SessionD
519519
request.SessionId = s.sessionId
520520
request.StatementId = s.requestStatementId
521521
resp, err = s.client.ExecuteQueryStatementV2(context.Background(), &request)
522-
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
523-
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize)
524-
} else {
525-
return nil, statusErr
522+
if err == nil {
523+
if resp == nil {
524+
return nil, fmt.Errorf("received nil response after reconnect")
525+
}
526+
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
527+
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize)
528+
} else {
529+
return nil, statusErr
530+
}
526531
}
527532
}
528533
return nil, err
@@ -545,10 +550,15 @@ func (s *Session) ExecuteAggregationQuery(paths []string, aggregations []common.
545550
if s.reconnect() {
546551
request.SessionId = s.sessionId
547552
resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request)
548-
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
549-
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize)
550-
} else {
551-
return nil, statusErr
553+
if err == nil {
554+
if resp == nil {
555+
return nil, fmt.Errorf("received nil response after reconnect")
556+
}
557+
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
558+
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize)
559+
} else {
560+
return nil, statusErr
561+
}
552562
}
553563
}
554564
return nil, err
@@ -572,10 +582,15 @@ func (s *Session) ExecuteAggregationQueryWithLegalNodes(paths []string, aggregat
572582
if s.reconnect() {
573583
request.SessionId = s.sessionId
574584
resp, err = s.client.ExecuteAggregationQueryV2(context.Background(), &request)
575-
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
576-
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize)
577-
} else {
578-
return nil, statusErr
585+
if err == nil {
586+
if resp == nil {
587+
return nil, fmt.Errorf("received nil response after reconnect")
588+
}
589+
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
590+
return NewSessionDataSet("", resp.Columns, resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.requestStatementId, s.client, s.sessionId, resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs, *resp.MoreData, s.config.FetchSize)
591+
} else {
592+
return nil, statusErr
593+
}
579594
}
580595
}
581596
return nil, err

client/sessiondataset.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,7 @@ func (s *SessionDataSet) GetColumnNames() []string {
124124
func (s *SessionDataSet) GetColumnTypes() []string {
125125
return s.ioTDBRpcDataSet.columnTypeList
126126
}
127+
128+
func (s *SessionDataSet) GetCurrentRowTime() time.Time {
129+
return convertToTimestamp(s.ioTDBRpcDataSet.GetCurrentRowTime(), s.ioTDBRpcDataSet.timeFactor)
130+
}

0 commit comments

Comments
 (0)