Skip to content

Commit 272d99e

Browse files
proletarianswuzexianyuxiqian
authored
[FLINK-38079] Add Pipeline support for DateType and TimeType (#4060)
Signed-off-by: yuxiqian <[email protected]> Co-authored-by: wuzexian <[email protected]> Co-authored-by: yuxiqian <[email protected]>
1 parent 381193f commit 272d99e

File tree

50 files changed

+1045
-290
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1045
-290
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.data;
19+
20+
import java.time.LocalDate;
21+
import java.util.Objects;
22+
23+
/**
24+
* An internal data structure representing data of {@link
25+
* org.apache.flink.cdc.common.types.DateType}.
26+
*/
27+
public class DateData implements Comparable<DateData> {
28+
29+
private final int epochDay;
30+
31+
private DateData(int epochDay) {
32+
this.epochDay = epochDay;
33+
}
34+
35+
public static DateData fromEpochDay(int epochOfDay) {
36+
return new DateData(epochOfDay);
37+
}
38+
39+
public static DateData fromLocalDate(LocalDate date) {
40+
return fromEpochDay((int) date.toEpochDay());
41+
}
42+
43+
public static DateData fromIsoLocalDateString(String dateString) {
44+
return fromLocalDate(LocalDate.parse(dateString));
45+
}
46+
47+
public int toEpochDay() {
48+
return epochDay;
49+
}
50+
51+
public LocalDate toLocalDate() {
52+
return LocalDate.ofEpochDay(epochDay);
53+
}
54+
55+
public String toString() {
56+
return toLocalDate().toString();
57+
}
58+
59+
@Override
60+
public final boolean equals(Object o) {
61+
if (!(o instanceof DateData)) {
62+
return false;
63+
}
64+
65+
DateData dateData = (DateData) o;
66+
return epochDay == dateData.epochDay;
67+
}
68+
69+
@Override
70+
public int compareTo(DateData other) {
71+
return Long.compare(epochDay, other.epochDay);
72+
}
73+
74+
@Override
75+
public int hashCode() {
76+
return Objects.hash(epochDay);
77+
}
78+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/RecordData.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,12 @@ public interface RecordData {
163163
*/
164164
RecordData getRow(int pos, int numFields);
165165

166+
/** Returns the Date data at the given position. */
167+
DateData getDate(int pos);
168+
169+
/** Returns the Time data at the given position. */
170+
TimeData getTime(int pos);
171+
166172
/**
167173
* Creates an accessor for getting elements in an internal RecordData structure at the given
168174
* position.
@@ -197,9 +203,13 @@ static RecordData.FieldGetter createFieldGetter(DataType fieldType, int fieldPos
197203
fieldGetter = record -> record.getShort(fieldPos);
198204
break;
199205
case INTEGER:
206+
fieldGetter = record -> record.getInt(fieldPos);
207+
break;
200208
case DATE:
209+
fieldGetter = record -> record.getDate(fieldPos);
210+
break;
201211
case TIME_WITHOUT_TIME_ZONE:
202-
fieldGetter = record -> record.getInt(fieldPos);
212+
fieldGetter = record -> record.getTime(fieldPos);
203213
break;
204214
case BIGINT:
205215
fieldGetter = record -> record.getLong(fieldPos);
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.data;
19+
20+
import java.time.LocalTime;
21+
import java.util.Objects;
22+
23+
/**
24+
* An internal data structure representing data of {@link
25+
* org.apache.flink.cdc.common.types.TimeType}.
26+
*/
27+
public class TimeData implements Comparable<TimeData> {
28+
29+
private static final int SECONDS_TO_MILLIS = 1000;
30+
private static final int MILLIS_TO_MICRO = 1000;
31+
private static final int MILLIS_TO_NANO = 1_000_000;
32+
33+
private final int millisOfDay;
34+
35+
private TimeData(int millisOfDay) {
36+
this.millisOfDay = millisOfDay;
37+
}
38+
39+
public static TimeData fromSecondOfDay(int secondOfDay) {
40+
return new TimeData(secondOfDay * SECONDS_TO_MILLIS);
41+
}
42+
43+
public static TimeData fromMillisOfDay(int millisOfDay) {
44+
return new TimeData(millisOfDay);
45+
}
46+
47+
public static TimeData fromMicroOfDay(long microOfDay) {
48+
return new TimeData((int) (microOfDay / MILLIS_TO_MICRO));
49+
}
50+
51+
public static TimeData fromNanoOfDay(long nanoOfDay) {
52+
// millisOfDay should not exceed 86400000, which is safe to fit into INT.
53+
return new TimeData((int) (nanoOfDay / MILLIS_TO_NANO));
54+
}
55+
56+
public static TimeData fromLocalTime(LocalTime localTime) {
57+
return fromNanoOfDay(localTime.toNanoOfDay());
58+
}
59+
60+
public static TimeData fromIsoLocalTimeString(String timeString) {
61+
return fromLocalTime(LocalTime.parse(timeString));
62+
}
63+
64+
public int toMillisOfDay() {
65+
return millisOfDay;
66+
}
67+
68+
public LocalTime toLocalTime() {
69+
return LocalTime.ofNanoOfDay((long) millisOfDay * MILLIS_TO_NANO);
70+
}
71+
72+
public String toString() {
73+
return toLocalTime().toString();
74+
}
75+
76+
@Override
77+
public final boolean equals(Object o) {
78+
if (!(o instanceof TimeData)) {
79+
return false;
80+
}
81+
82+
TimeData timeData = (TimeData) o;
83+
return millisOfDay == timeData.millisOfDay;
84+
}
85+
86+
@Override
87+
public int compareTo(TimeData other) {
88+
return Long.compare(millisOfDay, other.millisOfDay);
89+
}
90+
91+
@Override
92+
public int hashCode() {
93+
return Objects.hash(millisOfDay);
94+
}
95+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/data/binary/BinaryRecordData.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
2121
import org.apache.flink.cdc.common.data.ArrayData;
22+
import org.apache.flink.cdc.common.data.DateData;
2223
import org.apache.flink.cdc.common.data.DecimalData;
2324
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2425
import org.apache.flink.cdc.common.data.MapData;
2526
import org.apache.flink.cdc.common.data.RecordData;
2627
import org.apache.flink.cdc.common.data.StringData;
28+
import org.apache.flink.cdc.common.data.TimeData;
2729
import org.apache.flink.cdc.common.data.TimestampData;
2830
import org.apache.flink.cdc.common.data.ZonedTimestampData;
2931
import org.apache.flink.cdc.common.utils.Preconditions;
@@ -216,6 +218,18 @@ public RecordData getRow(int pos, int numFields) {
216218
return BinarySegmentUtils.readRecordData(segments, numFields, offset, getLong(pos));
217219
}
218220

221+
@Override
222+
public DateData getDate(int pos) {
223+
assertIndexIsValid(pos);
224+
return DateData.fromEpochDay(getInt(pos));
225+
}
226+
227+
@Override
228+
public TimeData getTime(int pos) {
229+
assertIndexIsValid(pos);
230+
return TimeData.fromMillisOfDay(getInt(pos));
231+
}
232+
219233
/** The bit is 1 when the field is null. Default is 0. */
220234
@Override
221235
public boolean anyNull() {

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.flink.cdc.common.types.utils;
1919

2020
import org.apache.flink.cdc.common.data.ArrayData;
21+
import org.apache.flink.cdc.common.data.DateData;
2122
import org.apache.flink.cdc.common.data.DecimalData;
2223
import org.apache.flink.cdc.common.data.MapData;
2324
import org.apache.flink.cdc.common.data.RecordData;
2425
import org.apache.flink.cdc.common.data.StringData;
26+
import org.apache.flink.cdc.common.data.TimeData;
2527
import org.apache.flink.cdc.common.data.TimestampData;
2628
import org.apache.flink.cdc.common.data.ZonedTimestampData;
2729
import org.apache.flink.cdc.common.types.DataField;
@@ -58,9 +60,11 @@ public static Class<?> toInternalConversionClass(DataType type) {
5860
case SMALLINT:
5961
return Short.class;
6062
case INTEGER:
63+
return Integer.class;
6164
case DATE:
65+
return DateData.class;
6266
case TIME_WITHOUT_TIME_ZONE:
63-
return Integer.class;
67+
return TimeData.class;
6468
case BIGINT:
6569
return Long.class;
6670
case FLOAT:

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/DateTimeUtils.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.flink.cdc.common.utils;
1919

20+
import org.apache.flink.cdc.common.data.DateData;
21+
import org.apache.flink.cdc.common.data.TimeData;
22+
2023
import org.slf4j.Logger;
2124
import org.slf4j.LoggerFactory;
2225

@@ -71,12 +74,12 @@ public class DateTimeUtils {
7174
* @param ts the timestamp in milliseconds.
7275
* @return the date in days.
7376
*/
74-
public static int timestampMillisToDate(long ts) {
75-
int days = (int) (ts / MILLIS_PER_DAY);
77+
public static DateData timestampMillisToDate(long ts) {
78+
long days = ts / MILLIS_PER_DAY;
7679
if (days < 0) {
7780
days = days - 1;
7881
}
79-
return days;
82+
return DateData.fromEpochDay((int) days);
8083
}
8184

8285
/**
@@ -85,8 +88,8 @@ public static int timestampMillisToDate(long ts) {
8588
* @param ts the timestamp in milliseconds.
8689
* @return the time in milliseconds.
8790
*/
88-
public static int timestampMillisToTime(long ts) {
89-
return (int) (ts % MILLIS_PER_DAY);
91+
public static TimeData timestampMillisToTime(long ts) {
92+
return TimeData.fromMillisOfDay((int) (ts % MILLIS_PER_DAY));
9093
}
9194

9295
// --------------------------------------------------------------------------------------------
@@ -103,12 +106,12 @@ public static int parseDate(String dateStr, String fromFormat) {
103106
return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(), zdt.getDayOfMonth());
104107
}
105108

106-
public static int parseDate(String dateStr, String fromFormat, String timezone) {
109+
public static DateData parseDate(String dateStr, String fromFormat, String timezone) {
107110
long ts = internalParseTimestampMillis(dateStr, fromFormat, TimeZone.getTimeZone(timezone));
108111
ZoneId zoneId = ZoneId.of(timezone);
109112
Instant instant = Instant.ofEpochMilli(ts);
110113
ZonedDateTime zdt = ZonedDateTime.ofInstant(instant, zoneId);
111-
return ymdToUnixDate(zdt.getYear(), zdt.getMonthValue(), zdt.getDayOfMonth());
114+
return DateData.fromLocalDate(zdt.toLocalDate());
112115
}
113116

114117
private static long internalParseTimestampMillis(String dateStr, String format, TimeZone tz) {

0 commit comments

Comments
 (0)