Skip to content

Commit c85444d

Browse files
committed
[FLINK-38206][pipeline-connector][paimon] Support writing to existed table with inconsistent schema with upstream.
1 parent 06ab765 commit c85444d

File tree

7 files changed

+765
-39
lines changed

7 files changed

+765
-39
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/DataField.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2121
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
2222
import org.apache.flink.cdc.common.utils.Preconditions;
23+
import org.apache.flink.table.types.utils.LogicalTypeDataTypeConverter;
2324

2425
import javax.annotation.Nullable;
2526

@@ -125,4 +126,13 @@ public org.apache.flink.table.api.DataTypes.Field toFlinkDataTypeField() {
125126
: org.apache.flink.table.api.DataTypes.FIELD(
126127
name, DataTypeUtils.toFlinkDataType(type), description);
127128
}
129+
130+
public static DataField fromFlinkDataTypeField(
131+
org.apache.flink.table.types.logical.RowType.RowField rowField) {
132+
return DataTypes.FIELD(
133+
rowField.getName(),
134+
DataTypeUtils.fromFlinkDataType(
135+
LogicalTypeDataTypeConverter.toDataType(rowField.getType())),
136+
rowField.getDescription().orElse(null));
137+
}
128138
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,16 @@
2929
import org.apache.flink.cdc.common.types.DataTypes;
3030
import org.apache.flink.cdc.common.types.RowType;
3131
import org.apache.flink.cdc.common.utils.Preconditions;
32+
import org.apache.flink.table.types.logical.LogicalType;
3233
import org.apache.flink.util.CollectionUtil;
3334

3435
import java.util.List;
3536
import java.util.stream.Collectors;
3637

