diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md
index 147fd766a9f..df5b493884f 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -182,6 +182,29 @@ sink {
}
```
+### example2: Kerberos
+
+```bash
+sink {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+ metastore_uri = "thrift://metastore:9083"
+ hive_site_path = "/tmp/hive-site.xml"
+ kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM"
+ kerberos_keytab_path = "/tmp/hive.keytab"
+ krb5_path = "/tmp/krb5.conf"
+ }
+}
+```
+
+Description:
+
+- `hive_site_path`: The path to the `hive-site.xml` file.
+- `kerberos_principal`: The principal for Kerberos authentication.
+- `kerberos_keytab_path`: The keytab file path for Kerberos authentication.
+- `krb5_path`: The path to the `krb5.conf` file used for Kerberos authentication.
+
+
## Hive on s3
### Step 1
diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md
index 5669906c3b9..6667ccc8eeb 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -138,6 +138,30 @@ Source plugin common parameters, please refer to [Source Common Options](../sour
```
+### Example3 : Kerberos
+
+```bash
+source {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ result_table_name = hive_source
+ hive_site_path = "/tmp/hive-site.xml"
+ kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM"
+ kerberos_keytab_path = "/tmp/hive.keytab"
+ krb5_path = "/tmp/krb5.conf"
+ }
+}
+```
+
+Description:
+
+- `hive_site_path`: The path to the `hive-site.xml` file.
+- `kerberos_principal`: The principal for Kerberos authentication.
+- `kerberos_keytab_path`: The keytab file path for Kerberos authentication.
+- `krb5_path`: The path to the `krb5.conf` file used for Kerberos authentication.
+
## Hive on s3
### Step 1
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
index 486ad0b8b65..cadf95c1109 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveContainer.java
@@ -17,9 +17,13 @@
package org.apache.seatunnel.e2e.connector.hive;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -28,6 +32,7 @@
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.DockerLoggerFactory;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.SQLException;
@@ -70,24 +75,58 @@ public String getMetastoreUri() {
return String.format("thrift://%s:%s", getHost(), getMappedPort(HMS_PORT));
}
- public String getHiveJdbcUri() {
- return String.format(
- "jdbc:hive2://%s:%s/default", getHost(), getMappedPort(HIVE_SERVER_PORT));
+ public String getHiveJdbcUri(boolean enableKerberos) {
+ if (enableKerberos) {
+ return String.format(
+ "jdbc:hive2://%s:%s/default;principal=hive/metastore.seatunnel@EXAMPLE.COM",
+ getHost(), getMappedPort(HIVE_SERVER_PORT));
+ } else {
+ return String.format(
+ "jdbc:hive2://%s:%s/default", getHost(), getMappedPort(HIVE_SERVER_PORT));
+ }
}
public HiveMetaStoreClient createMetaStoreClient() throws MetaException {
+ return this.createMetaStoreClient(false);
+ }
+
+ public HiveMetaStoreClient createMetaStoreClient(boolean enableKerberos) throws MetaException {
HiveConf conf = new HiveConf();
conf.set("hive.metastore.uris", getMetastoreUri());
-
+ if (enableKerberos) {
+ conf.addResource("kerberos/hive-site.xml");
+ }
return new HiveMetaStoreClient(conf);
}
public Connection getConnection()
throws ClassNotFoundException, InstantiationException, IllegalAccessException,
SQLException {
- Driver driver = loadHiveJdbcDriver();
+ return getConnection(false);
+ }
- return driver.connect(getHiveJdbcUri(), getJdbcConnectionConfig());
+ public Connection getConnection(boolean enableKerberos)
+ throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+ SQLException {
+ Driver driver = loadHiveJdbcDriver();
+ if (!enableKerberos) {
+ return driver.connect(getHiveJdbcUri(false), getJdbcConnectionConfig());
+ }
+ Configuration authConf = new Configuration();
+ authConf.set("hadoop.security.authentication", "kerberos");
+ Configuration configuration = new Configuration();
+ System.setProperty(
+ "java.security.krb5.conf",
+ ContainerUtil.getResourcesFile("/kerberos/krb5_local.conf").getPath());
+ configuration.set("hadoop.security.authentication", "KERBEROS");
+ try {
+ UserGroupInformation.setConfiguration(configuration);
+ UserGroupInformation.loginUserFromKeytab(
+ "hive/metastore.seatunnel@EXAMPLE.COM", "/tmp/hive.keytab");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return driver.connect(getHiveJdbcUri(true), getJdbcConnectionConfig());
}
public Driver loadHiveJdbcDriver()
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
index 5973d697582..bfa83dfb3b9 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
@@ -180,6 +180,7 @@ public void startUp() throws Exception {
.await()
.atMost(360, TimeUnit.SECONDS)
.pollDelay(Duration.ofSeconds(10L))
+ .pollInterval(Duration.ofSeconds(3L))
.untilAsserted(this::initializeConnection);
prepareTable();
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
new file mode 100644
index 00000000000..c2fca452fa8
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveKerberosIT.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hive;
+
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK})
+@Slf4j
+public class HiveKerberosIT extends SeaTunnelContainer {
+
+ // It is necessary to set up a separate network with a fixed name, otherwise network issues may
+ // cause Kerberos authentication failure
+ Network NETWORK =
+ Network.builder()
+ .createNetworkCmdModifier(cmd -> cmd.withName("SEATUNNEL"))
+ .enableIpv6(false)
+ .build();
+
+ private static final String CREATE_SQL =
+ "CREATE TABLE test_hive_sink_on_hdfs_with_kerberos"
+ + "("
+ + " pk_id BIGINT,"
+ + " name STRING,"
+ + " score INT"
+ + ")";
+
+ private static final String HMS_HOST = "metastore";
+ private static final String HIVE_SERVER_HOST = "hiveserver2";
+ private GenericContainer> kerberosContainer;
+ private static final String KERBEROS_IMAGE_NAME = "zhangshenghang/kerberos-server:1.0";
+
+ private String hiveExeUrl() {
+ return "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar";
+ }
+
+ private String libFb303Url() {
+ return "https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar";
+ }
+
+ private String hadoopAwsUrl() {
+ return "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.1.4/hadoop-aws-3.1.4.jar";
+ }
+
+ private String aliyunSdkOssUrl() {
+ return "https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.4.1/aliyun-sdk-oss-3.4.1.jar";
+ }
+
+ private String jdomUrl() {
+ return "https://repo1.maven.org/maven2/org/jdom/jdom/1.1/jdom-1.1.jar";
+ }
+
+ private String hadoopAliyunUrl() {
+ return "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.1.4/hadoop-aliyun-3.1.4.jar";
+ }
+
+ private String hadoopCosUrl() {
+ return "https://repo1.maven.org/maven2/com/qcloud/cos/hadoop-cos/2.6.5-8.0.2/hadoop-cos-2.6.5-8.0.2.jar";
+ }
+
+ private HiveContainer hiveServerContainer;
+ private HiveContainer hmsContainer;
+ private Connection hiveConnection;
+ private String pluginHiveDir = "/tmp/seatunnel/plugins/Hive/lib";
+
+ protected void downloadHivePluginJar() throws IOException, InterruptedException {
+ Container.ExecResult downloadHiveExeCommands =
+ server.execInContainer(
+ "sh",
+ "-c",
+ "mkdir -p "
+ + pluginHiveDir
+ + " && cd "
+ + pluginHiveDir
+ + " && wget "
+ + hiveExeUrl());
+ Assertions.assertEquals(
+ 0, downloadHiveExeCommands.getExitCode(), downloadHiveExeCommands.getStderr());
+ Container.ExecResult downloadLibFb303Commands =
+ server.execInContainer(
+ "sh", "-c", "cd " + pluginHiveDir + " && wget " + libFb303Url());
+ Assertions.assertEquals(
+ 0, downloadLibFb303Commands.getExitCode(), downloadLibFb303Commands.getStderr());
+ // The jar of s3
+ Container.ExecResult downloadS3Commands =
+ server.execInContainer(
+ "sh", "-c", "cd " + pluginHiveDir + " && wget " + hadoopAwsUrl());
+ Assertions.assertEquals(
+ 0, downloadS3Commands.getExitCode(), downloadS3Commands.getStderr());
+ // The jar of oss
+ Container.ExecResult downloadOssCommands =
+ server.execInContainer(
+ "sh",
+ "-c",
+ "cd "
+ + pluginHiveDir
+ + " && wget "
+ + aliyunSdkOssUrl()
+ + " && wget "
+ + jdomUrl()
+ + " && wget "
+ + hadoopAliyunUrl());
+ Assertions.assertEquals(
+ 0, downloadOssCommands.getExitCode(), downloadOssCommands.getStderr());
+ // The jar of cos
+ Container.ExecResult downloadCosCommands =
+ server.execInContainer(
+ "sh", "-c", "cd " + pluginHiveDir + " && wget " + hadoopCosUrl());
+ Assertions.assertEquals(
+ 0, downloadCosCommands.getExitCode(), downloadCosCommands.getStderr());
+ };
+
+ @BeforeEach
+ @Override
+ public void startUp() throws Exception {
+
+ kerberosContainer =
+ new GenericContainer<>(KERBEROS_IMAGE_NAME)
+ .withNetwork(NETWORK)
+ .withExposedPorts(88, 749)
+ .withCreateContainerCmdModifier(cmd -> cmd.withHostName("kerberos"))
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(KERBEROS_IMAGE_NAME)));
+ kerberosContainer.setPortBindings(Arrays.asList("88/udp:88/udp", "749:749"));
+ Startables.deepStart(Stream.of(kerberosContainer)).join();
+ log.info("Kerberos just started");
+
+ // Copy the keytab file from kerberos container to local
+ given().ignoreExceptions()
+ .await()
+ .atMost(30, TimeUnit.SECONDS)
+ .pollDelay(Duration.ofSeconds(1L))
+ .untilAsserted(
+ () ->
+ kerberosContainer.copyFileFromContainer(
+ "/tmp/hive.keytab", "/tmp/hive.keytab"));
+
+ hmsContainer =
+ HiveContainer.hmsStandalone()
+ .withCreateContainerCmdModifier(cmd -> cmd.withName(HMS_HOST))
+ .withNetwork(NETWORK)
+ .withFileSystemBind(
+ ContainerUtil.getResourcesFile("/kerberos/krb5.conf").getPath(),
+ "/etc/krb5.conf")
+ .withFileSystemBind("/tmp/hive.keytab", "/tmp/hive.keytab")
+ .withFileSystemBind(
+ ContainerUtil.getResourcesFile("/kerberos/hive-site.xml").getPath(),
+ "/opt/hive/conf/hive-site.xml")
+ .withFileSystemBind(
+ ContainerUtil.getResourcesFile("/kerberos/core-site.xml").getPath(),
+ "/opt/hive/conf/core-site.xml")
+ .withNetworkAliases(HMS_HOST);
+ hmsContainer.setPortBindings(Collections.singletonList("9083:9083"));
+
+ Startables.deepStart(Stream.of(hmsContainer)).join();
+ log.info("HMS just started");
+
+ hiveServerContainer =
+ HiveContainer.hiveServer()
+ .withNetwork(NETWORK)
+ .withCreateContainerCmdModifier(cmd -> cmd.withName(HIVE_SERVER_HOST))
+ .withNetworkAliases(HIVE_SERVER_HOST)
+ .withFileSystemBind(
+ ContainerUtil.getResourcesFile("/kerberos/krb5.conf").getPath(),
+ "/etc/krb5.conf")
+ .withFileSystemBind("/tmp/hive.keytab", "/tmp/hive.keytab")
+ .withFileSystemBind(
+ ContainerUtil.getResourcesFile("/kerberos/hive-site.xml").getPath(),
+ "/opt/hive/conf/hive-site.xml")
+ .withFileSystemBind(
+ ContainerUtil.getResourcesFile("/kerberos/core-site.xml").getPath(),
+ "/opt/hive/conf/core-site.xml")
+ .withFileSystemBind("/tmp/data", "/opt/hive/data")
+ // If there are any issues, you can open the kerberos debug log to view
+ // more information: -Dsun.security.krb5.debug=true
+ .withEnv("SERVICE_OPTS", "-Dhive.metastore.uris=thrift://metastore:9083")
+ .withEnv("IS_RESUME", "true")
+ .dependsOn(hmsContainer);
+ hiveServerContainer.setPortBindings(Collections.singletonList("10000:10000"));
+
+ Startables.deepStart(Stream.of(hiveServerContainer)).join();
+
+ log.info("HiveServer2 just started");
+
+ given().ignoreExceptions()
+ .await()
+ .atMost(3600, TimeUnit.SECONDS)
+ .pollDelay(Duration.ofSeconds(10L))
+ .pollInterval(Duration.ofSeconds(3L))
+ .untilAsserted(this::initializeConnection);
+
+ prepareTable();
+
+ // Set the fixed network to SeatunnelContainer
+ super.startUp(this.NETWORK);
+ // Load the hive plugin jar
+ this.downloadHivePluginJar();
+ }
+
+ @AfterEach
+ @Override
+ public void tearDown() throws Exception {
+ if (hmsContainer != null) {
+ log.info(hmsContainer.execInContainer("cat", "/tmp/hive/hive.log").getStdout());
+ hmsContainer.close();
+ }
+ if (hiveServerContainer != null) {
+ log.info(hiveServerContainer.execInContainer("cat", "/tmp/hive/hive.log").getStdout());
+ hiveServerContainer.close();
+ }
+ }
+
+ private void initializeConnection()
+ throws ClassNotFoundException, InstantiationException, IllegalAccessException,
+ SQLException {
+ this.hiveConnection = this.hiveServerContainer.getConnection(true);
+ }
+
+ private void prepareTable() throws Exception {
+ log.info(
+ String.format(
+ "Databases are %s",
+ this.hmsContainer.createMetaStoreClient(true).getAllDatabases()));
+ try (Statement statement = this.hiveConnection.createStatement()) {
+ statement.execute(CREATE_SQL);
+ } catch (Exception exception) {
+ log.error(ExceptionUtils.getMessage(exception));
+ throw exception;
+ }
+ }
+
+ private void executeJob(TestContainer container, String job1, String job2)
+ throws IOException, InterruptedException {
+ Container.ExecResult execResult = container.executeJob(job1);
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ Container.ExecResult readResult = container.executeJob(job2);
+ Assertions.assertEquals(0, readResult.getExitCode());
+ }
+
+ @Test
+ public void testFakeSinkHiveOnHDFS() throws Exception {
+ copyAbsolutePathToContainer("/tmp/hive.keytab", "/tmp/hive.keytab");
+ copyFileToContainer("/kerberos/krb5.conf", "/tmp/krb5.conf");
+ copyFileToContainer("/kerberos/hive-site.xml", "/tmp/hive-site.xml");
+
+ Container.ExecResult fakeToHiveWithKerberosResult =
+ executeJob("/fake_to_hive_on_hdfs_with_kerberos.conf");
+ Assertions.assertEquals(0, fakeToHiveWithKerberosResult.getExitCode());
+
+ Container.ExecResult hiveToAssertWithKerberosResult =
+ executeJob("/hive_on_hdfs_to_assert_with_kerberos.conf");
+ Assertions.assertEquals(0, hiveToAssertWithKerberosResult.getExitCode());
+
+ Container.ExecResult fakeToHiveResult = executeJob("/fake_to_hive_on_hdfs.conf");
+ Assertions.assertEquals(1, fakeToHiveResult.getExitCode());
+ Assertions.assertTrue(
+ fakeToHiveResult
+ .getStderr()
+ .contains("Get hive table information from hive metastore service failed"));
+
+ Container.ExecResult hiveToAssertResult = executeJob("/hive_on_hdfs_to_assert.conf");
+ Assertions.assertEquals(1, hiveToAssertResult.getExitCode());
+ Assertions.assertTrue(
+ hiveToAssertResult
+ .getStderr()
+ .contains("Get hive table information from hive metastore service failed"));
+ }
+
+ @TestTemplate
+ @Disabled(
+ "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this test, please set up your own environment in the test case file, hadoop_hive_conf_path_local and ip below}")
+ public void testFakeSinkHiveOnS3(TestContainer container) throws Exception {
+ executeJob(container, "/fake_to_hive_on_s3.conf", "/hive_on_s3_to_assert.conf");
+ }
+
+ @TestTemplate
+ @Disabled(
+ "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this test, please set up your own environment in the test case file, hadoop_hive_conf_path_local and ip below}")
+ public void testFakeSinkHiveOnOSS(TestContainer container) throws Exception {
+ executeJob(container, "/fake_to_hive_on_oss.conf", "/hive_on_oss_to_assert.conf");
+ }
+
+ @TestTemplate
+ @Disabled(
+ "[HDFS/COS/OSS/S3] is not available in CI, if you want to run this test, please set up your own environment in the test case file, hadoop_hive_conf_path_local and ip below}")
+ public void testFakeSinkHiveOnCos(TestContainer container) throws Exception {
+ executeJob(container, "/fake_to_hive_on_cos.conf", "/hive_on_cos_to_assert.conf");
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs_with_kerberos.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs_with_kerberos.conf
new file mode 100644
index 00000000000..d74b396a621
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_on_hdfs_with_kerberos.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+ metastore_uri = "thrift://metastore:9083"
+ hive_site_path = "/tmp/hive-site.xml"
+ kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM"
+ kerberos_keytab_path = "/tmp/hive.keytab"
+ krb5_path = "/tmp/krb5.conf"
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert_with_kerberos.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert_with_kerberos.conf
new file mode 100644
index 00000000000..59c768e4fbb
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_on_hdfs_to_assert_with_kerberos.conf
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_with_kerberos"
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ result_table_name = hive_source
+ hive_site_path = "/tmp/hive-site.xml"
+ kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM"
+ kerberos_keytab_path = "/tmp/hive.keytab"
+ krb5_path = "/tmp/krb5.conf"
+ }
+}
+
+sink {
+ Assert {
+ source_table_name = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/core-site.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/core-site.xml
new file mode 100644
index 00000000000..ed5df9b0f54
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/core-site.xml
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+ hadoop.security.authorization
+ true
+
+
+ hadoop.security.authentication
+ kerberos
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/hive-site.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/hive-site.xml
new file mode 100644
index 00000000000..2dd37b52561
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/hive-site.xml
@@ -0,0 +1,84 @@
+
+
+
+
+
+ hive.server2.authentication
+ KERBEROS
+
+
+ hive.server2.authentication.kerberos.principal
+ hive/metastore.seatunnel@EXAMPLE.COM
+
+
+ hive.server2.authentication.kerberos.keytab
+ /tmp/hive.keytab
+
+
+ hive.security.authenticator.manager
+ org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator
+
+
+ hive.metastore.sasl.enabled
+ true
+
+
+ hive.metastore.kerberos.keytab.file
+ /tmp/hive.keytab
+
+
+ hive.metastore.kerberos.principal
+ hive/metastore.seatunnel@EXAMPLE.COM
+
+
+ hive.exec.scratchdir
+ /opt/hive/scratch_dir
+
+
+ hive.user.install.directory
+ /opt/hive/install_dir
+
+
+ tez.runtime.optimize.local.fetch
+ true
+
+
+ hive.exec.submit.local.task.via.child
+ false
+
+
+ mapreduce.framework.name
+ local
+
+
+ tez.local.mode
+ true
+
+
+ hive.execution.engine
+ tez
+
+
+ metastore.warehouse.dir
+ /opt/hive/data/warehouse
+
+
+ metastore.metastore.event.db.notification.api.auth
+ false
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5.conf
new file mode 100755
index 00000000000..2b09b1c3e50
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[libdefaults]
+ default_realm = EXAMPLE.COM
+ dns_lookup_realm = true
+ dns_lookup_kdc = true
+ ticket_lifetime = 24h
+ forwardable = true
+
+[realms]
+ EXAMPLE.COM = {
+ kdc = kerberos:88
+ admin_server = kerberos:749
+ }
+
+[domain_realm]
+ .example.com = EXAMPLE.COM
+ example.com = EXAMPLE.COM
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5_local.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5_local.conf
new file mode 100755
index 00000000000..bd136e9e8d2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/kerberos/krb5_local.conf
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+[libdefaults]
+ default_realm = EXAMPLE.COM
+ dns_lookup_realm = true
+ dns_lookup_kdc = true
+ ticket_lifetime = 24h
+ forwardable = true
+
+[realms]
+ EXAMPLE.COM = {
+ kdc = localhost:88
+ admin_server = localhost:749
+ }
+
+[domain_realm]
+ .example.com = EXAMPLE.COM
+ example.com = EXAMPLE.COM
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
index fd49c7b46e5..f815ecb6b23 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/TestContainer.java
@@ -80,4 +80,6 @@ default String getJobStatus(String jobId) {
String getServerLogs();
void copyFileToContainer(String path, String targetPath);
+
+ void copyAbsolutePathToContainer(String path, String targetPath);
}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 47b3de5ff5d..007a5de4c75 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -34,6 +34,7 @@
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -180,4 +181,9 @@ public void copyFileToContainer(String path, String targetPath) {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath, jobManager);
}
+
+ @Override
+ public void copyAbsolutePathToContainer(String path, String targetPath) {
+ ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath, jobManager);
+ }
}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
index ea8bcd8788b..54804d10575 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/ConnectorPackageServiceContainer.java
@@ -245,4 +245,9 @@ public void copyFileToContainer(String path, String targetPath) {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath, server1);
}
+
+ @Override
+ public void copyAbsolutePathToContainer(String path, String targetPath) {
+ ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath, server1);
+ }
}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index b9ff54b6c34..51947c9e9d5 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -39,6 +39,7 @@
import org.junit.jupiter.api.Assertions;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerLoggerFactory;
@@ -86,7 +87,21 @@ public void startUp() throws Exception {
server = createSeaTunnelServer();
}
+ /**
+ * Start up the seatunnel server with the given network.
+ *
+ * @param NETWORK the network to use
+ */
+ public void startUp(Network NETWORK) throws Exception {
+ server = createSeaTunnelServer(NETWORK);
+ }
+
private GenericContainer> createSeaTunnelServer() throws IOException, InterruptedException {
+ return createSeaTunnelServer(NETWORK);
+ }
+
+ private GenericContainer> createSeaTunnelServer(Network NETWORK)
+ throws IOException, InterruptedException {
GenericContainer> server =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
@@ -523,4 +538,9 @@ public void copyFileToContainer(String path, String targetPath) {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath, server);
}
+
+ @Override
+ public void copyAbsolutePathToContainer(String path, String targetPath) {
+ ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath, server);
+ }
}
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index b13851582c2..d6c08f1231b 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -31,6 +31,7 @@
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -131,4 +132,9 @@ public void copyFileToContainer(String path, String targetPath) {
ContainerUtil.copyFileIntoContainers(
ContainerUtil.getResourcesFile(path).toPath(), targetPath, master);
}
+
+ @Override
+ public void copyAbsolutePathToContainer(String path, String targetPath) {
+ ContainerUtil.copyFileIntoContainers(Paths.get(path), targetPath, master);
+ }
}