diff --git a/docs/en/connector-v2/sink/Iceberg.md b/docs/en/connector-v2/sink/Iceberg.md
index 0b32bc7734f5..0ca271a60279 100644
--- a/docs/en/connector-v2/sink/Iceberg.md
+++ b/docs/en/connector-v2/sink/Iceberg.md
@@ -10,6 +10,7 @@
> Spark
> Flink
+> SeaTunnel Zeta
## Description
diff --git a/pom.xml b/pom.xml
index 7235af15e9b7..3c9865273fb1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -85,7 +85,7 @@
2.7
2.13.3
1.18.24
- 1.24.0
+ 1.20
1.11.1
false
false
diff --git a/release-note.md b/release-note.md
index 0ded29987d68..e57decf5bf3c 100644
--- a/release-note.md
+++ b/release-note.md
@@ -186,6 +186,7 @@
- [Transform-V2] Add support CatalogTable for FilterFieldTransform (#4422)
- [Transform-V2] Add catalog support for SQL Transform plugin (#4819)
- [Connector-V2] [Assert] Support check the precision and scale of Decimal type (#6110)
+- [Connector-V2] [Iceberg] Support iceberg sink #6198
### Zeta(ST-Engine)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
index a4f082a3eae2..d9947ec324cc 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml
@@ -69,8 +69,18 @@
org.glassfish.jersey.core
*
+
+
+ com.github.luben
+ zstd-jni
+
+
+ com.github.luben
+ zstd-jni
+ 1.5.5-5
+
@@ -84,6 +94,10 @@
io.debezium
debezium-embedded
+
+ com.github.luben
+ zstd-jni
+
com.zaxxer
HikariCP
diff --git a/seatunnel-connectors-v2/connector-iceberg/pom.xml b/seatunnel-connectors-v2/connector-iceberg/pom.xml
index 9a3206d2fe68..309900b09c77 100644
--- a/seatunnel-connectors-v2/connector-iceberg/pom.xml
+++ b/seatunnel-connectors-v2/connector-iceberg/pom.xml
@@ -34,8 +34,19 @@
1.13.1
1.11.3
2.3.9
+ connector-iceberg
+
+
+
+ com.github.luben
+ zstd-jni
+ 1.5.5-5
+
+
+
+
org.apache.seatunnel
@@ -54,6 +65,7 @@
iceberg-common
${iceberg.version}
+
org.apache.iceberg
iceberg-api
@@ -200,4 +212,44 @@
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+ shade
+
+ package
+
+
+
+ org.apache.avro
+
+ ${seatunnel.shade.package}.${connector.name}.org.apache.avro
+
+
+ org.apache.orc
+ ${seatunnel.shade.package}.${connector.name}.org.apache.orc
+
+
+ org.apache.parquet
+
+ ${seatunnel.shade.package}.${connector.name}.org.apache.parquet
+
+
+ shaded.parquet
+
+ ${seatunnel.shade.package}.${connector.name}.shaded.parquet
+
+
+
+
+
+
+
+
+
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
index b400347354c3..554099ef7b8c 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
@@ -26,8 +26,7 @@
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynMethods;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.Serializable;
@@ -38,11 +37,10 @@
import java.nio.file.Paths;
import java.util.List;
+@Slf4j
public class IcebergCatalogFactory implements Serializable {
private static final long serialVersionUID = -6003040601422350869L;
- private static final Logger LOG =
- LoggerFactory.getLogger(IcebergCatalogFactory.class.getName());
private static final List HADOOP_CONF_FILES =
ImmutableList.of("core-site.xml", "hdfs-site.xml", "hive-site.xml");
private CommonConfig config;
@@ -52,6 +50,8 @@ public IcebergCatalogFactory(CommonConfig config) {
}
public Catalog loadCatalog() {
+ // When using the seatunel engine, set the current class loader to prevent loading failures
+ Thread.currentThread().setContextClassLoader(IcebergCatalogFactory.class.getClassLoader());
return CatalogUtil.buildIcebergCatalog(
config.getCatalogName(), config.getCatalogProps(), loadHadoopConfig(config));
}
@@ -77,7 +77,7 @@ private Object loadHadoopConfig(CommonConfig config) {
}
if (configClass == null) {
- LOG.info("Hadoop not found on classpath, not creating Hadoop config");
+ log.info("Hadoop not found on classpath, not creating Hadoop config");
return null;
}
@@ -100,7 +100,7 @@ private Object loadHadoopConfig(CommonConfig config) {
try {
addResourceMethod.invoke(path.toUri().toURL());
} catch (IOException e) {
- LOG.warn(
+ log.warn(
"Error adding Hadoop resource {}, resource was not added",
path,
e);
@@ -109,13 +109,13 @@ private Object loadHadoopConfig(CommonConfig config) {
});
}
config.getHadoopProps().forEach(setMethod::invoke);
- LOG.info("Hadoop config initialized: {}", configClass.getName());
+ log.info("Hadoop config initialized: {}", configClass.getName());
return result;
} catch (InstantiationException
| IllegalAccessException
| NoSuchMethodException
| InvocationTargetException e) {
- LOG.warn(
+ log.warn(
"Hadoop found on classpath but could not create config, proceeding without config",
e);
}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java
index 4d55d5db7bb0..591c2450d0fa 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java
@@ -19,6 +19,7 @@
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
+import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
@@ -36,7 +37,7 @@ public class IcebergTableLoader implements Closeable, Serializable {
private final IcebergCatalogFactory icebergCatalogFactory;
private final String tableIdentifierStr;
- private Catalog catalog;
+ private transient Catalog catalog;
public IcebergTableLoader(
@NonNull IcebergCatalogFactory icebergCatalogFactory,
@@ -54,12 +55,15 @@ public TableIdentifier getTableIdentifier() {
}
public IcebergTableLoader open() {
- catalog = icebergCatalogFactory.loadCatalog();
+ catalog = CachingCatalog.wrap(icebergCatalogFactory.loadCatalog());
return this;
}
public Table loadTable() {
TableIdentifier tableIdentifier = TableIdentifier.parse(tableIdentifierStr);
+ if (catalog == null) {
+ open();
+ }
return catalog.loadTable(tableIdentifier);
}
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
index 92b3fe35b5f8..a23e6a6b1e84 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
@@ -24,6 +24,7 @@
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
/** Iceberg aggregated committer */
@@ -31,8 +32,7 @@
public class IcebergAggregatedCommitter
implements SinkAggregatedCommitter {
- private transient IcebergFilesCommitter filesCommitter;
- private IcebergTableLoader tableLoader;
+ private final IcebergFilesCommitter filesCommitter;
public IcebergAggregatedCommitter(SinkConfig config) {
IcebergTableLoader tableLoader = IcebergTableLoader.create(config).open();
@@ -45,7 +45,7 @@ public List commit(
for (IcebergAggregatedCommitInfo commitInfo : aggregatedCommitInfo) {
commitFiles(commitInfo.commitInfos);
}
- return null;
+ return Collections.emptyList();
}
private void commitFiles(List commitInfos) {
diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SeaTunnelRowConverter.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SeaTunnelRowConverter.java
index 8b2be3711102..a9fb4ef5ae47 100644
--- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SeaTunnelRowConverter.java
+++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SeaTunnelRowConverter.java
@@ -294,7 +294,6 @@ protected List
-
-
- com.github.luben
- zstd-jni
- ${zstd-jni.version}
- provided
-
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index 79ebf8bbf01a..5c1171a82dab 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -179,10 +179,6 @@
io.netty:netty-buffer:jar
io.netty:netty-common:jar
-
- com.github.luben:zstd-jni:jar
- com.github.luben:zstd-jni:jar
-
${artifact.file.name}
/lib
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
index fe9139019dea..d4efd489d0ea 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkCDCIT.java
@@ -71,8 +71,8 @@
@Slf4j
@DisabledOnContainer(
value = {},
- type = {EngineType.SEATUNNEL, EngineType.SPARK},
- disabledReason = "")
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
@DisabledOnOs(OS.WINDOWS)
public class IcebergSinkCDCIT extends TestSuiteBase implements TestResource {
@@ -116,12 +116,15 @@ private String zstdUrl() {
protected final ContainerExtendedFactory extendedFactory =
container -> {
container.execInContainer("sh", "-c", "mkdir -p " + CATALOG_DIR);
- container.execInContainer("sh", "-c", "chown -R flink " + CATALOG_DIR);
- container.execInContainer(
- "sh",
- "-c",
- "mkdir -p /tmp/seatunnel/lib && cd /tmp/seatunnel/lib && wget "
- + zstdUrl());
+ container.execInContainer("sh", "-c", "chmod -R 777 " + CATALOG_DIR);
+ Container.ExecResult extraCommandsZSTD =
+ container.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Iceberg/lib && cd /tmp/seatunnel/plugins/Iceberg/lib && wget "
+ + zstdUrl());
+ Assertions.assertEquals(
+ 0, extraCommandsZSTD.getExitCode(), extraCommandsZSTD.getStderr());
Container.ExecResult extraCommands =
container.execInContainer(
"sh",
@@ -198,7 +201,6 @@ public void testMysqlCdcCheckDataE2e(TestContainer container)
}
return null;
});
-
insertAndCheckData(container);
upsertAndCheckData(container);
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkIT.java
index 32c3b4007ff6..20c1b02914e5 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkIT.java
@@ -24,8 +24,8 @@
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
-import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
@@ -55,8 +55,8 @@
@Slf4j
@DisabledOnContainer(
- value = {},
- type = {EngineType.SEATUNNEL, EngineType.SPARK},
+ value = {TestContainerId.SPARK_2_4},
+ type = {},
disabledReason = "")
@DisabledOnOs(OS.WINDOWS)
public class IcebergSinkIT extends TestSuiteBase {
@@ -73,14 +73,12 @@ private String zstdUrl() {
protected final ContainerExtendedFactory extendedFactory =
container -> {
container.execInContainer("sh", "-c", "mkdir -p " + CATALOG_DIR);
- container.execInContainer("sh", "-c", "chown 777 " + CATALOG_DIR);
- Container.ExecResult extraCommands =
- container.execInContainer(
- "sh",
- "-c",
- "mkdir -p /tmp/seatunnel/lib && cd /tmp/seatunnel/lib && wget "
- + zstdUrl());
- Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr());
+ container.execInContainer("sh", "-c", "chmod -R 777 " + CATALOG_DIR);
+ container.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Iceberg/lib && cd /tmp/seatunnel/plugins/Iceberg/lib && wget "
+ + zstdUrl());
};
private final String NAMESPACE_TAR = NAMESPACE + ".tar.gz";
diff --git a/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml b/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
index 85e771e5c789..3e9f4b85a39f 100644
--- a/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
+++ b/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/pom.xml
@@ -47,19 +47,7 @@
org.apache.hadoop
hadoop-client
${hadoop3.version}
-
-
- org.apache.avro
- avro
-
-
-
- org.apache.avro
- avro
- ${avro.version}
-
-