diff --git a/kafka-connect-transforms/README.md b/kafka-connect-transforms/README.md index e6e2f46eb50f..9ee378fd174c 100644 --- a/kafka-connect-transforms/README.md +++ b/kafka-connect-transforms/README.md @@ -33,21 +33,17 @@ It will promote the `data` element fields to top level and add the following met ## Configuration -The SMT currently has no configuration. It can be used with the sink's CDC feature, e.g. -``` -"iceberg.tables.cdcField": "_cdc_op", -``` +The SMT currently has no configuration. # DebeziumTransform _(Experimental)_ The `DebeziumTransform` SMT transforms a Debezium formatted message for use by the sink's CDC feature. It will promote the `before` or `after` element fields to top level and add the following metadata fields: -`_cdc_op`, `_cdc_ts`, and `_cdc_table`. +`_cdc_op`, `_cdc_ts`, `_cdc_table`, `_cdc_key`, and `_cdc_target`. ## Configuration -The SMT currently has no configuration. It can be used with the sink's CDC feature, e.g. -``` -"iceberg.tables.cdcField": "_cdc_op", -``` +| Property | Description | +|---------------------|-----------------------------------------------------------------------------------| +| cdc.target.pattern | Pattern to use for setting the CDC target field value, default is `{db}.{table}` | diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CdcConstants.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CdcConstants.java index 7735c39fee49..c68661198ddc 100644 --- a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CdcConstants.java +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CdcConstants.java @@ -27,5 +27,6 @@ public interface CdcConstants { String COL_CDC_OP = "_cdc_op"; String COL_CDC_TS = "_cdc_ts"; String COL_CDC_TABLE = "_cdc_table"; + String COL_CDC_TARGET = "_cdc_target"; String COL_CDC_KEY = "_cdc_key"; } diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CopyValue.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CopyValue.java index f590d3efdbd0..3d787364f140 100644 --- a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CopyValue.java +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CopyValue.java @@ -19,7 +19,6 @@ package io.tabular.iceberg.connect.transforms; import java.util.Map; -import jdk.jfr.Experimental; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.cache.Cache; import org.apache.kafka.common.cache.LRUCache; @@ -35,27 +34,17 @@ import org.apache.kafka.connect.transforms.util.SchemaUtil; import org.apache.kafka.connect.transforms.util.SimpleConfig; -@Experimental public class CopyValue> implements Transformation { - private interface ConfigName { - - String SOURCE_FIELD = "source.field"; - String TARGET_FIELD = "target.field"; - } + private static final String SOURCE_FIELD = "source.field"; + private static final String TARGET_FIELD = "target.field"; public static final ConfigDef CONFIG_DEF = new ConfigDef() .define( - ConfigName.SOURCE_FIELD, - ConfigDef.Type.STRING, - ConfigDef.Importance.HIGH, - "Source field name.") + SOURCE_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Source field name.") .define( - ConfigName.TARGET_FIELD, - ConfigDef.Type.STRING, - ConfigDef.Importance.HIGH, - "Target field name."); + TARGET_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "Target field name."); private String sourceField; private String targetField; @@ -64,8 +53,8 @@ private interface ConfigName { @Override public void configure(Map props) { SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); - sourceField = config.getString(ConfigName.SOURCE_FIELD); - targetField = config.getString(ConfigName.TARGET_FIELD); + sourceField = config.getString(SOURCE_FIELD); + targetField = config.getString(TARGET_FIELD); schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16)); } diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java index ce2762779fe7..3376fc436f12 100644 --- a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DebeziumTransform.java @@ -19,9 +19,9 @@ package io.tabular.iceberg.connect.transforms; import java.util.Map; -import jdk.jfr.Experimental; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -31,14 +31,34 @@ import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.util.Requirements; import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Experimental public class DebeziumTransform> implements Transformation { private static final Logger LOG = LoggerFactory.getLogger(DebeziumTransform.class.getName()); - private static final ConfigDef EMPTY_CONFIG = new ConfigDef(); + + private static final String CDC_TARGET_PATTERN = "cdc.target.pattern"; + private static final String DB_PLACEHOLDER = "{db}"; + private static final String TABLE_PLACEHOLDER = "{table}"; + + public static final ConfigDef CONFIG_DEF = + new ConfigDef() + .define( + CDC_TARGET_PATTERN, + ConfigDef.Type.STRING, + null, + Importance.MEDIUM, + "Pattern to use for setting the CDC target field value."); + + private String cdcTargetPattern; + + @Override + public void configure(Map props) { + SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + cdcTargetPattern = config.getString(CDC_TARGET_PATTERN); + } @Override public R apply(R record) { @@ -75,7 +95,7 @@ private R applyWithSchema(R record) { newValue.put(CdcConstants.COL_CDC_OP, op); newValue.put(CdcConstants.COL_CDC_TS, new java.util.Date(value.getInt64("ts_ms"))); - newValue.put(CdcConstants.COL_CDC_TABLE, tableNameFromSourceStruct(value.getStruct("source"))); + setTableAndTargetFromSourceStruct(value.getStruct("source"), newValue); if (record.keySchema() != null) { newValue.put(CdcConstants.COL_CDC_KEY, record.key()); @@ -112,7 +132,7 @@ private R applySchemaless(R record) { Map newValue = Maps.newHashMap((Map) payload); newValue.put(CdcConstants.COL_CDC_OP, op); newValue.put(CdcConstants.COL_CDC_TS, value.get("ts_ms")); - newValue.put(CdcConstants.COL_CDC_TABLE, tableNameFromSourceMap(value.get("source"))); + setTableAndTargetFromSourceMap(value.get("source"), newValue); if (record.key() instanceof Map) { newValue.put(CdcConstants.COL_CDC_KEY, record.key()); @@ -140,7 +160,7 @@ private String mapOperation(String originalOp) { } } - private String tableNameFromSourceStruct(Struct source) { + private void setTableAndTargetFromSourceStruct(Struct source, Struct value) { String db; if (source.schema().field("schema") != null) { // prefer schema if present, e.g. for Postgres @@ -149,10 +169,12 @@ private String tableNameFromSourceStruct(Struct source) { db = source.getString("db"); } String table = source.getString("table"); - return db + "." + table; + + value.put(CdcConstants.COL_CDC_TABLE, db + "." + table); + value.put(CdcConstants.COL_CDC_TARGET, target(db, table)); } - private String tableNameFromSourceMap(Object source) { + private void setTableAndTargetFromSourceMap(Object source, Map value) { Map map = Requirements.requireMap(source, "Debezium transform"); String db; @@ -163,7 +185,15 @@ private String tableNameFromSourceMap(Object source) { db = map.get("db").toString(); } String table = map.get("table").toString(); - return db + "." + table; + + value.put(CdcConstants.COL_CDC_TABLE, db + "." + table); + value.put(CdcConstants.COL_CDC_TARGET, target(db, table)); + } + + private String target(String db, String table) { + return cdcTargetPattern == null || cdcTargetPattern.isEmpty() + ? db + "." + table + : cdcTargetPattern.replace(DB_PLACEHOLDER, db).replace(TABLE_PLACEHOLDER, table); } private Schema makeUpdatedSchema(Schema schema, Schema keySchema) { @@ -176,7 +206,8 @@ private Schema makeUpdatedSchema(Schema schema, Schema keySchema) { builder .field(CdcConstants.COL_CDC_OP, Schema.STRING_SCHEMA) .field(CdcConstants.COL_CDC_TS, Timestamp.SCHEMA) - .field(CdcConstants.COL_CDC_TABLE, Schema.STRING_SCHEMA); + .field(CdcConstants.COL_CDC_TABLE, Schema.STRING_SCHEMA) + .field(CdcConstants.COL_CDC_TARGET, Schema.STRING_SCHEMA); if (keySchema != null) { builder.field(CdcConstants.COL_CDC_KEY, keySchema); @@ -187,12 +218,9 @@ private Schema makeUpdatedSchema(Schema schema, Schema keySchema) { @Override public ConfigDef config() { - return EMPTY_CONFIG; + return CONFIG_DEF; } @Override public void close() {} - - @Override - public void configure(Map configs) {} } diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DmsTransform.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DmsTransform.java index 130275137bbe..db88ce1922e3 100644 --- a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DmsTransform.java +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/DmsTransform.java @@ -19,7 +19,6 @@ package io.tabular.iceberg.connect.transforms; import java.util.Map; -import jdk.jfr.Experimental; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; @@ -28,7 +27,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Experimental public class DmsTransform> implements Transformation { private static final Logger LOG = LoggerFactory.getLogger(DmsTransform.class.getName()); diff --git a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java index fa7f8914b911..46e01d1186f6 100644 --- a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java +++ b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/DebeziumTransformTest.java @@ -73,6 +73,8 @@ public void testDmsTransformNull() { @SuppressWarnings("unchecked") public void testDebeziumTransformSchemaless() { try (DebeziumTransform smt = new DebeziumTransform<>()) { + smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); + Map event = createDebeziumEventMap("u"); Map key = ImmutableMap.of("account_id", 1L); SinkRecord record = new SinkRecord("topic", 0, null, key, null, event, 0); @@ -83,6 +85,7 @@ public void testDebeziumTransformSchemaless() { assertThat(value.get("account_id")).isEqualTo(1); assertThat(value.get("_cdc_table")).isEqualTo("schema.tbl"); + assertThat(value.get("_cdc_target")).isEqualTo("schema_x.tbl_x"); assertThat(value.get("_cdc_op")).isEqualTo("U"); assertThat(value.get("_cdc_key")).isInstanceOf(Map.class); } @@ -91,6 +94,8 @@ public void testDebeziumTransformSchemaless() { @Test public void testDebeziumTransformWithSchema() { try (DebeziumTransform smt = new DebeziumTransform<>()) { + smt.configure(ImmutableMap.of("cdc.target.pattern", "{db}_x.{table}_x")); + Struct event = createDebeziumEventStruct("u"); Struct key = new Struct(KEY_SCHEMA).put("account_id", 1L); SinkRecord record = new SinkRecord("topic", 0, KEY_SCHEMA, key, VALUE_SCHEMA, event, 0); @@ -101,6 +106,7 @@ public void testDebeziumTransformWithSchema() { assertThat(value.get("account_id")).isEqualTo(1L); assertThat(value.get("_cdc_table")).isEqualTo("schema.tbl"); + assertThat(value.get("_cdc_target")).isEqualTo("schema_x.tbl_x"); assertThat(value.get("_cdc_op")).isEqualTo("U"); assertThat(value.get("_cdc_key")).isInstanceOf(Struct.class); }