diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
index 2752298c258..13dd8161389 100644
--- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md
@@ -349,6 +349,13 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
Integer |
The max retry times that the connector should retry to build MySQL database server connection. |
+
+ | binlog.fail-on-reconnection-error |
+ optional |
+ false |
+ Boolean |
+ Whether to fail the job when binlog reader encounters reconnection errors. When enabled, the job will fail after exhausting all retry attempts configured by 'connect.max-retries' and 'connect.timeout'. When disabled (default), the job will keep retrying indefinitely. |
+
| connection.pool.size |
optional |
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
index 4f1c7089948..a5339875cb7 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java
@@ -6,6 +6,8 @@
package io.debezium.connector.mysql;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
@@ -1240,8 +1242,97 @@ public void execute(
e);
}
}
+ // Extract retry configuration from connectorConfig using standard approach
+ Configuration configuration = connectorConfig.getConfig();
+ boolean failOnReconnectionError =
+ configuration.getBoolean(
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key(),
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.defaultValue());
+ int maxRetries =
+ configuration.getInteger(
+ MySqlSourceOptions.CONNECT_MAX_RETRIES.key(),
+ MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue());
+ long timeoutMs =
+ configuration.getLong(
+ MySqlSourceOptions.CONNECT_TIMEOUT.key(),
+ MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue().toMillis());
+
+ int reconnectionAttempts = 0;
+ long reconnectionStartTime = 0;
+
while (context.isRunning()) {
- Thread.sleep(100);
+ // Check if client is connected
+ if (!client.isConnected()) {
+ LOGGER.warn("Binlog client disconnected. Attempting to reconnect...");
+
+ if (failOnReconnectionError && reconnectionStartTime == 0) {
+ // Start tracking reconnection attempts
+ reconnectionStartTime = clock.currentTimeInMillis();
+ reconnectionAttempts = 0;
+ }
+
+ try {
+ if (failOnReconnectionError) {
+ long elapsedTime = clock.currentTimeInMillis() - reconnectionStartTime;
+ if (reconnectionAttempts >= maxRetries || elapsedTime >= timeoutMs) {
+ throw new DebeziumException(
+ String.format(
+ "Failed to reconnect to MySQL binlog after %d attempts and %d ms. "
+ + "Maximum retries: %d, timeout: %d ms",
+ reconnectionAttempts,
+ elapsedTime,
+ maxRetries,
+ timeoutMs));
+ }
+ reconnectionAttempts++;
+ LOGGER.info(
+ "Reconnection attempt {} of {} (elapsed time: {} ms of {} ms)",
+ reconnectionAttempts,
+ maxRetries,
+ elapsedTime,
+ timeoutMs);
+ }
+
+ client.connect(connectorConfig.getConnectionTimeout().toMillis());
+ LOGGER.info("Successfully reconnected to MySQL binlog");
+
+ // Reset retry tracking on successful connection
+ if (failOnReconnectionError) {
+ reconnectionStartTime = 0;
+ reconnectionAttempts = 0;
+ }
+ } catch (Exception e) {
+ if (failOnReconnectionError) {
+ long elapsedTime = clock.currentTimeInMillis() - reconnectionStartTime;
+ if (reconnectionAttempts >= maxRetries || elapsedTime >= timeoutMs) {
+ if (e instanceof AuthenticationException) {
+ throw new DebeziumException(
+ "Authentication failure detected during reconnection to the MySQL database at "
+ + connectorConfig.hostname()
+ + ":"
+ + connectorConfig.port()
+ + " with user '"
+ + connectorConfig.username()
+ + "'",
+ e);
+ } else {
+ throw new DebeziumException(
+ String.format(
+ "Failed to reconnect to MySQL binlog after %d attempts and %d ms. "
+ + "Last error: %s",
+ reconnectionAttempts,
+ elapsedTime,
+ e.getMessage()),
+ e);
+ }
+ }
+ }
+ LOGGER.error("Reconnection failed: {}", e.getMessage());
+ Thread.sleep(1000);
+ }
+ } else {
+ Thread.sleep(100);
+ }
}
} finally {
try {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 260a7cd2b5d..43b0d914773 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -70,6 +70,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean parseOnLineSchemaChanges;
public static boolean useLegacyJsonFormat = true;
private final boolean assignUnboundedChunkFirst;
+ private final boolean binlogFailOnReconnectionError;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
@@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable {
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean,
boolean useLegacyJsonFormat,
- boolean assignUnboundedChunkFirst) {
+ boolean assignUnboundedChunkFirst,
+ boolean binlogFailOnReconnectionError) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
@@ -152,6 +154,7 @@ public class MySqlSourceConfig implements Serializable {
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
+ this.binlogFailOnReconnectionError = binlogFailOnReconnectionError;
}
public String getHostname() {
@@ -285,4 +288,8 @@ public boolean isSkipSnapshotBackfill() {
public boolean isTreatTinyInt1AsBoolean() {
return treatTinyInt1AsBoolean;
}
+
+ public boolean isBinlogFailOnReconnectionError() {
+ return binlogFailOnReconnectionError;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 427115edea7..9c7291f235f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -74,6 +74,8 @@ public class MySqlSourceConfigFactory implements Serializable {
private boolean treatTinyInt1AsBoolean = true;
private boolean useLegacyJsonFormat = true;
private boolean assignUnboundedChunkFirst = false;
+ private boolean binlogFailOnReconnectionError =
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.defaultValue();
public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
@@ -324,6 +326,17 @@ public MySqlSourceConfigFactory assignUnboundedChunkFirst(boolean assignUnbounde
return this;
}
+ /**
+ * Whether to fail the job when binlog reader encounters reconnection errors. When enabled, the
+ * job will fail after exhausting all retry attempts. When disabled (default), the job will keep
+ * retrying indefinitely.
+ */
+ public MySqlSourceConfigFactory binlogFailOnReconnectionError(
+ boolean binlogFailOnReconnectionError) {
+ this.binlogFailOnReconnectionError = binlogFailOnReconnectionError;
+ return this;
+ }
+
/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
public MySqlSourceConfig createConfig(int subtaskId) {
// hard code server name, because we don't need to distinguish it, docs:
@@ -392,6 +405,13 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
jdbcProperties = new Properties();
}
+ // Add the new configs to debezium properties only if not already provided by user
+ if (!props.containsKey("binlog.fail-on-reconnection-error")) {
+ props.setProperty(
+ "binlog.fail-on-reconnection-error",
+ String.valueOf(binlogFailOnReconnectionError));
+ }
+
return new MySqlSourceConfig(
hostname,
port,
@@ -421,6 +441,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean,
useLegacyJsonFormat,
- assignUnboundedChunkFirst);
+ assignUnboundedChunkFirst,
+ binlogFailOnReconnectionError);
}
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
index a8e143f5fc5..917c830969f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
@@ -292,4 +292,13 @@ public class MySqlSourceOptions {
.defaultValue(true)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.");
+
+ public static final ConfigOption BINLOG_FAIL_ON_RECONNECTION_ERROR =
+ ConfigOptions.key("binlog.fail-on-reconnection-error")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to fail the job when binlog reader encounters reconnection errors. "
+ + "When enabled, the job will fail after exhausting all retry attempts. "
+ + "When disabled (default), the job will keep retrying indefinitely.");
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlReconnectionITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlReconnectionITCase.java
new file mode 100644
index 00000000000..4f144e1f13c
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlReconnectionITCase.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.debezium.StringDebeziumDeserializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration test for MySQL reconnection logic in {@link MySqlSource}. */
+class MySqlReconnectionITCase extends MySqlSourceTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MySqlReconnectionITCase.class);
+
+ @Test
+ @Timeout(value = 120, unit = TimeUnit.SECONDS)
+ void testBinlogReconnectionWithNetworkInterruption() throws Exception {
+ UniqueDatabase database =
+ new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
+ database.createAndInitialize();
+
+ // Create table for testing
+ try (Connection conn = database.getJdbcConnection();
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "CREATE TABLE network_table ("
+ + "id INT PRIMARY KEY, "
+ + "data VARCHAR(255)"
+ + ")");
+ stmt.execute("INSERT INTO network_table VALUES (1, 'Initial data')");
+ }
+
+ // Create Debezium properties to test reconnection failure
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty(
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key(), "true");
+
+ // Create source with reconnection settings
+ MySqlSource mySqlSource =
+ MySqlSource.builder()
+ .hostname(MYSQL_CONTAINER.getHost())
+ .port(MYSQL_CONTAINER.getDatabasePort())
+ .databaseList(database.getDatabaseName())
+ .tableList(database.getDatabaseName() + ".network_table")
+ .username(database.getUsername())
+ .password(database.getPassword())
+ .serverId("5420-5424")
+ .serverTimeZone("UTC")
+ .startupOptions(StartupOptions.initial())
+ .deserializer(new StringDebeziumDeserializationSchema())
+ .connectTimeout(Duration.ofSeconds(10))
+ .connectMaxRetries(3)
+ .debeziumProperties(debeziumProps)
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(200);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
+ env.setParallelism(1);
+
+ DataStreamSource source =
+ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
+
+ CloseableIterator iterator = source.executeAndCollect();
+ List records = new ArrayList<>();
+
+ // Consume initial snapshot
+ if (iterator.hasNext()) {
+ String record = iterator.next();
+ records.add(record);
+ LOG.info("Received initial record: {}", record);
+ }
+
+ // Insert data during binlog phase
+ try (Connection conn = database.getJdbcConnection();
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("INSERT INTO network_table VALUES (2, 'Binlog data')");
+ }
+
+ // Wait for binlog event and simulate interruption
+ Thread.sleep(1000);
+
+ // Simulate network interruption
+ LOG.info("Simulating network interruption...");
+ MYSQL_CONTAINER.stop();
+ Thread.sleep(2000);
+ MYSQL_CONTAINER.start();
+ Thread.sleep(3000);
+
+ // Try to continue consuming - should fail due to binlogFailOnReconnectionError=true
+ boolean exceptionThrown = false;
+ try {
+ long startTime = System.currentTimeMillis();
+ while (records.size() < 2 && (System.currentTimeMillis() - startTime) < 30000) {
+ if (iterator.hasNext()) {
+ String record = iterator.next();
+ records.add(record);
+ LOG.info("Received post-reconnection record: {}", record);
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ } catch (Exception e) {
+ LOG.info(
+ "Expected exception due to binlogFailOnReconnectionError=true: {}",
+ e.getMessage());
+ exceptionThrown = true;
+ }
+
+ // Verify the job failed due to network interruption with binlogFailOnReconnectionError=true
+ assertThat(exceptionThrown).isTrue();
+ iterator.close();
+ }
+
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testReconnectionWithCustomTimeoutAndRetries() throws Exception {
+ UniqueDatabase database =
+ new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
+ database.createAndInitialize();
+
+ // Create table for testing
+ try (Connection conn = database.getJdbcConnection();
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "CREATE TABLE products ("
+ + "id INT PRIMARY KEY, "
+ + "name VARCHAR(255), "
+ + "description VARCHAR(512)"
+ + ")");
+ stmt.execute("INSERT INTO products VALUES (1, 'Product 1', 'Initial product')");
+ }
+
+ // Create Debezium properties to test reconnection failure after retries
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty(
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key(), "true");
+
+ // Create source with custom reconnection settings
+ MySqlSource mySqlSource =
+ MySqlSource.builder()
+ .hostname(MYSQL_CONTAINER.getHost())
+ .port(MYSQL_CONTAINER.getDatabasePort())
+ .databaseList(database.getDatabaseName())
+ .tableList(database.getDatabaseName() + ".products")
+ .username(database.getUsername())
+ .password(database.getPassword())
+ .serverId("5400-5404")
+ .serverTimeZone("UTC")
+ .startupOptions(StartupOptions.initial())
+ .deserializer(new StringDebeziumDeserializationSchema())
+ // Test custom reconnection settings
+ .connectTimeout(Duration.ofSeconds(5))
+ .connectMaxRetries(2)
+ .debeziumProperties(debeziumProps)
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(200);
+ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000));
+ env.setParallelism(1);
+
+ DataStreamSource source =
+ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
+
+ CloseableIterator iterator = source.executeAndCollect();
+ List records = new ArrayList<>();
+
+ // Consume initial snapshot
+ for (int i = 0; i < 1 && iterator.hasNext(); i++) {
+ String record = iterator.next();
+ records.add(record);
+ LOG.info("Received record: {}", record);
+ }
+
+ assertThat(records).hasSize(1);
+
+ // Insert data while source is running
+ try (Connection conn = database.getJdbcConnection();
+ Statement stmt = conn.createStatement()) {
+ stmt.execute("INSERT INTO products VALUES (2, 'Product 2', 'Test reconnection')");
+ }
+
+ // Simulate network interruption by briefly stopping container
+ LOG.info("Stopping MySQL container to simulate network interruption...");
+ MYSQL_CONTAINER.stop();
+
+ // Wait a moment to ensure disconnection
+ Thread.sleep(2000);
+
+ LOG.info("Restarting MySQL container...");
+ MYSQL_CONTAINER.start();
+
+ // Wait for container to be fully ready
+ Thread.sleep(3000);
+
+ // Try to continue consuming - should fail due to binlogFailOnReconnectionError=true
+ boolean exceptionThrown = false;
+ try {
+ long startTime = System.currentTimeMillis();
+ while (iterator.hasNext() && (System.currentTimeMillis() - startTime) < 20000) {
+ String record = iterator.next();
+ records.add(record);
+ LOG.info("Received post-reconnection record: {}", record);
+ }
+ } catch (Exception e) {
+ LOG.info(
+ "Expected exception due to binlogFailOnReconnectionError=true: {}",
+ e.getMessage());
+ exceptionThrown = true;
+ }
+
+ // Verify the job failed due to reconnection error (binlogFailOnReconnectionError=true)
+ assertThat(exceptionThrown).isTrue();
+
+ iterator.close();
+ }
+
+ @Test
+ @Timeout(value = 60, unit = TimeUnit.SECONDS)
+ void testReconnectionFailureWhenMaxRetriesExceeded() throws Exception {
+ UniqueDatabase database =
+ new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
+ database.createAndInitialize();
+
+ // Create table for testing
+ try (Connection conn = database.getJdbcConnection();
+ Statement stmt = conn.createStatement()) {
+ stmt.execute(
+ "CREATE TABLE test_table ("
+ + "id INT PRIMARY KEY, "
+ + "value VARCHAR(255)"
+ + ")");
+ stmt.execute("INSERT INTO test_table VALUES (1, 'Initial value')");
+ }
+
+ // Create Debezium properties to fail after max retries
+ Properties debeziumProps = new Properties();
+ debeziumProps.setProperty(
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key(), "true");
+
+ // Create source with very short timeout and low retries to force failure
+ MySqlSource mySqlSource =
+ MySqlSource.builder()
+ .hostname(MYSQL_CONTAINER.getHost())
+ .port(MYSQL_CONTAINER.getDatabasePort())
+ .databaseList(database.getDatabaseName())
+ .tableList(database.getDatabaseName() + ".test_table")
+ .username(database.getUsername())
+ .password(database.getPassword())
+ .serverId("5405-5409")
+ .serverTimeZone("UTC")
+ .startupOptions(StartupOptions.initial())
+ .deserializer(new StringDebeziumDeserializationSchema())
+ // Very restrictive settings to trigger failure
+ .connectTimeout(Duration.ofSeconds(1))
+ .connectMaxRetries(1)
+ .debeziumProperties(debeziumProps)
+ .build();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(1);
+
+ DataStreamSource source =
+ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
+
+ CloseableIterator iterator = source.executeAndCollect();
+
+ // Consume initial record
+ if (iterator.hasNext()) {
+ String record = iterator.next();
+ LOG.info("Received initial record: {}", record);
+ }
+
+ // Stop container for extended period to exceed retry limits
+ LOG.info("Stopping MySQL container for extended period...");
+ MYSQL_CONTAINER.stop();
+
+ // The source should eventually fail due to reconnection timeout
+ // We expect an exception to be thrown when reconnection fails
+ boolean exceptionThrown = false;
+ try {
+ // Try to continue reading - this should fail
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime < 15000) {
+ if (iterator.hasNext()) {
+ iterator.next();
+ }
+ Thread.sleep(100);
+ }
+ } catch (Exception e) {
+ LOG.info("Expected exception during reconnection failure: {}", e.getMessage());
+ exceptionThrown = true;
+ }
+
+ // Restart container for cleanup
+ MYSQL_CONTAINER.start();
+
+ iterator.close();
+
+ // The test passes if we handled the reconnection failure appropriately
+ // Either by throwing an exception or by gracefully handling the disconnection
+ LOG.info("Reconnection failure test completed, exception thrown: {}", exceptionThrown);
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlReconnectionConfigTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlReconnectionConfigTest.java
new file mode 100644
index 00000000000..6f206504554
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlReconnectionConfigTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.config;
+
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for MySQL reconnection configuration in {@link MySqlSourceConfig}. */
+class MySqlReconnectionConfigTest {
+
+ @Test
+ void testReconnectionConfigurationExtraction() {
+ // Create MySqlSourceConfig with specific reconnection values
+ MySqlSourceConfig config =
+ new MySqlSourceConfigFactory()
+ .hostname("localhost")
+ .port(3306)
+ .username("root")
+ .password("password")
+ .databaseList("test_db")
+ .tableList("test_db.products")
+ .startupOptions(StartupOptions.initial())
+ .connectTimeout(Duration.ofSeconds(15))
+ .connectMaxRetries(5)
+ .createConfig(0);
+
+ // Verify the configuration values are correctly set in the MySqlSourceConfig
+ assertThat(config.getConnectTimeout()).isEqualTo(Duration.ofSeconds(15));
+ assertThat(config.getConnectMaxRetries()).isEqualTo(5);
+ assertThat(config.isBinlogFailOnReconnectionError()).isFalse();
+
+ // Verify the configuration is properly propagated to Debezium configuration
+ Configuration debeziumConfig = config.getDbzConfiguration();
+
+ // Check that Debezium gets the correct timeout value in milliseconds
+ assertThat(debeziumConfig.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS))
+ .isEqualTo(15000L);
+ assertThat(
+ debeziumConfig.getBoolean(
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key()))
+ .isFalse();
+ }
+
+ @Test
+ void testDefaultReconnectionConfiguration() {
+ // Create MySqlSourceConfig with default values
+ MySqlSourceConfig config =
+ new MySqlSourceConfigFactory()
+ .hostname("localhost")
+ .port(3306)
+ .username("root")
+ .password("password")
+ .databaseList("test_db")
+ .tableList("test_db.products")
+ .startupOptions(StartupOptions.initial())
+ .createConfig(0);
+
+ // Verify default values are applied
+ assertThat(config.getConnectTimeout())
+ .isEqualTo(MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue());
+ assertThat(config.getConnectMaxRetries())
+ .isEqualTo(MySqlSourceOptions.CONNECT_MAX_RETRIES.defaultValue());
+ assertThat(config.isBinlogFailOnReconnectionError())
+ .isEqualTo(MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.defaultValue());
+
+ Configuration debeziumConfig = config.getDbzConfiguration();
+
+ // Verify defaults are propagated to Debezium
+ assertThat(debeziumConfig.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS))
+ .isEqualTo(MySqlSourceOptions.CONNECT_TIMEOUT.defaultValue().toMillis());
+ assertThat(
+ debeziumConfig.getBoolean(
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key()))
+ .isEqualTo(MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.defaultValue());
+ }
+
+ @Test
+ void testCustomReconnectionTimeoutRange() {
+ // Test minimum timeout
+ MySqlSourceConfig minConfig =
+ new MySqlSourceConfigFactory()
+ .hostname("localhost")
+ .port(3306)
+ .username("root")
+ .password("password")
+ .databaseList("test_db")
+ .tableList("test_db.products")
+ .startupOptions(StartupOptions.initial())
+ .connectTimeout(Duration.ofMillis(250)) // Minimum allowed by MySQL
+ .createConfig(0);
+
+ assertThat(minConfig.getConnectTimeout()).isEqualTo(Duration.ofMillis(250));
+
+ // Test large timeout
+ MySqlSourceConfig maxConfig =
+ new MySqlSourceConfigFactory()
+ .hostname("localhost")
+ .port(3306)
+ .username("root")
+ .password("password")
+ .databaseList("test_db")
+ .tableList("test_db.products")
+ .startupOptions(StartupOptions.initial())
+ .connectTimeout(Duration.ofMinutes(5))
+ .createConfig(0);
+
+ assertThat(maxConfig.getConnectTimeout()).isEqualTo(Duration.ofMinutes(5));
+ }
+
+ @Test
+ void testReconnectionConfigurationConsistency() {
+ Duration customTimeout = Duration.ofSeconds(20);
+ int customRetries = 7;
+ boolean customFailOnError = true;
+
+ MySqlSourceConfig config =
+ new MySqlSourceConfigFactory()
+ .hostname("localhost")
+ .port(3306)
+ .username("root")
+ .password("password")
+ .databaseList("test_db")
+ .tableList("test_db.products")
+ .startupOptions(StartupOptions.initial())
+ .connectTimeout(customTimeout)
+ .connectMaxRetries(customRetries)
+ .binlogFailOnReconnectionError(customFailOnError)
+ .createConfig(0);
+
+ // Ensure all custom values are consistently applied
+ assertThat(config.getConnectTimeout()).isEqualTo(customTimeout);
+ assertThat(config.getConnectMaxRetries()).isEqualTo(customRetries);
+ assertThat(config.isBinlogFailOnReconnectionError()).isEqualTo(customFailOnError);
+
+ // Verify consistency between MySqlSourceConfig and Debezium configuration
+ Configuration debeziumConfig = config.getDbzConfiguration();
+
+ assertThat(debeziumConfig.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS))
+ .isEqualTo(customTimeout.toMillis());
+ assertThat(
+ debeziumConfig.getBoolean(
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key()))
+ .isEqualTo(customFailOnError);
+ }
+
+ @Test
+ void testReconnectionConfigurationFromProperties() {
+ Properties props = new Properties();
+ props.setProperty(MySqlSourceOptions.CONNECT_TIMEOUT.key(), "PT25S");
+ props.setProperty(MySqlSourceOptions.CONNECT_MAX_RETRIES.key(), "10");
+ props.setProperty(MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key(), "true");
+
+ MySqlSourceConfig config =
+ new MySqlSourceConfigFactory()
+ .hostname("localhost")
+ .port(3306)
+ .username("root")
+ .password("password")
+ .databaseList("test_db")
+ .tableList("test_db.products")
+ .startupOptions(StartupOptions.initial())
+ .debeziumProperties(props)
+ .createConfig(0);
+
+ Configuration debeziumConfig = config.getDbzConfiguration();
+
+ // Verify properties are correctly read and applied
+ assertThat(debeziumConfig.getString(MySqlSourceOptions.CONNECT_TIMEOUT.key()))
+ .isEqualTo("PT25S");
+ assertThat(debeziumConfig.getInteger(MySqlSourceOptions.CONNECT_MAX_RETRIES.key()))
+ .isEqualTo(10);
+ assertThat(
+ debeziumConfig.getBoolean(
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key()))
+ .isTrue();
+ }
+
+ @Test
+ void testBinlogFailOnReconnectionErrorFalseAllowsInfiniteRetries() {
+ MySqlSourceConfig config =
+ new MySqlSourceConfigFactory()
+ .hostname("localhost")
+ .port(3306)
+ .username("root")
+ .password("password")
+ .databaseList("test_db")
+ .tableList("test_db.products")
+ .startupOptions(StartupOptions.initial())
+ .connectMaxRetries(3) // Should be ignored when failOnError is false
+ .binlogFailOnReconnectionError(false) // Allow infinite retries
+ .createConfig(0);
+
+ // When binlogFailOnReconnectionError is false, it should allow infinite retries
+ assertThat(config.isBinlogFailOnReconnectionError()).isFalse();
+ assertThat(config.getConnectMaxRetries()).isEqualTo(3);
+
+ Configuration debeziumConfig = config.getDbzConfiguration();
+ assertThat(
+ debeziumConfig.getBoolean(
+ MySqlSourceOptions.BINLOG_FAIL_ON_RECONNECTION_ERROR.key()))
+ .isFalse();
+ }
+}