38+
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLength;
39+
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
40+
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
41+
3742
/** Utilities for handling {@link DataType}s. */
3843
public class DataTypeUtils {
3944
/**
@@ -197,4 +202,101 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ
197202
throw new IllegalArgumentException("Illegal type: " + type);
198203
}
199204
}
205+
206+
/**
207+
* Convert Flink's internal {@link org.apache.flink.table.types.DataType} to CDC's {@link
208+
* DataType}.
209+
*/
210+
public static DataType fromFlinkDataType(org.apache.flink.table.types.DataType flinkType) {
211+
LogicalType logicalType = flinkType.getLogicalType();
212+
List<org.apache.flink.table.types.DataType> children = flinkType.getChildren();
213+
DataType dataType;
214+
switch (logicalType.getTypeRoot()) {
215+
case CHAR:
216+
dataType = DataTypes.CHAR(getLength(logicalType));
217+
break;
218+
case VARCHAR:
219+
dataType = DataTypes.VARCHAR(getLength(logicalType));
220+
break;
221+
case BOOLEAN:
222+
dataType = DataTypes.BOOLEAN();
223+
break;
224+
case BINARY:
225+
dataType = DataTypes.BINARY(getLength(logicalType));
226+
break;
227+
case VARBINARY:
228+
dataType = DataTypes.VARBINARY(getLength(logicalType));
229+
break;
230+
case DECIMAL:
231+
dataType = DataTypes.DECIMAL(getPrecision(logicalType), getScale(logicalType));
232+
break;
233+
case TINYINT:
234+
dataType = DataTypes.TINYINT();
235+
break;
236+
case SMALLINT:
237+
dataType = DataTypes.SMALLINT();
238+
break;
239+
case INTEGER:
240+
dataType = DataTypes.INT();
241+
break;
242+
case BIGINT:
243+
dataType = DataTypes.BIGINT();
244+
break;
245+
case FLOAT:
246+
dataType = DataTypes.FLOAT();
247+
break;
248+
case DOUBLE:
249+
dataType = DataTypes.DOUBLE();
250+
break;
251+
case DATE:
252+
dataType = DataTypes.DATE();
253+
break;
254+
case TIME_WITHOUT_TIME_ZONE:
255+
dataType = DataTypes.TIME(getPrecision(logicalType));
256+
break;
257+
case TIMESTAMP_WITHOUT_TIME_ZONE:
258+
dataType = DataTypes.TIMESTAMP(getPrecision(logicalType));
259+
break;
260+
case TIMESTAMP_WITH_TIME_ZONE:
261+
dataType = DataTypes.TIMESTAMP_TZ(getPrecision(logicalType));
262+
break;
263+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
264+
dataType = DataTypes.TIMESTAMP_LTZ(getPrecision(logicalType));
265+
break;
266+
case ARRAY:
267+
Preconditions.checkState(children != null && !children.isEmpty());
268+
dataType = DataTypes.ARRAY(fromFlinkDataType(children.get(0)));
269+
break;
270+
case MAP:
271+
Preconditions.checkState(children != null && children.size() > 1);
272+
dataType =
273+
DataTypes.MAP(
274+
fromFlinkDataType(children.get(0)),
275+
fromFlinkDataType(children.get(1)));
276+
break;
277+
case ROW:
278+
Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
279+
org.apache.flink.table.types.logical.RowType rowType =
280+
(org.apache.flink.table.types.logical.RowType) flinkType.getLogicalType();
281+
DataField[] fields =
282+
rowType.getFields().stream()
283+
.map(DataField::fromFlinkDataTypeField)
284+
.toArray(DataField[]::new);
285+
dataType = DataTypes.ROW(fields);
286+
break;
287+
case INTERVAL_YEAR_MONTH:
288+
case INTERVAL_DAY_TIME:
289+
case NULL:
290+
case MULTISET:
291+
case DISTINCT_TYPE:
292+
case STRUCTURED_TYPE:
293+
case RAW:
294+
case SYMBOL:
295+
case UNRESOLVED:
296+
throw new IllegalArgumentException("Unsupported type: " + flinkType);
297+
default:
298+
throw new IllegalArgumentException("Illegal type: " + flinkType);
299+
}
300+
return logicalType.isNullable() ? dataType : dataType.notNull();
301+
}
200302
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,9 +294,9 @@ private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException
294294
event.getDroppedColumnNames()
295295
.forEach((column) -> tableChangeList.addAll(SchemaChangeProvider.drop(column)));
296296
catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true);
297-
} catch (Catalog.TableNotExistException
298-
| Catalog.ColumnAlreadyExistException
299-
| Catalog.ColumnNotExistException e) {
297+
} catch (Catalog.TableNotExistException | Catalog.ColumnNotExistException e) {
298+
LOG.warn("Failed to apply DropColumnEvent, skip it.", e);
299+
} catch (Catalog.ColumnAlreadyExistException e) {
300300
throw new SchemaEvolveException(event, e.getMessage(), e);
301301
}
302302
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterHelper.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,30 +25,42 @@
2525
import org.apache.flink.cdc.common.data.binary.BinaryMapData;
2626
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
2727
import org.apache.flink.cdc.common.event.DataChangeEvent;
28+
import org.apache.flink.cdc.common.event.TableId;
2829
import org.apache.flink.cdc.common.schema.Column;
2930
import org.apache.flink.cdc.common.schema.Schema;
3031
import org.apache.flink.cdc.common.types.DataType;
3132
import org.apache.flink.cdc.common.types.DataTypeChecks;
3233
import org.apache.flink.cdc.common.types.DataTypeRoot;
34+
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
35+
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
3336
import org.apache.flink.core.memory.MemorySegment;
37+
import org.apache.flink.table.types.utils.TypeConversions;
3438

39+
import org.apache.paimon.catalog.Identifier;
3540
import org.apache.paimon.data.BinaryRow;
3641
import org.apache.paimon.data.BinaryString;
3742
import org.apache.paimon.data.Decimal;
3843
import org.apache.paimon.data.GenericRow;
3944
import org.apache.paimon.data.InternalRow;
4045
import org.apache.paimon.data.Timestamp;
46+
import org.apache.paimon.flink.LogicalTypeConversion;
4147
import org.apache.paimon.memory.MemorySegmentUtils;
48+
import org.apache.paimon.table.Table;
4249
import org.apache.paimon.types.RowKind;
50+
import org.apache.paimon.types.RowType;
4351

