Skip to content

Commit

Permalink
[Improve][Connector-V2] Close all ResultSet after used (apache#7389)
Browse files Browse the repository at this point in the history
* [Improve][Connector-V2] Close all ResultSet after used

* update
  • Loading branch information
Hisoka-X authored Aug 16, 2024
1 parent 7130382 commit 853e973
Show file tree
Hide file tree
Showing 35 changed files with 326 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,24 @@ default Optional<PrimaryKey> getPrimaryKey(JdbcConnection jdbcConnection, TableI

DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();

// seq -> column name
List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
String pkName = null;

// According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
ResultSet rs =
metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(), tableId.table());

// seq -> column name
List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
String pkName = null;
while (rs.next()) {
// all the PK_NAME should be the same
pkName = rs.getString("PK_NAME");
String columnName = rs.getString("COLUMN_NAME");
int keySeq = rs.getInt("KEY_SEQ");
// KEY_SEQ is 1-based index
primaryKeyColumns.add(Pair.of(keySeq, columnName));
try (ResultSet rs =
metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(), tableId.table())) {
while (rs.next()) {
// all the PK_NAME should be the same
pkName = rs.getString("PK_NAME");
String columnName = rs.getString("COLUMN_NAME");
int keySeq = rs.getInt("KEY_SEQ");
// KEY_SEQ is 1-based index
primaryKeyColumns.add(Pair.of(keySeq, columnName));
}
}
// initialize size
List<String> pkFields =
Expand Down Expand Up @@ -121,41 +123,42 @@ default List<ConstraintKey> getConstraintKeys(JdbcConnection jdbcConnection, Tab
throws SQLException {
DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();

ResultSet resultSet =
try (ResultSet resultSet =
metaData.getIndexInfo(
tableId.catalog(), tableId.schema(), tableId.table(), false, false);
// index name -> index
Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
while (resultSet.next()) {
String columnName = resultSet.getString("COLUMN_NAME");
if (columnName == null) {
continue;
tableId.catalog(), tableId.schema(), tableId.table(), false, false)) {
// index name -> index
Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
while (resultSet.next()) {
String columnName = resultSet.getString("COLUMN_NAME");
if (columnName == null) {
continue;
}

String indexName = resultSet.getString("INDEX_NAME");
boolean noUnique = resultSet.getBoolean("NON_UNIQUE");

ConstraintKey constraintKey =
constraintKeyMap.computeIfAbsent(
indexName,
s -> {
ConstraintKey.ConstraintType constraintType =
ConstraintKey.ConstraintType.INDEX_KEY;
if (!noUnique) {
constraintType = ConstraintKey.ConstraintType.UNIQUE_KEY;
}
return ConstraintKey.of(
constraintType, indexName, new ArrayList<>());
});

ConstraintKey.ColumnSortType sortType =
"A".equals(resultSet.getString("ASC_OR_DESC"))
? ConstraintKey.ColumnSortType.ASC
: ConstraintKey.ColumnSortType.DESC;
ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
new ConstraintKey.ConstraintKeyColumn(columnName, sortType);
constraintKey.getColumnNames().add(constraintKeyColumn);
}

String indexName = resultSet.getString("INDEX_NAME");
boolean noUnique = resultSet.getBoolean("NON_UNIQUE");

ConstraintKey constraintKey =
constraintKeyMap.computeIfAbsent(
indexName,
s -> {
ConstraintKey.ConstraintType constraintType =
ConstraintKey.ConstraintType.INDEX_KEY;
if (!noUnique) {
constraintType = ConstraintKey.ConstraintType.UNIQUE_KEY;
}
return ConstraintKey.of(
constraintType, indexName, new ArrayList<>());
});

ConstraintKey.ColumnSortType sortType =
"A".equals(resultSet.getString("ASC_OR_DESC"))
? ConstraintKey.ColumnSortType.ASC
: ConstraintKey.ColumnSortType.DESC;
ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
new ConstraintKey.ConstraintKeyColumn(columnName, sortType);
constraintKey.getColumnNames().add(constraintKeyColumn);
return new ArrayList<>(constraintKeyMap.values());
}
return new ArrayList<>(constraintKeyMap.values());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ private boolean clickhouseServerEnableExperimentalLightweightDelete(
return false;
}
String configKey = "allow_experimental_lightweight_delete";
try (Statement stmt = clickhouseConnection.createStatement()) {
ResultSet resultSet = stmt.executeQuery("SHOW SETTINGS ILIKE '%" + configKey + "%'");
try (Statement stmt = clickhouseConnection.createStatement();
ResultSet resultSet =
stmt.executeQuery("SHOW SETTINGS ILIKE '%" + configKey + "%'")) {
while (resultSet.next()) {
String name = resultSet.getString("name");
if (name.equalsIgnoreCase(configKey)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ public void open() throws CatalogException {
private String getDorisVersion() throws SQLException {
String dorisVersion = null;
try (PreparedStatement preparedStatement =
conn.prepareStatement(DorisCatalogUtil.QUERY_DORIS_VERSION_QUERY)) {
ResultSet resultSet = preparedStatement.executeQuery();
conn.prepareStatement(DorisCatalogUtil.QUERY_DORIS_VERSION_QUERY);
ResultSet resultSet = preparedStatement.executeQuery()) {

while (resultSet.next()) {
dorisVersion = resultSet.getString(2);
}
Expand Down Expand Up @@ -180,8 +181,9 @@ public String getDefaultDatabase() throws CatalogException {
public boolean databaseExists(String databaseName) throws CatalogException {
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.DATABASE_QUERY)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
return rs.next();
try (ResultSet rs = ps.executeQuery()) {
return rs.next();
}
} catch (SQLException e) {
throw new CatalogException("check database exists failed", e);
}
Expand All @@ -190,8 +192,8 @@ public boolean databaseExists(String databaseName) throws CatalogException {
@Override
public List<String> listDatabases() throws CatalogException {
List<String> databases = new ArrayList<>();
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.ALL_DATABASES_QUERY)) {
ResultSet rs = ps.executeQuery();
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.ALL_DATABASES_QUERY);
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String database = rs.getString(1);
databases.add(database);
Expand All @@ -210,10 +212,11 @@ public List<String> listTables(String databaseName)
try (PreparedStatement ps =
conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_DATABASE_QUERY)) {
ps.setString(1, databaseName);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String table = rs.getString(1);
tables.add(table);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String table = rs.getString(1);
tables.add(table);
}
}
} catch (SQLException e) {
throw new CatalogException(
Expand All @@ -229,8 +232,9 @@ public boolean tableExists(TablePath tablePath) throws CatalogException {
conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_IDENTIFIER_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
ResultSet rs = ps.executeQuery();
return rs.next();
try (ResultSet rs = ps.executeQuery()) {
return rs.next();
}
} catch (SQLException e) {
throw new CatalogException(
String.format("check table [%s] exists failed", tablePath.getFullName()), e);
Expand All @@ -248,18 +252,19 @@ public CatalogTable getTable(TablePath tablePath)
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
ResultSet rs = ps.executeQuery();
Map<String, String> options = connectorOptions();
buildTableSchemaWithErrorCheck(
tablePath, rs, builder, options, Collections.emptyList());
return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
options,
Collections.emptyList(),
"",
catalogName);
try (ResultSet rs = ps.executeQuery()) {
Map<String, String> options = connectorOptions();
buildTableSchemaWithErrorCheck(
tablePath, rs, builder, options, Collections.emptyList());
return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
options,
Collections.emptyList(),
"",
catalogName);
}
} catch (SeaTunnelRuntimeException e) {
throw e;
} catch (Exception e) {
Expand All @@ -279,17 +284,18 @@ public CatalogTable getTable(TablePath tablePath, List<String> fieldNames)
try (PreparedStatement ps = conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
ps.setString(1, tablePath.getDatabaseName());
ps.setString(2, tablePath.getTableName());
ResultSet rs = ps.executeQuery();
Map<String, String> options = connectorOptions();
buildTableSchemaWithErrorCheck(tablePath, rs, builder, options, fieldNames);
return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
options,
Collections.emptyList(),
"",
catalogName);
try (ResultSet rs = ps.executeQuery()) {
Map<String, String> options = connectorOptions();
buildTableSchemaWithErrorCheck(tablePath, rs, builder, options, fieldNames);
return CatalogTable.of(
TableIdentifier.of(
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
builder.build(),
options,
Collections.emptyList(),
"",
catalogName);
}
} catch (SeaTunnelRuntimeException e) {
throw e;
} catch (Exception e) {
Expand Down Expand Up @@ -480,8 +486,8 @@ public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
public boolean isExistsData(TablePath tablePath) {
String tableName = tablePath.getFullName();
String sql = String.format("select * from %s limit 1;", tableName);
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ResultSet resultSet = ps.executeQuery();
try (PreparedStatement ps = conn.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery()) {
return resultSet.next();
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,9 @@ public interface ResultSetConsumer<T> {

protected List<String> queryString(String url, String sql, ResultSetConsumer<String> consumer)
throws SQLException {
try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
try (PreparedStatement ps = getConnection(url).prepareStatement(sql);
ResultSet rs = ps.executeQuery()) {
List<String> result = new ArrayList<>();
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String value = consumer.apply(rs);
if (value != null) {
Expand Down Expand Up @@ -643,8 +643,9 @@ public boolean isExistsData(TablePath tablePath) {
String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
Connection connection = getConnection(dbUrl);
String sql = getExistDataSql(tablePath);
try (PreparedStatement ps = connection.prepareStatement(sql)) {
ResultSet resultSet = ps.executeQuery();
try (PreparedStatement ps = connection.prepareStatement(sql);
ResultSet resultSet = ps.executeQuery()) {

return resultSet.next();
} catch (SQLException e) {
throw new CatalogException(String.format("Failed executeSql error %s", sql), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,12 @@ protected String getDropDatabaseSql(String databaseName) {
@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
Statement statement = defaultConnection.createStatement();
ResultSetMetaData metaData = statement.executeQuery(sqlQuery).getMetaData();
return CatalogUtils.getCatalogTable(
metaData, new OceanBaseMySqlTypeMapper(typeConverter), sqlQuery);
try (Statement statement = defaultConnection.createStatement();
ResultSet resultSet = statement.executeQuery(sqlQuery)) {
ResultSetMetaData metaData = resultSet.getMetaData();
return CatalogUtils.getCatalogTable(
metaData, new OceanBaseMySqlTypeMapper(typeConverter), sqlQuery);
}
}

@Override
Expand Down
Loading

0 comments on commit 853e973

Please sign in to comment.