> getCredential(CredentialKeyProvider provid
}
}
} else {
- Credentials cred = doGetCredentials(provider, credentials, StringUtils.EMPTY);
+ Credentials cred = doGetCredentials(provider, credentials, "");
if (cred != null) {
- res.add(new Pair(StringUtils.EMPTY, cred));
+ res.add(new Pair("", cred));
}
}
return res;
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
deleted file mode 100644
index daf3c6a2e06..00000000000
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHive.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hive.security;
-
-import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_CREDENTIALS;
-import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_CREDENTIALS_CONFIG_KEYS;
-
-import java.util.Map;
-
-import org.apache.storm.common.AbstractHadoopAutoCreds;
-
-/**
- * Auto credentials plugin for Hive implementation. This class provides a way to automatically
- * push credentials to a topology and to retrieve them in the worker.
- */
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class AutoHive extends AbstractHadoopAutoCreds {
- @Override
- public void doPrepare(Map conf) {
- }
-
- @Override
- protected String getConfigKeyString() {
- return HIVE_CREDENTIALS_CONFIG_KEYS;
- }
-
- @Override
- public String getCredentialKey(String configKey) {
- return HIVE_CREDENTIALS + configKey;
- }
-
-}
-
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
deleted file mode 100644
index 662053f7d67..00000000000
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveCommand.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hive.security;
-
-import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_KEYTAB_FILE_KEY;
-import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_PRINCIPAL_KEY;
-
-import java.util.HashMap;
-import java.util.Map;
-import javax.security.auth.Subject;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.storm.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Command tool of Hive credential renewer.
- */
-@Deprecated(since = "2.7.1", forRemoval = true)
-public final class AutoHiveCommand {
- private static final Logger LOG = LoggerFactory.getLogger(AutoHiveCommand.class);
-
- private AutoHiveCommand() {
- }
-
- @SuppressWarnings("unchecked")
- public static void main(String[] args) throws Exception {
- Map conf = new HashMap<>();
- conf.put(HIVE_PRINCIPAL_KEY, args[1]); // hive principal storm-hive@WITZEN.COM
- conf.put(HIVE_KEYTAB_FILE_KEY, args[2]); // storm hive keytab /etc/security/keytabs/storm-hive.keytab
- // hive.metastore.uris : "thrift://pm-eng1-cluster1.field.hortonworks.com:9083"
- conf.put(HiveConf.ConfVars.METASTOREURIS.varname, args[3]);
-
- AutoHive autoHive = new AutoHive();
- autoHive.prepare(conf);
- AutoHiveNimbus autoHiveNimbus = new AutoHiveNimbus();
- autoHiveNimbus.prepare(conf);
-
- Map creds = new HashMap<>();
- autoHiveNimbus.populateCredentials(creds, conf, args[0]);
- LOG.info("Got Hive credentials" + autoHive.getCredentials(creds));
-
- Subject subject = new Subject();
- autoHive.populateSubject(subject, creds);
- LOG.info("Got a Subject " + subject);
-
- autoHiveNimbus.renew(creds, conf, args[0]);
- LOG.info("Renewed credentials" + autoHive.getCredentials(creds));
- }
-
-}
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java
deleted file mode 100644
index 947034edf60..00000000000
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/AutoHiveNimbus.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hive.security;
-
-import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_CREDENTIALS;
-import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_CREDENTIALS_CONFIG_KEYS;
-import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_KEYTAB_FILE_KEY;
-import static org.apache.storm.hive.security.HiveSecurityUtil.HIVE_PRINCIPAL_KEY;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.math3.util.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hive.hcatalog.api.HCatClient;
-import org.apache.hive.hcatalog.common.HCatException;
-import org.apache.storm.common.AbstractHadoopNimbusPluginAutoCreds;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Auto credentials nimbus plugin for Hive implementation. This class automatically
- * gets Hive delegation tokens and push it to user's topology.
- */
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class AutoHiveNimbus extends AbstractHadoopNimbusPluginAutoCreds {
- private static final Logger LOG = LoggerFactory.getLogger(AutoHiveNimbus.class);
-
- public String hiveKeytab;
- public String hivePrincipal;
- @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
- public String metaStoreURI;
-
- @Override
- public void doPrepare(Map conf) {
- if (conf.containsKey(HIVE_KEYTAB_FILE_KEY) && conf.containsKey(HIVE_PRINCIPAL_KEY)) {
- hiveKeytab = (String) conf.get(HIVE_KEYTAB_FILE_KEY);
- hivePrincipal = (String) conf.get(HIVE_PRINCIPAL_KEY);
- metaStoreURI = (String) conf.get(HiveConf.ConfVars.METASTOREURIS.varname);
- }
- }
-
- @Override
- protected String getConfigKeyString() {
- return HIVE_CREDENTIALS_CONFIG_KEYS;
- }
-
- @Override
- public void shutdown() {
- //no op.
- }
-
- @Override
- protected byte[] getHadoopCredentials(Map conf, String configKey, final String topologyOwnerPrincipal) {
- Configuration configuration = getHadoopConfiguration(conf, configKey);
- return getHadoopCredentials(conf, configuration, topologyOwnerPrincipal);
- }
-
- @Override
- protected byte[] getHadoopCredentials(Map conf, final String topologyOwnerPrincipal) {
- Configuration configuration = new Configuration();
- return getHadoopCredentials(conf, configuration, topologyOwnerPrincipal);
- }
-
- @SuppressWarnings("unchecked")
- protected byte[] getHadoopCredentials(Map conf, final Configuration configuration, final String topologySubmitterUser) {
- try {
- if (UserGroupInformation.isSecurityEnabled()) {
- String hiveMetaStoreUri = getMetaStoreUri(configuration);
- String hiveMetaStorePrincipal = getMetaStorePrincipal(configuration);
- HiveConf hcatConf = createHiveConf(hiveMetaStoreUri, hiveMetaStorePrincipal);
- login(configuration);
-
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(topologySubmitterUser, currentUser);
- try {
- Token delegationTokenId =
- getDelegationToken(hcatConf, hiveMetaStorePrincipal, topologySubmitterUser);
- proxyUser.addToken(delegationTokenId);
- LOG.info("Obtained Hive tokens, adding to user credentials.");
-
- Credentials credential = proxyUser.getCredentials();
- ByteArrayOutputStream bao = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(bao);
- credential.write(out);
- out.flush();
- out.close();
- return bao.toByteArray();
- } catch (Exception ex) {
- LOG.debug(" Exception" + ex.getMessage());
- throw ex;
- }
- } else {
- throw new RuntimeException("Security is not enabled for Hadoop");
- }
- } catch (Exception ex) {
- throw new RuntimeException("Failed to get delegation tokens.", ex);
- }
- }
-
- private Configuration getHadoopConfiguration(Map topoConf, String configKey) {
- Configuration configuration = new Configuration();
- fillHadoopConfiguration(topoConf, configKey, configuration);
- return configuration;
- }
-
- public HiveConf createHiveConf(String metaStoreUri, String hiveMetaStorePrincipal) throws IOException {
- HiveConf hcatConf = new HiveConf();
- hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
- hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
- hcatConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
- hcatConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
- hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, hiveMetaStorePrincipal);
- return hcatConf;
- }
-
- private Token getDelegationToken(HiveConf hcatConf,
- String metaStoreServicePrincipal,
- String topologySubmitterUser) throws IOException {
- LOG.info("Creating delegation tokens for principal={}", metaStoreServicePrincipal);
-
- HCatClient hcatClient = null;
- try {
- hcatClient = HCatClient.create(hcatConf);
- String delegationToken = hcatClient.getDelegationToken(topologySubmitterUser, metaStoreServicePrincipal);
- Token delegationTokenId = new Token();
- delegationTokenId.decodeFromUrlString(delegationToken);
-
- DelegationTokenIdentifier d = new DelegationTokenIdentifier();
- d.readFields(new DataInputStream(new ByteArrayInputStream(
- delegationTokenId.getIdentifier())));
- LOG.info("Created Delegation Token for : " + d.getUser());
-
- return delegationTokenId;
- } finally {
- if (hcatClient != null) {
- hcatClient.close();
- }
- }
- }
-
- private String getMetaStoreUri(Configuration configuration) {
- if (configuration.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) {
- return metaStoreURI;
- } else {
- return configuration.get(HiveConf.ConfVars.METASTOREURIS.varname);
- }
- }
-
- private String getMetaStorePrincipal(Configuration configuration) {
- if (configuration.get(HIVE_PRINCIPAL_KEY) == null) {
- return hivePrincipal;
- } else {
- return configuration.get(HIVE_PRINCIPAL_KEY);
- }
- }
-
- private void login(Configuration configuration) throws IOException {
- if (configuration.get(HIVE_KEYTAB_FILE_KEY) == null) {
- configuration.set(HIVE_KEYTAB_FILE_KEY, hiveKeytab);
- }
- if (configuration.get(HIVE_PRINCIPAL_KEY) == null) {
- configuration.set(HIVE_PRINCIPAL_KEY, hivePrincipal);
- }
- SecurityUtil.login(configuration, HIVE_KEYTAB_FILE_KEY, HIVE_PRINCIPAL_KEY);
- LOG.info("Logged into hive with principal {}", configuration.get(HIVE_PRINCIPAL_KEY));
- }
-
- @Override
- public void doRenew(Map credentials, Map topologyConf, final String topologyOwnerPrincipal) {
- List configKeys = getConfigKeys(topologyConf);
- for (Pair cred : getCredentials(credentials, configKeys)) {
- try {
- Configuration configuration = getHadoopConfiguration(topologyConf, cred.getFirst());
- String hiveMetaStoreUri = getMetaStoreUri(configuration);
- String hiveMetaStorePrincipal = getMetaStorePrincipal(configuration);
-
- Collection> tokens = cred.getSecond().getAllTokens();
- login(configuration);
-
- if (tokens != null && !tokens.isEmpty()) {
- for (Token extends TokenIdentifier> token : tokens) {
- long expiration = renewToken(token, hiveMetaStoreUri, hiveMetaStorePrincipal);
- LOG.info("Hive delegation token renewed, new expiration time {}", expiration);
- }
- } else {
- LOG.debug("No tokens found for credentials, skipping renewal.");
- }
- } catch (Exception e) {
- LOG.warn("could not renew the credentials, one of the possible reason is tokens are beyond "
- + "renewal period so attempting to get new tokens.",
- e);
- populateCredentials(credentials, topologyConf);
- }
- }
- }
-
- private long renewToken(Token token, String metaStoreUri, String hiveMetaStorePrincipal) {
- HCatClient hcatClient = null;
- if (UserGroupInformation.isSecurityEnabled()) {
- try {
- String tokenStr = token.encodeToUrlString();
- HiveConf hcatConf = createHiveConf(metaStoreUri, hiveMetaStorePrincipal);
- LOG.debug("renewing delegation tokens for principal={}", hiveMetaStorePrincipal);
- hcatClient = HCatClient.create(hcatConf);
- Long expiryTime = hcatClient.renewDelegationToken(tokenStr);
- LOG.info("Renewed delegation token. new expiryTime={}", expiryTime);
- return expiryTime;
- } catch (Exception ex) {
- throw new RuntimeException("Failed to renew delegation tokens.", ex);
- } finally {
- if (hcatClient != null) {
- try {
- hcatClient.close();
- } catch (HCatException e) {
- LOG.error(" Exception", e);
- }
- }
- }
- } else {
- throw new RuntimeException("Security is not enabled for Hadoop");
- }
- }
-
- @Override
- public String getCredentialKey(String configKey) {
- return HIVE_CREDENTIALS + configKey;
- }
-
-}
-
diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/HiveSecurityUtil.java b/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/HiveSecurityUtil.java
deleted file mode 100644
index c3424e10649..00000000000
--- a/external/storm-autocreds/src/main/java/org/apache/storm/hive/security/HiveSecurityUtil.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.hive.security;
-
-/**
- * This class provides util methods for storm-hdfs connector communicating
- * with secured Hive.
- */
-@Deprecated(since = "2.7.1", forRemoval = true)
-public final class HiveSecurityUtil {
- public static final String HIVE_KEYTAB_FILE_KEY = "hive.keytab.file";
- public static final String HIVE_PRINCIPAL_KEY = "hive.kerberos.principal";
- public static final String HIVE_CREDENTIALS_CONFIG_KEYS = "hiveCredentialsConfigKeys";
- public static final String HIVE_CREDENTIALS = "HIVE_CREDENTIALS";
-
- private HiveSecurityUtil() {
- }
-}
diff --git a/external/storm-hive/README.md b/external/storm-hive/README.md
deleted file mode 100644
index 5e5df1ff265..00000000000
--- a/external/storm-hive/README.md
+++ /dev/null
@@ -1,189 +0,0 @@
-# Storm Hive Bolt & Trident State
-
- Hive offers streaming API that allows data to be written continuously into Hive. The incoming data
- can be continuously committed in small batches of records into existing Hive partition or table. Once the data
- is committed its immediately visible to all hive queries. More info on Hive Streaming API
- https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest
-
- With the help of Hive Streaming API, HiveBolt and HiveState allows users to stream data from Storm into Hive directly.
- To use Hive streaming API users need to create a bucketed table with ORC format. Example below
-
- ```code
- create table test_table ( id INT, name STRING, phone STRING, street STRING) partitioned by (city STRING, state STRING) stored as orc tblproperties ("orc.compress"="NONE");
- ```
-
-
-## HiveBolt (org.apache.storm.hive.bolt.HiveBolt)
-
-HiveBolt streams tuples directly into Hive. Tuples are written using Hive Transactions.
-Partitions to which HiveBolt will stream to can either created or pre-created or optionally
-HiveBolt can create them if they are missing. Fields from Tuples are mapped to table columns.
-User should make sure that Tuple field names are matched to the table column names.
-
-```java
-DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames));
-HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper);
-HiveBolt hiveBolt = new HiveBolt(hiveOptions);
-```
-
-### RecordHiveMapper
- This class maps Tuple field names to Hive table column names.
- There are two implementaitons available
-
-
- + DelimitedRecordHiveMapper (org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper)
- + JsonRecordHiveMapper (org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper)
-
- ```java
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames))
- .withPartitionFields(new Fields(partNames));
- or
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames))
- .withTimeAsPartitionField("YYYY/MM/DD");
- ```
-
-|Arg | Description | Type
-|--- |--- |---
-|withColumnFields| field names in a tuple to be mapped to table column names | Fields (required) |
-|withPartitionFields| field names in a tuple can be mapped to hive table partitions | Fields |
-|withTimeAsPartitionField| users can select system time as partition in hive table| String . Date format|
-
-### HiveOptions (org.apache.storm.hive.common.HiveOptions)
-
-HiveBolt takes in HiveOptions as a constructor arg.
-
- ```java
- HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
- .withTxnsPerBatch(10)
- .withBatchSize(1000)
- .withIdleTimeout(10)
- ```
-
-
-HiveOptions params
-
-|Arg |Description | Type
-|--- |--- |---
-|metaStoreURI | hive meta store URI (can be found in hive-site.xml) | String (required) |
-|dbName | database name | String (required) |
-|tblName | table name | String (required) |
-|mapper| Mapper class to map Tuple field names to Table column names | DelimitedRecordHiveMapper or JsonRecordHiveMapper (required) |
-|withTxnsPerBatch | Hive grants a *batch of transactions* instead of single transactions to streaming clients like HiveBolt.This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. HiveBolt will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files.| Integer . default 100 |
-|withMaxOpenConnections| Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.| Integer . default 100|
-|withBatchSize| Max number of events written to Hive in a single Hive transaction| Integer. default 15000|
-|withCallTimeout| (In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort. | Integer. default 10000|
-|withHeartBeatInterval| (In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.| Integer. default 240 |
-|withAutoCreatePartitions| HiveBolt will automatically create the necessary Hive partitions to stream to. |Boolean. default true |
-|withKerberosPrinicipal| Kerberos user principal for accessing secure Hive | String|
-|withKerberosKeytab| Kerberos keytab for accessing secure Hive | String |
-|withTickTupleInterval| (In seconds) If > 0 then the Hive Bolt will periodically flush transaction batches. Enabling this is recommended to avoid tuple timeouts while waiting for a batch to fill up.| Integer. default 0|
-
-
-
-## HiveState (org.apache.storm.hive.trident.HiveTrident)
-
-Hive Trident state also follows similar pattern to HiveBolt it takes in HiveOptions as an arg.
-
-```code
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames))
- .withTimeAsPartitionField("YYYY/MM/DD");
-
- HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper)
- .withTxnsPerBatch(10)
- .withBatchSize(1000)
- .withIdleTimeout(10)
-
- StateFactory factory = new HiveStateFactory().withOptions(hiveOptions);
- TridentState state = stream.partitionPersist(factory, hiveFields, new HiveUpdater(), new Fields());
-```
-
-
-##Working with Secure Hive
-If your topology is going to interact with secure Hive, your bolts/states needs to be authenticated by Hive Server. We
-currently have 2 options to support this:
-
-### Using keytabs on all worker hosts
-If you have distributed the keytab files for hive user on all potential worker hosts then you can use this method. You should specify
-hive configs using the methods HiveOptions.withKerberosKeytab(), HiveOptions.withKerberosPrincipal() methods.
-
-On worker hosts the bolt/trident-state code will use the keytab file with principal provided in the config to authenticate with
-Hive. This method is little dangerous as you need to ensure all workers have the keytab file at the same location and you need
-to remember this as you bring up new hosts in the cluster.
-
-
-### Using Hive MetaStore delegation tokens
-Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
-Since Hive depends on HDFS, we should also configure HDFS delegation tokens.
-
-More details about Hadoop Tokens here: https://github.com/apache/storm/blob/master/docs/storm-hdfs.md
-
-The nimbus should be started with following configurations:
-
-```
-nimbus.autocredential.plugins.classes : ["org.apache.storm.hive.security.AutoHive", "org.apache.storm.hdfs.security.AutoHDFS"]
-nimbus.credential.renewers.classes : ["org.apache.storm.hive.security.AutoHive", "org.apache.storm.hdfs.security.AutoHDFS"]
-nimbus.credential.renewers.freq.secs : 82800 (23 hours)
-
-hive.keytab.file: "/path/to/keytab/on/nimbus" (Keytab of The Hive metastore thrift server service principal. This is used to impersonate other users.)
-hive.kerberos.principal: "hive-metastore/_HOST@EXAMPLE.com" (The service principal for the metastore thrift server.)
-hive.metastore.uris: "thrift://server:9083"
-
-//hdfs configs
-hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.)
-hdfs.kerberos.principal: "superuser@EXAMPLE.com"
-```
-
-
-Your topology configuration should have:
-
-```
-topology.auto-credentials :["org.apache.storm.hive.security.AutoHive", "org.apache.storm.hdfs.security.AutoHDFS"]
-```
-
-If nimbus did not have the above configuration you need to add and then restart it. Ensure the hadoop configuration
-files (core-site.xml, hdfs-site.xml and hive-site.xml) and the storm-hive connector jar with all the dependencies is present in nimbus's classpath.
-
-As an alternative to adding the configuration files (core-site.xml, hdfs-site.xml and hive-site.xml) to the classpath, you could specify the configurations
-as a part of the topology configuration. E.g. in you custom storm.yaml (or -c option while submitting the topology),
-
-
-```
-hiveCredentialsConfigKeys : ["hivecluster1", "hivecluster2"] (the hive clusters you want to fetch the tokens from)
-"hivecluster1": {"config1": "value1", "config2": "value2", ... } (A map of config key-values specific to cluster1)
-"hivecluster2": {"config1": "value1", "hive.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs", "hive.kerberos.principal": "cluster2user@EXAMPLE.com", "hive.metastore.uris": "thrift://server:9083"} (here along with other configs, we have custom keytab and principal for "cluster2" which will override the keytab/principal specified at topology level)
-
-hdfsCredentialsConfigKeys : ["hdfscluster1", "hdfscluster2"] (the hdfs clusters you want to fetch the tokens from)
-"hdfscluster1": {"config1": "value1", "config2": "value2", ... } (A map of config key-values specific to cluster1)
-"hdfscluster2": {"config1": "value1", "hdfs.keytab.file": "/path/to/keytab/for/cluster2/on/nimubs", "hdfs.kerberos.principal": "cluster2user@EXAMPLE.com"} (here along with other configs, we have custom keytab and principal for "cluster2" which will override the keytab/principal specified at topology level)
-```
-
-Instead of specifying key values you may also directly specify the resource files for e.g.,
-
-```
-"cluster1": {"resources": ["/path/to/core-site1.xml", "/path/to/hdfs-site1.xml", "/path/to/hive-site1.xml"]}
-"cluster2": {"resources": ["/path/to/core-site2.xml", "/path/to/hdfs-site2.xml", "/path/to/hive-site2.xml"]}
-```
-
-Storm will download the tokens separately for each of the clusters and populate it into the subject and also renew the tokens periodically. This way it would be possible to run multiple bolts connecting to separate Hive cluster within the same topology.
-
-Nimbus will use the keytab and principal specified in the config to authenticate with Hive metastore. From then on for every
-topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
-topology submitter user. If topology was started with topology.auto-credentials set to AutoHive, nimbus will push the
-delegation tokens to all the workers for your topology and the hive bolt/state will authenticate with Hive Server using
-these tokens.
-
-As nimbus is impersonating topology submitter user, you need to ensure the user specified in hive.kerberos.principal
-has permissions to acquire tokens on behalf of other users.
-
-## Committer Sponsors
- * Sriharha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org))
- * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
- * Bobby Evans ([bobby@apache.org](mailto:bobby@apache.org))
-
-
-
-
diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml
deleted file mode 100644
index 8b6696322bf..00000000000
--- a/external/storm-hive/pom.xml
+++ /dev/null
@@ -1,200 +0,0 @@
-
-
-
- 4.0.0
-
-
- storm
- org.apache.storm
- 2.8.0-SNAPSHOT
- ../../pom.xml
-
-
- jar
- storm-hive
- storm-hive
-
-
- harshach
- Sriharsha Chintalapani
- mail@harsha.io
-
-
-
-
-
- org.apache.storm
- storm-client
- ${project.version}
- ${provided.scope}
-
-
- org.apache.storm
- storm-client
- ${project.version}
- test-jar
- test
-
-
- org.apache.hive.hcatalog
- hive-hcatalog-streaming
- ${hive.version}
-
-
- org.pentaho
- pentaho-aggdesigner-algorithm
-
-
- org.slf4j
- slf4j-log4j12
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
-
-
- org.apache.calcite
- calcite-core
-
-
- org.apache.calcite
- calcite-avatica
-
-
- junit
- junit
-
-
- org.springframework
- spring-test
-
-
- org.springframework
- spring-core
-
-
-
- org.apache.hive
- hive-cli
-
-
- org.apache.hadoop
- hadoop-common
-
-
- org.apache.zookeeper
- zookeeper
-
-
- org.apache.curator
- apache-curator
-
-
- org.apache.curator
- curator-framework
-
-
- org.eclipse.jetty
- *
-
-
- org.bouncycastle
- bcpkix-jdk15on
-
-
- org.bouncycastle
- bcprov-jdk15on
-
-
-
-
- net.minidev
- json-smart
-
-
- org.mockito
- mockito-core
-
-
- org.hamcrest
- hamcrest
-
-
- org.apache.storm
- storm-autocreds
- ${project.version}
-
-
- com.google.guava
- guava
-
-
-
-
-
- org.apache.maven.plugins
- maven-surefire-plugin
-
- true
- 1
-
-
-
- maven-clean-plugin
-
-
- cleanup
- test-compile
-
- clean
-
-
- true
-
-
- ./metastore_db/
-
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-jar-plugin
-
-
-
- test-jar
-
-
-
-
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
-
-
-
- org.apache.maven.plugins
- maven-pmd-plugin
-
-
-
-
-
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
deleted file mode 100644
index 14c6195cb23..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.bolt;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.SerializationError;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.storm.Config;
-import org.apache.storm.hive.common.HiveOptions;
-import org.apache.storm.hive.common.HiveUtils;
-import org.apache.storm.hive.common.HiveWriter;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.BatchHelper;
-import org.apache.storm.utils.TupleUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class HiveBolt extends BaseRichBolt {
- private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
- @VisibleForTesting
- Map allWriters;
- private OutputCollector collector;
- private HiveOptions options;
- private ExecutorService callTimeoutPool;
- private transient Timer heartBeatTimer;
- private AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
- private UserGroupInformation ugi = null;
- private BatchHelper batchHelper;
- private boolean tokenAuthEnabled;
-
- public HiveBolt(HiveOptions options) {
- this.options = options;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext topologyContext, OutputCollector collector) {
- try {
- tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf);
- try {
- ugi = HiveUtils.authenticate(tokenAuthEnabled, options.getKerberosKeytab(), options.getKerberosPrincipal());
- } catch (HiveUtils.AuthenticationFailed ex) {
- LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex);
- throw new IllegalArgumentException(ex);
- }
-
- this.collector = collector;
- this.batchHelper = new BatchHelper(options.getBatchSize(), collector);
- allWriters = new ConcurrentHashMap();
- String timeoutName = "hive-bolt-%d";
- this.callTimeoutPool = Executors.newFixedThreadPool(1,
- new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
-
- sendHeartBeat.set(true);
- heartBeatTimer = new Timer(topologyContext.getThisTaskId() + "-hb-timer", true);
- setupHeartBeatTimer();
-
- } catch (Exception e) {
- LOG.warn("unable to make connection to hive ", e);
- }
- }
-
- @Override
- public void execute(Tuple tuple) {
- try {
- if (batchHelper.shouldHandle(tuple)) {
- List partitionVals = options.getMapper().mapPartitions(tuple);
- HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
- HiveWriter writer = getOrCreateWriter(endPoint);
- writer.write(options.getMapper().mapRecord(tuple));
- batchHelper.addBatch(tuple);
- }
-
- if (batchHelper.shouldFlush()) {
- flushAllWriters(true);
- LOG.info("acknowledging tuples after writers flushed ");
- batchHelper.ack();
- }
- if (TupleUtils.isTick(tuple)) {
- retireIdleWriters();
- }
- } catch (SerializationError se) {
- LOG.info("Serialization exception occurred, tuple is acknowledged but not written to Hive.", tuple);
- this.collector.reportError(se);
- collector.ack(tuple);
- } catch (Exception e) {
- batchHelper.fail(e);
- abortAndCloseWriters();
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
- @Override
- public void cleanup() {
- sendHeartBeat.set(false);
- for (Entry entry : allWriters.entrySet()) {
- try {
- HiveWriter w = entry.getValue();
- w.flushAndClose();
- } catch (Exception ex) {
- LOG.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.",
- ex);
- if (ex instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- ExecutorService[] toShutdown = { callTimeoutPool };
- for (ExecutorService execService : toShutdown) {
- execService.shutdown();
- try {
- while (!execService.isTerminated()) {
- execService.awaitTermination(
- options.getCallTimeOut(), TimeUnit.MILLISECONDS);
- }
- } catch (InterruptedException ex) {
- LOG.warn("shutdown interrupted on " + execService, ex);
- }
- }
-
- callTimeoutPool = null;
- if (heartBeatTimer != null) {
- heartBeatTimer.cancel();
- }
- super.cleanup();
- LOG.info("Hive Bolt stopped");
- }
-
- @Override
- public Map getComponentConfiguration() {
- Map conf = super.getComponentConfiguration();
- if (conf == null) {
- conf = new Config();
- }
-
- if (options.getTickTupleInterval() > 0) {
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, options.getTickTupleInterval());
- }
-
- return conf;
- }
-
- private void setupHeartBeatTimer() {
- if (options.getHeartBeatInterval() > 0) {
- heartBeatTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- if (sendHeartBeat.get()) {
- LOG.debug("Start sending heartbeat on all writers");
- sendHeartBeatOnAllWriters();
- setupHeartBeatTimer();
- }
- } catch (Exception e) {
- LOG.warn("Failed to heartbeat on HiveWriter ", e);
- }
- }
- }, options.getHeartBeatInterval() * 1000);
- }
- }
-
- private void sendHeartBeatOnAllWriters() throws InterruptedException {
- for (HiveWriter writer : allWriters.values()) {
- writer.heartBeat();
- }
- }
-
- void flushAllWriters(boolean rollToNext)
- throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
- for (HiveWriter writer : allWriters.values()) {
- writer.flush(rollToNext);
- }
- }
-
- void abortAndCloseWriters() {
- try {
- abortAllWriters();
- closeAllWriters();
- } catch (Exception ie) {
- LOG.warn("unable to close hive connections. ", ie);
- }
- }
-
- /**
- * Abort current Txn on all writers.
- */
- private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
- for (Entry entry : allWriters.entrySet()) {
- try {
- entry.getValue().abort();
- } catch (Exception e) {
- LOG.error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", e);
- }
- }
- }
-
- /**
- * Closes all writers and remove them from cache.
- */
- private void closeAllWriters() {
- //1) Retire writers
- for (Entry entry : allWriters.entrySet()) {
- try {
- entry.getValue().close();
- } catch (Exception e) {
- LOG.warn("unable to close writers. ", e);
- }
- }
- //2) Clear cache
- allWriters.clear();
- }
-
- @VisibleForTesting
- HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
- throws HiveWriter.ConnectFailure, InterruptedException {
- try {
- HiveWriter writer = allWriters.get(endPoint);
- if (writer == null) {
- LOG.debug("Creating Writer to Hive end point : " + endPoint);
- writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options, tokenAuthEnabled);
- if (allWriters.size() > (options.getMaxOpenConnections() - 1)) {
- LOG.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", allWriters.size(),
- options.getMaxOpenConnections());
- int retired = retireIdleWriters();
- if (retired == 0) {
- retireEldestWriter();
- }
- }
- allWriters.put(endPoint, writer);
- HiveUtils.logAllHiveEndPoints(allWriters);
- }
- return writer;
- } catch (HiveWriter.ConnectFailure e) {
- LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
- throw e;
- }
- }
-
- /**
- * Locate writer that has not been used for longest time and retire it.
- */
- private void retireEldestWriter() {
- LOG.info("Attempting close eldest writers");
- long oldestTimeStamp = System.currentTimeMillis();
- HiveEndPoint eldest = null;
- for (Entry entry : allWriters.entrySet()) {
- if (entry.getValue().getLastUsed() < oldestTimeStamp) {
- eldest = entry.getKey();
- oldestTimeStamp = entry.getValue().getLastUsed();
- }
- }
- try {
- LOG.info("Closing least used Writer to Hive end point : " + eldest);
- allWriters.remove(eldest).flushAndClose();
- } catch (IOException e) {
- LOG.warn("Failed to close writer for end point: " + eldest, e);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
- }
- }
-
- /**
- * Locate all writers past idle timeout and retire them.
- * @return number of writers retired
- */
- private int retireIdleWriters() {
- LOG.info("Attempting close idle writers");
- int count = 0;
- long now = System.currentTimeMillis();
-
- //1) Find retirement candidates
- for (Entry entry : allWriters.entrySet()) {
- if (now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
- ++count;
- retire(entry.getKey());
- }
- }
- return count;
- }
-
- private void retire(HiveEndPoint ep) {
- try {
- HiveWriter writer = allWriters.remove(ep);
- if (writer != null) {
- LOG.info("Closing idle Writer to Hive end point : {}", ep);
- writer.flushAndClose();
- }
- } catch (IOException e) {
- LOG.warn("Failed to close writer for end point: {}. Error: " + ep, e);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
- }
- }
-
-}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
deleted file mode 100644
index 0e3af046384..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/DelimitedRecordHiveMapper.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.bolt.mapper;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.RecordWriter;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.hive.hcatalog.streaming.TransactionBatch;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class DelimitedRecordHiveMapper implements HiveMapper {
- private static final Logger LOG = LoggerFactory.getLogger(DelimitedRecordHiveMapper.class);
- private static final String DEFAULT_FIELD_DELIMITER = ",";
- private Fields columnFields;
- private Fields partitionFields;
- private String[] columnNames;
- private String timeFormat;
- private String fieldDelimiter = DEFAULT_FIELD_DELIMITER;
- private SimpleDateFormat parseDate;
-
- public DelimitedRecordHiveMapper() {
- }
-
- public DelimitedRecordHiveMapper withColumnFields(Fields columnFields) {
- this.columnFields = columnFields;
- List tempColumnNamesList = this.columnFields.toList();
- columnNames = new String[tempColumnNamesList.size()];
- tempColumnNamesList.toArray(columnNames);
- return this;
- }
-
- public DelimitedRecordHiveMapper withPartitionFields(Fields partitionFields) {
- this.partitionFields = partitionFields;
- return this;
- }
-
- public DelimitedRecordHiveMapper withFieldDelimiter(String delimiter) {
- this.fieldDelimiter = delimiter;
- return this;
- }
-
- public DelimitedRecordHiveMapper withTimeAsPartitionField(String timeFormat) {
- this.timeFormat = timeFormat;
- parseDate = new SimpleDateFormat(timeFormat);
- return this;
- }
-
- @Override
- public RecordWriter createRecordWriter(HiveEndPoint endPoint)
- throws StreamingException, IOException, ClassNotFoundException {
- return new DelimitedInputWriter(columnNames, fieldDelimiter, endPoint);
- }
-
- @Override
- public void write(TransactionBatch txnBatch, Tuple tuple)
- throws StreamingException, IOException, InterruptedException {
- txnBatch.write(mapRecord(tuple));
- }
-
- @Override
- public List mapPartitions(Tuple tuple) {
- List partitionList = new ArrayList();
- if (this.partitionFields != null) {
- for (String field : this.partitionFields) {
- partitionList.add(tuple.getStringByField(field));
- }
- }
- if (this.timeFormat != null) {
- partitionList.add(getPartitionsByTimeFormat());
- }
- return partitionList;
- }
-
- @Override
- public List mapPartitions(TridentTuple tuple) {
- List partitionList = new ArrayList();
- if (this.partitionFields != null) {
- for (String field : this.partitionFields) {
- partitionList.add(tuple.getStringByField(field));
- }
- }
- if (this.timeFormat != null) {
- partitionList.add(getPartitionsByTimeFormat());
- }
- return partitionList;
- }
-
- @Override
- public byte[] mapRecord(Tuple tuple) {
- StringBuilder builder = new StringBuilder();
- if (this.columnFields != null) {
- for (String field : this.columnFields) {
- builder.append(tuple.getValueByField(field));
- builder.append(fieldDelimiter);
- }
- }
- return builder.toString().getBytes();
- }
-
- @Override
- public byte[] mapRecord(TridentTuple tuple) {
- StringBuilder builder = new StringBuilder();
- if (this.columnFields != null) {
- for (String field : this.columnFields) {
- builder.append(tuple.getValueByField(field));
- builder.append(fieldDelimiter);
- }
- }
- return builder.toString().getBytes();
- }
-
- private String getPartitionsByTimeFormat() {
- return parseDate.format(System.currentTimeMillis());
- }
-
- @VisibleForTesting
- public String getFieldDelimiter() {
- return fieldDelimiter;
- }
-}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
deleted file mode 100644
index bb9e5f0f084..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/HiveMapper.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.bolt.mapper;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.RecordWriter;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.hive.hcatalog.streaming.TransactionBatch;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * Maps a org.apache.storm.tuple.Tupe
object
- * to a row in an Hive table.
- */
-@Deprecated(since = "2.7.1", forRemoval = true)
-public interface HiveMapper extends Serializable {
-
- /**
- * Given a endPoint, returns a RecordWriter with columnNames.
- */
-
- RecordWriter createRecordWriter(HiveEndPoint endPoint)
- throws StreamingException, IOException, ClassNotFoundException;
-
- void write(TransactionBatch txnBatch, Tuple tuple)
- throws StreamingException, IOException, InterruptedException;
-
- /**
- * Given a tuple, return a hive partition values list.
- */
- List mapPartitions(Tuple tuple);
-
- /**
- * Given a TridetnTuple, return a hive partition values list.
- */
- List mapPartitions(TridentTuple tuple);
-
- /**
- * Given a tuple, maps to a HiveRecord based on columnFields.
- */
- byte[] mapRecord(Tuple tuple);
-
- /**
- * Given a TridentTuple, maps to a HiveRecord based on columnFields.
- */
- byte[] mapRecord(TridentTuple tuple);
-
-}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
deleted file mode 100644
index a380704f337..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/mapper/JsonRecordHiveMapper.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.bolt.mapper;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-import net.minidev.json.JSONObject;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.RecordWriter;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.hive.hcatalog.streaming.StrictJsonWriter;
-import org.apache.hive.hcatalog.streaming.TransactionBatch;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class JsonRecordHiveMapper implements HiveMapper {
- private static final Logger LOG = LoggerFactory.getLogger(JsonRecordHiveMapper.class);
- private Fields columnFields;
- private Fields partitionFields;
- private String timeFormat;
- private SimpleDateFormat parseDate;
-
- public JsonRecordHiveMapper() {
- }
-
- public JsonRecordHiveMapper withColumnFields(Fields columnFields) {
- this.columnFields = columnFields;
- return this;
- }
-
- public JsonRecordHiveMapper withPartitionFields(Fields partitionFields) {
- this.partitionFields = partitionFields;
- return this;
- }
-
- public JsonRecordHiveMapper withTimeAsPartitionField(String timeFormat) {
- this.timeFormat = timeFormat;
- parseDate = new SimpleDateFormat(timeFormat);
- return this;
- }
-
- @Override
- public RecordWriter createRecordWriter(HiveEndPoint endPoint)
- throws StreamingException, IOException, ClassNotFoundException {
- return new StrictJsonWriter(endPoint);
- }
-
- @Override
- public void write(TransactionBatch txnBatch, Tuple tuple)
- throws StreamingException, IOException, InterruptedException {
- txnBatch.write(mapRecord(tuple));
- }
-
- @Override
- public List mapPartitions(Tuple tuple) {
- List partitionList = new ArrayList();
- if (this.partitionFields != null) {
- for (String field : this.partitionFields) {
- partitionList.add(tuple.getStringByField(field));
- }
- }
- if (this.timeFormat != null) {
- partitionList.add(getPartitionsByTimeFormat());
- }
- return partitionList;
- }
-
- @Override
- public List mapPartitions(TridentTuple tuple) {
- List partitionList = new ArrayList();
- if (this.partitionFields != null) {
- for (String field : this.partitionFields) {
- partitionList.add(tuple.getStringByField(field));
- }
- }
- if (this.timeFormat != null) {
- partitionList.add(getPartitionsByTimeFormat());
- }
- return partitionList;
- }
-
- @Override
- public byte[] mapRecord(Tuple tuple) {
- JSONObject obj = new JSONObject();
- if (this.columnFields != null) {
- for (String field : this.columnFields) {
- obj.put(field, tuple.getValueByField(field));
- }
- }
- return obj.toJSONString().getBytes();
- }
-
- @Override
- public byte[] mapRecord(TridentTuple tuple) {
- JSONObject obj = new JSONObject();
- if (this.columnFields != null) {
- for (String field : this.columnFields) {
- obj.put(field, tuple.getValueByField(field));
- }
- }
- return obj.toJSONString().getBytes();
- }
-
- private String getPartitionsByTimeFormat() {
- return parseDate.format(System.currentTimeMillis());
- }
-}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
deleted file mode 100644
index 1d4f042d225..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveOptions.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.common;
-
-import java.io.Serializable;
-import org.apache.storm.hive.bolt.mapper.HiveMapper;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class HiveOptions implements Serializable {
- /**
- * Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
- */
- public static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15;
-
- protected HiveMapper mapper;
- protected String databaseName;
- protected String tableName;
- @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
- protected String metaStoreURI;
- protected Integer txnsPerBatch = 100;
- protected Integer maxOpenConnections = 10;
- protected Integer batchSize = 15000;
- protected Integer idleTimeout = 60000;
- protected Integer callTimeout = 0;
- protected Integer heartBeatInterval = 60;
- protected Boolean autoCreatePartitions = true;
- protected String kerberosPrincipal;
- protected String kerberosKeytab;
- protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS;
-
- public HiveOptions(String metaStoreUri, String databaseName, String tableName, HiveMapper mapper) {
- this.metaStoreURI = metaStoreUri;
- this.databaseName = databaseName;
- this.tableName = tableName;
- this.mapper = mapper;
- }
-
- public HiveOptions withTickTupleInterval(Integer tickInterval) {
- this.tickTupleInterval = tickInterval;
- return this;
- }
-
- public HiveOptions withTxnsPerBatch(Integer txnsPerBatch) {
- this.txnsPerBatch = txnsPerBatch;
- return this;
- }
-
- public HiveOptions withMaxOpenConnections(Integer maxOpenConnections) {
- this.maxOpenConnections = maxOpenConnections;
- return this;
- }
-
- public HiveOptions withBatchSize(Integer batchSize) {
- this.batchSize = batchSize;
- return this;
- }
-
- public HiveOptions withIdleTimeout(Integer idleTimeout) {
- this.idleTimeout = idleTimeout;
- return this;
- }
-
- public HiveOptions withCallTimeout(Integer callTimeout) {
- this.callTimeout = callTimeout;
- return this;
- }
-
- public HiveOptions withHeartBeatInterval(Integer heartBeatInterval) {
- this.heartBeatInterval = heartBeatInterval;
- return this;
- }
-
- public HiveOptions withAutoCreatePartitions(Boolean autoCreatePartitions) {
- this.autoCreatePartitions = autoCreatePartitions;
- return this;
- }
-
- public HiveOptions withKerberosKeytab(String kerberosKeytab) {
- this.kerberosKeytab = kerberosKeytab;
- return this;
- }
-
- public HiveOptions withKerberosPrincipal(String kerberosPrincipal) {
- this.kerberosPrincipal = kerberosPrincipal;
- return this;
- }
-
- @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
- public String getMetaStoreURI() {
- return metaStoreURI;
- }
-
- public String getDatabaseName() {
- return databaseName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public HiveMapper getMapper() {
- return mapper;
- }
-
- public Integer getBatchSize() {
- return batchSize;
- }
-
- public Integer getCallTimeOut() {
- return callTimeout;
- }
-
- public Integer getHeartBeatInterval() {
- return heartBeatInterval;
- }
-
- public Integer getMaxOpenConnections() {
- return maxOpenConnections;
- }
-
- public Integer getIdleTimeout() {
- return idleTimeout;
- }
-
- public Integer getTxnsPerBatch() {
- return txnsPerBatch;
- }
-
- public Boolean getAutoCreatePartitions() {
- return autoCreatePartitions;
- }
-
- public String getKerberosPrincipal() {
- return kerberosPrincipal;
- }
-
- public String getKerberosKeytab() {
- return kerberosKeytab;
- }
-
- public Integer getTickTupleInterval() {
- return tickTupleInterval;
- }
-}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
deleted file mode 100644
index 0265ad76984..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.common;
-
-import static org.apache.storm.Config.TOPOLOGY_AUTO_CREDENTIALS;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.hcatalog.streaming.ConnectionError;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.storm.hive.security.AutoHive;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class HiveUtils {
- private static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class);
-
- public static HiveEndPoint makeEndPoint(List partitionVals, HiveOptions options) throws ConnectionError {
- if (partitionVals == null) {
- return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), null);
- }
- return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals);
- }
-
- public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi,
- HiveOptions options, boolean tokenAuthEnabled)
- throws HiveWriter.ConnectFailure, InterruptedException {
- return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(),
- options.getCallTimeOut(), callTimeoutPool, options.getMapper(), ugi, tokenAuthEnabled);
- }
-
- public static synchronized UserGroupInformation authenticate(boolean isTokenAuthEnabled, String keytab, String principal) throws
- AuthenticationFailed {
-
- if (isTokenAuthEnabled) {
- return getCurrentUser(principal);
- }
-
- boolean kerberosEnabled = false;
-
- if (principal == null && keytab == null) {
- kerberosEnabled = false;
- } else if (principal != null && keytab != null) {
- kerberosEnabled = true;
- } else {
- throw new IllegalArgumentException("To enable Kerberos, need to set both KerberosPrincipal and KerberosKeytab");
- }
-
- if (kerberosEnabled) {
- File kfile = new File(keytab);
-
- if (!(kfile.isFile() && kfile.canRead())) {
- throw new IllegalArgumentException("The keyTab file: " + keytab + " is nonexistent or can't read. "
- + "Please specify a readable keytab file for Kerberos auth.");
- }
-
- try {
- principal = SecurityUtil.getServerPrincipal(principal, "");
- } catch (Exception e) {
- throw new AuthenticationFailed("Host lookup error when resolving principal " + principal, e);
- }
-
- try {
- UserGroupInformation.loginUserFromKeytab(principal, keytab);
- return UserGroupInformation.getLoginUser();
- } catch (IOException e) {
- throw new AuthenticationFailed("Login failed for principal " + principal, e);
- }
- }
-
- return null;
-
- }
-
- public static void logAllHiveEndPoints(Map allWriters) {
- for (Map.Entry entry : allWriters.entrySet()) {
- LOG.info("cached writers {} ", entry.getValue());
- }
- }
-
- public static boolean isTokenAuthEnabled(Map conf) {
- return conf.get(TOPOLOGY_AUTO_CREDENTIALS) != null
- && (((List) conf.get(TOPOLOGY_AUTO_CREDENTIALS)).contains(AutoHive.class.getName()));
- }
-
- private static UserGroupInformation getCurrentUser(String principal) throws AuthenticationFailed {
- try {
- return UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw new AuthenticationFailed("Login failed for principal " + principal, e);
- }
- }
-
- public static class AuthenticationFailed extends Exception {
- public AuthenticationFailed(String reason, Exception cause) {
- super("Kerberos Authentication Failed. " + reason, cause);
- }
- }
-}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java b/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
deleted file mode 100644
index dde370a6932..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/common/HiveWriter.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.common;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.RecordWriter;
-import org.apache.hive.hcatalog.streaming.SerializationError;
-import org.apache.hive.hcatalog.streaming.StreamingConnection;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.hive.hcatalog.streaming.StreamingIOFailure;
-import org.apache.hive.hcatalog.streaming.TransactionBatch;
-import org.apache.storm.hive.bolt.mapper.HiveMapper;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class HiveWriter {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(HiveWriter.class);
-
- private final HiveEndPoint endPoint;
- private final StreamingConnection connection;
- private final int txnsPerBatch;
- private final RecordWriter recordWriter;
- private final ExecutorService callTimeoutPool;
- private final long callTimeout;
- private final Object txnBatchLock = new Object();
- protected boolean closed; // flag indicating HiveWriter was closed
- private TransactionBatch txnBatch;
- private long lastUsed; // time of last flush on this writer
- private boolean autoCreatePartitions;
- private UserGroupInformation ugi;
- private int totalRecords = 0;
-
- public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
- boolean autoCreatePartitions, long callTimeout,
- ExecutorService callTimeoutPool, HiveMapper mapper,
- UserGroupInformation ugi, boolean tokenAuthEnabled)
- throws InterruptedException, ConnectFailure {
- try {
- this.autoCreatePartitions = autoCreatePartitions;
- this.callTimeout = callTimeout;
- this.callTimeoutPool = callTimeoutPool;
- this.endPoint = endPoint;
- this.ugi = ugi;
- this.connection = newConnection(ugi, tokenAuthEnabled);
- this.txnsPerBatch = txnsPerBatch;
- this.recordWriter = getRecordWriter(mapper, tokenAuthEnabled);
- this.txnBatch = nextTxnBatch(recordWriter);
- this.closed = false;
- this.lastUsed = System.currentTimeMillis();
- } catch (InterruptedException e) {
- throw e;
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new ConnectFailure(endPoint, e);
- }
- }
-
- /**
- * If the current thread has been interrupted, then throws an
- * exception.
- */
- private static void checkAndThrowInterruptedException()
- throws InterruptedException {
- if (Thread.currentThread().interrupted()) {
- throw new InterruptedException("Timed out before Hive call was made. "
- + "Your callTimeout might be set too low or Hive calls are "
- + "taking too long.");
- }
- }
-
- public RecordWriter getRecordWriter(final HiveMapper mapper, final boolean tokenAuthEnabled) throws Exception {
- if (!tokenAuthEnabled) {
- return mapper.createRecordWriter(endPoint);
- }
-
- try {
- return ugi.doAs(
- new PrivilegedExceptionAction() {
- @Override
- public RecordWriter run() throws StreamingException, IOException, ClassNotFoundException {
- return mapper.createRecordWriter(endPoint);
- }
- }
- );
- } catch (Exception e) {
- throw new ConnectFailure(endPoint, e);
- }
- }
-
- private HiveConf createHiveConf(String metaStoreUri, boolean tokenAuthEnabled) {
- if (!tokenAuthEnabled) {
- return null;
- }
-
- HiveConf hcatConf = new HiveConf();
- hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri);
- hcatConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
- return hcatConf;
- }
-
- @Override
- public String toString() {
- return "{ "
- + "endPoint = " + endPoint.toString()
- + ", TransactionBatch = " + txnBatch.toString() + " }";
- }
-
- /**
- * Write data.
- */
- public synchronized void write(final byte[] record)
- throws WriteFailure, SerializationError, InterruptedException {
- if (closed) {
- throw new IllegalStateException("This hive streaming writer was closed "
- + "and thus no longer able to write : "
- + endPoint);
- }
- // write the tuple
- try {
- LOG.debug("Writing event to {}", endPoint);
- callWithTimeout(new CallRunner() {
- @Override
- public Void call() throws StreamingException, InterruptedException {
- txnBatch.write(record);
- totalRecords++;
- return null;
- }
- });
- } catch (SerializationError se) {
- throw new SerializationError(endPoint.toString() + " SerializationError", se);
- } catch (StreamingException e) {
- throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
- } catch (TimeoutException e) {
- throw new WriteFailure(endPoint, txnBatch.getCurrentTxnId(), e);
- }
- }
-
- /**
- * Commits the current Txn if totalRecordsPerTransaction > 0 .
- * If 'rollToNext' is true, will switch to next Txn in batch or to a
- * new TxnBatch if current Txn batch is exhausted
- */
- public void flush(boolean rollToNext)
- throws CommitFailure, TxnBatchFailure, TxnFailure, InterruptedException {
- // if there are no records do not call flush
- if (totalRecords <= 0) {
- return;
- }
- try {
- synchronized (txnBatchLock) {
- commitTxn();
- nextTxn(rollToNext);
- totalRecords = 0;
- lastUsed = System.currentTimeMillis();
- }
- } catch (StreamingException e) {
- throw new TxnFailure(txnBatch, e);
- }
- }
-
- /** Queues up a heartbeat request on the current and remaining txns using the
- * heartbeatThdPool and returns immediately.
- */
- public void heartBeat() throws InterruptedException {
- // 1) schedule the heartbeat on one thread in pool
- synchronized (txnBatchLock) {
- try {
- callWithTimeout(new CallRunner() {
- @Override
- public Void call() throws Exception {
- try {
- LOG.info("Sending heartbeat on batch " + txnBatch);
- txnBatch.heartbeat();
- } catch (StreamingException e) {
- LOG.warn("Heartbeat error on batch " + txnBatch, e);
- }
- return null;
- }
- });
- } catch (InterruptedException e) {
- throw e;
- } catch (Exception e) {
- LOG.warn("Unable to send heartbeat on Txn Batch " + txnBatch, e);
- // Suppressing exceptions as we don't care for errors on heartbeats
- }
- }
- }
-
- /**
- * returns totalRecords written so far in a transaction.
- */
- public int getTotalRecords() {
- return totalRecords;
- }
-
- /**
- * Flush and Close current transactionBatch.
- */
- public void flushAndClose() throws TxnBatchFailure, TxnFailure, CommitFailure,
- IOException, InterruptedException {
- flush(false);
- close();
- }
-
- /**
- * Close the Transaction Batch and connection.
- */
- public void close() throws IOException, InterruptedException {
- closeTxnBatch();
- closeConnection();
- closed = true;
- }
-
- private void closeConnection() throws InterruptedException {
- LOG.info("Closing connection to end point : {}", endPoint);
- try {
- callWithTimeout(new CallRunner() {
- @Override
- public Void call() throws Exception {
- connection.close(); // could block
- return null;
- }
- });
- } catch (Exception e) {
- LOG.warn("Error closing connection to EndPoint : " + endPoint, e);
- // Suppressing exceptions as we don't care for errors on connection close
- }
- }
-
- private void commitTxn() throws CommitFailure, InterruptedException {
- LOG.debug("Committing Txn id {} to {}", txnBatch.getCurrentTxnId(), endPoint);
- try {
- callWithTimeout(new CallRunner() {
- @Override
- public Void call() throws Exception {
- txnBatch.commit(); // could block
- return null;
- }
- });
- } catch (StreamingException e) {
- throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
- } catch (TimeoutException e) {
- throw new CommitFailure(endPoint, txnBatch.getCurrentTxnId(), e);
- }
- }
-
- @VisibleForTesting
- StreamingConnection newConnection(final UserGroupInformation ugi, final boolean tokenAuthEnabled)
- throws InterruptedException, ConnectFailure {
- try {
- return callWithTimeout(new CallRunner() {
- @Override
- public StreamingConnection call() throws Exception {
- return endPoint
- .newConnection(autoCreatePartitions, createHiveConf(endPoint.metaStoreUri, tokenAuthEnabled), ugi); // could block
- }
- });
- } catch (StreamingException e) {
- throw new ConnectFailure(endPoint, e);
- } catch (TimeoutException e) {
- throw new ConnectFailure(endPoint, e);
- }
- }
-
- private TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
- throws InterruptedException, TxnBatchFailure {
- LOG.debug("Fetching new Txn Batch for {}", endPoint);
- TransactionBatch batch = null;
- try {
- batch = callWithTimeout(new CallRunner() {
- @Override
- public TransactionBatch call() throws Exception {
- return connection.fetchTransactionBatch(txnsPerBatch, recordWriter); // could block
- }
- });
- batch.beginNextTransaction();
- LOG.debug("Acquired {}. Switching to first txn", batch);
- } catch (TimeoutException e) {
- throw new TxnBatchFailure(endPoint, e);
- } catch (StreamingException e) {
- throw new TxnBatchFailure(endPoint, e);
- }
- return batch;
- }
-
- private void closeTxnBatch() throws InterruptedException {
- try {
- LOG.debug("Closing Txn Batch {}", txnBatch);
- callWithTimeout(new CallRunner() {
- @Override
- public Void call() throws Exception {
- if (txnBatch != null) {
- txnBatch.close(); // could block
- }
- return null;
- }
- });
- } catch (InterruptedException e) {
- throw e;
- } catch (Exception e) {
- LOG.warn("Error closing txn batch " + txnBatch, e);
- }
- }
-
- /**
- * Aborts the current Txn and switches to next Txn.
- * @throws StreamingException if could not get new Transaction Batch, or switch to next Txn
- */
- public void abort() throws StreamingException, TxnBatchFailure, InterruptedException {
- synchronized (txnBatchLock) {
- abortTxn();
- nextTxn(true); // roll to next
- }
- }
-
- /**
- * Aborts current Txn in the txnBatch.
- */
- private void abortTxn() throws InterruptedException {
- LOG.info("Aborting Txn id {} on End Point {}", txnBatch.getCurrentTxnId(), endPoint);
- try {
- callWithTimeout(new CallRunner() {
- @Override
- public Void call() throws StreamingException, InterruptedException {
- txnBatch.abort(); // could block
- return null;
- }
- });
- } catch (InterruptedException e) {
- throw e;
- } catch (TimeoutException e) {
- LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
- } catch (Exception e) {
- LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on EndPoint: " + endPoint, e);
- // Suppressing exceptions as we don't care for errors on abort
- }
- }
-
- /**
- * if there are remainingTransactions in current txnBatch, begins nextTransactions
- * otherwise creates new txnBatch.
- */
- private void nextTxn(boolean rollToNext) throws StreamingException, InterruptedException, TxnBatchFailure {
- if (txnBatch.remainingTransactions() == 0) {
- closeTxnBatch();
- txnBatch = null;
- if (rollToNext) {
- txnBatch = nextTxnBatch(recordWriter);
- }
- } else if (rollToNext) {
- LOG.debug("Switching to next Txn for {}", endPoint);
- txnBatch.beginNextTransaction(); // does not block
- }
- }
-
- /**
- * Execute the callable on a separate thread and wait for the completion
- * for the specified amount of time in milliseconds. In case of timeout
- * cancel the callable and throw an IOException
- */
- private T callWithTimeout(final CallRunner callRunner)
- throws TimeoutException, StreamingException, InterruptedException {
- Future future = callTimeoutPool.submit(new Callable() {
- @Override
- public T call() throws Exception {
- return callRunner.call();
- }
- });
- try {
- if (callTimeout > 0) {
- return future.get(callTimeout, TimeUnit.MILLISECONDS);
- } else {
- return future.get();
- }
- } catch (TimeoutException timeoutException) {
- future.cancel(true);
- throw timeoutException;
- } catch (ExecutionException e1) {
- Throwable cause = e1.getCause();
- if (cause instanceof IOException) {
- throw new StreamingIOFailure("I/O Failure", (IOException) cause);
- } else if (cause instanceof StreamingException) {
- throw (StreamingException) cause;
- } else if (cause instanceof InterruptedException) {
- throw (InterruptedException) cause;
- } else if (cause instanceof RuntimeException) {
- throw (RuntimeException) cause;
- } else if (cause instanceof TimeoutException) {
- throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
- } else {
- throw new RuntimeException(e1);
- }
- }
- }
-
- public long getLastUsed() {
- return lastUsed;
- }
-
- private byte[] generateRecord(Tuple tuple) {
- StringBuilder buf = new StringBuilder();
- for (Object o : tuple.getValues()) {
- buf.append(o);
- buf.append(",");
- }
- return buf.toString().getBytes();
- }
-
- /**
- * Simple interface whose call method is called by
- * {#callWithTimeout} in a new thread inside a
- * {@linkplain java.security.PrivilegedExceptionAction#run()} call.
- */
- private interface CallRunner {
- T call() throws Exception;
- }
-
- public static class Failure extends Exception {
- public Failure(String message, Throwable cause) {
- super(message, cause);
- }
- }
-
- public static class WriteFailure extends Failure {
- public WriteFailure(HiveEndPoint endPoint, Long currentTxnId, Throwable cause) {
- super("Failed writing to : " + endPoint + ". TxnID : " + currentTxnId, cause);
- }
- }
-
- public static class CommitFailure extends Failure {
- public CommitFailure(HiveEndPoint endPoint, Long txnId, Throwable cause) {
- super("Commit of Txn " + txnId + " failed on EndPoint: " + endPoint, cause);
- }
- }
-
- public static class ConnectFailure extends Failure {
- public ConnectFailure(HiveEndPoint ep, Throwable cause) {
- super("Failed connecting to EndPoint " + ep, cause);
- }
- }
-
- public static class TxnBatchFailure extends Failure {
- public TxnBatchFailure(HiveEndPoint ep, Throwable cause) {
- super("Failed acquiring Transaction Batch from EndPoint: " + ep, cause);
- }
- }
-
- public static class TxnFailure extends Failure {
- public TxnFailure(TransactionBatch txnBatch, Throwable cause) {
- super("Failed switching to next Txn in TxnBatch " + txnBatch, cause);
- }
- }
-}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
deleted file mode 100644
index 60d1a937eb4..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.trident;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.hive.hcatalog.streaming.StreamingException;
-import org.apache.storm.hive.common.HiveOptions;
-import org.apache.storm.hive.common.HiveUtils;
-import org.apache.storm.hive.common.HiveWriter;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class HiveState implements State {
- private static final Logger LOG = LoggerFactory.getLogger(HiveState.class);
- private HiveOptions options;
- private Integer currentBatchSize;
- private ExecutorService callTimeoutPool;
- private transient Timer heartBeatTimer;
- private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
- private Boolean sendHeartBeat = true;
- private UserGroupInformation ugi = null;
- private Boolean kerberosEnabled = false;
- private Map allWriters;
- private boolean tokenAuthEnabled;
-
- public HiveState(HiveOptions options) {
- this.options = options;
- this.currentBatchSize = 0;
- }
-
-
- @Override
- public void beginCommit(Long txId) {
- }
-
- @Override
- public void commit(Long txId) {
- try {
- flushAllWriters();
- currentBatchSize = 0;
- } catch (HiveWriter.TxnFailure | InterruptedException | HiveWriter.CommitFailure | HiveWriter.TxnBatchFailure ex) {
- LOG.warn("Commit failed. Failing the batch.", ex);
- throw new FailedException(ex);
- }
- }
-
- public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- try {
- tokenAuthEnabled = HiveUtils.isTokenAuthEnabled(conf);
- try {
- ugi = HiveUtils.authenticate(tokenAuthEnabled, options.getKerberosKeytab(), options.getKerberosPrincipal());
- } catch (HiveUtils.AuthenticationFailed ex) {
- LOG.error("Hive kerberos authentication failed " + ex.getMessage(), ex);
- throw new IllegalArgumentException(ex);
- }
-
- allWriters = new ConcurrentHashMap();
- String timeoutName = "hive-bolt-%d";
- this.callTimeoutPool = Executors.newFixedThreadPool(1,
- new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
- heartBeatTimer = new Timer("hive-hb-timer", true);
- setupHeartBeatTimer();
- } catch (Exception e) {
- LOG.warn("unable to make connection to hive ", e);
- }
- }
-
- public void updateState(List tuples, TridentCollector collector) {
- try {
- writeTuples(tuples);
- } catch (Exception e) {
- abortAndCloseWriters();
- LOG.warn("hive streaming failed.", e);
- throw new FailedException(e);
- }
- }
-
- private void writeTuples(List tuples)
- throws Exception {
- for (TridentTuple tuple : tuples) {
- List partitionVals = options.getMapper().mapPartitions(tuple);
- HiveEndPoint endPoint = HiveUtils.makeEndPoint(partitionVals, options);
- HiveWriter writer = getOrCreateWriter(endPoint);
- writer.write(options.getMapper().mapRecord(tuple));
- currentBatchSize++;
- if (currentBatchSize >= options.getBatchSize()) {
- flushAllWriters();
- currentBatchSize = 0;
- }
- }
- }
-
- private void abortAndCloseWriters() {
- try {
- sendHeartBeat = false;
- abortAllWriters();
- closeAllWriters();
- } catch (Exception ie) {
- LOG.warn("unable to close hive connections. ", ie);
- }
- }
-
- /**
- * Abort current Txn on all writers.
- */
- private void abortAllWriters() throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
- for (Entry entry : allWriters.entrySet()) {
- entry.getValue().abort();
- }
- }
-
-
- /**
- * Closes all writers and remove them from cache.
- * @return number of writers retired
- */
- private void closeAllWriters() throws InterruptedException, IOException {
- //1) Retire writers
- for (Entry entry : allWriters.entrySet()) {
- entry.getValue().close();
- }
- //2) Clear cache
- allWriters.clear();
- }
-
- private void setupHeartBeatTimer() {
- if (options.getHeartBeatInterval() > 0) {
- heartBeatTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- try {
- if (sendHeartBeat) {
- LOG.debug("Start sending heartbeat on all writers");
- sendHeartBeatOnAllWriters();
- setupHeartBeatTimer();
- }
- } catch (Exception e) {
- LOG.warn("Failed to heartbeat on HiveWriter ", e);
- }
- }
- }, options.getHeartBeatInterval() * 1000);
- }
- }
-
- private void flushAllWriters()
- throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
- for (HiveWriter writer : allWriters.values()) {
- writer.flush(true);
- }
- }
-
- private void sendHeartBeatOnAllWriters() throws InterruptedException {
- for (HiveWriter writer : allWriters.values()) {
- writer.heartBeat();
- }
- }
-
- private HiveWriter getOrCreateWriter(HiveEndPoint endPoint)
- throws HiveWriter.ConnectFailure, InterruptedException {
- try {
- HiveWriter writer = allWriters.get(endPoint);
- if (writer == null) {
- LOG.info("Creating Writer to Hive end point : " + endPoint);
- writer = HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options, tokenAuthEnabled);
- if (allWriters.size() > (options.getMaxOpenConnections() - 1)) {
- int retired = retireIdleWriters();
- if (retired == 0) {
- retireEldestWriter();
- }
- }
- allWriters.put(endPoint, writer);
- }
- return writer;
- } catch (HiveWriter.ConnectFailure e) {
- LOG.error("Failed to create HiveWriter for endpoint: " + endPoint, e);
- throw e;
- }
-
- }
-
-
- /**
- * Locate writer that has not been used for longest time and retire it.
- */
- private void retireEldestWriter() {
- long oldestTimeStamp = System.currentTimeMillis();
- HiveEndPoint eldest = null;
- for (Entry entry : allWriters.entrySet()) {
- if (entry.getValue().getLastUsed() < oldestTimeStamp) {
- eldest = entry.getKey();
- oldestTimeStamp = entry.getValue().getLastUsed();
- }
- }
- try {
- LOG.info("Closing least used Writer to Hive end point : " + eldest);
- allWriters.remove(eldest).flushAndClose();
- } catch (IOException e) {
- LOG.warn("Failed to close writer for end point: " + eldest, e);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- LOG.warn("Interrupted when attempting to close writer for end point: " + eldest, e);
- }
- }
-
- /**
- * Locate all writers past idle timeout and retire them.
- * @return number of writers retired
- */
- private int retireIdleWriters() {
- int count = 0;
- long now = System.currentTimeMillis();
- ArrayList retirees = new ArrayList();
-
- //1) Find retirement candidates
- for (Entry entry : allWriters.entrySet()) {
- if (now - entry.getValue().getLastUsed() > options.getIdleTimeout()) {
- ++count;
- retirees.add(entry.getKey());
- }
- }
- //2) Retire them
- for (HiveEndPoint ep : retirees) {
- try {
- LOG.info("Closing idle Writer to Hive end point : {}", ep);
- allWriters.remove(ep).flushAndClose();
- } catch (IOException e) {
- LOG.warn("Failed to close writer for end point: {}. Error: " + ep, e);
- } catch (InterruptedException e) {
- LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- LOG.warn("Interrupted when attempting to close writer for end point: " + ep, e);
- }
- }
- return count;
- }
-
- public void cleanup() {
- for (Entry entry : allWriters.entrySet()) {
- try {
- sendHeartBeat = false;
- HiveWriter w = entry.getValue();
- LOG.info("Flushing writer to {}", w);
- w.flush(false);
- LOG.info("Closing writer to {}", w);
- w.close();
- } catch (Exception ex) {
- LOG.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.",
- ex);
- if (ex instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- ExecutorService[] toShutdown = { callTimeoutPool };
- for (ExecutorService execService : toShutdown) {
- execService.shutdown();
- try {
- while (!execService.isTerminated()) {
- execService.awaitTermination(
- options.getCallTimeOut(), TimeUnit.MILLISECONDS);
- }
- } catch (InterruptedException ex) {
- LOG.warn("shutdown interrupted on " + execService, ex);
- }
- }
- heartBeatTimer.cancel();
- callTimeoutPool = null;
- }
-
-}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
deleted file mode 100644
index d587bfb566d..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveStateFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.trident;
-
-import java.util.Map;
-import org.apache.storm.hive.common.HiveOptions;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class HiveStateFactory implements StateFactory {
- private static final Logger LOG = LoggerFactory.getLogger(HiveStateFactory.class);
- private HiveOptions options;
-
- public HiveStateFactory() {}
-
- /**
- * The options for connecting to Hive.
- */
- public HiveStateFactory withOptions(HiveOptions options) {
- if (options.getTickTupleInterval() != HiveOptions.DEFAULT_TICK_TUPLE_INTERVAL_SECS) {
- LOG.error("Tick tuple interval will be ignored for trident."
- + " The Hive writers are flushed after each batch.");
- }
- this.options = options;
- return this;
- }
-
- @Override
- public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
- HiveState state = new HiveState(this.options);
- state.prepare(conf, metrics, partitionIndex, numPartitions);
- return state;
- }
-}
diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
deleted file mode 100644
index 47beda81ca3..00000000000
--- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveUpdater.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.trident;
-
-import java.util.List;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class HiveUpdater extends BaseStateUpdater {
- @Override
- public void updateState(HiveState state, List tuples, TridentCollector collector) {
- state.updateState(tuples, collector);
- }
-}
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
deleted file mode 100644
index 39a87b91436..00000000000
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/HiveSetupUtil.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.bolt;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.thrift.TException;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class HiveSetupUtil {
- private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
-
- public static HiveConf getHiveConf() {
- HiveConf conf = new HiveConf();
- // String metastoreDBLocation = "jdbc:derby:databaseName=/tmp/metastore_db;create=true";
- // conf.set("javax.jdo.option.ConnectionDriverName","org.apache.derby.jdbc.EmbeddedDriver");
- // conf.set("javax.jdo.option.ConnectionURL",metastoreDBLocation);
- conf.set("fs.raw.impl", RawFileSystem.class.getName());
- conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr);
- conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
- return conf;
- }
-
- public static void createDbAndTable(HiveConf conf, String databaseName,
- String tableName, List partVals,
- String[] colNames, String[] colTypes,
- String[] partNames, String dbLocation)
- throws Exception {
- IMetaStoreClient client = new HiveMetaStoreClient(conf);
- try {
- Database db = new Database();
- db.setName(databaseName);
- db.setLocationUri(dbLocation);
- client.createDatabase(db);
-
- Table tbl = new Table();
- tbl.setDbName(databaseName);
- tbl.setTableName(tableName);
- tbl.setTableType(TableType.MANAGED_TABLE.toString());
- StorageDescriptor sd = new StorageDescriptor();
- sd.setCols(getTableColumns(colNames, colTypes));
- sd.setNumBuckets(1);
- sd.setLocation(dbLocation + Path.SEPARATOR + tableName);
- if (partNames != null && partNames.length != 0) {
- tbl.setPartitionKeys(getPartitionKeys(partNames));
- }
-
- tbl.setSd(sd);
-
- sd.setBucketCols(new ArrayList(2));
- sd.setSerdeInfo(new SerDeInfo());
- sd.getSerdeInfo().setName(tbl.getTableName());
- sd.getSerdeInfo().setParameters(new HashMap());
- sd.getSerdeInfo().getParameters().put(serdeConstants.SERIALIZATION_FORMAT, "1");
-
- sd.getSerdeInfo().setSerializationLib(OrcSerde.class.getName());
- sd.setInputFormat(OrcInputFormat.class.getName());
- sd.setOutputFormat(OrcOutputFormat.class.getName());
-
- Map tableParams = new HashMap();
- tbl.setParameters(tableParams);
- client.createTable(tbl);
- try {
- if (partVals != null && partVals.size() > 0) {
- addPartition(client, tbl, partVals);
- }
- } catch (AlreadyExistsException e) {
- }
- } finally {
- client.close();
- }
- }
-
- // delete db and all tables in it
- public static void dropDB(HiveConf conf, String databaseName) throws HiveException, MetaException {
- IMetaStoreClient client = new HiveMetaStoreClient(conf);
- try {
- for (String table : client.listTableNamesByFilter(databaseName, "", (short) -1)) {
- client.dropTable(databaseName, table, true, true);
- }
- client.dropDatabase(databaseName);
- } catch (TException e) {
- client.close();
- }
- }
-
- private static void addPartition(IMetaStoreClient client, Table tbl
- , List partValues)
- throws IOException, TException {
- Partition part = new Partition();
- part.setDbName(tbl.getDbName());
- part.setTableName(tbl.getTableName());
- StorageDescriptor sd = new StorageDescriptor(tbl.getSd());
- sd.setLocation(sd.getLocation() + Path.SEPARATOR + makePartPath(tbl.getPartitionKeys(), partValues));
- part.setSd(sd);
- part.setValues(partValues);
- client.add_partition(part);
- }
-
- private static String makePartPath(List partKeys, List partVals) {
- if (partKeys.size() != partVals.size()) {
- throw new IllegalArgumentException("Partition values:" + partVals +
- ", does not match the partition Keys in table :" + partKeys);
- }
- StringBuffer buff = new StringBuffer(partKeys.size() * 20);
- int i = 0;
- for (FieldSchema schema : partKeys) {
- buff.append(schema.getName());
- buff.append("=");
- buff.append(partVals.get(i));
- if (i != partKeys.size() - 1) {
- buff.append(Path.SEPARATOR);
- }
- ++i;
- }
- return buff.toString();
- }
-
- private static List getTableColumns(String[] colNames, String[] colTypes) {
- List fields = new ArrayList();
- for (int i = 0; i < colNames.length; ++i) {
- fields.add(new FieldSchema(colNames[i], colTypes[i], ""));
- }
- return fields;
- }
-
- private static List getPartitionKeys(String[] partNames) {
- List fields = new ArrayList();
- for (int i = 0; i < partNames.length; ++i) {
- fields.add(new FieldSchema(partNames[i], serdeConstants.STRING_TYPE_NAME, ""));
- }
- return fields;
- }
-
- public static class RawFileSystem extends RawLocalFileSystem {
- private static final URI NAME;
-
- static {
- try {
- NAME = new URI("raw:///");
- } catch (URISyntaxException se) {
- throw new IllegalArgumentException("bad uri", se);
- }
- }
-
- @Override
- public URI getUri() {
- return NAME;
- }
-
- @Override
- public FileStatus getFileStatus(Path path) throws IOException {
- File file = pathToFile(path);
- if (!file.exists()) {
- throw new FileNotFoundException("Can't find " + path);
- }
- // get close enough
- short mod = 0;
- if (file.canRead()) {
- mod |= 0444;
- }
- if (file.canWrite()) {
- mod |= 0200;
- }
- if (file.canExecute()) {
- mod |= 0111;
- }
- ShimLoader.getHadoopShims();
- return new FileStatus(file.length(), file.isDirectory(), 1, 1024,
- file.lastModified(), file.lastModified(),
- FsPermission.createImmutable(mod), "owen", "users", path);
- }
- }
-
-}
diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
deleted file mode 100644
index 9997ba14171..00000000000
--- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ /dev/null
@@ -1,527 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.hive.bolt;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hive.hcatalog.streaming.HiveEndPoint;
-import org.apache.storm.Config;
-import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
-import org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper;
-import org.apache.storm.hive.common.HiveOptions;
-import org.apache.storm.hive.common.HiveWriter;
-import org.apache.storm.task.GeneralTopologyContext;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.TupleImpl;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.MockTupleHelpers;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.jupiter.api.Assertions.*;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
-
-@Deprecated(since = "2.7.1", forRemoval = true)
-public class TestHiveBolt {
- final static String dbName = "testdb";
- final static String tblName = "test_table";
- final static String dbName1 = "testdb1";
- final static String tblName1 = "test_table1";
- final static String PART1_NAME = "city";
- final static String PART2_NAME = "state";
- final static String[] partNames = { PART1_NAME, PART2_NAME };
- private static final String COL1 = "id";
- private static final String COL2 = "msg";
- private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
- final String partitionVals = "sunnyvale,ca";
- final String[] colNames = { COL1, COL2 };
- final String[] colNames1 = { COL2, COL1 };
- final String metaStoreURI;
- private final HiveConf conf;
- private String[] colTypes = { serdeConstants.INT_TYPE_NAME, serdeConstants.STRING_TYPE_NAME };
- private Config config = new Config();
- private TestingHiveBolt bolt;
- ;
- private ObjectMapper objectMapper = new ObjectMapper();
- @Mock
- private OutputCollector collector;
-
- public TestHiveBolt() throws Exception {
- //metaStoreURI = "jdbc:derby:;databaseName="+System.getProperty("java.io.tmpdir") +"metastore_db;create=true";
- metaStoreURI = null;
- conf = HiveSetupUtil.getHiveConf();
- TxnDbUtil.setConfValues(conf);
- if (metaStoreURI != null) {
- conf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreURI);
- }
- }
-
- @BeforeEach
- public void setup() throws Exception {
- MockitoAnnotations.initMocks(this);
- }
-
- @Test
- public void testWithByteArrayIdandMessage()
- throws Exception {
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames))
- .withPartitionFields(new Fields(partNames));
- HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
- .withTxnsPerBatch(2)
- .withBatchSize(2);
-
- bolt = new TestingHiveBolt(hiveOptions);
- bolt.prepare(config, null, collector);
-
- Integer id = 100;
- String msg = "test-123";
- String city = "sunnyvale";
- String state = "ca";
-
- Set tupleSet = new HashSet();
- for (int i = 0; i < 4; i++) {
- Tuple tuple = generateTestTuple(id, msg, city, state);
- bolt.execute(tuple);
- tupleSet.add(tuple);
- }
-
- List partVals = Lists.newArrayList(city, state);
-
- for (Tuple t : tupleSet) {
- verify(collector).ack(t);
- }
-
- assertEquals(4, bolt.getRecordWritten(partVals).size());
-
- bolt.cleanup();
- }
-
- @Test
- public void testWithoutPartitions()
- throws Exception {
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames));
- HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName1, tblName1, mapper)
- .withTxnsPerBatch(2).withBatchSize(2).withAutoCreatePartitions(false);
-
- bolt = new TestingHiveBolt(hiveOptions);
- bolt.prepare(config, null, collector);
-
- Integer id = 100;
- String msg = "test-123";
- String city = "sunnyvale";
- String state = "ca";
-
- Set tupleSet = new HashSet();
- for (int i = 0; i < 4; i++) {
- Tuple tuple = generateTestTuple(id, msg, city, state);
- bolt.execute(tuple);
- tupleSet.add(tuple);
- }
-
- List partVals = Collections.emptyList();
-
- for (Tuple t : tupleSet) {
- verify(collector).ack(t);
- }
-
- List recordWritten = bolt.getRecordWritten(partVals);
- assertNotNull(recordWritten);
- assertEquals(4, recordWritten.size());
-
- bolt.cleanup();
- }
-
- @Test
- public void testWithTimeformat()
- throws Exception {
- String timeFormat = "yyyy/MM/dd";
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames))
- .withTimeAsPartitionField(timeFormat);
- HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName1, tblName1, mapper)
- .withTxnsPerBatch(2)
- .withBatchSize(1)
- .withMaxOpenConnections(1);
-
- bolt = new TestingHiveBolt(hiveOptions);
- bolt.prepare(config, null, collector);
-
- Integer id = 100;
- String msg = "test-123";
- Date d = new Date();
- SimpleDateFormat parseDate = new SimpleDateFormat(timeFormat);
- String today = parseDate.format(d.getTime());
-
- List tuples = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- Tuple tuple = generateTestTuple(id, msg, null, null);
- tuples.add(tuple);
- bolt.execute(tuple);
- }
-
- for (Tuple t : tuples) {
- verify(collector).ack(t);
- }
-
- List partVals = Lists.newArrayList(today);
-
- List recordsWritten = bolt.getRecordWritten(partVals);
- assertNotNull(recordsWritten);
- assertEquals(2, recordsWritten.size());
-
- byte[] mapped = generateDelimiteredRecord(Lists.newArrayList(id, msg), mapper.getFieldDelimiter());
-
- for (byte[] record : recordsWritten) {
- assertArrayEquals(mapped, record);
- }
-
- bolt.cleanup();
- }
-
- @Test
- public void testData()
- throws Exception {
- DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
- .withColumnFields(new Fields(colNames))
- .withPartitionFields(new Fields(partNames));
- HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
- .withTxnsPerBatch(2)
- .withBatchSize(1);
-
- bolt = new TestingHiveBolt(hiveOptions);
- bolt.prepare(config, null, new OutputCollector(collector));
-
- Integer id = 1;
- String msg = "SJC";
- String city = "Sunnyvale";
- String state = "CA";
-
- Tuple tuple1 = generateTestTuple(id, msg, city, state);
-
- bolt.execute(tuple1);
- verify(collector).ack(tuple1);
-
- List partVals = Lists.newArrayList(city, state);
-
- List recordsWritten = bolt.getRecordWritten(partVals);
- assertNotNull(recordsWritten);
- assertEquals(1, recordsWritten.size());
-
- byte[] mapped = generateDelimiteredRecord(Lists.newArrayList(id, msg), mapper.getFieldDelimiter());
- assertArrayEquals(mapped, recordsWritten.get(0));
-
- bolt.cleanup();
- }
-
- @Test
- public void testJsonWriter()
- throws Exception {
- // json record doesn't need columns to be in the same order
- // as table in hive.
- JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
- .withColumnFields(new Fields(colNames1))
- .withPartitionFields(new Fields(partNames));
- HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
- .withTxnsPerBatch(2)
- .withBatchSize(1);
-
- bolt = new TestingHiveBolt(hiveOptions);
- bolt.prepare(config, null, collector);
-
- Integer id = 1;
- String msg = "SJC";
- String city = "Sunnyvale";
- String state = "CA";
-
- Tuple tuple1 = generateTestTuple(id, msg, city, state);
-
- bolt.execute(tuple1);
- verify(collector).ack(tuple1);
-
- List partVals = Lists.newArrayList(city, state);
-
- List recordsWritten = bolt.getRecordWritten(partVals);
- assertNotNull(recordsWritten);
- assertEquals(1, recordsWritten.size());
-
- byte[] written = recordsWritten.get(0);
-
- Map writtenMap = objectMapper.readValue(new String(written), new TypeReference