Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,14 @@ Notice:
1. The group name is `namespace.schema.table`, where `namespace` is the actual database name, `schema` is the actual schema name, and `table` is the actual table name.
2. For MySQL, the `namespace` will be set to the default value "", and the group name will be like `test_database.test_table`.

The mysql-cdc connector offers six additional metrics for each type of data change record.
- `numRecordsOutByDataChangeRecordInsert`: The number of `INSERT` data change records.
- `numRecordsOutByDataChangeRecordUpdate`: The number of `UPDATE` data change records.
- `numRecordsOutByDataChangeRecordDelete`: The number of `DELETE` data change records.
- `numRecordsOutByRateDataChangeRecordInsert`: The number of `INSERT` data change records per second.
- `numRecordsOutByRateDataChangeRecordUpdate`: The number of `UPDATE` data change records per second.
- `numRecordsOutByRateDataChangeRecordDelete`: The number of `DELETE` data change records per second.

Data Type Mapping
----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,33 @@

package org.apache.flink.cdc.connectors.mysql.source.metrics;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSourceReader;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.MetricNames;

import io.debezium.relational.TableId;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** A collection class for handling metrics in {@link MySqlSourceReader}. */
public class MySqlSourceReaderMetrics {

public static final long UNDEFINED = -1;
private static final Map<OperationType, String> DATA_CHANGE_RECORD_MAP =
new ConcurrentHashMap<OperationType, String>() {
{
put(OperationType.INSERT, "DataChangeRecordInsert");
put(OperationType.UPDATE, "DataChangeRecordUpdate");
put(OperationType.DELETE, "DataChangeRecordDelete");
}
};

private final MetricGroup metricGroup;

Expand All @@ -35,6 +53,11 @@ public class MySqlSourceReaderMetrics {
*/
private volatile long fetchDelay = UNDEFINED;

private final Map<Tuple2<TableId, OperationType>, Counter> numRecordsOutByDataChangeRecordMap =
new ConcurrentHashMap();
private final Map<Tuple2<TableId, OperationType>, Meter>
numRecordsOutByRateDataChangeRecordMap = new ConcurrentHashMap();

public MySqlSourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
Expand All @@ -51,4 +74,34 @@ public long getFetchDelay() {
public void recordFetchDelay(long fetchDelay) {
this.fetchDelay = fetchDelay;
}

public void numRecordsOutByDataChangeRecord(TableId tableId, OperationType op) {
Tuple2<TableId, OperationType> metricMapKey = new Tuple2<>(tableId, op);

Counter counter =
numRecordsOutByDataChangeRecordMap.compute(
metricMapKey,
(keyForCounter, existingCounter) -> {
if (existingCounter == null) {
MetricGroup tableMetricGroup =
metricGroup.addGroup("table", tableId.identifier());
Counter newCounter =
tableMetricGroup.counter(
MetricNames.IO_NUM_RECORDS_OUT
+ DATA_CHANGE_RECORD_MAP.get(op));
numRecordsOutByRateDataChangeRecordMap.computeIfAbsent(
metricMapKey,
keyForMeter ->
tableMetricGroup.meter(
MetricNames.IO_NUM_RECORDS_OUT_RATE
+ DATA_CHANGE_RECORD_MAP.get(op),
new MeterView(newCounter)));
return newCounter;
} else {
return existingCounter;
}
});

counter.inc();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.mysql.source.reader;

import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.cdc.common.event.OperationType;
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
Expand All @@ -29,9 +30,12 @@
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.util.Collector;

import io.debezium.data.Envelope;
import io.debezium.document.Array;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -121,15 +125,33 @@ private void emitElement(SourceRecord element, SourceOutput<T> output) throws Ex
debeziumDeserializationSchema.deserialize(element, outputCollector);
}

public void applySplit(MySqlSplit split) {}

private void reportMetrics(SourceRecord element) {
private void reportMetrics(SourceRecord record) {
Struct value = (Struct) record.value();
if (value != null) {
TableId tableId = RecordUtils.getTableId(record);
Envelope.Operation op =
Envelope.Operation.forCode(value.getString(Envelope.FieldName.OPERATION));
switch (op) {
case CREATE:
sourceReaderMetrics.numRecordsOutByDataChangeRecord(
tableId, OperationType.INSERT);
break;
case UPDATE:
sourceReaderMetrics.numRecordsOutByDataChangeRecord(
tableId, OperationType.UPDATE);
break;
case DELETE:
sourceReaderMetrics.numRecordsOutByDataChangeRecord(
tableId, OperationType.DELETE);
break;
}
}

Long messageTimestamp = RecordUtils.getMessageTimestamp(element);
Long messageTimestamp = RecordUtils.getMessageTimestamp(record);

if (messageTimestamp != null && messageTimestamp > 0L) {
// report fetch delay
Long fetchTimestamp = RecordUtils.getFetchTimestamp(element);
Long fetchTimestamp = RecordUtils.getFetchTimestamp(record);
if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) {
// report fetch delay
sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
Expand Down