Skip to content

Commit

Permalink
jdbc连接器:xugu自动建表语句替换PreparedStatement为createStatement,避免binlog识别不到
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonYoah committed Jun 7, 2024
1 parent dba212d commit 830bca1
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu.XuguTypeMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -61,6 +59,7 @@

@Slf4j
public class XuguCatalog extends AbstractJdbcCatalog {
private static final Logger LOG = LoggerFactory.getLogger(XuguCatalog.class);

private static final XuguDataTypeConvertor DATA_TYPE_CONVERTOR =
new XuguDataTypeConvertor();
Expand Down Expand Up @@ -146,6 +145,15 @@ protected String getListDatabaseSql() {
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
return new XuguCreateTableSqlBuilder(table).build(tablePath);
}
protected boolean executeInternal(String url, String sql) throws SQLException {
LOG.info("Execute sql : {}", sql);
// try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
// return ps.execute();
// }
try (Statement stmt = getConnection(url).createStatement()) {
return stmt.execute(sql);
}
}

@Override
protected String getDropTableSql(TablePath tablePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public static JdbcSinkConfig of(ReadonlyConfig config) {
builder.isExactlyOnce(config.get(JdbcOptions.IS_EXACTLY_ONCE));
config.getOptional(JdbcOptions.PRIMARY_KEYS).ifPresent(builder::primaryKeys);
config.getOptional(JdbcOptions.DATABASE).ifPresent(builder::database);
// TODO 优化多余的空格
config.getOptional(JdbcOptions.TABLE).ifPresent(builder::table);
builder.enableUpsert(config.get(ENABLE_UPSERT));
builder.isPrimaryKeyUpdated(config.get(IS_PRIMARY_KEY_UPDATED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static List<JdbcSourceTableConfig> of(ReadonlyConfig connectorConfig) {
} else {
JdbcSourceTableConfig tableProperty =
JdbcSourceTableConfig.builder()
// TODO 优化多余的空格
.tablePath(connectorConfig.get(JdbcSourceOptions.TABLE_PATH))
.query(connectorConfig.get(JdbcOptions.QUERY))
.partitionColumn(connectorConfig.get(JdbcOptions.PARTITION_COLUMN))
Expand Down

0 comments on commit 830bca1

Please sign in to comment.