Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,13 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
<td>Integer</td>
<td>The max retry times that the connector should retry to build MySQL database server connection.</td>
</tr>
<tr>
<td>binlog.fail-on-reconnection-error</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to fail the job when binlog reader encounters reconnection errors. When enabled, the job will fail after exhausting all retry attempts configured by 'connect.max-retries' and 'connect.timeout'. When disabled (default), the job will keep retrying indefinitely.</td>
</tr>
<tr>
<td>connection.pool.size</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package io.debezium.connector.mysql;

import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
Expand Down Expand Up @@ -1240,8 +1242,97 @@ public void execute(
e);
}
}
// Extract retry configuration from connectorConfig using standard approach
Configuration configuration = connectorConfig.getConfig();
boolean failOnReconnectionError =
configuration.getBoolean(
MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key(),
MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.defaultValue());
int maxRetries =
configuration.getInteger(
MySqlSourceOptions.CONNECT_MAX_RETRIES.key(),
MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue());
long timeoutMs =
configuration.getLong(
MySqlSourceOptions.CONNECT_TIMEOUT.key(),
MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue().toMillis());

int reconnectionAttempts = 0;
long reconnectionStartTime = 0;

while (context.isRunning()) {
Thread.sleep(100);
// Check if client is connected
if (!client.isConnected()) {
LOGGER.warn("Binlog client disconnected. Attempting to reconnect...");

if (failOnReconnectionError && reconnectionStartTime == 0) {
// Start tracking reconnection attempts
reconnectionStartTime = clock.currentTimeInMillis();
reconnectionAttempts = 0;
}

try {
if (failOnReconnectionError) {
long elapsedTime = clock.currentTimeInMillis() - reconnectionStartTime;
if (reconnectionAttempts >= maxRetries || elapsedTime >= timeoutMs) {
throw new DebeziumException(
String.format(
"Failed to reconnect to MySQL binlog after %d attempts and %d ms. "
+ "Maximum retries: %d, timeout: %d ms",
reconnectionAttempts,
elapsedTime,
maxRetries,
timeoutMs));
}
reconnectionAttempts++;
LOGGER.info(
"Reconnection attempt {} of {} (elapsed time: {} ms of {} ms)",
reconnectionAttempts,
maxRetries,
elapsedTime,
timeoutMs);
}

client.connect(connectorConfig.getConnectionTimeout().toMillis());
LOGGER.info("Successfully reconnected to MySQL binlog");

// Reset retry tracking on successful connection
if (failOnReconnectionError) {
reconnectionStartTime = 0;
reconnectionAttempts = 0;
}
} catch (Exception e) {
if (failOnReconnectionError) {
long elapsedTime = clock.currentTimeInMillis() - reconnectionStartTime;
if (reconnectionAttempts >= maxRetries || elapsedTime >= timeoutMs) {
if (e instanceof AuthenticationException) {
throw new DebeziumException(
"Authentication failure detected during reconnection to the MySQL database at "
+ connectorConfig.hostname()
+ ":"
+ connectorConfig.port()
+ " with user '"
+ connectorConfig.username()
+ "'",
e);
} else {
throw new DebeziumException(
String.format(
"Failed to reconnect to MySQL binlog after %d attempts and %d ms. "
+ "Last error: %s",
reconnectionAttempts,
elapsedTime,
e.getMessage()),
e);
}
}
}
LOGGER.error("Reconnection failed: {}", e.getMessage());
Thread.sleep(1000);
}
} else {
Thread.sleep(100);
}
}
} finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean parseOnLineSchemaChanges;
public static boolean useLegacyJsonFormat = true;
private final boolean assignUnboundedChunkFirst;
private final boolean binlogFailOnReconnectionError;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand Down Expand Up @@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable {
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean,
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst) {
boolean assignUnboundedChunkFirst,
boolean binlogFailOnReconnectionError) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
Expand Down Expand Up @@ -152,6 +154,7 @@ public class MySqlSourceConfig implements Serializable {
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.binlogFailOnReconnectionError = binlogFailOnReconnectionError;
}

public String getHostname() {
Expand Down Expand Up @@ -285,4 +288,8 @@ public boolean isSkipSnapshotBackfill() {
public boolean isTreatTinyInt1AsBoolean() {
return treatTinyInt1AsBoolean;
}

public boolean isBinlogFailOnReconnectionError() {
return binlogFailOnReconnectionError;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public class MySqlSourceConfigFactory implements Serializable {
private boolean treatTinyInt1AsBoolean = true;
private boolean useLegacyJsonFormat = true;
private boolean assignUnboundedChunkFirst = false;
private boolean binlogFailOnReconnectionError =
MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.defaultValue();

public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
Expand Down Expand Up @@ -324,6 +326,17 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde
return this;
}

/**
* Whether to fail the job when binlog reader encounters reconnection errors. When enabled, the
* job will fail after exhausting all retry attempts. When disabled (default), the job will keep
* retrying indefinitely.
*/
public MySqlSourceConfigFactory binlogFailOnReconnectionError(
boolean binlogFailOnReconnectionError) {
this.binlogFailOnReconnectionError = binlogFailOnReconnectionError;
return this;
}

/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
public MySqlSourceConfig createConfig(int subtaskId) {
// hard code server name, because we don't need to distinguish it, docs:
Expand Down Expand Up @@ -392,6 +405,13 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
jdbcProperties = new Properties();
}

// Add the new configs to debezium properties only if not already provided by user
if (!props.containsKey("binlog.fail-on-reconnection-error")) {
props.setProperty(
"binlog.fail-on-reconnection-error",
String.valueOf(binlogFailOnReconnectionError));
}

return new MySqlSourceConfig(
hostname,
port,
Expand Down Expand Up @@ -421,6 +441,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean,
useLegacyJsonFormat,
assignUnboundedChunkFirst);
assignUnboundedChunkFirst,
binlogFailOnReconnectionError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,4 +292,13 @@ public class MySqlSourceOptions {
.defaultValue(true)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.");

public static final ConfigOption<Boolean> BINLOG_FAIL_ON_RECONNECTION_ERROR =
ConfigOptions.key("binlog.fail-on-reconnection-error")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to fail the job when binlog reader encounters reconnection errors. "
+ "When enabled, the job will fail after exhausting all retry attempts. "
+ "When disabled (default), the job will keep retrying indefinitely.");
}
Loading