Skip to content

Commit

Permalink
Add key to Debezium CDC metadata (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Oct 7, 2023
1 parent 21f390c commit 5812322
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public interface CdcConstants {
String COL_CDC_OP = "_cdc_op";
String COL_CDC_TS = "_cdc_ts";
String COL_CDC_TABLE = "_cdc_table";
String COL_CDC_KEY = "_cdc_key";
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,21 @@ private R applyWithSchema(R record) {
payloadSchema = value.schema().field("after").schema();
}

Schema newValueSchema = makeUpdatedSchema(payloadSchema);
Schema newValueSchema = makeUpdatedSchema(payloadSchema, record.keySchema());
Struct newValue = new Struct(newValueSchema);

for (Field field : payloadSchema.fields()) {
newValue.put(field.name(), payload.get(field));
}

newValue.put(CdcConstants.COL_CDC_OP, op);
newValue.put(CdcConstants.COL_CDC_TS, new java.util.Date(value.getInt64("ts_ms")));
newValue.put(CdcConstants.COL_CDC_TABLE, tableNameFromSourceStruct(value.getStruct("source")));

if (record.keySchema() != null) {
newValue.put(CdcConstants.COL_CDC_KEY, record.key());
}

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Expand Down Expand Up @@ -109,6 +114,10 @@ private R applySchemaless(R record) {
newValue.put(CdcConstants.COL_CDC_TS, value.get("ts_ms"));
newValue.put(CdcConstants.COL_CDC_TABLE, tableNameFromSourceMap(value.get("source")));

if (record.key() instanceof Map) {
newValue.put(CdcConstants.COL_CDC_KEY, record.key());
}

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Expand Down Expand Up @@ -157,15 +166,21 @@ private String tableNameFromSourceMap(Object source) {
return db + "." + table;
}

private Schema makeUpdatedSchema(Schema schema) {
private Schema makeUpdatedSchema(Schema schema, Schema keySchema) {
SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());

for (Field field : schema.fields()) {
builder.field(field.name(), field.schema());
}
builder.field(CdcConstants.COL_CDC_OP, Schema.STRING_SCHEMA);
builder.field(CdcConstants.COL_CDC_TS, Timestamp.SCHEMA);
builder.field(CdcConstants.COL_CDC_TABLE, Schema.STRING_SCHEMA);

builder
.field(CdcConstants.COL_CDC_OP, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_CDC_TS, Timestamp.SCHEMA)
.field(CdcConstants.COL_CDC_TABLE, Schema.STRING_SCHEMA);

if (keySchema != null) {
builder.field(CdcConstants.COL_CDC_KEY, keySchema);
}

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@

public class DebeziumTransformTest {

private static final Schema KEY_SCHEMA =
SchemaBuilder.struct().field("account_id", Schema.INT64_SCHEMA).build();

private static final Schema ROW_SCHEMA =
SchemaBuilder.struct()
.field("account_id", Schema.INT64_SCHEMA)
Expand Down Expand Up @@ -71,7 +74,8 @@ public void testDmsTransformNull() {
public void testDebeziumTransformSchemaless() {
try (DebeziumTransform<SinkRecord> smt = new DebeziumTransform<>()) {
Map<String, Object> event = createDebeziumEventMap("u");
SinkRecord record = new SinkRecord("topic", 0, null, null, null, event, 0);
Map<String, Object> key = ImmutableMap.of("account_id", 1L);
SinkRecord record = new SinkRecord("topic", 0, null, key, null, event, 0);

SinkRecord result = smt.apply(record);
assertThat(result.value()).isInstanceOf(Map.class);
Expand All @@ -80,15 +84,16 @@ public void testDebeziumTransformSchemaless() {
assertThat(value.get("account_id")).isEqualTo(1);
assertThat(value.get("_cdc_table")).isEqualTo("schema.tbl");
assertThat(value.get("_cdc_op")).isEqualTo("U");
assertThat(value.get("_cdc_key")).isInstanceOf(Map.class);
}
}

@Test
@SuppressWarnings("unchecked")
public void testDebeziumTransformWithSchema() {
try (DebeziumTransform<SinkRecord> smt = new DebeziumTransform<>()) {
Struct event = createDebeziumEventStruct("u");
SinkRecord record = new SinkRecord("topic", 0, null, null, VALUE_SCHEMA, event, 0);
Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L);
SinkRecord record = new SinkRecord("topic", 0, KEY_SCHEMA, key, VALUE_SCHEMA, event, 0);

SinkRecord result = smt.apply(record);
assertThat(result.value()).isInstanceOf(Struct.class);
Expand All @@ -97,6 +102,7 @@ public void testDebeziumTransformWithSchema() {
assertThat(value.get("account_id")).isEqualTo(1L);
assertThat(value.get("_cdc_table")).isEqualTo("schema.tbl");
assertThat(value.get("_cdc_op")).isEqualTo("U");
assertThat(value.get("_cdc_key")).isInstanceOf(Struct.class);
}
}

Expand Down

0 comments on commit 5812322

Please sign in to comment.