Skip to content

Commit

Permalink
Place CDC metadata in struct/map in SMTs (#116)
Browse files Browse the repository at this point in the history
(cherry picked from commit 89f533b2e689cbd1935c3bd1b82eea5e9dc0cd07)
  • Loading branch information
bryanck authored and ismailsimsek committed Jan 15, 2025
1 parent ba451e4 commit 245c9c4
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ public interface CdcConstants {
String OP_UPDATE = "U";
String OP_DELETE = "D";

String COL_CDC_OP = "_cdc_op";
String COL_CDC_TS = "_cdc_ts";
String COL_CDC_TABLE = "_cdc_table";
String COL_CDC_TARGET = "_cdc_target";
String COL_CDC_KEY = "_cdc_key";
String COL_CDC = "_cdc";
String COL_OP = "op";
String COL_TS = "ts";
String COL_OFFSET = "offset";
String COL_SOURCE = "source";
String COL_TARGET = "target";
String COL_KEY = "key";
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
Expand Down Expand Up @@ -86,20 +87,28 @@ private R applyWithSchema(R record) {
payloadSchema = value.schema().field("after").schema();
}

Schema newValueSchema = makeUpdatedSchema(payloadSchema, record.keySchema());
Struct newValue = new Struct(newValueSchema);
// create the CDC metadata
Schema cdcSchema = makeCdcSchema(record.keySchema());
Struct cdcMetadata = new Struct(cdcSchema);
cdcMetadata.put(CdcConstants.COL_OP, op);
cdcMetadata.put(CdcConstants.COL_TS, new java.util.Date(value.getInt64("ts_ms")));
if (record instanceof SinkRecord) {
cdcMetadata.put(CdcConstants.COL_OFFSET, ((SinkRecord) record).kafkaOffset());
}
setTableAndTargetFromSourceStruct(value.getStruct("source"), cdcMetadata);

for (Field field : payloadSchema.fields()) {
newValue.put(field.name(), payload.get(field));
if (record.keySchema() != null) {
cdcMetadata.put(CdcConstants.COL_KEY, record.key());
}

newValue.put(CdcConstants.COL_CDC_OP, op);
newValue.put(CdcConstants.COL_CDC_TS, new java.util.Date(value.getInt64("ts_ms")));
setTableAndTargetFromSourceStruct(value.getStruct("source"), newValue);
// create the new value
Schema newValueSchema = makeUpdatedSchema(payloadSchema, cdcSchema);
Struct newValue = new Struct(newValueSchema);

if (record.keySchema() != null) {
newValue.put(CdcConstants.COL_CDC_KEY, record.key());
for (Field field : payloadSchema.fields()) {
newValue.put(field.name(), payload.get(field));
}
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

return record.newRecord(
record.topic(),
Expand Down Expand Up @@ -129,15 +138,23 @@ private R applySchemaless(R record) {
return null;
}

Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) payload);
newValue.put(CdcConstants.COL_CDC_OP, op);
newValue.put(CdcConstants.COL_CDC_TS, value.get("ts_ms"));
setTableAndTargetFromSourceMap(value.get("source"), newValue);
// create the CDC metadata
Map<String, Object> cdcMetadata = Maps.newHashMap();
cdcMetadata.put(CdcConstants.COL_OP, op);
cdcMetadata.put(CdcConstants.COL_TS, value.get("ts_ms"));
if (record instanceof SinkRecord) {
cdcMetadata.put(CdcConstants.COL_OFFSET, ((SinkRecord) record).kafkaOffset());
}
setTableAndTargetFromSourceMap(value.get("source"), cdcMetadata);

if (record.key() instanceof Map) {
newValue.put(CdcConstants.COL_CDC_KEY, record.key());
cdcMetadata.put(CdcConstants.COL_KEY, record.key());
}

// create the new value
Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) payload);
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

return record.newRecord(
record.topic(),
record.kafkaPartition(),
Expand All @@ -160,7 +177,7 @@ private String mapOperation(String originalOp) {
}
}

private void setTableAndTargetFromSourceStruct(Struct source, Struct value) {
private void setTableAndTargetFromSourceStruct(Struct source, Struct cdcMetadata) {
String db;
if (source.schema().field("schema") != null) {
// prefer schema if present, e.g. for Postgres
Expand All @@ -170,11 +187,11 @@ private void setTableAndTargetFromSourceStruct(Struct source, Struct value) {
}
String table = source.getString("table");

value.put(CdcConstants.COL_CDC_TABLE, db + "." + table);
value.put(CdcConstants.COL_CDC_TARGET, target(db, table));
cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table));
}

private void setTableAndTargetFromSourceMap(Object source, Map<String, Object> value) {
private void setTableAndTargetFromSourceMap(Object source, Map<String, Object> cdcMetadata) {
Map<String, Object> map = Requirements.requireMap(source, "Debezium transform");

String db;
Expand All @@ -186,8 +203,8 @@ private void setTableAndTargetFromSourceMap(Object source, Map<String, Object> v
}
String table = map.get("table").toString();

value.put(CdcConstants.COL_CDC_TABLE, db + "." + table);
value.put(CdcConstants.COL_CDC_TARGET, target(db, table));
cdcMetadata.put(CdcConstants.COL_SOURCE, db + "." + table);
cdcMetadata.put(CdcConstants.COL_TARGET, target(db, table));
}

private String target(String db, String table) {
Expand All @@ -196,22 +213,30 @@ private String target(String db, String table) {
: cdcTargetPattern.replace(DB_PLACEHOLDER, db).replace(TABLE_PLACEHOLDER, table);
}

private Schema makeUpdatedSchema(Schema schema, Schema keySchema) {
private Schema makeCdcSchema(Schema keySchema) {
SchemaBuilder builder =
SchemaBuilder.struct()
.field(CdcConstants.COL_OP, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_TS, Timestamp.SCHEMA)
.field(CdcConstants.COL_OFFSET, Schema.OPTIONAL_INT64_SCHEMA)
.field(CdcConstants.COL_SOURCE, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_TARGET, Schema.STRING_SCHEMA);

if (keySchema != null) {
builder.field(CdcConstants.COL_KEY, keySchema);
}

return builder.build();
}

private Schema makeUpdatedSchema(Schema schema, Schema cdcSchema) {
SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());

for (Field field : schema.fields()) {
builder.field(field.name(), field.schema());
}

builder
.field(CdcConstants.COL_CDC_OP, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_CDC_TS, Timestamp.SCHEMA)
.field(CdcConstants.COL_CDC_TABLE, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_CDC_TARGET, Schema.STRING_SCHEMA);

if (keySchema != null) {
builder.field(CdcConstants.COL_CDC_KEY, keySchema);
}
builder.field(CdcConstants.COL_CDC, cdcSchema);

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ private R applySchemaless(R record) {
return null;
}

Map<String, Object> result = Maps.newHashMap((Map<String, Object>) dataObj);

Map<String, Object> metadata = (Map<String, Object>) metadataObj;

String dmsOp = metadata.get("operation").toString();
Expand All @@ -72,19 +70,25 @@ private R applySchemaless(R record) {
op = CdcConstants.OP_INSERT;
}

result.put(CdcConstants.COL_CDC_OP, op);
result.put(
CdcConstants.COL_CDC_TABLE,
// create the CDC metadata
Map<String, Object> cdcMetadata = Maps.newHashMap();
cdcMetadata.put(CdcConstants.COL_OP, op);
cdcMetadata.put(CdcConstants.COL_TS, metadata.get("timestamp"));
cdcMetadata.put(
CdcConstants.COL_SOURCE,
String.format("%s.%s", metadata.get("schema-name"), metadata.get("table-name")));
result.put(CdcConstants.COL_CDC_TS, metadata.get("timestamp"));

// create the new value
Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) dataObj);
newValue.put(CdcConstants.COL_CDC, cdcMetadata);

return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
null,
result,
newValue,
record.timestamp());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class DebeziumTransformTest {
.field("db", Schema.STRING_SCHEMA)
.field("schema", Schema.STRING_SCHEMA)
.field("table", Schema.STRING_SCHEMA)
.field("txId", Schema.INT64_SCHEMA)
.build();

private static final Schema VALUE_SCHEMA =
Expand Down Expand Up @@ -84,10 +83,12 @@ public void testDebeziumTransformSchemaless() {
Map<String, Object> value = (Map<String, Object>) result.value();

assertThat(value.get("account_id")).isEqualTo(1);
assertThat(value.get("_cdc_table")).isEqualTo("schema.tbl");
assertThat(value.get("_cdc_target")).isEqualTo("schema_x.tbl_x");
assertThat(value.get("_cdc_op")).isEqualTo("U");
assertThat(value.get("_cdc_key")).isInstanceOf(Map.class);

Map<String, Object> cdcMetadata = (Map<String, Object>) value.get("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl");
assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x");
assertThat(cdcMetadata.get("key")).isInstanceOf(Map.class);
}
}

Expand All @@ -105,10 +106,12 @@ public void testDebeziumTransformWithSchema() {
Struct value = (Struct) result.value();

assertThat(value.get("account_id")).isEqualTo(1L);
assertThat(value.get("_cdc_table")).isEqualTo("schema.tbl");
assertThat(value.get("_cdc_target")).isEqualTo("schema_x.tbl_x");
assertThat(value.get("_cdc_op")).isEqualTo("U");
assertThat(value.get("_cdc_key")).isInstanceOf(Struct.class);

Struct cdcMetadata = value.getStruct("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("source")).isEqualTo("schema.tbl");
assertThat(cdcMetadata.get("target")).isEqualTo("schema_x.tbl_x");
assertThat(cdcMetadata.get("key")).isInstanceOf(Struct.class);
}
}

Expand All @@ -117,8 +120,7 @@ private Map<String, Object> createDebeziumEventMap(String operation) {
ImmutableMap.of(
"db", "db",
"schema", "schema",
"table", "tbl",
"txId", 12345);
"table", "tbl");

Map<String, Object> data =
ImmutableMap.of(
Expand All @@ -136,11 +138,7 @@ private Map<String, Object> createDebeziumEventMap(String operation) {

private Struct createDebeziumEventStruct(String operation) {
Struct source =
new Struct(SOURCE_SCHEMA)
.put("db", "db")
.put("schema", "schema")
.put("table", "tbl")
.put("txId", 12345L);
new Struct(SOURCE_SCHEMA).put("db", "db").put("schema", "schema").put("table", "tbl");

Struct data =
new Struct(ROW_SCHEMA)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public void testDmsTransform() {
Map<String, Object> value = (Map<String, Object>) result.value();

assertThat(value.get("account_id")).isEqualTo(1);
assertThat(value.get("_cdc_table")).isEqualTo("db.tbl");
assertThat(value.get("_cdc_op")).isEqualTo("U");

Map<String, Object> cdcMetadata = (Map<String, Object>) value.get("_cdc");
assertThat(cdcMetadata.get("op")).isEqualTo("U");
assertThat(cdcMetadata.get("source")).isEqualTo("db.tbl");
}
}

Expand Down

0 comments on commit 245c9c4

Please sign in to comment.