Skip to content

Commit 846f1e4

Browse files
authored
[BugFix] Fix exception that java.lang.String is not a valid external type for DATE/TIMESTAMP type (#111)
Signed-off-by: PengFei Li <[email protected]>
1 parent 2485a7d commit 846f1e4

File tree

3 files changed

+66
-9
lines changed

3 files changed

+66
-9
lines changed

src/main/java/com/starrocks/connector/spark/rest/RestService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ public static List<PartitionDefinition> findPartitions(Settings cfg, Logger logg
289289
if (!StringUtils.isEmpty(cfg.getProperty(STARROCKS_FILTER_QUERY))) {
290290
sql += " where " + cfg.getProperty(STARROCKS_FILTER_QUERY);
291291
}
292-
logger.debug("Query SQL Sending to StarRocks FE is: '{}'.", sql);
292+
logger.info("Query SQL Sending to StarRocks FE is: '{}'.", sql);
293293

294294
HttpPost httpPost = new HttpPost(getUriStr(cfg, logger) + QUERY_PLAN);
295295
String entity = "{\"sql\": \"" + sql + "\"}";

src/main/java/com/starrocks/connector/spark/serialization/RowBatch.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,15 +168,16 @@ public void convertArrowToRowBatch() throws Exception {
168168
for (int col = 0; col < fieldVectors.size(); col++) {
169169
FieldVector curFieldVector = fieldVectors.get(col);
170170
Types.MinorType mt = curFieldVector.getMinorType();
171-
Field field = fieldMap.get(curFieldVector.getName());
172-
173-
String currentType;
174-
175-
if (field != null && field.getType().isPresent()) {
176-
currentType = field.getType().get();
177-
} else {
178-
currentType = DataTypeUtils.map(mt);
171+
String vectorName = curFieldVector.getName();
172+
Field field = schema.get(col);
173+
Preconditions.checkNotNull(field,
174+
"Can't find schema for arrow vector [%s] at index [%s]", vectorName, col);
175+
if (!vectorName.isEmpty()) {
176+
Preconditions.checkState(vectorName.equals(field.getName()),
177+
"The column at [%s] has inconsistent column names between schema [%s] " +
178+
"and arrow vector [%s]", col, field.getName(), vectorName);
179179
}
180+
String currentType = field.getType().orElseGet(() -> DataTypeUtils.map(mt));
180181

181182
switch (currentType) {
182183
case "NULL_TYPE":

src/test/java/com/starrocks/connector/spark/sql/ReadWriteITTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,4 +1107,60 @@ public void testJsonLz4Compression() throws Exception {
11071107

11081108
spark.stop();
11091109
}
1110+
1111+
@Test
1112+
public void testArrowVectorNameIsEmptyForTimestampType() throws Exception {
1113+
String tableName = "testArrowVectorNameIsEmptyForTimestampType_" + genRandomUuid();
1114+
prepareDateAndDateTimeTable(tableName);
1115+
executeSrSQL(
1116+
String.format("INSERT INTO `%s`.`%s` VALUES (1, '1', '2024-04-20', '2024-04-20 19:00:00')",
1117+
DB_NAME, tableName));
1118+
1119+
SparkSession spark = SparkSession
1120+
.builder()
1121+
.master("local[1]")
1122+
.appName("testArrowVectorNameIsEmptyForTimestampType")
1123+
.getOrCreate();
1124+
1125+
String ddl = String.format("CREATE TABLE src \n" +
1126+
"USING starrocks\n" +
1127+
"OPTIONS(\n" +
1128+
" \"starrocks.table.identifier\"=\"%s\",\n" +
1129+
" \"starrocks.fe.http.url\"=\"%s\",\n" +
1130+
" \"starrocks.fe.jdbc.url\"=\"%s\",\n" +
1131+
" \"starrocks.user\"=\"%s\",\n" +
1132+
" \"starrocks.password\"=\"%s\"\n" +
1133+
")", String.join(".", DB_NAME, tableName), FE_HTTP, FE_JDBC, USER, PASSWORD);
1134+
spark.sql(ddl);
1135+
List<Row> readRows = spark.sql("SELECT c1, c2, c3 FROM src WHERE c0 = 1;").collectAsList();
1136+
1137+
List<List<Object>> expectedData = Collections.singletonList(
1138+
Arrays.asList(
1139+
"1",
1140+
Date.valueOf("2024-04-20"),
1141+
Timestamp.valueOf("2024-04-20 19:00:00")
1142+
)
1143+
);
1144+
verifyRows(expectedData, readRows);
1145+
1146+
spark.stop();
1147+
}
1148+
1149+
private void prepareDateAndDateTimeTable(String tableName) throws Exception {
1150+
String createStarRocksTable =
1151+
String.format("CREATE TABLE `%s`.`%s` (" +
1152+
"c0 INT," +
1153+
"c1 STRING," +
1154+
"c2 DATE," +
1155+
"c3 DATETIME" +
1156+
") ENGINE=OLAP " +
1157+
"PRIMARY KEY(`c0`) " +
1158+
"DISTRIBUTED BY HASH(`c0`) BUCKETS 2 " +
1159+
"PROPERTIES (" +
1160+
"\"replication_num\" = \"1\"" +
1161+
")",
1162+
DB_NAME, tableName);
1163+
executeSrSQL(createStarRocksTable);
1164+
}
1165+
11101166
}

0 commit comments

Comments
 (0)