From 9de8a7676515836155c8060e76aa1d8047efe2d0 Mon Sep 17 00:00:00 2001 From: Nick Del Nano Date: Fri, 12 Dec 2025 09:18:57 -0500 Subject: [PATCH] Add mysql table operation metrics --- .../connectors/flink-sources/mysql-cdc.md | 8 +++ .../metrics/MySqlSourceReaderMetrics.java | 53 +++++++++++++++++++ .../source/reader/MySqlRecordEmitter.java | 32 +++++++++-- 3 files changed, 88 insertions(+), 5 deletions(-) diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 2752298c258..ec1e8df41d1 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -941,6 +941,14 @@ Notice: 1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name. 2. For MySQL, the `namespace` will be set to the default value "", and the group name will be like `test_database.test_table`. +The mysql-cdc connector offers six additional metrics for each type of data change record. +- `numRecordsOutByDataChangeRecordInsert`: The number of `INSERT` data change records. +- `numRecordsOutByDataChangeRecordUpdate`: The number of `UPDATE` data change records. +- `numRecordsOutByDataChangeRecordDelete`: The number of `DELETE` data change records. +- `numRecordsOutByRateDataChangeRecordInsert`: The number of `INSERT` data change records per second. +- `numRecordsOutByRateDataChangeRecordUpdate`: The number of `UPDATE` data change records per second. +- `numRecordsOutByRateDataChangeRecordDelete`: The number of `DELETE` data change records per second. + Data Type Mapping ---------------- diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java index 76ef2ed66f4..83b32657d06 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java @@ -17,15 +17,33 @@ package org.apache.flink.cdc.connectors.mysql.source.metrics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.event.OperationType; import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricNames; +import io.debezium.relational.TableId; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** A collection class for handling metrics in {@link MySqlSourceReader}. */ public class MySqlSourceReaderMetrics { public static final long UNDEFINED = -1; + private static final Map DATA_CHANGE_RECORD_MAP = + new ConcurrentHashMap() { + { + put(OperationType.INSERT, "DataChangeRecordInsert"); + put(OperationType.UPDATE, "DataChangeRecordUpdate"); + put(OperationType.DELETE, "DataChangeRecordDelete"); + } + }; private final MetricGroup metricGroup; @@ -35,6 +53,11 @@ public class MySqlSourceReaderMetrics { */ private volatile long fetchDelay = UNDEFINED; + private final Map, Counter> numRecordsOutByDataChangeRecordMap = + new ConcurrentHashMap(); + private final Map, Meter> + numRecordsOutByRateDataChangeRecordMap = new ConcurrentHashMap(); + public MySqlSourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; } @@ -51,4 +74,34 @@ public long getFetchDelay() { public void recordFetchDelay(long fetchDelay) { this.fetchDelay = fetchDelay; } + + public void numRecordsOutByDataChangeRecord(TableId tableId, OperationType op) { + Tuple2 metricMapKey = new Tuple2<>(tableId, op); + + Counter counter = + numRecordsOutByDataChangeRecordMap.compute( + metricMapKey, + (keyForCounter, existingCounter) -> { + if (existingCounter == null) { + MetricGroup tableMetricGroup = + metricGroup.addGroup("table", tableId.identifier()); + Counter newCounter = + tableMetricGroup.counter( + MetricNames.IO_NUM_RECORDS_OUT + + DATA_CHANGE_RECORD_MAP.get(op)); + numRecordsOutByRateDataChangeRecordMap.computeIfAbsent( + metricMapKey, + keyForMeter -> + tableMetricGroup.meter( + MetricNames.IO_NUM_RECORDS_OUT_RATE + + DATA_CHANGE_RECORD_MAP.get(op), + new MeterView(newCounter))); + return newCounter; + } else { + return existingCounter; + } + }); + + counter.inc(); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java index 449e7f608f2..5c9ef499d63 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.connectors.mysql.source.reader; import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.cdc.common.event.OperationType; import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; @@ -29,9 +30,12 @@ import org.apache.flink.connector.base.source.reader.RecordEmitter; import org.apache.flink.util.Collector; +import io.debezium.data.Envelope; import io.debezium.document.Array; +import io.debezium.relational.TableId; import io.debezium.relational.history.HistoryRecord; import io.debezium.relational.history.TableChanges; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,15 +125,33 @@ private void emitElement(SourceRecord element, SourceOutput output) throws Ex debeziumDeserializationSchema.deserialize(element, outputCollector); } - public void applySplit(MySqlSplit split) {} - - private void reportMetrics(SourceRecord element) { + private void reportMetrics(SourceRecord record) { + Struct value = (Struct) record.value(); + if (value != null) { + TableId tableId = RecordUtils.getTableId(record); + Envelope.Operation op = + Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION)); + switch (op) { + case CREATE: + sourceReaderMetrics.numRecordsOutByDataChangeRecord( + tableId, OperationType.INSERT); + break; + case UPDATE: + sourceReaderMetrics.numRecordsOutByDataChangeRecord( + tableId, OperationType.UPDATE); + break; + case DELETE: + sourceReaderMetrics.numRecordsOutByDataChangeRecord( + tableId, OperationType.DELETE); + break; + } + } - Long messageTimestamp = RecordUtils.getMessageTimestamp(element); + Long messageTimestamp = RecordUtils.getMessageTimestamp(record); if (messageTimestamp != null && messageTimestamp > 0L) { // report fetch delay - Long fetchTimestamp = RecordUtils.getFetchTimestamp(element); + Long fetchTimestamp = RecordUtils.getFetchTimestamp(record); if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) { // report fetch delay sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);