4452
import java.nio.ByteBuffer;
4553
import java.time.ZoneId;
4654
import java.util.ArrayList;
4755
import java.util.List;
56+
import java.util.stream.Collectors;
4857

4958
import static org.apache.flink.cdc.common.types.DataTypeChecks.getFieldCount;
5059

51-
/** A helper class for {@link PaimonWriter} to create FieldGetter and GenericRow. */
60+
/**
61+
* A helper class to deduce Schema of paimon table for {@link BucketAssignOperator}, and create
62+
* FieldGetter and GenericRow for {@link PaimonWriter}.
63+
*/
5264
public class PaimonWriterHelper {
5365

5466
/** create a list of {@link RecordData.FieldGetter} for {@link PaimonWriter}. */
@@ -61,6 +73,33 @@ public static List<RecordData.FieldGetter> createFieldGetters(Schema schema, Zon
6173
return fieldGetters;
6274
}
6375

76+
/**
77+
* Check if the columns of upstream schema is the same as the physical schema.
78+
*
79+
* <p>Note: Default value of column was ignored as it has no influence in {@link
80+
* #createFieldGetter(DataType, int, ZoneId)}.
81+
*/
82+
public static Boolean sameColumnsIgnoreCommentAndDefaultValue(
83+
Schema upstreamSchema, Schema physicalSchema) {
84+
List<Column> upstreamColumns = upstreamSchema.getColumns();
85+
List<Column> physicalColumns = physicalSchema.getColumns();
86+
if (upstreamColumns.size() != physicalColumns.size()) {
87+
return false;
88+
}
89+
for (int i = 0; i < physicalColumns.size(); i++) {
90+
Column upstreamColumn = upstreamColumns.get(i);
91+
Column physicalColumn = physicalColumns.get(i);
92+
// Case sensitive.
93+
if (!upstreamColumn.getName().equals(physicalColumn.getName())) {
94+
return false;
95+
}
96+
if (!upstreamColumn.getType().equals(physicalColumn.getType())) {
97+
return false;
98+
}
99+
}
100+
return true;
101+
}
102+
64103
private static RecordData.FieldGetter createFieldGetter(
65104
DataType fieldType, int fieldPos, ZoneId zoneId) {
66105
final RecordData.FieldGetter fieldGetter;
@@ -215,6 +254,32 @@ public static List<GenericRow> convertEventToFullGenericRows(
215254
return fullGenericRows;
216255
}
217256

257+
/**
258+
* Deduce {@link Schema} for a {@link Table}.
259+
*
260+
* <p>Note: default value was not included in the result.
261+
*/
262+
public static Schema deduceSchemaForPaimonTable(Table table) {
263+
RowType rowType = table.rowType();
264+
Schema.Builder builder = Schema.newBuilder();
265+
builder.setColumns(
266+
rowType.getFields().stream()
267+
.map(
268+
column ->
269+
Column.physicalColumn(
270+
column.name(),
271+
DataTypeUtils.fromFlinkDataType(
272+
TypeConversions.fromLogicalToDataType(
273+
LogicalTypeConversion.toLogicalType(
274+
column.type()))),
275+
column.description()))
276+
.collect(Collectors.toList()));
277+
builder.primaryKey(table.primaryKeys());
278+
table.comment().ifPresent(builder::comment);
279+
builder.options(table.options());
280+
return builder.build();
281+
}
282+
218283
private static GenericRow convertRecordDataToGenericRow(
219284
RecordData recordData, List<RecordData.FieldGetter> fieldGetters, RowKind rowKind) {
220285
GenericRow genericRow = new GenericRow(rowKind, recordData.getArity());
@@ -340,4 +405,8 @@ public InternalRow readRowData(
340405
return row;
341406
}
342407
}
408+
409+
public static Identifier identifierFromTableId(TableId tableId) {
410+
return Identifier.fromString(tableId.identifier());
411+
}
343412
}

0 commit comments

Comments
 (0)