Skip to content

Commit

Permalink
Add SMT project with copy value SMT (#50)
Browse files Browse the repository at this point in the history
(cherry picked from commit 639b0d5b41b827d984aae04efe594315ec2b2b91)
  • Loading branch information
bryanck authored and ismailsimsek committed Jan 15, 2025
1 parent dbfefb0 commit 18fed10
Show file tree
Hide file tree
Showing 7 changed files with 490 additions and 209 deletions.
138 changes: 138 additions & 0 deletions kafka-connect-runtime/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
plugins {
id "distribution"
}

configurations {
hive {
extendsFrom runtimeClasspath
}
all {
resolutionStrategy.force "net.minidev:json-smart:2.4.11"
resolutionStrategy.force "com.nimbusds:nimbus-jose-jwt:9.31"
resolutionStrategy.force "org.codehaus.jettison:jettison:1.5.4"
resolutionStrategy.force "org.xerial.snappy:snappy-java:1.1.10.1"
}
}

dependencies {
implementation project(":iceberg-kafka-connect")
implementation project(":iceberg-kafka-connect-transforms")
implementation libs.bundles.iceberg.ext
implementation libs.bundles.aws
implementation(libs.hadoop.common) {
exclude group: "log4j"
exclude group: "org.slf4j"
exclude group: "ch.qos.reload4j"
}

hive libs.iceberg.hive.metastore
hive(libs.hive.metastore) {
exclude group: "org.apache.avro", module: "avro"
exclude group: "org.slf4j", module: "slf4j-log4j12"
exclude group: "org.pentaho" // missing dependency
exclude group: "org.apache.hbase"
exclude group: "org.apache.logging.log4j"
exclude group: "co.cask.tephra"
exclude group: "com.google.code.findbugs", module: "jsr305"
exclude group: "org.eclipse.jetty.aggregate", module: "jetty-all"
exclude group: "org.eclipse.jetty.orbit", module: "javax.servlet"
exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle"
exclude group: "com.tdunning", module: "json"
exclude group: "javax.transaction", module: "transaction-api"
exclude group: "com.zaxxer", module: "HikariCP"
exclude group: "org.apache.hadoop", module: "hadoop-yarn-server-common"
exclude group: "org.apache.hadoop", module: "hadoop-yarn-server-applicationhistoryservice"
exclude group: "org.apache.hadoop", module: "hadoop-yarn-server-resourcemanager"
exclude group: "org.apache.hadoop", module: "hadoop-yarn-server-web-proxy"
exclude group: "org.apache.hive", module: "hive-service-rpc"
exclude group: "com.github.joshelser", module: "dropwizard-metrics-hadoop-metrics2-reporter"
}
hive(libs.hadoop.client) {
exclude group: "org.apache.avro", module: "avro"
exclude group: "org.slf4j", module: "slf4j-log4j12"
}

testImplementation libs.bundles.iceberg
testImplementation libs.bundles.aws
testImplementation libs.bundles.jackson
testImplementation libs.bundles.kafka.connect

testImplementation libs.junit.api
testRuntimeOnly libs.junit.engine

testImplementation libs.mockito
testImplementation libs.assertj
testImplementation libs.awaitility
testImplementation libs.testcontainers
testImplementation libs.testcontainers.kafka
testImplementation libs.http.client
}

processResources {
filter {
it.replace("__VERSION__", project.version)
}
}

distributions {
main {
contents {
from(processResources.destinationDir) {
include "manifest.json"
}
into("lib/") {
from configurations.runtimeClasspath
}
into("doc/") {
from "$rootDir/LICENSE"
from "$rootDir/README.md"
}
into("assets/") {
from "$rootDir/logos"
}
}
}
hive {
contents {
from(processResources.destinationDir) {
include "manifest.json"
}
into("lib/") {
from configurations.hive
}
into("doc/") {
from "$rootDir/LICENSE"
from "$rootDir/README.md"
}
into("assets/") {
from "$rootDir/logos"
}
}
}
}

publishing {
publications {
main(MavenPublication) {
artifact distZip
}
hive(MavenPublication) {
artifact hiveDistZip
}
}
}

tasks.jar.enabled = false

tasks.distTar.enabled = false
distZip.dependsOn processResources
installDist.dependsOn processResources

tasks.hiveDistTar.enabled = false
hiveDistZip.dependsOn processResources
installHiveDist.dependsOn processResources

// build the install before test so it can be installed into kafka connect
test.dependsOn installDist

assemble.dependsOn distZip, hiveDistZip
24 changes: 24 additions & 0 deletions kafka-connect-transforms/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SMTs for the Apache Iceberg Sink Connector

This project contains some SMTs that could be useful when transforming Kafka data for use by
the Iceberg sink connector.

# CopyValue

The `CopyValue` SMT copies a value from one field to a new field.

## Configuration

| Property | Description |
|------------------|-------------------|
| source.field | Source field name |
| target.field | Target field name |

## Example

```sql
"transforms": "copyId",
"transforms.copyId.type": "io.tabular.iceberg.connect.transforms.CopyValue",
"transforms.copyId.source.field": "id",
"transforms.copyId.target.field": "id_copy",
```
11 changes: 11 additions & 0 deletions kafka-connect-transforms/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
dependencies {
implementation libs.slf4j
compileOnly libs.bundles.kafka.connect

testImplementation libs.junit.api
testRuntimeOnly libs.junit.engine
}

configurations {
testImplementation.extendsFrom compileOnly
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.transforms;

import static org.apache.kafka.connect.transforms.util.Requirements.requireMap;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;

import java.util.HashMap;
import java.util.Map;
import jdk.jfr.Experimental;
import org.apache.kafka.common.cache.Cache;
import org.apache.kafka.common.cache.LRUCache;
import org.apache.kafka.common.cache.SynchronizedCache;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

@Experimental
public class CopyValue<R extends ConnectRecord<R>> implements Transformation<R> {

private interface ConfigName {

String SOURCE_FIELD = "source.field";
String TARGET_FIELD = "target.field";
}

public static final ConfigDef CONFIG_DEF =
new ConfigDef()
.define(
ConfigName.SOURCE_FIELD,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Source field name.")
.define(
ConfigName.TARGET_FIELD,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"Target field name.");

private String sourceField;
private String targetField;
private Cache<Schema, Schema> schemaUpdateCache;

@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
sourceField = config.getString(ConfigName.SOURCE_FIELD);
targetField = config.getString(ConfigName.TARGET_FIELD);
schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
}

@Override
public R apply(R record) {
if (operatingValue(record) == null) {
return record;
} else if (operatingSchema(record) == null) {
return applySchemaless(record);
} else {
return applyWithSchema(record);
}
}

private R applySchemaless(R record) {
final Map<String, Object> value = requireMap(operatingValue(record), "copy value");

final Map<String, Object> updatedValue = new HashMap<>(value);
updatedValue.put(targetField, value.get(sourceField));

return newRecord(record, null, updatedValue);
}

private R applyWithSchema(R record) {
final Struct value = requireStruct(operatingValue(record), "copy value");

Schema updatedSchema = schemaUpdateCache.get(value.schema());
if (updatedSchema == null) {
updatedSchema = makeUpdatedSchema(value.schema());
schemaUpdateCache.put(value.schema(), updatedSchema);
}

final Struct updatedValue = new Struct(updatedSchema);

for (Field field : value.schema().fields()) {
updatedValue.put(field.name(), value.get(field));
}
updatedValue.put(targetField, value.get(sourceField));

return newRecord(record, updatedSchema, updatedValue);
}

private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());

for (Field field : schema.fields()) {
builder.field(field.name(), field.schema());
}
builder.field(targetField, schema.field(sourceField).schema());

return builder.build();
}

@Override
public void close() {
schemaUpdateCache = null;
}

@Override
public ConfigDef config() {
return CONFIG_DEF;
}

protected Schema operatingSchema(R record) {
return record.valueSchema();
}

protected Object operatingValue(R record) {
return record.value();
}

protected R newRecord(R record, Schema updatedSchema, Object updatedValue) {
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
updatedSchema,
updatedValue,
record.timestamp());
}
}
Loading

0 comments on commit 18fed10

Please sign in to comment.