Skip to content

Commit ef2bd72

Browse files
authored
feature:【OSCP】新增dataproxy对于达梦数据库支持(#739) (#81)
1 parent 1aa54be commit ef2bd72

File tree

29 files changed

+5687
-227
lines changed

29 files changed

+5687
-227
lines changed

dataproxy-common/src/main/java/org/secretflow/dataproxy/common/utils/ArrowUtil.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,19 @@
2424
/**
2525
* @author yuexie
2626
* @date 2024/11/8 15:51
27+
* Arrow type utility for parsing Kuscia column types to Arrow types
2728
**/
2829
public class ArrowUtil {
2930

31+
/**
32+
* Parse Kuscia column type to Arrow type
33+
* @param type Column type string (e.g., "int32", "interval_year_month", "large_string")
34+
* @return ArrowType
35+
*/
3036
public static ArrowType parseKusciaColumnType(String type) {
31-
// string integer float datetime timestamp
32-
return switch (type) {
37+
String typeLower = type.toLowerCase();
38+
return switch (typeLower) {
39+
// Integer types
3340
case "int8" -> Types.MinorType.TINYINT.getType();
3441
case "int16" -> Types.MinorType.SMALLINT.getType();
3542
case "int32" -> Types.MinorType.INT.getType();
@@ -38,13 +45,49 @@ public static ArrowType parseKusciaColumnType(String type) {
3845
case "uint16" -> Types.MinorType.UINT2.getType();
3946
case "uint32" -> Types.MinorType.UINT4.getType();
4047
case "uint64" -> Types.MinorType.UINT8.getType();
48+
49+
// Floating point types
4150
case "float32" -> Types.MinorType.FLOAT4.getType();
4251
case "float64", "float" -> Types.MinorType.FLOAT8.getType();
52+
53+
// Date types
4354
case "date32" -> Types.MinorType.DATEDAY.getType();
4455
case "date64" -> Types.MinorType.DATEMILLI.getType();
56+
57+
// Time types
58+
case "time32" -> Types.MinorType.TIMEMILLI.getType();
59+
case "time64" -> Types.MinorType.TIMEMICRO.getType();
60+
61+
// Timestamp types
62+
case "timestamp" -> Types.MinorType.TIMESTAMPMICRO.getType();
63+
case "timestamp_us" -> Types.MinorType.TIMESTAMPMICRO.getType();
64+
case "timestamp_ms" -> Types.MinorType.TIMESTAMPMILLI.getType();
65+
case "timestamp_ns" -> Types.MinorType.TIMESTAMPNANO.getType();
66+
case "timestamp_tz" -> Types.MinorType.TIMESTAMPMICROTZ.getType();
67+
68+
// Boolean types
4569
case "bool" -> Types.MinorType.BIT.getType();
70+
71+
// String types
4672
case "string", "str" -> Types.MinorType.VARCHAR.getType();
73+
case "large_string", "large_utf8", "utf8_large" -> Types.MinorType.LARGEVARCHAR.getType();
74+
75+
// Binary types
4776
case "binary" -> Types.MinorType.VARBINARY.getType();
77+
case "large_binary", "large_varbinary", "varbinary_large" -> Types.MinorType.LARGEVARBINARY.getType();
78+
79+
// Decimal types
80+
// Note: Types.MinorType.DECIMAL.getType() throws UnsupportedOperationException
81+
// Decimal requires precision/scale, must use new ArrowType.Decimal(precision, scale, bitWidth)
82+
case "decimal" -> new ArrowType.Decimal(38, 10, 128);
83+
84+
// Interval types
85+
case "interval_year_month", "interval_ym" ->
86+
Types.MinorType.INTERVALYEAR.getType();
87+
case "interval_day_time", "interval_dt" ->
88+
Types.MinorType.INTERVALDAY.getType();
89+
case "interval" -> Types.MinorType.INTERVALYEAR.getType();
90+
4891
default -> throw DataproxyException.of(DataproxyErrorCode.PARAMS_UNRELIABLE, "Unsupported field types: " + type);
4992
};
5093
}

dataproxy-core/src/main/java/org/secretflow/dataproxy/core/visitor/ByteValueVisitor.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
package org.secretflow.dataproxy.core.visitor;
1818

19+
import lombok.extern.slf4j.Slf4j;
20+
1921
import javax.annotation.Nonnull;
22+
import java.math.BigDecimal;
2023

2124
/**
2225
* @author yuexie
2326
* @date 2024/11/1 20:25
2427
**/
28+
@Slf4j
2529
public class ByteValueVisitor implements ValueVisitor<Byte>{
2630

2731
@Override
@@ -54,4 +58,40 @@ public Byte visit(@Nonnull Double value) {
5458
return value.byteValue();
5559
}
5660

61+
@Override
62+
public Byte visit(@Nonnull String value) {
63+
try {
64+
return Byte.valueOf(value);
65+
} catch (NumberFormatException e) {
66+
log.warn("Failed to parse string '{}' as Byte, using 0", value);
67+
return (byte) 0;
68+
}
69+
}
70+
71+
@Override
72+
public Byte visit(@Nonnull BigDecimal value) {
73+
return value.byteValue();
74+
}
75+
76+
@Override
77+
public Byte visit(@Nonnull Object value) {
78+
// Directly Byte type, return directly
79+
if (value instanceof Byte byteValue) {
80+
return byteValue;
81+
}
82+
83+
// Number type (including BigDecimal, Integer, Long, Short, Float, Double, etc.)
84+
if (value instanceof Number number) {
85+
return number.byteValue();
86+
}
87+
88+
// String type, call dedicated visit(String) method
89+
if (value instanceof String stringValue) {
90+
return visit(stringValue);
91+
}
92+
93+
// Other types: try to convert to string then parse
94+
return visit(value.toString());
95+
}
96+
5797
}

dataproxy-core/src/main/java/org/secretflow/dataproxy/core/visitor/IntegerValueVisitor.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import lombok.extern.slf4j.Slf4j;
2020

2121
import javax.annotation.Nonnull;
22+
import java.sql.Time;
2223
import java.time.LocalDate;
2324
import java.time.LocalDateTime;
2425
import java.time.ZonedDateTime;
@@ -87,7 +88,21 @@ public Integer visit(@Nonnull Integer value) {
8788

8889
@Override
8990
public Integer visit(@Nonnull Date value) {
90-
return (int) value.getTime();
91+
// Handle java.sql.Time: Time32Vector needs milliseconds since midnight
92+
if (value instanceof Time sqlTime) {
93+
// java.sql.Time.getTime() returns milliseconds since Unix epoch
94+
// But Time32Vector needs milliseconds since midnight of the day
95+
// Convert to LocalTime then calculate milliseconds
96+
return (int) (sqlTime.toLocalTime().toNanoOfDay() / 1_000_000);
97+
}
98+
99+
// Handle java.sql.Date: DateDayVector needs days since 1970-01-01
100+
if (value instanceof java.sql.Date sqlDate) {
101+
return (int) sqlDate.toLocalDate().toEpochDay();
102+
}
103+
104+
// For java.util.Date, assume it's a date type, convert milliseconds to days
105+
return (int) (value.getTime() / (24 * 60 * 60 * 1000L));
91106
}
92107

93108
@Override

dataproxy-core/src/main/java/org/secretflow/dataproxy/core/visitor/LongValueVisitor.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,8 @@
1919
import lombok.extern.slf4j.Slf4j;
2020

2121
import javax.annotation.Nonnull;
22-
import java.time.Instant;
23-
import java.time.LocalDate;
24-
import java.time.LocalDateTime;
25-
import java.time.ZoneId;
26-
import java.time.ZoneOffset;
27-
import java.time.ZonedDateTime;
22+
import java.sql.Time;
23+
import java.time.*;
2824
import java.util.Date;
2925

3026
/**
@@ -50,6 +46,8 @@ public Long visit(@Nonnull Object value) {
5046

5147
if (value instanceof Long longValue) {
5248
return visit(longValue);
49+
} else if (value instanceof Time sqlTime) {
50+
return this.visit(sqlTime);
5351
} else if (value instanceof Date dateValue) {
5452
return this.visit(dateValue);
5553
} else if (value instanceof LocalDateTime localDateTime) {
@@ -102,7 +100,10 @@ public Long visit(@Nonnull ZonedDateTime value) {
102100

103101
@Override
104102
public Long visit(@Nonnull LocalDateTime value) {
105-
return value.toInstant(ZoneOffset.of(ZoneId.systemDefault().getId())).toEpochMilli();
103+
// LocalDateTime has no timezone information, treat it as local time in system default timezone.
104+
// Use atZone() instead of toInstant(ZoneOffset.of(...)) because ZoneOffset.of() requires
105+
// an offset (e.g., "+08:00"), cannot directly use zone ID
106+
return value.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
106107
}
107108

108109
@Override
@@ -115,4 +116,12 @@ public Long visit(@Nonnull Instant value) {
115116
log.debug("visit instant: {}", value.toEpochMilli());
116117
return value.toEpochMilli();
117118
}
119+
120+
public Long visit(@Nonnull Time value) {
121+
long nanosSinceMidnight = value.toLocalTime().toNanoOfDay();
122+
long microsSinceMidnight = nanosSinceMidnight / 1_000;
123+
log.debug("Converting java.sql.Time {} (toLocalTime: {}) to microseconds since midnight: {} (nanos: {})",
124+
value, value.toLocalTime(), microsSinceMidnight, nanosSinceMidnight);
125+
return microsSinceMidnight;
126+
}
118127
}

dataproxy-core/src/main/java/org/secretflow/dataproxy/core/visitor/ValueVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.secretflow.dataproxy.core.visitor;
1818

1919
import javax.annotation.Nonnull;
20+
import java.math.BigDecimal;
2021
import java.time.Instant;
2122
import java.time.LocalDate;
2223
import java.time.LocalDateTime;
@@ -65,6 +66,10 @@ default T visit(@Nonnull byte[] value) {
6566
throw new UnsupportedOperationException("byte[] not supported");
6667
}
6768

69+
default T visit(@Nonnull BigDecimal value) {
70+
throw new UnsupportedOperationException("BigDecimal not supported");
71+
}
72+
6873
default T visit(@Nonnull Object value) {
6974
throw new UnsupportedOperationException("Object not supported");
7075
}

dataproxy-integration-tests/pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@
3232
<groupId>org.secretflow</groupId>
3333
<artifactId>dataproxy-plugin-odps</artifactId>
3434
</dependency>
35+
<dependency>
36+
<groupId>org.secretflow</groupId>
37+
<artifactId>dataproxy-plugin-dameng</artifactId>
38+
</dependency>
3539

3640
<dependency>
3741
<groupId>org.projectlombok</groupId>
@@ -67,6 +71,24 @@
6771
<artifactId>mockito-junit-jupiter</artifactId>
6872
<scope>test</scope>
6973
</dependency>
74+
75+
<!-- Testcontainers -->
76+
<dependency>
77+
<groupId>org.testcontainers</groupId>
78+
<artifactId>testcontainers</artifactId>
79+
<scope>test</scope>
80+
</dependency>
81+
<dependency>
82+
<groupId>org.testcontainers</groupId>
83+
<artifactId>junit-jupiter</artifactId>
84+
<scope>test</scope>
85+
</dependency>
86+
<dependency>
87+
<groupId>com.dameng</groupId>
88+
<artifactId>DmJdbcDriver18</artifactId>
89+
<version>8.1.3.62</version>
90+
<scope>test</scope>
91+
</dependency>
7092
</dependencies>
7193

7294
<build>

0 commit comments

Comments
 (0)