Skip to content

Commit 0230c29

Browse files
[FLINK-38742][cdc/postgres] Add support for TIMESTAMP WITH TIME ZONE in Debezium event deserialization
1 parent 25fa95e commit 0230c29

File tree

2 files changed

+59
-0
lines changed

2 files changed

+59
-0
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSchemaDataTypeInference.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,46 @@
2525
import io.debezium.data.geometry.Geography;
2626
import io.debezium.data.geometry.Geometry;
2727
import io.debezium.data.geometry.Point;
28+
import io.debezium.time.ZonedTimestamp;
2829
import org.apache.kafka.connect.data.Schema;
2930

31+
import java.time.Instant;
32+
import java.util.Optional;
33+
3034
/** {@link DataType} inference for PostgresSQL debezium {@link Schema}. */
3135
@Internal
3236
public class PostgresSchemaDataTypeInference extends DebeziumSchemaDataTypeInference {
3337

3438
private static final long serialVersionUID = 1L;
3539

40+
protected DataType inferString(Object value, Schema schema) {
41+
// PostgreSQL TIMESTAMPTZ is encoded as ZonedTimestamp in Debezium
42+
// We need to return TIMESTAMP_TZ (ZonedTimestampType) instead of TIMESTAMP_LTZ
43+
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
44+
int nano =
45+
Optional.ofNullable((String) value)
46+
.map(s -> ZonedTimestamp.FORMATTER.parse(s, Instant::from))
47+
.map(Instant::getNano)
48+
.orElse(0);
49+
50+
int precision;
51+
if (nano == 0) {
52+
precision = 0;
53+
} else if (nano % 1000 > 0) {
54+
precision = 9;
55+
} else if (nano % 1000_000 > 0) {
56+
precision = 6;
57+
} else if (nano % 1000_000_000 > 0) {
58+
precision = 3;
59+
} else {
60+
precision = 0;
61+
}
62+
// Return TIMESTAMP_TZ (ZonedTimestampType) for PostgreSQL TIMESTAMPTZ
63+
return DataTypes.TIMESTAMP_TZ(precision);
64+
}
65+
return super.inferString(value, schema);
66+
}
67+
3668
protected DataType inferStruct(Object value, Schema schema) {
3769
// the Geometry datatype in PostgresSQL will be converted to
3870
// a String with Json format

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.cdc.common.data.RecordData;
2828
import org.apache.flink.cdc.common.data.TimeData;
2929
import org.apache.flink.cdc.common.data.TimestampData;
30+
import org.apache.flink.cdc.common.data.ZonedTimestampData;
3031
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
3132
import org.apache.flink.cdc.common.event.ChangeEvent;
3233
import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -54,6 +55,7 @@
5455
import io.debezium.time.NanoTime;
5556
import io.debezium.time.NanoTimestamp;
5657
import io.debezium.time.Timestamp;
58+
import io.debezium.time.ZonedTimestamp;
5759
import org.apache.kafka.connect.data.Decimal;
5860
import org.apache.kafka.connect.data.Field;
5961
import org.apache.kafka.connect.data.Schema;
@@ -65,6 +67,7 @@
6567
import java.math.BigDecimal;
6668
import java.nio.ByteBuffer;
6769
import java.time.Instant;
70+
import java.time.OffsetDateTime;
6871
import java.util.Collections;
6972
import java.util.Date;
7073
import java.util.HashMap;
@@ -196,6 +199,8 @@ protected DeserializationRuntimeConverter createNotNullConverter(DataType type)
196199
return this::convertToTime;
197200
case TIMESTAMP_WITHOUT_TIME_ZONE:
198201
return this::convertToTimestamp;
202+
case TIMESTAMP_WITH_TIME_ZONE:
203+
return this::convertToZonedTimestamp;
199204
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
200205
return this::convertToLocalTimeZoneTimestamp;
201206
case FLOAT:
@@ -367,6 +372,28 @@ protected Object convertToTimestamp(Object dbzObj, Schema schema) {
367372
+ dbzObj.getClass().getName());
368373
}
369374

375+
protected Object convertToZonedTimestamp(Object dbzObj, Schema schema) {
376+
if (dbzObj instanceof String) {
377+
String str = (String) dbzObj;
378+
// ZonedTimestamp type is encoded in string type with timezone offset
379+
// Format: ISO-8601 with timezone offset (e.g., "2020-07-17T18:00:22+00:00")
380+
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
381+
// Parse using Debezium's ZonedTimestamp formatter
382+
OffsetDateTime offsetDateTime = OffsetDateTime.parse(str, ZonedTimestamp.FORMATTER);
383+
return ZonedTimestampData.fromOffsetDateTime(offsetDateTime);
384+
} else {
385+
// Fallback to standard ISO-8601 parsing
386+
OffsetDateTime offsetDateTime = OffsetDateTime.parse(str);
387+
return ZonedTimestampData.fromOffsetDateTime(offsetDateTime);
388+
}
389+
}
390+
throw new IllegalArgumentException(
391+
"Unable to convert to TIMESTAMP WITH TIME ZONE from unexpected value '"
392+
+ dbzObj
393+
+ "' of type "
394+
+ dbzObj.getClass().getName());
395+
}
396+
370397
protected Object convertToLocalTimeZoneTimestamp(Object dbzObj, Schema schema) {
371398
if (dbzObj instanceof String) {
372399
String str = (String) dbzObj;

0 commit comments

Comments
 (0)