Skip to content

Commit

Permalink
[Improve][Connector] Improve hive connector with error data sink (#490)
Browse files Browse the repository at this point in the history
  • Loading branch information
zixi0825 authored Jan 13, 2025
1 parent b90bc0b commit 5261a56
Show file tree
Hide file tree
Showing 30 changed files with 142 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ConfigConstants {
public static final String DATA_DATE = "data_date";
public static final String REGEXP_PATTERN = "regexp_pattern";
public static final String ERROR_OUTPUT_PATH = "error_output_path";
public static final String ERROR_DATA_CONNECTOR_TYPE = "error_data_connector_type";
public static final String INDEX = "index";
public static final String PATH = "path";
public static final String HDFS_FILE = "hdfs_file";
Expand Down Expand Up @@ -111,7 +112,6 @@ public class ConfigConstants {
public static final String DATA_DIR = "data_dir";

public static final String ENABLE_SPARK_HIVE_SUPPORT = "enable_spark_hive_support";
public static final String ENABLE_USE_VIEW = "enable_use_view";

public static final String FILE = "file";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static String formatSql(String sql) {
return sql;
}

private static Map<String, Object> getResultObjectMap(ResultSet rs, ResultSetMetaData metaData) throws SQLException {
public static Map<String, Object> getResultObjectMap(ResultSet rs, ResultSetMetaData metaData) throws SQLException {
Map<String, Object> map = new LinkedHashMap<>();

for (int i = 1; i <= metaData.getColumnCount(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@

import io.datavines.common.enums.DataType;
import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.entity.ResultList;
import io.datavines.connector.api.entity.StructField;
import org.apache.commons.collections4.CollectionUtils;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -138,4 +142,6 @@ default String getInsertAsSelectStatementFromSql(String srcTable, String targetD
String getErrorDataScript(Map<String, String> configMap);

String getValidateResultDataScript(Map<String, String> configMap);

ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs, String sourceTable, int start, int end) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.local.api.entity;
package io.datavines.connector.api.entity;

public class QueryColumn {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.local.api.entity;
package io.datavines.connector.api.entity;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.local.api.entity;
package io.datavines.connector.api.entity;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.Dialect;
import io.datavines.connector.api.entity.ResultList;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -61,4 +65,9 @@ public String getValidateResultDataScript(Map<String, String> configMap) {
}
return null;
}

@Override
public ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs, String sourceTable, int start, int end) throws SQLException {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,21 @@
*/
package io.datavines.connector.plugin;

import io.datavines.common.datasource.jdbc.utils.HiveSqlUtils;
import io.datavines.connector.api.entity.ResultList;
import lombok.extern.slf4j.Slf4j;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static io.datavines.common.ConfigConstants.STRING_TYPE;

@Slf4j
public class HiveDialect extends JdbcDialect {

@Override
Expand All @@ -35,7 +46,12 @@ public String getDriver() {
}

@Override
public boolean invalidateItemCanOutput() {
return false;
public ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs,String sourceTable, int start, int end) throws SQLException {
List<Map<String, Object>> resultList = new ArrayList<>();
String sql = "select * from " + sourceTable + " LIMIT " + start + ", " + (end-start);
ResultSet errorDataResultSet = sourceConnectionStatement.executeQuery(sql);
ResultSetMetaData metaData = rs.getMetaData();
resultList.add(HiveSqlUtils.getResultObjectMap(errorDataResultSet, metaData));
return new ResultList(resultList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,7 @@ protected InputParam getInputParam(String field, String title, String placeholde

protected List<PluginParams> getOtherParams(boolean isEn) {

List<PluginParams> list = new ArrayList<>();

InputParam enableExternalCatalog = getInputParam("enable_use_view",
isEn ? "enable.use.view" : "允许使用视图",
isEn ? "please enter true or false" : "请填入 true 或者 false", 2, null,
"false");

list.add(enableExternalCatalog);
return list;
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

import io.datavines.common.utils.StringUtils;
import io.datavines.connector.api.Dialect;
import io.datavines.connector.api.entity.ResultList;
import io.datavines.connector.plugin.utils.SqlUtils;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -80,4 +85,8 @@ public String getValidateResultDataScript(Map<String, String> configMap) {
return null;
}

@Override
public ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs, String sourceTable, int start, int end) throws SQLException {
return SqlUtils.getPageFromResultSet(rs, start, end);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public static List<StructField> getSchema(ResultSet resultSet, Dialect dialect,
boolean isNullable = metaData.isNullable(i + 1) != ResultSetMetaData.columnNoNulls;

StructField field = new StructField();
String[] columns = columnName.split("\\.");
if (columns.length > 1) {
columnName = columns[columns.length - 1];
}
field.setName(columnName.toLowerCase());
field.setDataType(typeConverter.convert(typeName));
field.setNullable(isNullable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.datavines.engine.local.api.utils;
package io.datavines.connector.plugin.utils;

import io.datavines.common.utils.StringUtils;
import io.datavines.engine.local.api.entity.QueryColumn;
import io.datavines.engine.local.api.entity.ResultList;
import io.datavines.engine.local.api.entity.ResultListWithColumns;
import io.datavines.connector.api.entity.QueryColumn;
import io.datavines.connector.api.entity.ResultList;
import io.datavines.connector.api.entity.ResultListWithColumns;
import lombok.extern.slf4j.Slf4j;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.Alias;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Table;
import net.sf.jsqlparser.statement.select.*;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;

import java.sql.*;
import java.util.*;

import static io.datavines.common.CommonConstants.DOT;
import static org.apache.commons.lang3.StringUtils.EMPTY;

@Slf4j
public class SqlUtils {

protected static Logger log = LoggerFactory.getLogger(SqlUtils.class);

public static ResultListWithColumns getListWithHeaderFromResultSet(ResultSet rs, Set<String> queryFromsAndJoins) throws SQLException {

ResultListWithColumns resultListWithColumns = new ResultListWithColumns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@
import io.datavines.common.exception.DataVinesException;
import io.datavines.common.utils.StringUtils;
import io.datavines.engine.api.env.Execution;
import io.datavines.engine.local.api.entity.ResultList;
import io.datavines.connector.api.entity.ResultList;
import io.datavines.engine.local.api.utils.LoggerFactory;
import io.datavines.engine.local.api.utils.SqlUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;

Expand Down Expand Up @@ -58,7 +57,6 @@ public void execute(List<LocalSource> sources, List<LocalTransform> transforms,
return;
}

List<String> invalidItemTableSet = new ArrayList<>();
String preSql = null;
String postSql = null;
try {
Expand Down Expand Up @@ -116,14 +114,11 @@ public void execute(List<LocalSource> sources, List<LocalTransform> transforms,

List<ResultList> taskResult = new ArrayList<>();
List<ResultList> actualValue = new ArrayList<>();
transforms.forEach(localTransform -> {
for (LocalTransform localTransform : transforms) {
if (localRuntimeEnvironment.isStop()) {
break;
}
switch (TransformType.of(localTransform.getConfig().getString(PLUGIN_TYPE))){
case INVALIDATE_ITEMS:
if (StringUtils.isNotEmpty(localTransform.getConfig().getString(INVALIDATE_ITEMS_TABLE))) {
invalidItemTableSet.add(localTransform.getConfig().getString(INVALIDATE_ITEMS_TABLE));
}
localTransform.process(localRuntimeEnvironment);
break;
case ACTUAL_VALUE:
ResultList actualValueResult = localTransform.process(localRuntimeEnvironment);
actualValue.add(actualValueResult);
Expand All @@ -138,9 +133,12 @@ public void execute(List<LocalSource> sources, List<LocalTransform> transforms,
default:
break;
}
});
}

for (LocalSink localSink : sinks) {
if (localRuntimeEnvironment.isStop()) {
break;
}
switch (SinkType.of(localSink.getConfig().getString(PLUGIN_TYPE))){
case ERROR_DATA:
localSink.output(null, localRuntimeEnvironment);
Expand All @@ -159,14 +157,6 @@ public void execute(List<LocalSource> sources, List<LocalTransform> transforms,
} catch (Exception e) {
log.error("execute error", e);
throw e;
} finally {
for (String invalidItemTable : invalidItemTableSet) {
try {
SqlUtils.dropView(invalidItemTable, localRuntimeEnvironment.getSourceConnection().getConnection());
} catch (SQLException sqlException) {
log.error("drop view error: ", sqlException);
}
}
}

post(postSql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.datavines.engine.api.env.RuntimeEnvironment;
import io.datavines.engine.local.api.entity.ConnectionHolder;
import io.datavines.engine.local.api.utils.LoggerFactory;
import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;

import java.sql.Statement;
Expand All @@ -30,14 +32,24 @@ public class LocalRuntimeEnvironment implements RuntimeEnvironment {

protected Logger log = LoggerFactory.getLogger(LocalRuntimeEnvironment.class);

@Setter
@Getter
private ConnectionHolder sourceConnection;

@Setter
@Getter
private ConnectionHolder targetConnection;

@Setter
@Getter
private ConnectionHolder metadataConnection;

@Setter
private Statement currentStatement;

@Getter
private boolean stop;

@Override
public void prepare() {

Expand All @@ -63,33 +75,9 @@ public CheckResult checkConfig() {
return null;
}

public ConnectionHolder getSourceConnection() {
return sourceConnection;
}

public void setSourceConnection(ConnectionHolder sourceConnection) {
this.sourceConnection = sourceConnection;
}

public ConnectionHolder getMetadataConnection() {
return metadataConnection;
}

public void setMetadataConnection(ConnectionHolder metadataConnection) {
this.metadataConnection = metadataConnection;
}

public ConnectionHolder getTargetConnection() {
return targetConnection;
}

public void setTargetConnection(ConnectionHolder targetConnection) {
this.targetConnection = targetConnection;
}

public void close() throws Exception {
if (currentStatement != null) {
currentStatement.close();
currentStatement.cancel();
}

if (sourceConnection != null) {
Expand All @@ -103,9 +91,8 @@ public void close() throws Exception {
if (metadataConnection != null) {
metadataConnection.close();
}
}

public void setCurrentStatement(Statement statement) {
this.currentStatement = statement;
stop = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.datavines.common.config.Config;
import io.datavines.common.utils.StringUtils;
import io.datavines.engine.api.component.Component;
import io.datavines.engine.local.api.entity.ResultList;
import io.datavines.connector.api.entity.ResultList;

import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
*/
package io.datavines.engine.local.api;

import io.datavines.common.exception.DataVinesException;
import io.datavines.engine.api.component.Component;
import io.datavines.engine.local.api.entity.ResultList;
import io.datavines.connector.api.entity.ResultList;

public interface LocalTransform extends Component {

ResultList process(LocalRuntimeEnvironment env);
ResultList process(LocalRuntimeEnvironment env) throws DataVinesException;
}
Loading

0 comments on commit 5261a56

Please sign in to comment.