Skip to content

Commit

Permalink
Merge branch 'refs/heads/feature/2.3.4-xugu-to-2.3.6' into xugu-2.3.6
Browse files Browse the repository at this point in the history
# Conflicts:
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/dm/DamengCatalog.java
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java
#	seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
#	seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
#	seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/ftp/ftp-hive.sh
  • Loading branch information
LeonYoah committed Aug 19, 2024
2 parents e2e98a2 + 3655956 commit bf2d5c7
Show file tree
Hide file tree
Showing 101 changed files with 7,713 additions and 279 deletions.
4 changes: 2 additions & 2 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ seatunnel.source.Hive = connector-hive
seatunnel.source.Clickhouse = connector-clickhouse
seatunnel.sink.Clickhouse = connector-clickhouse
seatunnel.sink.ClickhouseFile = connector-clickhouse
seatunnel.source.Jdbc = connector-jdbc
seatunnel.sink.Jdbc = connector-jdbc
seatunnel.source.MappingJdbc = connector-jdbc
seatunnel.sink.MappingJdbc = connector-jdbc
seatunnel.source.Kudu = connector-kudu
seatunnel.sink.Kudu = connector-kudu
seatunnel.sink.EmailSink = connector-email
Expand Down
4 changes: 4 additions & 0 deletions seatunnel-connectors-v2/connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

