diff --git a/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java b/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java index f9904a33cdf..d1551f96cb4 100644 --- a/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java +++ b/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/xugu/XuguCatalog.java @@ -32,12 +32,10 @@ import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; +import java.sql.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -61,6 +59,7 @@ @Slf4j public class XuguCatalog extends AbstractJdbcCatalog { + private static final Logger LOG = LoggerFactory.getLogger(XuguCatalog.class); private static final XuguDataTypeConvertor DATA_TYPE_CONVERTOR = new XuguDataTypeConvertor(); @@ -146,6 +145,15 @@ protected String getListDatabaseSql() { protected String getCreateTableSql(TablePath tablePath, CatalogTable table) { return new XuguCreateTableSqlBuilder(table).build(tablePath); } + protected boolean executeInternal(String url, String sql) throws SQLException { + LOG.info("Execute sql : {}", sql); +// try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) { +// return ps.execute(); +// } + try (Statement stmt = getConnection(url).createStatement()) { + return stmt.execute(sql); + } + } @Override protected String getDropTableSql(TablePath tablePath) { diff --git a/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java b/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java index 738185c8fe9..371a8538a3b 100644 --- a/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java +++ b/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSinkConfig.java @@ -53,6 +53,7 @@ public static JdbcSinkConfig of(ReadonlyConfig config) { builder.isExactlyOnce(config.get(JdbcOptions.IS_EXACTLY_ONCE)); config.getOptional(JdbcOptions.PRIMARY_KEYS).ifPresent(builder::primaryKeys); config.getOptional(JdbcOptions.DATABASE).ifPresent(builder::database); + // TODO 优化多余的空格 config.getOptional(JdbcOptions.TABLE).ifPresent(builder::table); builder.enableUpsert(config.get(ENABLE_UPSERT)); builder.isPrimaryKeyUpdated(config.get(IS_PRIMARY_KEY_UPDATED)); diff --git a/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java b/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java index b4a6e890dfc..071e991a2f4 100644 --- a/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java +++ b/seatunnel-connectors-v2/connector-mapping-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceTableConfig.java @@ -73,6 +73,7 @@ public static List of(ReadonlyConfig connectorConfig) { } else { JdbcSourceTableConfig tableProperty = JdbcSourceTableConfig.builder() + // TODO 优化多余的空格 .tablePath(connectorConfig.get(JdbcSourceOptions.TABLE_PATH)) .query(connectorConfig.get(JdbcOptions.QUERY)) .partitionColumn(connectorConfig.get(JdbcOptions.PARTITION_COLUMN))