Skip to content

Commit 6e5ee7d

Browse files
committed
Merge branch 'v1.4.1'
2 parents 054b728 + 7a0d7c2 commit 6e5ee7d

File tree

3 files changed

+15
-11
lines changed

3 files changed

+15
-11
lines changed

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3737
import org.apache.flink.configuration.Configuration;
3838
import org.apache.flink.streaming.api.functions.async.ResultFuture;
39+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3940
import org.apache.flink.types.Row;
4041
import org.hbase.async.HBaseClient;
4142
import org.slf4j.Logger;
@@ -159,7 +160,10 @@ protected Row fillData(Row input, Object sideInput){
159160
Row row = new Row(sideInfo.getOutFieldInfoList().size());
160161
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
161162
Object obj = input.getField(entry.getValue());
162-
if(obj instanceof Timestamp){
163+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
164+
165+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
166+
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
163167
obj = ((Timestamp)obj).getTime();
164168
}
165169
row.setField(entry.getKey(), obj);

mysql/mysql-side/mysql-all-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAllReqRow.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,15 @@
33
import com.dtstack.flink.sql.side.AllReqRow;
44
import com.dtstack.flink.sql.side.FieldInfo;
55
import com.dtstack.flink.sql.side.JoinInfo;
6-
import com.dtstack.flink.sql.side.SideInfo;
76
import com.dtstack.flink.sql.side.SideTableInfo;
87
import com.dtstack.flink.sql.side.mysql.table.MysqlSideTableInfo;
9-
import com.dtstack.flink.sql.threadFactory.DTThreadFactory;
108
import com.dtstack.flink.sql.util.DtStringUtil;
119
import org.apache.calcite.sql.JoinType;
1210
import org.apache.commons.collections.CollectionUtils;
1311
import org.apache.flink.api.java.typeutils.RowTypeInfo;
1412
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
1513
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
16-
import org.apache.flink.configuration.Configuration;
14+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
1715
import org.apache.flink.types.Row;
1816
import org.apache.flink.util.Collector;
1917
import org.slf4j.Logger;
@@ -28,10 +26,6 @@
2826
import java.util.Calendar;
2927
import java.util.List;
3028
import java.util.Map;
31-
import java.util.concurrent.ExecutorService;
32-
import java.util.concurrent.Executors;
33-
import java.util.concurrent.ScheduledExecutorService;
34-
import java.util.concurrent.TimeUnit;
3529
import java.util.concurrent.atomic.AtomicReference;
3630

3731
/**
@@ -59,14 +53,16 @@ public MysqlAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
5953
super(new MysqlAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6054
}
6155

62-
6356
@Override
6457
protected Row fillData(Row input, Object sideInput) {
6558
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
6659
Row row = new Row(sideInfo.getOutFieldInfoList().size());
6760
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
6861
Object obj = input.getField(entry.getValue());
69-
if(obj instanceof Timestamp){
62+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
63+
64+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
65+
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
7066
obj = ((Timestamp)obj).getTime();
7167
}
7268
row.setField(entry.getKey(), obj);

mysql/mysql-side/mysql-async-side/src/main/java/com/dtstack/flink/sql/side/mysql/MysqlAsyncReqRow.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
4040
import org.apache.flink.configuration.Configuration;
4141
import org.apache.flink.streaming.api.functions.async.ResultFuture;
42+
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
4243
import org.apache.flink.types.Row;
4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
@@ -185,7 +186,10 @@ public Row fillData(Row input, Object line){
185186
Row row = new Row(sideInfo.getOutFieldInfoList().size());
186187
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){
187188
Object obj = input.getField(entry.getValue());
188-
if(obj instanceof Timestamp){
189+
boolean isTimeIndicatorTypeInfo = TimeIndicatorTypeInfo.class.isAssignableFrom(sideInfo.getRowTypeInfo().getTypeAt(entry.getValue()).getClass());
190+
191+
//Type information for indicating event or processing time. However, it behaves like a regular SQL timestamp but is serialized as Long.
192+
if(obj instanceof Timestamp && isTimeIndicatorTypeInfo){
189193
obj = ((Timestamp)obj).getTime();
190194
}
191195
row.setField(entry.getKey(), obj);

0 commit comments

Comments
 (0)