Skip to content

Commit 06ab765

Browse files
authored
[FLINK-38142] Upgrading the Paimon version to 1.2.0 (#4066)
1 parent 224a075 commit 06ab765

File tree

8 files changed

+28
-47
lines changed

8 files changed

+28
-47
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ limitations under the License.
2929
<artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
3030

3131
<properties>
32-
<paimon.version>1.0.1</paimon.version>
32+
<paimon.version>1.2.0</paimon.version>
3333
<hadoop.version>2.8.5</hadoop.version>
3434
<hive.version>2.3.9</hive.version>
3535
<mockito.version>3.4.6</mockito.version>
@@ -301,4 +301,4 @@ limitations under the License.
301301
</plugins>
302302
</build>
303303

304-
</project>
304+
</project>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/SchemaChangeProvider.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.flink.cdc.common.types.DataType;
2323
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
2424

25-
import org.apache.paimon.CoreOptions;
2625
import org.apache.paimon.flink.LogicalTypeConversion;
2726
import org.apache.paimon.schema.SchemaChange;
2827

@@ -36,7 +35,6 @@
3635
* represent different types of schema modifications.
3736
*/
3837
public class SchemaChangeProvider {
39-
4038
/**
4139
* Creates a SchemaChange object for adding a column without specifying its position.
4240
*
@@ -60,13 +58,9 @@ public static List<SchemaChange> add(AddColumnEvent.ColumnWithPosition columnWit
6058
Optional.ofNullable(column.getDefaultValueExpression())
6159
.ifPresent(
6260
value -> {
63-
String key =
64-
String.format(
65-
"%s.%s.%s",
66-
CoreOptions.FIELDS_PREFIX,
67-
column.getName(),
68-
CoreOptions.DEFAULT_VALUE_SUFFIX);
69-
result.add(SchemaChangeProvider.setOption(key, value));
61+
result.add(
62+
SchemaChange.updateColumnDefaultValue(
63+
new String[] {column.getName()}, value));
7064
});
7165
return result;
7266
}
@@ -98,13 +92,9 @@ public static List<SchemaChange> add(
9892
Optional.ofNullable(column.getDefaultValueExpression())
9993
.ifPresent(
10094
value -> {
101-
String key =
102-
String.format(
103-
"%s.%s.%s",
104-
CoreOptions.FIELDS_PREFIX,
105-
column.getName(),
106-
CoreOptions.DEFAULT_VALUE_SUFFIX);
107-
result.add(SchemaChangeProvider.setOption(key, value));
95+
result.add(
96+
SchemaChange.updateColumnDefaultValue(
97+
new String[] {column.getName()}, value));
10898
});
10999
return result;
110100
}
@@ -134,11 +124,6 @@ public static List<SchemaChange> rename(
134124
String oldColumnName, String newColumnName, Map<String, String> options) {
135125
List<SchemaChange> result = new ArrayList<>();
136126
result.add(SchemaChange.renameColumn(oldColumnName, newColumnName));
137-
String defaultValue = options.get(defaultValueOptionKey(oldColumnName));
138-
if (defaultValue != null) {
139-
result.add(SchemaChange.removeOption(defaultValueOptionKey(oldColumnName)));
140-
result.add(SchemaChange.setOption(defaultValueOptionKey(newColumnName), defaultValue));
141-
}
142127
return result;
143128
}
144129

@@ -151,16 +136,9 @@ public static List<SchemaChange> rename(
151136
public static List<SchemaChange> drop(String columnName) {
152137
List<SchemaChange> result = new ArrayList<>();
153138
result.add(SchemaChange.dropColumn(columnName));
154-
result.add(SchemaChange.removeOption(defaultValueOptionKey(columnName)));
155139
return result;
156140
}
157141

158-
public static String defaultValueOptionKey(String columnName) {
159-
return String.format(
160-
"%s.%s.%s",
161-
CoreOptions.FIELDS_PREFIX, columnName, CoreOptions.DEFAULT_VALUE_SUFFIX);
162-
}
163-
164142
/**
165143
* Creates a SchemaChange object for setting an option.
166144
*

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public PaimonCommitter(Options catalogOptions, String commitUser) {
4646
new StoreMultiCommitter(
4747
() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions),
4848
org.apache.paimon.flink.sink.Committer.createContext(
49-
commitUser, null, true, false, null));
49+
commitUser, null, true, false, null, 1, 1));
5050
}
5151

5252
@Override
@@ -66,7 +66,7 @@ public void commit(Collection<CommitRequest<MultiTableCommittable>> commitReques
6666
storeMultiCommitter.combine(checkpointId, 1L, committables);
6767
try {
6868
storeMultiCommitter.filterAndCommit(
69-
Collections.singletonList(wrappedManifestCommittable));
69+
Collections.singletonList(wrappedManifestCommittable), true, false);
7070
commitRequests.forEach(CommitRequest::signalAlreadyCommitted);
7171
LOGGER.info(
7272
"Commit succeeded for {} with {} committable",

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ public void initializeState(StateInitializationContext context) throws Exception
8585
getMetricGroup(),
8686
true,
8787
context.isRestored(),
88-
context.getOperatorStateStore()));
88+
context.getOperatorStateStore(),
89+
getRuntimeContext().getNumberOfParallelSubtasks(),
90+
getRuntimeContext().getIndexOfThisSubtask()));
8991
}
9092
}
9193

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
118118
"memoryPool and memoryPoolFactory cannot be set at the same time.");
119119

120120
TableWriteImpl<?> tableWrite =
121-
table.newWrite(commitUser, (part, bucket) -> true)
121+
table.newWrite(commitUser, (part, bucket) -> true, null)
122122
.withIOManager(paimonIOManager)
123123
.withIgnorePreviousFiles(ignorePreviousFiles);
124124

@@ -142,9 +142,6 @@ public void withCompactExecutor(ExecutorService compactExecutor) {
142142
write.withCompactExecutor(compactExecutor);
143143
}
144144

145-
@Override
146-
public void withInsertOnly(boolean b) {}
147-
148145
@Override
149146
public SinkRecord write(InternalRow internalRow) throws Exception {
150147
return write.writeAndReturn(internalRow);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
216216
}
217217
long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
218218
Integer numAssigners = table.coreOptions().dynamicBucketInitialBuckets();
219+
Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets();
220+
219221
return new Tuple4<>(
220222
table.bucketMode(),
221223
table.createRowKeyExtractor(),
@@ -226,7 +228,8 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
226228
totalTasksNumber,
227229
MathUtils.min(numAssigners, totalTasksNumber),
228230
currentTaskNumber,
229-
targetRowNum),
231+
targetRowNum,
232+
maxBucketsNum),
230233
new RowPartitionKeyExtractor(table.schema()));
231234
}
232235
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,13 +136,11 @@ void testApplySchemaChange(String metastore)
136136
Arrays.asList(
137137
new DataField(0, "col1", DataTypes.STRING().notNull()),
138138
new DataField(1, "col2", DataTypes.INT()),
139-
new DataField(2, "col3", DataTypes.STRING())));
139+
new DataField(
140+
2, "col3", DataTypes.STRING(), null, "col3DefValue")));
140141
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
141142
.isEqualTo(tableSchema);
142143

143-
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).options())
144-
.containsEntry("fields.col3.default-value", "col3DefValue");
145-
146144
Map<String, String> nameMapping = new HashMap<>();
147145
nameMapping.put("col2", "newcol2");
148146
nameMapping.put("col3", "newcol3");
@@ -154,7 +152,8 @@ void testApplySchemaChange(String metastore)
154152
Arrays.asList(
155153
new DataField(0, "col1", DataTypes.STRING().notNull()),
156154
new DataField(1, "newcol2", DataTypes.INT()),
157-
new DataField(2, "newcol3", DataTypes.STRING())));
155+
new DataField(
156+
2, "newcol3", DataTypes.STRING(), null, "col3DefValue")));
158157
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
159158
.isEqualTo(tableSchema);
160159

@@ -168,7 +167,8 @@ void testApplySchemaChange(String metastore)
168167
Arrays.asList(
169168
new DataField(0, "col1", DataTypes.STRING().notNull()),
170169
new DataField(1, "newcol2", DataTypes.STRING()),
171-
new DataField(2, "newcol3", DataTypes.STRING())));
170+
new DataField(
171+
2, "newcol3", DataTypes.STRING(), null, "col3DefValue")));
172172
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
173173
.isEqualTo(tableSchema);
174174

@@ -181,7 +181,8 @@ void testApplySchemaChange(String metastore)
181181
new RowType(
182182
Arrays.asList(
183183
new DataField(0, "col1", DataTypes.STRING().notNull()),
184-
new DataField(2, "newcol3", DataTypes.STRING())));
184+
new DataField(
185+
2, "newcol3", DataTypes.STRING(), null, "col3DefValue")));
185186
Assertions.assertThat(catalog.getTable(Identifier.fromString("test.table1")).rowType())
186187
.isEqualTo(tableSchema);
187188

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ limitations under the License.
3535
<mysql.driver.version>8.0.27</mysql.driver.version>
3636
<!-- TODO: Update this, when StarRocks releases a 1.20 compatible connector. -->
3737
<starrocks.connector.version>1.2.10_flink-${flink-major-1.19}</starrocks.connector.version>
38-
<paimon.version>1.0.1</paimon.version>
38+
<paimon.version>1.2.0</paimon.version>
3939
<flink.release.download.skip>false</flink.release.download.skip>
4040
<flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name>
4141
<flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror>
@@ -682,4 +682,4 @@ limitations under the License.
682682
</plugin>
683683
</plugins>
684684
</build>
685-
</project>
685+
</project>

0 commit comments

Comments
 (0)