Skip to content

Commit

Permalink
Config to set target in Debezium SMT (#111)
Browse files Browse the repository at this point in the history
(cherry picked from commit bf82d607dc2b5e816c8b6f59bcbdc48281154e98)
  • Loading branch information
bryanck authored and ismailsimsek committed Jan 15, 2025
1 parent 932fe3c commit ba451e4
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 42 deletions.
14 changes: 5 additions & 9 deletions kafka-connect-transforms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,17 @@ It will promote the `data` element fields to top level and add the following met

## Configuration

The SMT currently has no configuration. It can be used with the sink's CDC feature, e.g.
```
"iceberg.tables.cdcField": "_cdc_op",
```
The SMT currently has no configuration.

# DebeziumTransform
_(Experimental)_

The `DebeziumTransform` SMT transforms a Debezium formatted message for use by the sink's CDC feature.
It will promote the `before` or `after` element fields to top level and add the following metadata fields:
`_cdc_op`, `_cdc_ts`, and `_cdc_table`.
`_cdc_op`, `_cdc_ts`, `_cdc_table`, `_cdc_key`, and `_cdc_target`.

## Configuration

The SMT currently has no configuration. It can be used with the sink's CDC feature, e.g.
```
"iceberg.tables.cdcField": "_cdc_op",
```
| Property | Description |
|---------------------|-----------------------------------------------------------------------------------|
| cdc.target.pattern | Pattern to use for setting the CDC target field value, default is `{db}.{table}` |
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ public interface CdcConstants {
String COL_CDC_OP = "_cdc_op";
String COL_CDC_TS = "_cdc_ts";
String COL_CDC_TABLE = "_cdc_table";
String COL_CDC_TARGET = "_cdc_target";
String COL_CDC_KEY = "_cdc_key";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package io.tabular.iceberg.connect.transforms;

import java.util.Map;
import jdk.jfr.Experimental;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
Expand All @@ -35,27 +34,17 @@
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

@Experimental
public class CopyValue<R extends ConnectRecord<R>> implements Transformation<R> {

private interface ConfigName {

String SOURCE_FIELD = "source.field";
String TARGET_FIELD = "target.field";
}
private static final String SOURCE_FIELD = "source.field";
private static final String TARGET_FIELD = "target.field";

public static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
ConfigName.SOURCE_FIELD,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Source field name.")
SOURCE_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Source field name.")
.define(
ConfigName.TARGET_FIELD,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Target field name.");
TARGET_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Target field name.");

private String sourceField;
private String targetField;
Expand All @@ -64,8 +53,8 @@ private interface ConfigName {
@Override
public void configure(Map<String, ?> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
sourceField = config.getString(ConfigName.SOURCE_FIELD);
targetField = config.getString(ConfigName.TARGET_FIELD);
sourceField = config.getString(SOURCE_FIELD);
targetField = config.getString(TARGET_FIELD);
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package io.tabular.iceberg.connect.transforms;

import java.util.Map;
import jdk.jfr.Experimental;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -31,14 +31,34 @@
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class DebeziumTransform<R extends ConnectRecord<R>> implements Transformation<R> {

private static final Logger LOG = LoggerFactory.getLogger(DebeziumTransform.class.getName());
private static final ConfigDef EMPTY_CONFIG = new ConfigDef();

private static final String CDC_TARGET_PATTERN = "cdc.target.pattern";
private static final String DB_PLACEHOLDER = "{db}";
private static final String TABLE_PLACEHOLDER = "{table}";

public static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
CDC_TARGET_PATTERN,
ConfigDef.Type.STRING,
null,
Importance.MEDIUM,
"Pattern to use for setting the CDC target field value.");

private String cdcTargetPattern;

@Override
public void configure(Map<String, ?> props) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
cdcTargetPattern = config.getString(CDC_TARGET_PATTERN);
}

@Override
public R apply(R record) {
Expand Down Expand Up @@ -75,7 +95,7 @@ private R applyWithSchema(R record) {

newValue.put(CdcConstants.COL_CDC_OP, op);
newValue.put(CdcConstants.COL_CDC_TS, new java.util.Date(value.getInt64("ts_ms")));
newValue.put(CdcConstants.COL_CDC_TABLE, tableNameFromSourceStruct(value.getStruct("source")));
setTableAndTargetFromSourceStruct(value.getStruct("source"), newValue);

if (record.keySchema() != null) {
newValue.put(CdcConstants.COL_CDC_KEY, record.key());
Expand Down Expand Up @@ -112,7 +132,7 @@ private R applySchemaless(R record) {
Map<String, Object> newValue = Maps.newHashMap((Map<String, Object>) payload);
newValue.put(CdcConstants.COL_CDC_OP, op);
newValue.put(CdcConstants.COL_CDC_TS, value.get("ts_ms"));
newValue.put(CdcConstants.COL_CDC_TABLE, tableNameFromSourceMap(value.get("source")));
setTableAndTargetFromSourceMap(value.get("source"), newValue);

if (record.key() instanceof Map) {
newValue.put(CdcConstants.COL_CDC_KEY, record.key());
Expand Down Expand Up @@ -140,7 +160,7 @@ private String mapOperation(String originalOp) {
}
}

private String tableNameFromSourceStruct(Struct source) {
private void setTableAndTargetFromSourceStruct(Struct source, Struct value) {
String db;
if (source.schema().field("schema") != null) {
// prefer schema if present, e.g. for Postgres
Expand All @@ -149,10 +169,12 @@ private String tableNameFromSourceStruct(Struct source) {
db = source.getString("db");
}
String table = source.getString("table");
return db + "." + table;

value.put(CdcConstants.COL_CDC_TABLE, db + "." + table);
value.put(CdcConstants.COL_CDC_TARGET, target(db, table));
}

private String tableNameFromSourceMap(Object source) {
private void setTableAndTargetFromSourceMap(Object source, Map<String, Object> value) {
Map<String, Object> map = Requirements.requireMap(source, "Debezium transform");

String db;
Expand All @@ -163,7 +185,15 @@ private String tableNameFromSourceMap(Object source) {
db = map.get("db").toString();
}
String table = map.get("table").toString();
return db + "." + table;

value.put(CdcConstants.COL_CDC_TABLE, db + "." + table);
value.put(CdcConstants.COL_CDC_TARGET, target(db, table));
}

private String target(String db, String table) {
return cdcTargetPattern == null || cdcTargetPattern.isEmpty()
? db + "." + table
: cdcTargetPattern.replace(DB_PLACEHOLDER, db).replace(TABLE_PLACEHOLDER, table);
}

private Schema makeUpdatedSchema(Schema schema, Schema keySchema) {
Expand All @@ -176,7 +206,8 @@ private Schema makeUpdatedSchema(Schema schema, Schema keySchema) {
builder
.field(CdcConstants.COL_CDC_OP, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_CDC_TS, Timestamp.SCHEMA)
.field(CdcConstants.COL_CDC_TABLE, Schema.STRING_SCHEMA);
.field(CdcConstants.COL_CDC_TABLE, Schema.STRING_SCHEMA)
.field(CdcConstants.COL_CDC_TARGET, Schema.STRING_SCHEMA);

if (keySchema != null) {
builder.field(CdcConstants.COL_CDC_KEY, keySchema);
Expand All @@ -187,12 +218,9 @@ private Schema makeUpdatedSchema(Schema schema, Schema keySchema) {

@Override
public ConfigDef config() {
return EMPTY_CONFIG;
return CONFIG_DEF;
}

@Override
public void close() {}

@Override
public void configure(Map<String, ?> configs) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package io.tabular.iceberg.connect.transforms;

import java.util.Map;
import jdk.jfr.Experimental;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
Expand All @@ -28,7 +27,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class DmsTransform<R extends ConnectRecord<R>> implements Transformation<R> {

private static final Logger LOG = LoggerFactory.getLogger(DmsTransform.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public void testDmsTransformNull() {
@SuppressWarnings("unchecked")
public void testDebeziumTransformSchemaless() {
try (DebeziumTransform<SinkRecord> smt = new DebeziumTransform<>()) {
smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x"));

Map<String, Object> event = createDebeziumEventMap("u");
Map<String, Object> key = ImmutableMap.of("account_id", 1L);
SinkRecord record = new SinkRecord("topic", 0, null, key, null, event, 0);
Expand All @@ -83,6 +85,7 @@ public void testDebeziumTransformSchemaless() {

assertThat(value.get("account_id")).isEqualTo(1);
assertThat(value.get("_cdc_table")).isEqualTo("schema.tbl");
assertThat(value.get("_cdc_target")).isEqualTo("schema_x.tbl_x");
assertThat(value.get("_cdc_op")).isEqualTo("U");
assertThat(value.get("_cdc_key")).isInstanceOf(Map.class);
}
Expand All @@ -91,6 +94,8 @@ public void testDebeziumTransformSchemaless() {
@Test
public void testDebeziumTransformWithSchema() {
try (DebeziumTransform<SinkRecord> smt = new DebeziumTransform<>()) {
smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x"));

Struct event = createDebeziumEventStruct("u");
Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L);
SinkRecord record = new SinkRecord("topic", 0, KEY_SCHEMA, key, VALUE_SCHEMA, event, 0);
Expand All @@ -101,6 +106,7 @@ public void testDebeziumTransformWithSchema() {

assertThat(value.get("account_id")).isEqualTo(1L);
assertThat(value.get("_cdc_table")).isEqualTo("schema.tbl");
assertThat(value.get("_cdc_target")).isEqualTo("schema_x.tbl_x");
assertThat(value.get("_cdc_op")).isEqualTo("U");
assertThat(value.get("_cdc_key")).isInstanceOf(Struct.class);
}
Expand Down

0 comments on commit ba451e4

Please sign in to comment.