Skip to content

Commit

Permalink
[JDBC] [Config] Add JDBC Fetch Size Config And Custom Postgres Prepar…
Browse files Browse the repository at this point in the history
…eStatement (#3478)

* [JDBC] [Config] Add JDBC Fetch Size Config
  • Loading branch information
Hisoka-X authored Nov 21, 2022
1 parent 9589a7f commit d60a705
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 2 deletions.
7 changes: 7 additions & 0 deletions docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ supports query SQL and can achieve projection effect.
| partition_upper_bound | Long | No | - |
| partition_lower_bound | Long | No | - |
| partition_num | Int | No | job parallelism |
| fetch_size | Int | No | 0 |
| common-options | | No | - |


Expand Down Expand Up @@ -77,6 +78,11 @@ The partition_column min value for scan, if not set SeaTunnel will query databas

The number of partition count, only support positive integer. default value is job parallelism

### fetch_size [int]

For queries that return a large number of objects, you can configure the row fetch size used in the query to
improve performance by reducing the number database hits required to satisfy the selection criteria. Zero means use jdbc default value.

### common options

Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
Expand Down Expand Up @@ -151,3 +157,4 @@ parallel:

- [BugFix] Fix jdbc split bug ([3220](https://github.com/apache/incubator-seatunnel/pull/3220))
- [Feature] Support Tablestore Source ([3309](https://github.com/apache/incubator-seatunnel/pull/3309))
- [Feature] Support JDBC Fetch Size Config ([3478](https://github.com/apache/incubator-seatunnel/pull/3478))
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class JdbcConfig implements Serializable {

public static final Option<Integer> BATCH_SIZE = Options.key("batch_size").intType().noDefaultValue().withDescription("batch size");

public static final Option<Integer> FETCH_SIZE = Options.key("fetch_size").intType().defaultValue(0).withDescription("For queries that return a large number of objects, " +
"you can configure the row fetch size used in the query to improve performance by reducing the number database hits required to satisfy the selection criteria. Zero means use jdbc default value.");

public static final Option<Integer> BATCH_INTERVAL_MS = Options.key("batch_interval_ms").intType().noDefaultValue().withDescription("batch interval milliSecond");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class JdbcSourceOptions implements Serializable {
private String partitionColumn;
private Long partitionUpperBound;
private Long partitionLowerBound;
private int fetchSize = JdbcConfig.FETCH_SIZE.defaultValue();
private Integer partitionNumber;

public JdbcSourceOptions(Config config) {
Expand All @@ -54,6 +55,9 @@ public JdbcSourceOptions(Config config) {
if (config.hasPath(JdbcConfig.PARTITION_NUM.key())) {
this.partitionNumber = config.getInt(JdbcConfig.PARTITION_NUM.key());
}
if (config.hasPath(JdbcConfig.FETCH_SIZE.key())) {
this.fetchSize = config.getInt(JdbcConfig.FETCH_SIZE.key());
}
}

public JdbcConnectionOptions getJdbcConnectionOptions() {
Expand All @@ -75,4 +79,8 @@ public Optional<Long> getPartitionLowerBound() {
public Optional<Integer> getPartitionNumber() {
return Optional.ofNullable(partitionNumber);
}

public int getFetchSize() {
return fetchSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,18 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Collectors;

public class PostgresDialect implements JdbcDialect {

public static final int DEFAULT_POSTGRES_FETCH_SIZE = 128;

@Override
public String dialectName() {
return "PostgreSQL";
Expand Down Expand Up @@ -53,4 +60,17 @@ public Optional<String> getUpsertStatement(String tableName, String[] fieldNames
getInsertIntoStatement(tableName, fieldNames), uniqueColumns, updateClause);
return Optional.of(upsertSQL);
}

@Override
public PreparedStatement creatPreparedStatement(Connection connection, String queryTemplate, int fetchSize) throws SQLException {
// use cursor mode, reference: https://jdbc.postgresql.org/documentation/query/#getting-results-based-on-a-cursor
connection.setAutoCommit(false);
PreparedStatement statement = connection.prepareStatement(queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
if (fetchSize > 0) {
statement.setFetchSize(fetchSize);
} else {
statement.setFetchSize(DEFAULT_POSTGRES_FETCH_SIZE);
}
return statement;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
jdbcDialect,
typeInfo,
query,
0,
true
jdbcSourceOptions.getFetchSize(),
jdbcSourceOptions.getJdbcConnectionOptions().isAutoCommit()
);
}

Expand Down

0 comments on commit d60a705

Please sign in to comment.