From 5261a56f723fbcaf2de40bd1b8cac3c28c9c23e7 Mon Sep 17 00:00:00 2001 From: zixi0825 Date: Mon, 13 Jan 2025 21:27:46 +0800 Subject: [PATCH] [Improve][Connector] Improve hive connector with error data sink (#490) --- .../io/datavines/common/ConfigConstants.java | 2 +- .../datasource/jdbc/utils/HiveSqlUtils.java | 2 +- .../io/datavines/connector/api/Dialect.java | 6 +++ .../connector}/api/entity/QueryColumn.java | 2 +- .../connector}/api/entity/ResultList.java | 2 +- .../api/entity/ResultListWithColumns.java | 2 +- .../connector/plugin/FileDialect.java | 9 ++++ .../connector/plugin/HiveDialect.java | 20 ++++++- .../connector/plugin/JdbcConfigBuilder.java | 10 +--- .../connector/plugin/JdbcDialect.java | 9 ++++ .../connector/plugin/utils/JdbcUtils.java | 4 ++ .../connector/plugin}/utils/SqlUtils.java | 13 +++-- .../engine/local/api/LocalExecution.java | 28 ++++------ .../local/api/LocalRuntimeEnvironment.java | 43 ++++++--------- .../datavines/engine/local/api/LocalSink.java | 2 +- .../engine/local/api/LocalTransform.java | 5 +- .../engine/local/api/utils/FileUtils.java | 4 +- .../config/BaseLocalConfigurationBuilder.java | 53 ++++--------------- .../LocalMultiTableAccuracyMetricBuilder.java | 26 ++++----- .../config/LocalSingleTableMetricBuilder.java | 26 ++++----- .../engine/local/connector/LocalFileSink.java | 6 +-- .../engine/local/connector/BaseJdbcSink.java | 2 +- .../executor/BaseDataSinkExecutor.java | 4 +- .../executor/ErrorDataSinkExecutor.java | 29 ++++------ .../transform/sql/ActualValueExecutor.java | 4 +- .../transform/sql/ExpectedValueExecutor.java | 4 +- .../transform/sql/ITransformExecutor.java | 2 +- .../sql/InvalidateItemsExecutor.java | 53 ------------------- .../local/transform/sql/SqlTransform.java | 9 ++-- .../api/controller/OpenApiController.java | 2 +- 30 files changed, 142 insertions(+), 241 deletions(-) rename {datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local => datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector}/api/entity/QueryColumn.java (97%) rename {datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local => datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector}/api/entity/ResultList.java (96%) rename {datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local => datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector}/api/entity/ResultListWithColumns.java (96%) rename {datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api => datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin}/utils/SqlUtils.java (96%) delete mode 100644 datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-transform/src/main/java/io/datavines/engine/local/transform/sql/InvalidateItemsExecutor.java diff --git a/datavines-common/src/main/java/io/datavines/common/ConfigConstants.java b/datavines-common/src/main/java/io/datavines/common/ConfigConstants.java index 07ffffa78..4865328e8 100644 --- a/datavines-common/src/main/java/io/datavines/common/ConfigConstants.java +++ b/datavines-common/src/main/java/io/datavines/common/ConfigConstants.java @@ -62,6 +62,7 @@ public class ConfigConstants { public static final String DATA_DATE = "data_date"; public static final String REGEXP_PATTERN = "regexp_pattern"; public static final String ERROR_OUTPUT_PATH = "error_output_path"; + public static final String ERROR_DATA_CONNECTOR_TYPE = "error_data_connector_type"; public static final String INDEX = "index"; public static final String PATH = "path"; public static final String HDFS_FILE = "hdfs_file"; @@ -111,7 +112,6 @@ public class ConfigConstants { public static final String DATA_DIR = "data_dir"; public static final String ENABLE_SPARK_HIVE_SUPPORT = "enable_spark_hive_support"; - public static final String ENABLE_USE_VIEW = "enable_use_view"; public static final String FILE = "file"; diff --git a/datavines-common/src/main/java/io/datavines/common/datasource/jdbc/utils/HiveSqlUtils.java b/datavines-common/src/main/java/io/datavines/common/datasource/jdbc/utils/HiveSqlUtils.java index cbcbe1303..46319d97f 100644 --- a/datavines-common/src/main/java/io/datavines/common/datasource/jdbc/utils/HiveSqlUtils.java +++ b/datavines-common/src/main/java/io/datavines/common/datasource/jdbc/utils/HiveSqlUtils.java @@ -109,7 +109,7 @@ public static String formatSql(String sql) { return sql; } - private static Map getResultObjectMap(ResultSet rs, ResultSetMetaData metaData) throws SQLException { + public static Map getResultObjectMap(ResultSet rs, ResultSetMetaData metaData) throws SQLException { Map map = new LinkedHashMap<>(); for (int i = 1; i <= metaData.getColumnCount(); i++) { diff --git a/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/Dialect.java b/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/Dialect.java index b4627fb19..ecd658562 100644 --- a/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/Dialect.java +++ b/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/Dialect.java @@ -18,9 +18,13 @@ import io.datavines.common.enums.DataType; import io.datavines.common.utils.StringUtils; +import io.datavines.connector.api.entity.ResultList; import io.datavines.connector.api.entity.StructField; import org.apache.commons.collections4.CollectionUtils; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -138,4 +142,6 @@ default String getInsertAsSelectStatementFromSql(String srcTable, String targetD String getErrorDataScript(Map configMap); String getValidateResultDataScript(Map configMap); + + ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs, String sourceTable, int start, int end) throws SQLException; } diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/entity/QueryColumn.java b/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/entity/QueryColumn.java similarity index 97% rename from datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/entity/QueryColumn.java rename to datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/entity/QueryColumn.java index 337c634c7..aee8aaad9 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/entity/QueryColumn.java +++ b/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/entity/QueryColumn.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.datavines.engine.local.api.entity; +package io.datavines.connector.api.entity; public class QueryColumn { diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/entity/ResultList.java b/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/entity/ResultList.java similarity index 96% rename from datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/entity/ResultList.java rename to datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/entity/ResultList.java index f7d52ba91..a95938963 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/entity/ResultList.java +++ b/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/entity/ResultList.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.datavines.engine.local.api.entity; +package io.datavines.connector.api.entity; import java.io.Serializable; import java.util.ArrayList; diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/entity/ResultListWithColumns.java b/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/entity/ResultListWithColumns.java similarity index 96% rename from datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/entity/ResultListWithColumns.java rename to datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/entity/ResultListWithColumns.java index 852871668..e8b61d787 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/entity/ResultListWithColumns.java +++ b/datavines-connector/datavines-connector-api/src/main/java/io/datavines/connector/api/entity/ResultListWithColumns.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.datavines.engine.local.api.entity; +package io.datavines.connector.api.entity; import java.util.List; import java.util.Map; diff --git a/datavines-connector/datavines-connector-plugins/datavines-connector-file/src/main/java/io/datavines/connector/plugin/FileDialect.java b/datavines-connector/datavines-connector-plugins/datavines-connector-file/src/main/java/io/datavines/connector/plugin/FileDialect.java index f737d9609..82af1cac2 100644 --- a/datavines-connector/datavines-connector-plugins/datavines-connector-file/src/main/java/io/datavines/connector/plugin/FileDialect.java +++ b/datavines-connector/datavines-connector-plugins/datavines-connector-file/src/main/java/io/datavines/connector/plugin/FileDialect.java @@ -18,7 +18,11 @@ import io.datavines.common.utils.StringUtils; import io.datavines.connector.api.Dialect; +import io.datavines.connector.api.entity.ResultList; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.List; import java.util.Map; @@ -61,4 +65,9 @@ public String getValidateResultDataScript(Map configMap) { } return null; } + + @Override + public ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs, String sourceTable, int start, int end) throws SQLException { + return null; + } } diff --git a/datavines-connector/datavines-connector-plugins/datavines-connector-hive/src/main/java/io/datavines/connector/plugin/HiveDialect.java b/datavines-connector/datavines-connector-plugins/datavines-connector-hive/src/main/java/io/datavines/connector/plugin/HiveDialect.java index 27cf38f7a..fe0c814dd 100644 --- a/datavines-connector/datavines-connector-plugins/datavines-connector-hive/src/main/java/io/datavines/connector/plugin/HiveDialect.java +++ b/datavines-connector/datavines-connector-plugins/datavines-connector-hive/src/main/java/io/datavines/connector/plugin/HiveDialect.java @@ -16,10 +16,21 @@ */ package io.datavines.connector.plugin; +import io.datavines.common.datasource.jdbc.utils.HiveSqlUtils; +import io.datavines.connector.api.entity.ResultList; +import lombok.extern.slf4j.Slf4j; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import static io.datavines.common.ConfigConstants.STRING_TYPE; +@Slf4j public class HiveDialect extends JdbcDialect { @Override @@ -35,7 +46,12 @@ public String getDriver() { } @Override - public boolean invalidateItemCanOutput() { - return false; + public ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs,String sourceTable, int start, int end) throws SQLException { + List> resultList = new ArrayList<>(); + String sql = "select * from " + sourceTable + " LIMIT " + start + ", " + (end-start); + ResultSet errorDataResultSet = sourceConnectionStatement.executeQuery(sql); + ResultSetMetaData metaData = rs.getMetaData(); + resultList.add(HiveSqlUtils.getResultObjectMap(errorDataResultSet, metaData)); + return new ResultList(resultList); } } diff --git a/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/JdbcConfigBuilder.java b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/JdbcConfigBuilder.java index cc068e78e..831496f08 100644 --- a/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/JdbcConfigBuilder.java +++ b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/JdbcConfigBuilder.java @@ -144,15 +144,7 @@ protected InputParam getInputParam(String field, String title, String placeholde protected List getOtherParams(boolean isEn) { - List list = new ArrayList<>(); - - InputParam enableExternalCatalog = getInputParam("enable_use_view", - isEn ? "enable.use.view" : "允许使用视图", - isEn ? "please enter true or false" : "请填入 true 或者 false", 2, null, - "false"); - - list.add(enableExternalCatalog); - return list; + return null; } } diff --git a/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/JdbcDialect.java b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/JdbcDialect.java index 5b0c47c6d..f9f6951ad 100644 --- a/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/JdbcDialect.java +++ b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/JdbcDialect.java @@ -18,7 +18,12 @@ import io.datavines.common.utils.StringUtils; import io.datavines.connector.api.Dialect; +import io.datavines.connector.api.entity.ResultList; +import io.datavines.connector.plugin.utils.SqlUtils; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -80,4 +85,8 @@ public String getValidateResultDataScript(Map configMap) { return null; } + @Override + public ResultList getPageFromResultSet(Statement sourceConnectionStatement, ResultSet rs, String sourceTable, int start, int end) throws SQLException { + return SqlUtils.getPageFromResultSet(rs, start, end); + } } diff --git a/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/utils/JdbcUtils.java b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/utils/JdbcUtils.java index fa7f88760..730ba7090 100644 --- a/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/utils/JdbcUtils.java +++ b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/utils/JdbcUtils.java @@ -101,6 +101,10 @@ public static List getSchema(ResultSet resultSet, Dialect dialect, boolean isNullable = metaData.isNullable(i + 1) != ResultSetMetaData.columnNoNulls; StructField field = new StructField(); + String[] columns = columnName.split("\\."); + if (columns.length > 1) { + columnName = columns[columns.length - 1]; + } field.setName(columnName.toLowerCase()); field.setDataType(typeConverter.convert(typeName)); field.setNullable(isNullable); diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/utils/SqlUtils.java b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/utils/SqlUtils.java similarity index 96% rename from datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/utils/SqlUtils.java rename to datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/utils/SqlUtils.java index b1ac4e45a..4681c2048 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/utils/SqlUtils.java +++ b/datavines-connector/datavines-connector-plugins/datavines-connector-jdbc/src/main/java/io/datavines/connector/plugin/utils/SqlUtils.java @@ -14,19 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.datavines.engine.local.api.utils; +package io.datavines.connector.plugin.utils; import io.datavines.common.utils.StringUtils; -import io.datavines.engine.local.api.entity.QueryColumn; -import io.datavines.engine.local.api.entity.ResultList; -import io.datavines.engine.local.api.entity.ResultListWithColumns; +import io.datavines.connector.api.entity.QueryColumn; +import io.datavines.connector.api.entity.ResultList; +import io.datavines.connector.api.entity.ResultListWithColumns; +import lombok.extern.slf4j.Slf4j; import net.sf.jsqlparser.JSQLParserException; import net.sf.jsqlparser.expression.Alias; import net.sf.jsqlparser.parser.CCJSqlParserUtil; import net.sf.jsqlparser.schema.Table; import net.sf.jsqlparser.statement.select.*; import org.apache.commons.collections4.CollectionUtils; -import org.slf4j.Logger; import java.sql.*; import java.util.*; @@ -34,10 +34,9 @@ import static io.datavines.common.CommonConstants.DOT; import static org.apache.commons.lang3.StringUtils.EMPTY; +@Slf4j public class SqlUtils { - protected static Logger log = LoggerFactory.getLogger(SqlUtils.class); - public static ResultListWithColumns getListWithHeaderFromResultSet(ResultSet rs, Set queryFromsAndJoins) throws SQLException { ResultListWithColumns resultListWithColumns = new ResultListWithColumns(); diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalExecution.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalExecution.java index 2bec54f09..aa90dca61 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalExecution.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalExecution.java @@ -22,9 +22,8 @@ import io.datavines.common.exception.DataVinesException; import io.datavines.common.utils.StringUtils; import io.datavines.engine.api.env.Execution; -import io.datavines.engine.local.api.entity.ResultList; +import io.datavines.connector.api.entity.ResultList; import io.datavines.engine.local.api.utils.LoggerFactory; -import io.datavines.engine.local.api.utils.SqlUtils; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; @@ -58,7 +57,6 @@ public void execute(List sources, List transforms, return; } - List invalidItemTableSet = new ArrayList<>(); String preSql = null; String postSql = null; try { @@ -116,14 +114,11 @@ public void execute(List sources, List transforms, List taskResult = new ArrayList<>(); List actualValue = new ArrayList<>(); - transforms.forEach(localTransform -> { + for (LocalTransform localTransform : transforms) { + if (localRuntimeEnvironment.isStop()) { + break; + } switch (TransformType.of(localTransform.getConfig().getString(PLUGIN_TYPE))){ - case INVALIDATE_ITEMS: - if (StringUtils.isNotEmpty(localTransform.getConfig().getString(INVALIDATE_ITEMS_TABLE))) { - invalidItemTableSet.add(localTransform.getConfig().getString(INVALIDATE_ITEMS_TABLE)); - } - localTransform.process(localRuntimeEnvironment); - break; case ACTUAL_VALUE: ResultList actualValueResult = localTransform.process(localRuntimeEnvironment); actualValue.add(actualValueResult); @@ -138,9 +133,12 @@ public void execute(List sources, List transforms, default: break; } - }); + } for (LocalSink localSink : sinks) { + if (localRuntimeEnvironment.isStop()) { + break; + } switch (SinkType.of(localSink.getConfig().getString(PLUGIN_TYPE))){ case ERROR_DATA: localSink.output(null, localRuntimeEnvironment); @@ -159,14 +157,6 @@ public void execute(List sources, List transforms, } catch (Exception e) { log.error("execute error", e); throw e; - } finally { - for (String invalidItemTable : invalidItemTableSet) { - try { - SqlUtils.dropView(invalidItemTable, localRuntimeEnvironment.getSourceConnection().getConnection()); - } catch (SQLException sqlException) { - log.error("drop view error: ", sqlException); - } - } } post(postSql); diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalRuntimeEnvironment.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalRuntimeEnvironment.java index a2784c64b..d157868e4 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalRuntimeEnvironment.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalRuntimeEnvironment.java @@ -22,6 +22,8 @@ import io.datavines.engine.api.env.RuntimeEnvironment; import io.datavines.engine.local.api.entity.ConnectionHolder; import io.datavines.engine.local.api.utils.LoggerFactory; +import lombok.Getter; +import lombok.Setter; import org.slf4j.Logger; import java.sql.Statement; @@ -30,14 +32,24 @@ public class LocalRuntimeEnvironment implements RuntimeEnvironment { protected Logger log = LoggerFactory.getLogger(LocalRuntimeEnvironment.class); + @Setter + @Getter private ConnectionHolder sourceConnection; + @Setter + @Getter private ConnectionHolder targetConnection; + @Setter + @Getter private ConnectionHolder metadataConnection; + @Setter private Statement currentStatement; + @Getter + private boolean stop; + @Override public void prepare() { @@ -63,33 +75,9 @@ public CheckResult checkConfig() { return null; } - public ConnectionHolder getSourceConnection() { - return sourceConnection; - } - - public void setSourceConnection(ConnectionHolder sourceConnection) { - this.sourceConnection = sourceConnection; - } - - public ConnectionHolder getMetadataConnection() { - return metadataConnection; - } - - public void setMetadataConnection(ConnectionHolder metadataConnection) { - this.metadataConnection = metadataConnection; - } - - public ConnectionHolder getTargetConnection() { - return targetConnection; - } - - public void setTargetConnection(ConnectionHolder targetConnection) { - this.targetConnection = targetConnection; - } - public void close() throws Exception { if (currentStatement != null) { - currentStatement.close(); + currentStatement.cancel(); } if (sourceConnection != null) { @@ -103,9 +91,8 @@ public void close() throws Exception { if (metadataConnection != null) { metadataConnection.close(); } - } - public void setCurrentStatement(Statement statement) { - this.currentStatement = statement; + stop = true; } + } diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalSink.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalSink.java index f3d5df0cf..7b4c98d38 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalSink.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalSink.java @@ -19,7 +19,7 @@ import io.datavines.common.config.Config; import io.datavines.common.utils.StringUtils; import io.datavines.engine.api.component.Component; -import io.datavines.engine.local.api.entity.ResultList; +import io.datavines.connector.api.entity.ResultList; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalTransform.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalTransform.java index dd6b99c30..08c2c652e 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalTransform.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/LocalTransform.java @@ -16,10 +16,11 @@ */ package io.datavines.engine.local.api; +import io.datavines.common.exception.DataVinesException; import io.datavines.engine.api.component.Component; -import io.datavines.engine.local.api.entity.ResultList; +import io.datavines.connector.api.entity.ResultList; public interface LocalTransform extends Component { - ResultList process(LocalRuntimeEnvironment env); + ResultList process(LocalRuntimeEnvironment env) throws DataVinesException; } diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/utils/FileUtils.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/utils/FileUtils.java index 7a7a25004..22e3d2d07 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/utils/FileUtils.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-api/src/main/java/io/datavines/engine/local/api/utils/FileUtils.java @@ -17,8 +17,8 @@ package io.datavines.engine.local.api.utils; import io.datavines.connector.api.TypeConverter; -import io.datavines.engine.local.api.entity.QueryColumn; -import io.datavines.engine.local.api.entity.ResultListWithColumns; +import io.datavines.connector.api.entity.QueryColumn; +import io.datavines.connector.api.entity.ResultListWithColumns; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/BaseLocalConfigurationBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/BaseLocalConfigurationBuilder.java index b18f02a57..b6253e42f 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/BaseLocalConfigurationBuilder.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/BaseLocalConfigurationBuilder.java @@ -66,10 +66,6 @@ protected List getSourceConfigs() throws DataVinesException { .getPluginLoader(ConnectorFactory.class) .getNewPlugin(connectorParameter.getType()); - if (connectorParameter.getParameters().get(ENABLE_USE_VIEW) != null) { - metricInputParameter.put(ENABLE_USE_VIEW, String.valueOf(connectorParameter.getParameters().get(ENABLE_USE_VIEW))); - } - Map connectorParameterMap = new HashMap<>(connectorParameter.getParameters()); connectorParameterMap.putAll(metricInputParameter); connectorParameterMap = connectorFactory.getConnectorParameterConverter().converter(connectorParameterMap); @@ -207,24 +203,19 @@ public void buildTransformConfigs() { invalidateItemCanOutput &= sqlMetric.isInvalidateItemsCanOutput(); metricInputParameter.put(INVALIDATE_ITEM_CAN_OUTPUT, String.valueOf(invalidateItemCanOutput)); - boolean isEnableUseView = false; - if (metricInputParameter.get(ENABLE_USE_VIEW) != null) { - isEnableUseView = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW)); - } - - if (isEnableUseView) { - // generate invalidate item execute sql - if (sqlMetric.getInvalidateItems(metricInputParameter) != null) { - ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter); - metricInputParameter.put(INVALIDATE_ITEMS_TABLE, invalidateItemExecuteSql.getResultTable()); - invalidateItemExecuteSql.setResultTable(invalidateItemExecuteSql.getResultTable()); + if (sqlMetric.getInvalidateItems(metricInputParameter) != null) { + // generate actual value execute sql + ExecuteSql actualValueExecuteSql = sqlMetric.getDirectActualValue(metricInputParameter); + if (actualValueExecuteSql != null) { + actualValueExecuteSql.setResultTable(sqlMetric.getDirectActualValue(metricInputParameter).getResultTable()); MetricParserUtils.setTransformerConfig( metricInputParameter, transformConfigs, - invalidateItemExecuteSql, - TransformType.INVALIDATE_ITEMS.getDescription()); + actualValueExecuteSql, + TransformType.ACTUAL_VALUE.getDescription()); + metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable()); } - + } else { // generate actual value execute sql ExecuteSql actualValueExecuteSql = sqlMetric.getActualValue(metricInputParameter); if (actualValueExecuteSql != null) { @@ -236,32 +227,6 @@ public void buildTransformConfigs() { TransformType.ACTUAL_VALUE.getDescription()); metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable()); } - } else { - if (sqlMetric.getInvalidateItems(metricInputParameter) != null) { - // generate actual value execute sql - ExecuteSql actualValueExecuteSql = sqlMetric.getDirectActualValue(metricInputParameter); - if (actualValueExecuteSql != null) { - actualValueExecuteSql.setResultTable(sqlMetric.getDirectActualValue(metricInputParameter).getResultTable()); - MetricParserUtils.setTransformerConfig( - metricInputParameter, - transformConfigs, - actualValueExecuteSql, - TransformType.ACTUAL_VALUE.getDescription()); - metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable()); - } - } else { - // generate actual value execute sql - ExecuteSql actualValueExecuteSql = sqlMetric.getActualValue(metricInputParameter); - if (actualValueExecuteSql != null) { - actualValueExecuteSql.setResultTable(sqlMetric.getActualValue(metricInputParameter).getResultTable()); - MetricParserUtils.setTransformerConfig( - metricInputParameter, - transformConfigs, - actualValueExecuteSql, - TransformType.ACTUAL_VALUE.getDescription()); - metricInputParameter.put(ACTUAL_TABLE, sqlMetric.getActualValue(metricInputParameter).getResultTable()); - } - } } // generate expected value transform sql diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/LocalMultiTableAccuracyMetricBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/LocalMultiTableAccuracyMetricBuilder.java index 9c5137211..c37357e55 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/LocalMultiTableAccuracyMetricBuilder.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/LocalMultiTableAccuracyMetricBuilder.java @@ -125,27 +125,21 @@ public void buildSinkConfigs() throws DataVinesException { connectorParameterMap.put(ERROR_DATA_FILE_NAME, jobExecutionInfo.getErrorDataFileName()); connectorParameterMap.put(ERROR_DATA_DIR, metricInputParameter.get(ERROR_DATA_DIR)); connectorParameterMap.put(METRIC_NAME, metricInputParameter.get(METRIC_NAME)); - boolean isEnableUseView = false; - if (metricInputParameter.get(ENABLE_USE_VIEW) != null) { - isEnableUseView = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW)); - } - if (isEnableUseView) { - connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE)); - } else { - String metricType = parameter.getMetricType(); - SqlMetric sqlMetric = PluginLoader - .getPluginLoader(SqlMetric.class) - .getNewPlugin(metricType); - MetricParserUtils.operateInputParameter(metricInputParameter, sqlMetric, jobExecutionInfo); - if (sqlMetric.getInvalidateItems(metricInputParameter) != null) { - ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter); - connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, "(" + ParameterUtils.convertParameterPlaceholders(invalidateItemExecuteSql.getSql(), metricInputParameter) + ") t"); - } + String metricType = parameter.getMetricType(); + SqlMetric sqlMetric = PluginLoader + .getPluginLoader(SqlMetric.class) + .getNewPlugin(metricType); + MetricParserUtils.operateInputParameter(metricInputParameter, sqlMetric, jobExecutionInfo); + if (sqlMetric.getInvalidateItems(metricInputParameter) != null) { + ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter); + connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, "(" + ParameterUtils.convertParameterPlaceholders(invalidateItemExecuteSql.getSql(), metricInputParameter) + ") t"); } + connectorParameterMap.put(INVALIDATE_ITEM_CAN_OUTPUT, metricInputParameter.get(INVALIDATE_ITEM_CAN_OUTPUT)); // use to get source type converter in sink connectorParameterMap.put(SRC_CONNECTOR_TYPE, metricInputParameter.get(SRC_CONNECTOR_TYPE)); + connectorParameterMap.put(ERROR_DATA_CONNECTOR_TYPE, jobExecutionInfo.getErrorDataStorageType()); connectorParameterMap.put(JOB_EXECUTION_ID, metricInputParameter.get(JOB_EXECUTION_ID)); connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver()); errorDataSinkConfig.setConfig(connectorParameterMap); diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/LocalSingleTableMetricBuilder.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/LocalSingleTableMetricBuilder.java index 45868afda..3933e13bb 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/LocalSingleTableMetricBuilder.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-config/src/main/java/io/datavines/engine/local/config/LocalSingleTableMetricBuilder.java @@ -97,29 +97,21 @@ public void buildSinkConfigs() throws DataVinesException { connectorParameterMap.put(ERROR_DATA_FILE_NAME, jobExecutionInfo.getErrorDataFileName()); connectorParameterMap.put(ERROR_DATA_DIR, metricInputParameter.get(ERROR_DATA_DIR)); connectorParameterMap.put(METRIC_NAME, metricInputParameter.get(METRIC_NAME)); - boolean isEnableUseView = false; - if (metricInputParameter.get(ENABLE_USE_VIEW) != null) { - isEnableUseView = Boolean.parseBoolean(metricInputParameter.get(ENABLE_USE_VIEW)); - } - if (isEnableUseView) { - connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, metricInputParameter.get(INVALIDATE_ITEMS_TABLE)); - } else { - String metricType = parameter.getMetricType(); - SqlMetric sqlMetric = PluginLoader - .getPluginLoader(SqlMetric.class) - .getNewPlugin(metricType); - MetricParserUtils.operateInputParameter(metricInputParameter, sqlMetric, jobExecutionInfo); - if (sqlMetric.getInvalidateItems(metricInputParameter) != null) { - ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter); - connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, "(" + ParameterUtils.convertParameterPlaceholders(invalidateItemExecuteSql.getSql(), metricInputParameter) + ") t"); - } + String metricType = parameter.getMetricType(); + SqlMetric sqlMetric = PluginLoader + .getPluginLoader(SqlMetric.class) + .getNewPlugin(metricType); + MetricParserUtils.operateInputParameter(metricInputParameter, sqlMetric, jobExecutionInfo); + if (sqlMetric.getInvalidateItems(metricInputParameter) != null) { + ExecuteSql invalidateItemExecuteSql = sqlMetric.getInvalidateItems(metricInputParameter); + connectorParameterMap.put(INVALIDATE_ITEMS_TABLE, "(" + ParameterUtils.convertParameterPlaceholders(invalidateItemExecuteSql.getSql(), metricInputParameter) + ") t"); } - connectorParameterMap.put(ENABLE_USE_VIEW, isEnableUseView); connectorParameterMap.put(INVALIDATE_ITEM_CAN_OUTPUT, metricInputParameter.get(INVALIDATE_ITEM_CAN_OUTPUT)); // use to get source type converter in sink connectorParameterMap.put(SRC_CONNECTOR_TYPE, metricInputParameter.get(SRC_CONNECTOR_TYPE)); + connectorParameterMap.put(ERROR_DATA_CONNECTOR_TYPE, jobExecutionInfo.getErrorDataStorageType()); connectorParameterMap.put(JOB_EXECUTION_ID, metricInputParameter.get(JOB_EXECUTION_ID)); connectorParameterMap.put(DRIVER, connectorFactory.getDialect().getDriver()); errorDataSinkConfig.setConfig(connectorParameterMap); diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-file/src/main/java/io/datavines/engine/local/connector/LocalFileSink.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-file/src/main/java/io/datavines/engine/local/connector/LocalFileSink.java index a3025d21e..d0f127ba0 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-file/src/main/java/io/datavines/engine/local/connector/LocalFileSink.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-file/src/main/java/io/datavines/engine/local/connector/LocalFileSink.java @@ -26,11 +26,11 @@ import io.datavines.engine.api.env.RuntimeEnvironment; import io.datavines.engine.local.api.LocalRuntimeEnvironment; import io.datavines.engine.local.api.LocalSink; -import io.datavines.engine.local.api.entity.ResultList; -import io.datavines.engine.local.api.entity.ResultListWithColumns; +import io.datavines.connector.api.entity.ResultList; +import io.datavines.connector.api.entity.ResultListWithColumns; import io.datavines.engine.local.api.utils.FileUtils; import io.datavines.engine.local.api.utils.LoggerFactory; -import io.datavines.engine.local.api.utils.SqlUtils; +import io.datavines.connector.plugin.utils.SqlUtils; import io.datavines.spi.PluginLoader; import org.slf4j.Logger; diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/BaseJdbcSink.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/BaseJdbcSink.java index f476f43dd..4bb2c128c 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/BaseJdbcSink.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/BaseJdbcSink.java @@ -23,7 +23,7 @@ import io.datavines.engine.api.env.RuntimeEnvironment; import io.datavines.engine.local.api.LocalRuntimeEnvironment; import io.datavines.engine.local.api.LocalSink; -import io.datavines.engine.local.api.entity.ResultList; +import io.datavines.connector.api.entity.ResultList; import io.datavines.engine.local.api.utils.LoggerFactory; import io.datavines.engine.local.connector.executor.*; import org.slf4j.Logger; diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/executor/BaseDataSinkExecutor.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/executor/BaseDataSinkExecutor.java index 71c4de09f..31801506d 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/executor/BaseDataSinkExecutor.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/executor/BaseDataSinkExecutor.java @@ -21,7 +21,7 @@ import io.datavines.common.utils.StringUtils; import io.datavines.engine.local.api.LocalRuntimeEnvironment; import io.datavines.engine.local.api.utils.LoggerFactory; -import io.datavines.engine.local.api.utils.SqlUtils; +import io.datavines.connector.plugin.utils.SqlUtils; import org.slf4j.Logger; import java.sql.SQLException; @@ -87,7 +87,7 @@ private boolean checkTableExist(LocalRuntimeEnvironment env, String tableName) t //定义一个变量标示 boolean flag = false ; //一个查询该表所有的语句。 - String sql = "SELECT COUNT(*) FROM "+ tableName ; + String sql = "SELECT 1 FROM "+ tableName ; try (Statement statement = env.getMetadataConnection().getConnection().createStatement()) { statement.executeQuery(sql); flag = true; diff --git a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/executor/ErrorDataSinkExecutor.java b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/executor/ErrorDataSinkExecutor.java index bcbbf9b18..08f8bc856 100644 --- a/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/executor/ErrorDataSinkExecutor.java +++ b/datavines-engine/datavines-engine-plugins/datavines-engine-local/datavines-engine-local-connector-jdbc/src/main/java/io/datavines/engine/local/connector/executor/ErrorDataSinkExecutor.java @@ -28,9 +28,9 @@ import io.datavines.connector.plugin.utils.JdbcUtils; import io.datavines.engine.local.api.LocalRuntimeEnvironment; import io.datavines.engine.local.api.entity.ConnectionHolder; -import io.datavines.engine.local.api.entity.ResultList; +import io.datavines.connector.api.entity.ResultList; import io.datavines.engine.local.api.utils.LoggerFactory; -import io.datavines.engine.local.api.utils.SqlUtils; +import io.datavines.connector.plugin.utils.SqlUtils; import io.datavines.spi.PluginLoader; import org.slf4j.Logger; @@ -105,22 +105,14 @@ private void sinkErrorDataToDataSource() { } String srcConnectorType = config.getString(SRC_CONNECTOR_TYPE); - boolean isEnableUseView = config.getBoolean(ENABLE_USE_VIEW); + ConnectorFactory connectorFactory = PluginLoader.getPluginLoader(ConnectorFactory.class).getOrCreatePlugin(srcConnectorType); Dialect dialect = connectorFactory.getDialect(); if (!checkTableExist(getConnectionHolder().getConnection(), dialect.quoteIdentifier(targetDatabase)+"."+dialect.quoteIdentifier(targetTable), dialect)) { - if (isEnableUseView) { - sourceConnectionStatement.execute(dialect.getCreateTableAsSelectStatement(sourceTable, targetDatabase, targetTable)); - } else { - sourceConnectionStatement.execute(dialect.getCreateTableAsSelectStatementFromSql(sourceTable, targetDatabase, targetTable)); - } + sourceConnectionStatement.execute(dialect.getCreateTableAsSelectStatementFromSql(sourceTable, targetDatabase, targetTable)); } else { - if (isEnableUseView) { - sourceConnectionStatement.execute(dialect.getInsertAsSelectStatement(sourceTable, targetDatabase, targetTable)); - } else { - sourceConnectionStatement.execute(dialect.getInsertAsSelectStatementFromSql(sourceTable, targetDatabase, targetTable)); - } + sourceConnectionStatement.execute(dialect.getInsertAsSelectStatementFromSql(sourceTable, targetDatabase, targetTable)); } } catch (Exception e) { @@ -148,7 +140,7 @@ private void sinkErrorData() { env.setCurrentStatement(sourceConnectionStatement); String srcConnectorType = config.getString(SRC_CONNECTOR_TYPE); ConnectorFactory connectorFactory = PluginLoader.getPluginLoader(ConnectorFactory.class).getOrCreatePlugin(srcConnectorType); - + ConnectorFactory errorDataConnectorFactory = PluginLoader.getPluginLoader(ConnectorFactory.class).getOrCreatePlugin(config.getString(ERROR_DATA_CONNECTOR_TYPE)); int count = 0; //执行统计行数语句 countResultSet = sourceConnectionStatement.executeQuery(connectorFactory.getDialect().getCountQuery(sourceTable)); @@ -164,10 +156,11 @@ private void sinkErrorData() { TypeConverter typeConverter = connectorFactory.getTypeConverter(); Dialect dialect = connectorFactory.getDialect(); + Dialect errorDataConnectorDialect = errorDataConnectorFactory.getDialect(); String targetTableName = config.getString(ERROR_DATA_FILE_NAME); List columns = getTableSchema(sourceConnectionStatement, config, typeConverter); - if (!checkTableExist(getConnectionHolder().getConnection(), targetTableName, dialect)) { - createTable(typeConverter, dialect, targetTableName, columns); + if (!checkTableExist(getConnectionHolder().getConnection(), targetTableName, errorDataConnectorDialect)) { + createTable(typeConverter, errorDataConnectorDialect, targetTableName, columns); } //根据行数进行分页查询。分批写到文件里面 int pageSize = 1000; @@ -175,7 +168,7 @@ private void sinkErrorData() { errorDataResultSet = sourceConnectionStatement.executeQuery(connectorFactory.getDialect().getSelectQuery(sourceTable)); errorDataStorageConnection = getConnectionHolder().getConnection(); - String insertStatement = JdbcUtils.getInsertStatement(targetTableName, columns, dialect); + String insertStatement = JdbcUtils.getInsertStatement(targetTableName, columns, errorDataConnectorDialect); if (StringUtils.isEmpty(insertStatement)) { return; } @@ -185,8 +178,8 @@ private void sinkErrorData() { for (int i=0; i row: resultList.getResultList()) { for (int j=0; j