Skip to content

Commit 2f64f78

Browse files
committed
Add mysql table operation metrics
1 parent c1a7d0b commit 2f64f78

File tree

3 files changed

+88
-6
lines changed

3 files changed

+88
-6
lines changed

docs/content/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -941,6 +941,14 @@ Notice:
941941
1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name.
942942
2. For MySQL, the `namespace` will be set to the default value "", and the group name will be like `test_database.test_table`.
943943

944+
The mysql-cdc connector offers six additional metrics for each type of data change record.
945+
- `numRecordsOutByDataChangeRecordInsert`: The number of `INSERT` data change records.
946+
- `numRecordsOutByDataChangeRecordUpdate`: The number of `UPDATE` data change records.
947+
- `numRecordsOutByDataChangeRecordDelete`: The number of `DELETE` data change records.
948+
- `numRecordsOutByRateDataChangeRecordInsert`: The number of `INSERT` data change records per second.
949+
- `numRecordsOutByRateDataChangeRecordUpdate`: The number of `UPDATE` data change records per second.
950+
- `numRecordsOutByRateDataChangeRecordDelete`: The number of `DELETE` data change records per second.
951+
944952
Data Type Mapping
945953
----------------
946954

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/metrics/MySqlSourceReaderMetrics.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,33 @@
1717

1818
package org.apache.flink.cdc.connectors.mysql.source.metrics;
1919

20+
import org.apache.flink.api.java.tuple.Tuple2;
21+
import org.apache.flink.cdc.common.event.OperationType;
2022
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
23+
import org.apache.flink.metrics.Counter;
2124
import org.apache.flink.metrics.Gauge;
25+
import org.apache.flink.metrics.Meter;
26+
import org.apache.flink.metrics.MeterView;
2227
import org.apache.flink.metrics.MetricGroup;
2328
import org.apache.flink.runtime.metrics.MetricNames;
2429

30+
import io.debezium.relational.TableId;
31+
32+
import java.util.Map;
33+
import java.util.concurrent.ConcurrentHashMap;
34+
2535
/** A collection class for handling metrics in {@link MySqlSourceReader}. */
2636
public class MySqlSourceReaderMetrics {
2737

2838
public static final long UNDEFINED = -1;
39+
private static final Map<OperationType, String> DATA_CHANGE_RECORD_MAP =
40+
new ConcurrentHashMap<OperationType, String>() {
41+
{
42+
put(OperationType.INSERT, "DataChangeRecordInsert");
43+
put(OperationType.UPDATE, "DataChangeRecordUpdate");
44+
put(OperationType.DELETE, "DataChangeRecordDelete");
45+
}
46+
};
2947

3048
private final MetricGroup metricGroup;
3149

@@ -35,6 +53,11 @@ public class MySqlSourceReaderMetrics {
3553
*/
3654
private volatile long fetchDelay = UNDEFINED;
3755

56+
private final Map<Tuple2<TableId, OperationType>, Counter> numRecordsOutByDataChangeRecordMap =
57+
new ConcurrentHashMap();
58+
private final Map<Tuple2<TableId, OperationType>, Meter>
59+
numRecordsOutByRateDataChangeRecordMap = new ConcurrentHashMap();
60+
3861
public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
3962
this.metricGroup = metricGroup;
4063
}
@@ -51,4 +74,34 @@ public long getFetchDelay() {
5174
public void recordFetchDelay(long fetchDelay) {
5275
this.fetchDelay = fetchDelay;
5376
}
77+
78+
public void numRecordsOutByDataChangeRecord(TableId tableId, OperationType op) {
79+
Tuple2<TableId, OperationType> metricMapKey = new Tuple2<>(tableId, op);
80+
81+
Counter counter =
82+
numRecordsOutByDataChangeRecordMap.compute(
83+
metricMapKey,
84+
(keyForCounter, existingCounter) -> {
85+
if (existingCounter == null) {
86+
MetricGroup tableMetricGroup =
87+
metricGroup.addGroup("table", tableId.identifier());
88+
Counter newCounter =
89+
tableMetricGroup.counter(
90+
MetricNames.IO_NUM_RECORDS_OUT
91+
+ DATA_CHANGE_RECORD_MAP.get(op));
92+
numRecordsOutByRateDataChangeRecordMap.computeIfAbsent(
93+
metricMapKey,
94+
keyForMeter ->
95+
tableMetricGroup.meter(
96+
MetricNames.IO_NUM_RECORDS_OUT_RATE
97+
+ DATA_CHANGE_RECORD_MAP.get(op),
98+
new MeterView(newCounter)));
99+
return newCounter;
100+
} else {
101+
return existingCounter;
102+
}
103+
});
104+
105+
counter.inc();
106+
}
54107
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.flink.cdc.connectors.mysql.source.reader;
1919

2020
import org.apache.flink.api.connector.source.SourceOutput;
21+
import org.apache.flink.cdc.common.event.OperationType;
2122
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
2223
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
23-
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
2424
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
2525
import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords;
2626
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
@@ -29,9 +29,12 @@
2929
import org.apache.flink.connector.base.source.reader.RecordEmitter;
3030
import org.apache.flink.util.Collector;
3131

32+
import io.debezium.data.Envelope;
3233
import io.debezium.document.Array;
34+
import io.debezium.relational.TableId;
3335
import io.debezium.relational.history.HistoryRecord;
3436
import io.debezium.relational.history.TableChanges;
37+
import org.apache.kafka.connect.data.Struct;
3538
import org.apache.kafka.connect.source.SourceRecord;
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
@@ -121,15 +124,33 @@ private void emitElement(SourceRecord element, SourceOutput<T> output) throws Ex
121124
debeziumDeserializationSchema.deserialize(element, outputCollector);
122125
}
123126

124-
public void applySplit(MySqlSplit split) {}
125-
126-
private void reportMetrics(SourceRecord element) {
127+
private void reportMetrics(SourceRecord record) {
128+
Struct value = (Struct) record.value();
129+
if (value != null) {
130+
TableId tableId = RecordUtils.getTableId(record);
131+
Envelope.Operation op =
132+
Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));
133+
switch (op) {
134+
case CREATE:
135+
sourceReaderMetrics.numRecordsOutByDataChangeRecord(
136+
tableId, OperationType.INSERT);
137+
break;
138+
case UPDATE:
139+
sourceReaderMetrics.numRecordsOutByDataChangeRecord(
140+
tableId, OperationType.UPDATE);
141+
break;
142+
case DELETE:
143+
sourceReaderMetrics.numRecordsOutByDataChangeRecord(
144+
tableId, OperationType.DELETE);
145+
break;
146+
}
147+
}
127148

128-
Long messageTimestamp = RecordUtils.getMessageTimestamp(element);
149+
Long messageTimestamp = RecordUtils.getMessageTimestamp(record);
129150

130151
if (messageTimestamp != null && messageTimestamp > 0L) {
131152
// report fetch delay
132-
Long fetchTimestamp = RecordUtils.getFetchTimestamp(element);
153+
Long fetchTimestamp = RecordUtils.getFetchTimestamp(record);
133154
if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) {
134155
// report fetch delay
135156
sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);

0 commit comments

Comments
 (0)