-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[FLINK-36165][source-connector/mysql] Support capturing snapshot data with conditions #4104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… with conditions [FLINK-36165][source-connector/mysql] add docs [FLINK-36165][source-connector/mysql] Implement snapshot filter for MySQL table source [FLINK-36165][source-connector/mysql] Escape dot [FLINK-36165 ] fixed supported escape like 'city != 'China:beijing''
d0738af
to
c3005b6
Compare
fixed checkstyle fixed test fixed MySqlTableSourceFactoryTest test error.
<tr> | ||
<td>scan.snapshot.filters</td> | ||
<td>optional</td> | ||
<td style="word-wrap: break-word;">(none)</td> | ||
<td>String</td> | ||
<td>When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). <br> | ||
By default, no filter is applied, meaning the entire table will be synchronized. <br> | ||
A colon (:) separates table name and filter expression, while a semicolon (;) separate multiple filters, | ||
e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id < 0;`. | ||
</td> | ||
</tr> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about explicitly stating that the filter conditions are combined using AND
and that it has nothing to do with the binlog step?
Map<Selectors, String> snapshotFilters = toSelector(filters); | ||
|
||
String filter = null; | ||
for (Selectors selector : snapshotFilters.keySet()) { | ||
if (selector.isMatch( | ||
org.apache.flink.cdc.common.event.TableId.tableId( | ||
tableId.catalog(), tableId.table()))) { | ||
filter = snapshotFilters.get(selector); | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we fail fast on unknown/nonexiststent columns in filters?
public static Long queryRowCnt( | ||
JdbcConnection jdbc, TableId tableId, String columnName, @Nullable String filter) | ||
throws SQLException { | ||
|
||
if (filter == null) { | ||
return queryApproximateRowCnt(jdbc, tableId); | ||
} | ||
|
||
final String cntQuery = | ||
String.format( | ||
"SELECT COUNT(%s) FROM %s WHERE %s", | ||
quote(columnName), quote(tableId), filter); | ||
return jdbc.queryAndMap( | ||
cntQuery, | ||
rs -> { | ||
if (!rs.next()) { | ||
// this should never happen | ||
throw new SQLException( | ||
String.format( | ||
"No result returned after running query [%s]", cntQuery)); | ||
} | ||
return rs.getLong(1); | ||
}); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the filter is applied during split planning, the distribution factor may fall outside the configured bounds(0.05 ~1,000), affecting snapshot performance. I’d appreciate your thoughts on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I've left a few minor comments.
It seems the author of #3776 is actively working on their PR, and there's some duplicated work... It would be nice if we can discuss & implement this nice feature at one place. |
issue link: https://issues.apache.org/jira/browse/FLINK-36165
fixed #3776