Skip to content

Commit 48b45a4

Browse files
committed
fix
1 parent 6443063 commit 48b45a4

File tree

2 files changed

+46
-17
lines changed

2 files changed

+46
-17
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/io/debezium/relational/ddl/DdlChanges.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@
1010
import java.util.List;
1111
import java.util.Set;
1212
import java.util.function.Predicate;
13+
import java.util.regex.Pattern;
1314

1415
import io.debezium.annotation.NotThreadSafe;
1516
import io.debezium.relational.RelationalTableFilters;
1617
import io.debezium.relational.TableId;
1718
import io.debezium.relational.Tables.TableFilter;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
1821

1922
/**
2023
* Copied from Debezium project.
@@ -25,9 +28,11 @@
2528
@NotThreadSafe
2629
public class DdlChanges implements DdlParserListener {
2730

31+
private final static Logger LOGGER = LoggerFactory.getLogger(DdlChanges.class);
2832
private final String terminator;
2933
private final List<Event> events = new ArrayList<>();
3034
private final Set<String> databaseNames = new HashSet<>();
35+
private static final Pattern GHOST_TABLE_PATTERN = Pattern.compile("^_(.*)_(del|gho|ghc)$");
3136

3237
/**
3338
* Create a new changes object with ';' as the terminator token.
@@ -253,8 +258,31 @@ public boolean anyMatch(RelationalTableFilters filters) {
253258
|| databaseFilter.test(((SetVariableEvent) event).databaseName().get())));
254259
}
255260

256-
public boolean anyMatch(Predicate<Event> predicate) {
257-
return events.stream().anyMatch(predicate);
261+
public boolean anyMatch(RelationalTableFilters filters,boolean parserOnlineDDL) {
262+
Predicate<String> databaseFilter = filters.databaseFilter();
263+
TableFilter tableFilter = filters.dataCollectionFilter();
264+
return events.stream().anyMatch(event -> {
265+
if (event instanceof DatabaseEvent) {
266+
DatabaseEvent dbEvent = (DatabaseEvent) event;
267+
return databaseFilter.test(dbEvent.databaseName());
268+
}
269+
if (event instanceof TableEvent) {
270+
TableEvent tableEvent = (TableEvent) event;
271+
TableId tableId = tableEvent.tableId();
272+
boolean isIncludedByFilter = tableFilter.isIncluded(tableId);
273+
boolean isGhostTable = parserOnlineDDL
274+
&& tableId != null
275+
&& GHOST_TABLE_PATTERN.matcher(tableId.table()).matches();
276+
LOGGER.info("isIncludedByFilter:{},isGhostTable:{},result:{}", isIncludedByFilter, isGhostTable,isIncludedByFilter || isGhostTable);
277+
return isIncludedByFilter || isGhostTable;
278+
}
279+
if (event instanceof SetVariableEvent) {
280+
SetVariableEvent varEvent = (SetVariableEvent) event;
281+
return !varEvent.databaseName().isPresent()
282+
|| (varEvent.databaseName().isPresent()
283+
&& databaseFilter.test(varEvent.databaseName().get()));
284+
}
285+
return false;
286+
});
258287
}
259-
260288
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,9 @@ public MySqlDatabaseSchema(MySqlConnectorConfig connectorConfig, MySqlValueConve
112112
valueConverter,
113113
getTableFilter());
114114
this.ddlChanges = this.ddlParser.getDdlChanges();
115+
this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
115116
this.connectorConfig = connectorConfig;
116117
filters = connectorConfig.getTableFilters();
117-
this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
118118
}
119119

120120
/**
@@ -158,11 +158,13 @@ private boolean isGhostTable(TableId tableId) {
158158
* table to schema change history.
159159
*/
160160
private boolean isTableIncluded(String MethodName,TableId tableId) {
161-
// 先应用原有过滤规则
162161
boolean isIncludedByOriginalFilter = filters.dataCollectionFilter().isIncluded(tableId);
162+
LOGGER.info("MethodName: {},Including isIncludedByOriginalFilter table that would otherwise be filtered: {}," +
163+
"isIncludedByOriginalFilter:{}",
164+
MethodName, tableId,isIncludedByOriginalFilter);
163165
if (parseOnLineSchemaChanges) {
164166
if (!isIncludedByOriginalFilter && isGhostTable(tableId)) {
165-
LOGGER.info("MethodName{},Including gh-ost table that would otherwise be filtered: {}",MethodName, tableId);
167+
LOGGER.info("MethodName: {},Including gh-ost table that would otherwise be filtered: {}",MethodName, tableId);
166168
return true;
167169
}
168170
}
@@ -226,7 +228,10 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
226228
// - or DDLs for monitored objects
227229
if (!databaseHistory.storeOnlyCapturedTables() || isGlobalSetVariableStatement(schemaChange.getDdl(), schemaChange.getDatabase())
228230
|| schemaChange.getTables().stream().map(Table::id).anyMatch(x->isTableIncluded("applySchemaChange",x))) {
229-
LOGGER.info("Recorded DDL statements for database '{}': {}", schemaChange.getDatabase(), schemaChange.getDdl());
231+
boolean applySchemaChange = schemaChange.getTables().stream().map(Table::id).anyMatch(x -> isTableIncluded("applySchemaChange", x));
232+
LOGGER.info("Recorded DDL statements for database '{}': {},{},{},{}", schemaChange.getDatabase(),
233+
schemaChange.getDdl()
234+
,applySchemaChange,!databaseHistory.storeOnlyCapturedTables(),isGlobalSetVariableStatement(schemaChange.getDdl(), schemaChange.getDatabase()));
230235
record(schemaChange, schemaChange.getTableChanges());
231236
}
232237
}
@@ -239,7 +244,7 @@ public List<SchemaChangeEvent> parseSnapshotDdl(MySqlPartition partition, String
239244

240245
public List<SchemaChangeEvent> parseStreamingDdl(MySqlPartition partition, String ddlStatements, String databaseName,
241246
MySqlOffsetContext offset, Instant sourceTime) {
242-
LOGGER.debug("Processing streaming DDL '{}' for database '{}'", ddlStatements, databaseName);
247+
LOGGER.info("Processing streaming DDL '{}' for database '{}'", ddlStatements, databaseName);
243248
return parseDdl(partition, ddlStatements, databaseName, offset, sourceTime, false);
244249
}
245250

@@ -265,10 +270,7 @@ private List<SchemaChangeEvent> parseDdl(MySqlPartition partition, String ddlSta
265270
}
266271
}
267272
// No need to send schema events or store DDL if no table has changed
268-
if (!databaseHistory.storeOnlyCapturedTables() || isGlobalSetVariableStatement(ddlStatements, databaseName) || ddlChanges.anyMatch(event -> {
269-
TableId tableId = this.getTableId(event);
270-
return tableId != null && isTableIncluded("parseDdl",tableId);
271-
})) {
273+
if (!databaseHistory.storeOnlyCapturedTables() || isGlobalSetVariableStatement(ddlStatements, databaseName) || ddlChanges.anyMatch(filters,parseOnLineSchemaChanges)){
272274

273275
// We are supposed to _also_ record the schema changes as SourceRecords, but these need to be filtered
274276
// by database. Unfortunately, the databaseName on the event might not be the same database as that
@@ -407,13 +409,12 @@ public boolean storeOnlyCapturedTables() {
407409
* connector's configuration
408410
*/
409411
public boolean assignTableNumber(long tableNumber, TableId id) {
410-
412+
if (!isTableIncluded("assignTableNumber",id)) {
413+
excludeTableIdsByTableNumber.put(tableNumber, id);
414+
return false;
415+
}
411416
final TableSchema tableSchema = schemaFor(id);
412417
if (tableSchema == null) {
413-
if (isTableIncluded("assignTableNumber",id)) {
414-
tableIdsByTableNumber.put(tableNumber, id);
415-
return true;
416-
}
417418
excludeTableIdsByTableNumber.put(tableNumber, id);
418419
return false;
419420
}

0 commit comments

Comments
 (0)