From 18fed10c7e60b53bb18951c0e27e6c517974c50b Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Thu, 20 Jul 2023 02:41:20 +0200 Subject: [PATCH] Add SMT project with copy value SMT (#50) (cherry picked from commit 639b0d5b41b827d984aae04efe594315ec2b2b91) --- kafka-connect-runtime/build.gradle | 138 +++++++++++ kafka-connect-transforms/README.md | 24 ++ kafka-connect-transforms/build.gradle | 11 + .../iceberg/connect/transforms/CopyValue.java | 152 ++++++++++++ .../connect/transforms/CopyValueTest.java | 88 +++++++ settings.gradle | 223 ++---------------- versions.toml | 63 +++++ 7 files changed, 490 insertions(+), 209 deletions(-) create mode 100644 kafka-connect-runtime/build.gradle create mode 100644 kafka-connect-transforms/README.md create mode 100644 kafka-connect-transforms/build.gradle create mode 100644 kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CopyValue.java create mode 100644 kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/CopyValueTest.java create mode 100644 versions.toml diff --git a/kafka-connect-runtime/build.gradle b/kafka-connect-runtime/build.gradle new file mode 100644 index 000000000000..5f98eb279209 --- /dev/null +++ b/kafka-connect-runtime/build.gradle @@ -0,0 +1,138 @@ +plugins { + id "distribution" +} + +configurations { + hive { + extendsFrom runtimeClasspath + } + all { + resolutionStrategy.force "net.minidev:json-smart:2.4.11" + resolutionStrategy.force "com.nimbusds:nimbus-jose-jwt:9.31" + resolutionStrategy.force "org.codehaus.jettison:jettison:1.5.4" + resolutionStrategy.force "org.xerial.snappy:snappy-java:1.1.10.1" + } +} + +dependencies { + implementation project(":iceberg-kafka-connect") + implementation project(":iceberg-kafka-connect-transforms") + implementation libs.bundles.iceberg.ext + implementation libs.bundles.aws + implementation(libs.hadoop.common) { + exclude group: "log4j" + exclude group: "org.slf4j" + exclude group: "ch.qos.reload4j" + } + + hive libs.iceberg.hive.metastore + hive(libs.hive.metastore) { + exclude group: "org.apache.avro", module: "avro" + exclude group: "org.slf4j", module: "slf4j-log4j12" + exclude group: "org.pentaho" // missing dependency + exclude group: "org.apache.hbase" + exclude group: "org.apache.logging.log4j" + exclude group: "co.cask.tephra" + exclude group: "com.google.code.findbugs", module: "jsr305" + exclude group: "org.eclipse.jetty.aggregate", module: "jetty-all" + exclude group: "org.eclipse.jetty.orbit", module: "javax.servlet" + exclude group: "org.apache.parquet", module: "parquet-hadoop-bundle" + exclude group: "com.tdunning", module: "json" + exclude group: "javax.transaction", module: "transaction-api" + exclude group: "com.zaxxer", module: "HikariCP" + exclude group: "org.apache.hadoop", module: "hadoop-yarn-server-common" + exclude group: "org.apache.hadoop", module: "hadoop-yarn-server-applicationhistoryservice" + exclude group: "org.apache.hadoop", module: "hadoop-yarn-server-resourcemanager" + exclude group: "org.apache.hadoop", module: "hadoop-yarn-server-web-proxy" + exclude group: "org.apache.hive", module: "hive-service-rpc" + exclude group: "com.github.joshelser", module: "dropwizard-metrics-hadoop-metrics2-reporter" + } + hive(libs.hadoop.client) { + exclude group: "org.apache.avro", module: "avro" + exclude group: "org.slf4j", module: "slf4j-log4j12" + } + + testImplementation libs.bundles.iceberg + testImplementation libs.bundles.aws + testImplementation libs.bundles.jackson + testImplementation libs.bundles.kafka.connect + + testImplementation libs.junit.api + testRuntimeOnly libs.junit.engine + + testImplementation libs.mockito + testImplementation libs.assertj + testImplementation libs.awaitility + testImplementation libs.testcontainers + testImplementation libs.testcontainers.kafka + testImplementation libs.http.client +} + +processResources { + filter { + it.replace("__VERSION__", project.version) + } +} + +distributions { + main { + contents { + from(processResources.destinationDir) { + include "manifest.json" + } + into("lib/") { + from configurations.runtimeClasspath + } + into("doc/") { + from "$rootDir/LICENSE" + from "$rootDir/README.md" + } + into("assets/") { + from "$rootDir/logos" + } + } + } + hive { + contents { + from(processResources.destinationDir) { + include "manifest.json" + } + into("lib/") { + from configurations.hive + } + into("doc/") { + from "$rootDir/LICENSE" + from "$rootDir/README.md" + } + into("assets/") { + from "$rootDir/logos" + } + } + } +} + +publishing { + publications { + main(MavenPublication) { + artifact distZip + } + hive(MavenPublication) { + artifact hiveDistZip + } + } +} + +tasks.jar.enabled = false + +tasks.distTar.enabled = false +distZip.dependsOn processResources +installDist.dependsOn processResources + +tasks.hiveDistTar.enabled = false +hiveDistZip.dependsOn processResources +installHiveDist.dependsOn processResources + +// build the install before test so it can be installed into kafka connect +test.dependsOn installDist + +assemble.dependsOn distZip, hiveDistZip diff --git a/kafka-connect-transforms/README.md b/kafka-connect-transforms/README.md new file mode 100644 index 000000000000..b89de867b866 --- /dev/null +++ b/kafka-connect-transforms/README.md @@ -0,0 +1,24 @@ +# SMTs for the Apache Iceberg Sink Connector + +This project contains some SMTs that could be useful when transforming Kafka data for use by +the Iceberg sink connector. + +# CopyValue + +The `CopyValue` SMT copies a value from one field to a new field. + +## Configuration + +| Property | Description | +|------------------|-------------------| +| source.field | Source field name | +| target.field | Target field name | + +## Example + +```sql +"transforms": "copyId", +"transforms.copyId.type": "io.tabular.iceberg.connect.transforms.CopyValue", +"transforms.copyId.source.field": "id", +"transforms.copyId.target.field": "id_copy", +``` diff --git a/kafka-connect-transforms/build.gradle b/kafka-connect-transforms/build.gradle new file mode 100644 index 000000000000..40ec2611ade2 --- /dev/null +++ b/kafka-connect-transforms/build.gradle @@ -0,0 +1,11 @@ +dependencies { + implementation libs.slf4j + compileOnly libs.bundles.kafka.connect + + testImplementation libs.junit.api + testRuntimeOnly libs.junit.engine +} + +configurations { + testImplementation.extendsFrom compileOnly +} diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CopyValue.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CopyValue.java new file mode 100644 index 000000000000..f5a78464f467 --- /dev/null +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/CopyValue.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.transforms; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +import java.util.HashMap; +import java.util.Map; +import jdk.jfr.Experimental; +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +@Experimental +public class CopyValue> implements Transformation { + + private interface ConfigName { + + String SOURCE_FIELD = "source.field"; + String TARGET_FIELD = "target.field"; + } + + public static final ConfigDef CONFIG_DEF = + new ConfigDef() + .define( + ConfigName.SOURCE_FIELD, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Source field name.") + .define( + ConfigName.TARGET_FIELD, + ConfigDef.Type.STRING, + ConfigDef.Importance.HIGH, + "Target field name."); + + private String sourceField; + private String targetField; + private Cache schemaUpdateCache; + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + sourceField = config.getString(ConfigName.SOURCE_FIELD); + targetField = config.getString(ConfigName.TARGET_FIELD); + schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16)); + } + + @Override + public R apply(R record) { + if (operatingValue(record) == null) { + return record; + } else if (operatingSchema(record) == null) { + return applySchemaless(record); + } else { + return applyWithSchema(record); + } + } + + private R applySchemaless(R record) { + final Map value = requireMap(operatingValue(record), "copy value"); + + final Map updatedValue = new HashMap<>(value); + updatedValue.put(targetField, value.get(sourceField)); + + return newRecord(record, null, updatedValue); + } + + private R applyWithSchema(R record) { + final Struct value = requireStruct(operatingValue(record), "copy value"); + + Schema updatedSchema = schemaUpdateCache.get(value.schema()); + if (updatedSchema == null) { + updatedSchema = makeUpdatedSchema(value.schema()); + schemaUpdateCache.put(value.schema(), updatedSchema); + } + + final Struct updatedValue = new Struct(updatedSchema); + + for (Field field : value.schema().fields()) { + updatedValue.put(field.name(), value.get(field)); + } + updatedValue.put(targetField, value.get(sourceField)); + + return newRecord(record, updatedSchema, updatedValue); + } + + private Schema makeUpdatedSchema(Schema schema) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + + for (Field field : schema.fields()) { + builder.field(field.name(), field.schema()); + } + builder.field(targetField, schema.field(sourceField).schema()); + + return builder.build(); + } + + @Override + public void close() { + schemaUpdateCache = null; + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + protected Object operatingValue(R record) { + return record.value(); + } + + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + updatedSchema, + updatedValue, + record.timestamp()); + } +} diff --git a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/CopyValueTest.java b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/CopyValueTest.java new file mode 100644 index 000000000000..a76dea923adf --- /dev/null +++ b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/CopyValueTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.transforms; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.Test; + +public class CopyValueTest { + + @Test + public void testCopyValueNull() { + try (CopyValue smt = new CopyValue<>()) { + SinkRecord record = new SinkRecord("topic", 0, null, null, null, null, 0); + SinkRecord result = smt.apply(record); + assertNull(result.value()); + } + } + + @Test + @SuppressWarnings("unchecked") + public void testCopyValueSchemaless() { + Map props = new HashMap<>(); + props.put("source.field", "data"); + props.put("target.field", "data_copy"); + + Map value = new HashMap<>(); + value.put("id", 123L); + value.put("data", "foobar"); + + try (CopyValue smt = new CopyValue<>()) { + smt.configure(props); + SinkRecord record = new SinkRecord("topic", 0, null, null, null, value, 0); + SinkRecord result = smt.apply(record); + Map newValue = (Map) result.value(); + assertEquals(3, newValue.size()); + assertEquals("foobar", newValue.get("data_copy")); + } + } + + @Test + public void testCopyValueWithSchema() { + Map props = new HashMap<>(); + props.put("source.field", "data"); + props.put("target.field", "data_copy"); + + Schema schema = + SchemaBuilder.struct().field("id", Schema.INT64_SCHEMA).field("data", Schema.STRING_SCHEMA); + + Struct value = new Struct(schema).put("id", 123L).put("data", "foobar"); + + try (CopyValue smt = new CopyValue<>()) { + smt.configure(props); + SinkRecord record = new SinkRecord("topic", 0, null, null, schema, value, 0); + SinkRecord result = smt.apply(record); + + Schema newSchema = result.valueSchema(); + assertEquals(3, newSchema.fields().size()); + assertEquals(Schema.STRING_SCHEMA, newSchema.field("data_copy").schema()); + + Struct newValue = (Struct) result.value(); + assertEquals("foobar", newValue.get("data_copy")); + } + } +} diff --git a/settings.gradle b/settings.gradle index 103741389a26..1ce736247277 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,214 +1,19 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -rootProject.name = 'iceberg' -include 'bom' -include 'api' -include 'common' -include 'core' -include 'data' -include 'aliyun' -include 'aws' -include 'aws-bundle' -include 'azure' -include 'azure-bundle' -include 'orc' -include 'arrow' -include 'parquet' -include 'bundled-guava' -include 'spark' -include 'hive-metastore' -include 'nessie' -include 'gcp' -include 'gcp-bundle' -include 'dell' -include 'snowflake' -include 'delta-lake' -include 'open-api' - -project(':bom').name = 'iceberg-bom' -project(':api').name = 'iceberg-api' -project(':common').name = 'iceberg-common' -project(':core').name = 'iceberg-core' -project(':data').name = 'iceberg-data' -project(':aliyun').name = 'iceberg-aliyun' -project(':aws').name = 'iceberg-aws' -project(':aws-bundle').name = 'iceberg-aws-bundle' -project(':azure').name = 'iceberg-azure' -project(':azure-bundle').name = 'iceberg-azure-bundle' -project(':orc').name = 'iceberg-orc' -project(':arrow').name = 'iceberg-arrow' -project(':parquet').name = 'iceberg-parquet' -project(':bundled-guava').name = 'iceberg-bundled-guava' -project(':spark').name = 'iceberg-spark' -project(':hive-metastore').name = 'iceberg-hive-metastore' -project(':nessie').name = 'iceberg-nessie' -project(':gcp').name = 'iceberg-gcp' -project(':gcp-bundle').name = 'iceberg-gcp-bundle' -project(':dell').name = 'iceberg-dell' -project(':snowflake').name = 'iceberg-snowflake' -project(':delta-lake').name = 'iceberg-delta-lake' -project(':open-api').name = 'iceberg-open-api' - -if (null != System.getProperty("allModules")) { - System.setProperty("flinkVersions", System.getProperty("knownFlinkVersions")) - System.setProperty("sparkVersions", System.getProperty("knownSparkVersions")) - System.setProperty("hiveVersions", System.getProperty("knownHiveVersions")) - System.setProperty("kafkaVersions", System.getProperty("knownKafkaVersions")) -} - -List knownFlinkVersions = System.getProperty("knownFlinkVersions").split(",") -String flinkVersionsString = System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions") -List flinkVersions = flinkVersionsString != null && !flinkVersionsString.isEmpty() ? flinkVersionsString.split(",") : [] - -if (!knownFlinkVersions.containsAll(flinkVersions)) { - throw new GradleException("Found unsupported Flink versions: " + (flinkVersions - knownFlinkVersions)) -} - -List knownHiveVersions = System.getProperty("knownHiveVersions").split(",") -String hiveVersionsString = System.getProperty("hiveVersions") != null ? System.getProperty("hiveVersions") : System.getProperty("defaultHiveVersions") -List hiveVersions = hiveVersionsString != null && !hiveVersionsString.isEmpty() ? hiveVersionsString.split(",") : [] - -if (!knownHiveVersions.containsAll(hiveVersions)) { - throw new GradleException("Found unsupported Hive versions: " + (hiveVersions - knownHiveVersions)) -} - -List knownSparkVersions = System.getProperty("knownSparkVersions").split(",") -String sparkVersionsString = System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions") -List sparkVersions = sparkVersionsString != null && !sparkVersionsString.isEmpty() ? sparkVersionsString.split(",") : [] - -if (!knownSparkVersions.containsAll(sparkVersions)) { - throw new GradleException("Found unsupported Spark versions: " + (sparkVersions - knownSparkVersions)) -} - -List knownKafkaVersions = System.getProperty("knownKafkaVersions").split(",") -String kafkaVersionsString = System.getProperty("kafkaVersions") != null ? System.getProperty("kafkaVersions") : System.getProperty("defaultKafkaVersions") -List kafkaVersions = kafkaVersionsString != null && !kafkaVersionsString.isEmpty() ? kafkaVersionsString.split(",") : [] - -if (!knownKafkaVersions.containsAll(kafkaVersions)) { - throw new GradleException("Found unsupported Kafka versions: " + (kafkaVersions - knownKafkaVersions)) -} - -List knownScalaVersions = System.getProperty("knownScalaVersions").split(",") -String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") - -if (!knownScalaVersions.contains(scalaVersion)) { - throw new GradleException("Found unsupported Scala version: " + scalaVersion) -} - -if (!flinkVersions.isEmpty()) { - include 'flink' - project(':flink').name = 'iceberg-flink' -} - -if (flinkVersions.contains("1.18")) { - include ":iceberg-flink:flink-1.18" - include ":iceberg-flink:flink-runtime-1.18" - project(":iceberg-flink:flink-1.18").projectDir = file('flink/v1.18/flink') - project(":iceberg-flink:flink-1.18").name = "iceberg-flink-1.18" - project(":iceberg-flink:flink-runtime-1.18").projectDir = file('flink/v1.18/flink-runtime') - project(":iceberg-flink:flink-runtime-1.18").name = "iceberg-flink-runtime-1.18" -} - -if (flinkVersions.contains("1.19")) { - include ":iceberg-flink:flink-1.19" - include ":iceberg-flink:flink-runtime-1.19" - project(":iceberg-flink:flink-1.19").projectDir = file('flink/v1.19/flink') - project(":iceberg-flink:flink-1.19").name = "iceberg-flink-1.19" - project(":iceberg-flink:flink-runtime-1.19").projectDir = file('flink/v1.19/flink-runtime') - project(":iceberg-flink:flink-runtime-1.19").name = "iceberg-flink-runtime-1.19" +dependencyResolutionManagement { + versionCatalogs { + libs { + from(files("versions.toml")) + } + } } -if (flinkVersions.contains("1.20")) { - include ":iceberg-flink:flink-1.20" - include ":iceberg-flink:flink-runtime-1.20" - project(":iceberg-flink:flink-1.20").projectDir = file('flink/v1.20/flink') - project(":iceberg-flink:flink-1.20").name = "iceberg-flink-1.20" - project(":iceberg-flink:flink-runtime-1.20").projectDir = file('flink/v1.20/flink-runtime') - project(":iceberg-flink:flink-runtime-1.20").name = "iceberg-flink-runtime-1.20" -} +include "iceberg-kafka-connect" +project(":iceberg-kafka-connect").projectDir = file("kafka-connect") -if (sparkVersions.contains("3.3")) { - include ":iceberg-spark:spark-3.3_${scalaVersion}" - include ":iceberg-spark:spark-extensions-3.3_${scalaVersion}" - include ":iceberg-spark:spark-runtime-3.3_${scalaVersion}" - project(":iceberg-spark:spark-3.3_${scalaVersion}").projectDir = file('spark/v3.3/spark') - project(":iceberg-spark:spark-3.3_${scalaVersion}").name = "iceberg-spark-3.3_${scalaVersion}" - project(":iceberg-spark:spark-extensions-3.3_${scalaVersion}").projectDir = file('spark/v3.3/spark-extensions') - project(":iceberg-spark:spark-extensions-3.3_${scalaVersion}").name = "iceberg-spark-extensions-3.3_${scalaVersion}" - project(":iceberg-spark:spark-runtime-3.3_${scalaVersion}").projectDir = file('spark/v3.3/spark-runtime') - project(":iceberg-spark:spark-runtime-3.3_${scalaVersion}").name = "iceberg-spark-runtime-3.3_${scalaVersion}" -} +include "iceberg-kafka-connect-events" +project(":iceberg-kafka-connect-events").projectDir = file("kafka-connect-events") -if (sparkVersions.contains("3.4")) { - include ":iceberg-spark:spark-3.4_${scalaVersion}" - include ":iceberg-spark:spark-extensions-3.4_${scalaVersion}" - include ":iceberg-spark:spark-runtime-3.4_${scalaVersion}" - project(":iceberg-spark:spark-3.4_${scalaVersion}").projectDir = file('spark/v3.4/spark') - project(":iceberg-spark:spark-3.4_${scalaVersion}").name = "iceberg-spark-3.4_${scalaVersion}" - project(":iceberg-spark:spark-extensions-3.4_${scalaVersion}").projectDir = file('spark/v3.4/spark-extensions') - project(":iceberg-spark:spark-extensions-3.4_${scalaVersion}").name = "iceberg-spark-extensions-3.4_${scalaVersion}" - project(":iceberg-spark:spark-runtime-3.4_${scalaVersion}").projectDir = file('spark/v3.4/spark-runtime') - project(":iceberg-spark:spark-runtime-3.4_${scalaVersion}").name = "iceberg-spark-runtime-3.4_${scalaVersion}" -} - -if (sparkVersions.contains("3.5")) { - include ":iceberg-spark:spark-3.5_${scalaVersion}" - include ":iceberg-spark:spark-extensions-3.5_${scalaVersion}" - include ":iceberg-spark:spark-runtime-3.5_${scalaVersion}" - project(":iceberg-spark:spark-3.5_${scalaVersion}").projectDir = file('spark/v3.5/spark') - project(":iceberg-spark:spark-3.5_${scalaVersion}").name = "iceberg-spark-3.5_${scalaVersion}" - project(":iceberg-spark:spark-extensions-3.5_${scalaVersion}").projectDir = file('spark/v3.5/spark-extensions') - project(":iceberg-spark:spark-extensions-3.5_${scalaVersion}").name = "iceberg-spark-extensions-3.5_${scalaVersion}" - project(":iceberg-spark:spark-runtime-3.5_${scalaVersion}").projectDir = file('spark/v3.5/spark-runtime') - project(":iceberg-spark:spark-runtime-3.5_${scalaVersion}").name = "iceberg-spark-runtime-3.5_${scalaVersion}" -} +include "iceberg-kafka-connect-transforms" +project(":iceberg-kafka-connect-transforms").projectDir = file("kafka-connect-transforms") -// hive 3 depends on hive 2, so always add hive 2 if hive3 is enabled -if (hiveVersions.contains("2") || hiveVersions.contains("3")) { - include 'mr' - include 'hive-runtime' - - project(':mr').name = 'iceberg-mr' - project(':hive-runtime').name = 'iceberg-hive-runtime' -} - -if (hiveVersions.contains("3")) { - include 'hive3' - include 'hive3-orc-bundle' - project(':hive3').name = 'iceberg-hive3' - project(':hive3-orc-bundle').name = 'iceberg-hive3-orc-bundle' -} - -if (kafkaVersions.contains("3")) { - include 'kafka-connect' - project(':kafka-connect').name = 'iceberg-kafka-connect' - - include ":iceberg-kafka-connect:kafka-connect-events" - project(":iceberg-kafka-connect:kafka-connect-events").projectDir = file('kafka-connect/kafka-connect-events') - project(":iceberg-kafka-connect:kafka-connect-events").name = "iceberg-kafka-connect-events" - - include ":iceberg-kafka-connect:kafka-connect" - project(":iceberg-kafka-connect:kafka-connect").projectDir = file('kafka-connect/kafka-connect') - project(":iceberg-kafka-connect:kafka-connect").name = "iceberg-kafka-connect" - - include ":iceberg-kafka-connect:kafka-connect-runtime" - project(":iceberg-kafka-connect:kafka-connect-runtime").projectDir = file('kafka-connect/kafka-connect-runtime') - project(":iceberg-kafka-connect:kafka-connect-runtime").name = "iceberg-kafka-connect-runtime" -} +include "iceberg-kafka-connect-runtime" +project(":iceberg-kafka-connect-runtime").projectDir = file("kafka-connect-runtime") diff --git a/versions.toml b/versions.toml new file mode 100644 index 000000000000..484faa593a9c --- /dev/null +++ b/versions.toml @@ -0,0 +1,63 @@ +[versions] +assertj-ver = "3.23.1" +avro-ver = "1.11.1" +awaitility-ver = "4.2.0" +aws-ver = "2.20.18" +hadoop-ver = "3.3.5" +hive-ver = "2.3.9" +http-client-ver = "5.2.1" +iceberg-ver = "1.3.1-tabular.4" +jackson-ver = "2.14.2" +junit-ver = "5.9.2" +kafka-ver = "3.4.0" +slf4j-ver = "1.7.36" +testcontainers-ver = "1.18.1" + + +[libraries] +avro = { module = "org.apache.avro:avro", version.ref = "avro-ver" } +aws-dynamodb = { module = "software.amazon.awssdk:dynamodb", version.ref = "aws-ver" } +aws-glue = { module = "software.amazon.awssdk:glue", version.ref = "aws-ver" } +aws-kms = { module = "software.amazon.awssdk:kms", version.ref = "aws-ver" } +aws-s3 = { module = "software.amazon.awssdk:s3", version.ref = "aws-ver" } +aws-sso = { module = "software.amazon.awssdk:sso", version.ref = "aws-ver" } +aws-sts = { module = "software.amazon.awssdk:sts", version.ref = "aws-ver" } +hadoop-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop-ver" } +hadoop-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop-ver" } +hive-metastore = { module = "org.apache.hive:hive-metastore", version.ref = "hive-ver" } +iceberg-api = { module = "org.apache.iceberg:iceberg-api", version.ref = "iceberg-ver" } +iceberg-aws = { module = "org.apache.iceberg:iceberg-aws", version.ref = "iceberg-ver" } +iceberg-common = { module = "org.apache.iceberg:iceberg-common", version.ref = "iceberg-ver" } +iceberg-core = { module = "org.apache.iceberg:iceberg-core", version.ref = "iceberg-ver" } +iceberg-data = { module = "org.apache.iceberg:iceberg-data", version.ref = "iceberg-ver" } +iceberg-gcp = { module = "org.apache.iceberg:iceberg-gcp", version.ref = "iceberg-ver" } +iceberg-guava = { module = "org.apache.iceberg:iceberg-bundled-guava", version.ref = "iceberg-ver" } +iceberg-hive-metastore = { module = "org.apache.iceberg:iceberg-hive-metastore", version.ref = "iceberg-ver" } +iceberg-nessie = { module = "org.apache.iceberg:iceberg-nessie", version.ref = "iceberg-ver" } +iceberg-orc = { module = "org.apache.iceberg:iceberg-orc", version.ref = "iceberg-ver" } +iceberg-parquet = { module = "org.apache.iceberg:iceberg-parquet", version.ref = "iceberg-ver" } +kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka-ver" } +kafka-connect-api = { module = "org.apache.kafka:connect-api", version.ref = "kafka-ver" } +kafka-connect-json = { module = "org.apache.kafka:connect-json", version.ref = "kafka-ver" } +kafka-connect-transforms = { module = "org.apache.kafka:connect-transforms", version.ref = "kafka-ver" } +jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref = "jackson-ver" } +jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson-ver" } +slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j-ver" } + +# test dependencies +assertj = { module = "org.assertj:assertj-core", version.ref = "assertj-ver" } +awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility-ver" } +http-client = { module = "org.apache.httpcomponents.client5:httpclient5", version.ref = "http-client-ver" } +junit-api = { module = "org.junit.jupiter:junit-jupiter-api", version.ref = "junit-ver" } +junit-engine = { module = "org.junit.jupiter:junit-jupiter-engine", version.ref = "junit-ver" } +mockito = "org.mockito:mockito-core:4.8.1" +testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers-ver" } +testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "testcontainers-ver" } + + +[bundles] +aws = ["aws-dynamodb", "aws-glue", "aws-kms", "aws-s3", "aws-sso", "aws-sts"] +iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet"] +iceberg-ext = ["iceberg-aws", "iceberg-gcp", "iceberg-nessie"] +jackson = ["jackson-core", "jackson-databind"] +kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms"]