@Slf4j
public class HiveMetaStoreProxy {
private HiveMetaStoreClient hiveMetaStoreClient;
private static volatile HiveMetaStoreProxy INSTANCE = null;
private static final List<String> HADOOP_CONF_FILES = ImmutableList.of("hive-site.xml");
private static final List<String> HADOOP_CONF_FILES =
ImmutableList.of("hive-site.xml", "core-site.xml", "hdfs-site.xml");

private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
String metastoreUri = readonlyConfig.get(HiveSourceOptions.METASTORE_URI);
Expand Down Expand Up @@ -80,9 +83,16 @@ private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
if (StringUtils.isNotBlank(hiveSitePath)) {
hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
}
// Try to load from hadoopConf
Optional<Map<String, String>> hadoopConf =
readonlyConfig.getOptional(HiveConfig.HADOOP_CONF);
hadoopConf.ifPresent(stringStringMap -> stringStringMap.forEach(hiveConf::set));

log.info("hive client conf:{}", hiveConf);
if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) {
hiveConf.set("hive.metastore.sasl.enabled", "true");
hiveConf.set("hadoop.security.authentication", "kerberos");
// hiveConf.set("hadoop.rpc.protection", "privacy");
// login Kerberos
Configuration authConf = new Configuration();
authConf.set("hadoop.security.authentication", "kerberos");
Expand Down
37 changes: 37 additions & 0 deletions seatunnel-connectors-v2/connector-http/connector-http-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.4</httpcore.version>
<guava-retrying.version>2.0.0</guava-retrying.version>
<hikvision-version>1.1.3</hikvision-version>
</properties>

<dependencies>
Expand Down Expand Up @@ -69,5 +70,41 @@
<artifactId>json-path</artifactId>
<version>${json-path.version}</version>
</dependency>
<dependency>
<groupId>com.hikvision.ga</groupId>
<artifactId>artemis-http-client</artifactId>
<version>${hikvision-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Spring Web dependency -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.3.20</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.30</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@

package org.apache.seatunnel.connectors.seatunnel.http.client;

import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
import org.apache.seatunnel.connectors.seatunnel.http.sdk.aksk.service.impl.SigSignerJavaImpl;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
Expand All @@ -33,13 +38,18 @@
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.message.BasicHeaderElementIterator;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.protocol.HTTP;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.apache.http.util.EntityUtils;

import com.github.rholder.retry.Attempt;
Expand All @@ -50,8 +60,15 @@
import com.github.rholder.retry.WaitStrategies;
import lombok.extern.slf4j.Slf4j;

import javax.net.ssl.SSLContext;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -62,17 +79,54 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader.AUTHCODE;
import static org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader.SANGFOR;
import static org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader.SDK;

@Slf4j
public class HttpClientProvider implements AutoCloseable {
private static final String ENCODING = "UTF-8";
private static final String APPLICATION_JSON = "application/json";
public static final String APPLICATION_JSON = "application/json";
private static final int INITIAL_CAPACITY = 16;
private RequestConfig requestConfig;
private final CloseableHttpClient httpClient;
private final Retryer<CloseableHttpResponse> retryer;
protected HttpParameter httpParameter;

public HttpClientProvider(HttpParameter httpParameter) {
this.httpClient = HttpClients.createDefault();
this.httpParameter = httpParameter;
try {
SSLContext sslContext =
SSLContexts.custom()
.loadTrustMaterial(
null,
new TrustStrategy() {
@Override
public boolean isTrusted(
X509Certificate[] x509Certificates, String s)
throws CertificateException {
return true;
}
})
.build();
HttpClientBuilder httpClientBuilder =
HttpClients.custom()
.setSSLContext(sslContext)
.setSSLHostnameVerifier(new NoopHostnameVerifier());
HttpClientBuilder defaultHttpClientBuilder = HttpClientBuilder.create();

if (httpParameter.isSkipSslVerification()) {
this.httpClient = httpClientBuilder.build();
} else {
this.httpClient = defaultHttpClientBuilder.build();
}
} catch (NoSuchAlgorithmException e) {
throw new SeaTunnelException(e);
} catch (KeyManagementException e) {
throw new SeaTunnelException(e);
} catch (KeyStoreException e) {
throw new SeaTunnelException(e);
}
this.retryer = buildRetryer(httpParameter);
this.requestConfig =
RequestConfig.custom()
Expand Down Expand Up @@ -114,15 +168,16 @@ public HttpResponse execute(
String method,
Map<String, String> headers,
Map<String, String> params,
String body)
String body,
Map<String, String> otherSdk)
throws Exception {
// convert method option to uppercase
method = method.toUpperCase(Locale.ROOT);
if (HttpPost.METHOD_NAME.equals(method)) {
return doPost(url, headers, params, body);
return doPost(url, headers, params, body, otherSdk);
}
if (HttpGet.METHOD_NAME.equals(method)) {
return doGet(url, headers, params);
return doGet(url, headers, params, otherSdk);
}
if (HttpPut.METHOD_NAME.equals(method)) {
return doPut(url, params);
Expand All @@ -131,7 +186,7 @@ public HttpResponse execute(
return doDelete(url, params);
}
// if http method that user assigned is not support by http provider, default do get
return doGet(url, headers, params);
return doGet(url, headers, params, otherSdk);
}

/**
Expand All @@ -142,7 +197,7 @@ public HttpResponse execute(
* @throws Exception information
*/
public HttpResponse doGet(String url) throws Exception {
return doGet(url, Collections.emptyMap(), Collections.emptyMap());
return doGet(url, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
}

/**
Expand All @@ -154,7 +209,7 @@ public HttpResponse doGet(String url) throws Exception {
* @throws Exception information
*/
public HttpResponse doGet(String url, Map<String, String> params) throws Exception {
return doGet(url, Collections.emptyMap(), params);
return doGet(url, Collections.emptyMap(), params, Collections.emptyMap());
}

/**
Expand All @@ -166,7 +221,11 @@ public HttpResponse doGet(String url, Map<String, String> params) throws Excepti
* @return http response result
* @throws Exception information
*/
public HttpResponse doGet(String url, Map<String, String> headers, Map<String, String> params)
public HttpResponse doGet(
String url,
Map<String, String> headers,
Map<String, String> params,
Map<String, String> otherSdk)
throws Exception {
// Create access address
URIBuilder uriBuilder = new URIBuilder(url);
Expand Down Expand Up @@ -274,19 +333,50 @@ public HttpResponse doPost(String url, Map<String, String> headers, String body)
* @throws Exception information
*/
public HttpResponse doPost(
String url, Map<String, String> headers, Map<String, String> params, String body)
String url,
Map<String, String> headers,
Map<String, String> params,
String body,
Map<String, String> otherSdk)
throws Exception {
// create a new http get
HttpPost httpPost = new HttpPost(url);
HttpPost httpPost;
String contentType = "";
if (MapUtils.isNotEmpty(headers)) {
contentType = headers.get("Content-Type");
}
// 处理params位置 ,如果非form表单请求,那么将params拼接的url里面
if (APPLICATION_JSON.equals(contentType)) {
// Create access address
URIBuilder uriBuilder = new URIBuilder(url);
// add parameter to uri
addParameters(uriBuilder, params);
// create a new http get
httpPost = new HttpPost(uriBuilder.build());
} else {
// create a new http get
httpPost = new HttpPost(url);
// set request params
addParameters(httpPost, params);
}
// set default request config
httpPost.setConfig(requestConfig);
// set request header
addHeaders(httpPost, headers);
// set request params
addParameters(httpPost, params);
// return http response
// 深信服加密
if (MapUtils.isNotEmpty(otherSdk)) {
if (SANGFOR.equals(otherSdk.get(SDK))) {
String authCode = otherSdk.get(AUTHCODE);
SigSignerJavaImpl sigSignerJava = new SigSignerJavaImpl(authCode);
addBodyFromSangFor(httpPost, body);
sigSignerJava.sign(httpPost);
return getResponse(httpPost);
}
}

// add body in request
addBody(httpPost, body);
// return http response

return getResponse(httpPost);
}

Expand Down Expand Up @@ -362,7 +452,20 @@ private HttpResponse getResponse(HttpRequestBase request) throws Exception {
if (httpResponse.getEntity() != null) {
content = EntityUtils.toString(httpResponse.getEntity(), ENCODING);
}
return new HttpResponse(httpResponse.getStatusLine().getStatusCode(), content);
HeaderElementIterator it =
new BasicHeaderElementIterator(httpResponse.headerIterator("Set-Cookie"));
String cookies = "";
String cookiesKey = this.httpParameter.getCookiesKey();
while (it.hasNext()) {
HeaderElement elem = it.nextElement();
String name = elem.getName();
String value = elem.getValue();
if (cookiesKey.equals(name) && value != null) {
cookies = name + "=" + value;
}
}
return new HttpResponse(
httpResponse.getStatusLine().getStatusCode(), content, cookies);
}
}
return new HttpResponse(HttpStatus.SC_INTERNAL_SERVER_ERROR);
Expand Down Expand Up @@ -425,6 +528,19 @@ private void addBody(HttpEntityEnclosingRequestBase request, String body) {
request.setEntity(entity);
}

// sanFor sdk需要自己拼接请求头 ,所以无需自己添加请求头
private void addBodyFromSangFor(HttpEntityEnclosingRequestBase request, String body) {
if (checkAlreadyHaveContentType(request)) {
return;
}
if (StringUtils.isBlank(body)) {
body = "";
}
StringEntity entity = new StringEntity(body, ContentType.APPLICATION_JSON);
entity.setContentEncoding(ENCODING);
request.setEntity(entity);
}

@Override
public void close() throws IOException {
if (Objects.nonNull(httpClient)) {
Expand Down
Loading

0 comments on commit bf2d5c7

Please sign in to comment.