Skip to content

Commit

Permalink
[Improvement] add starrocks jdbc dialect
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie committed Aug 9, 2024
1 parent 876d2f0 commit 214d5f9
Show file tree
Hide file tree
Showing 18 changed files with 186 additions and 16 deletions.
3 changes: 2 additions & 1 deletion docs/en/connector-v2/source/Jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ supports query SQL and can achieve projection effect.
## Options

| name | type | required | default value | description |
|--------------------------------------------|---------|----------|-----------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|--------------------------------------------|---------|----------|-----------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/test |
| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, if you use MySQL the value is `com.mysql.cj.jdbc.Driver`. |
| user | String | No | - | userName |
Expand All @@ -60,6 +60,7 @@ supports query SQL and can achieve projection effect.
| table_list | Array | No | - | The list of tables to be read, you can use this configuration instead of `table_path` |
| where_condition | String | No | - | Common row filter conditions for all tables/queries, must start with `where`. for example `where id > 100` |
| split.size | Int | No | 8096 | How many rows in one split, captured tables are split into multiple splits when read of table. |
| dialect.name | String | No | default | One JDBC drive can connect to multiple databases, like mysql drive can use to connector MySQL and StarRocks.<br/> With this parameter you can choose the dedicated jdbc dialect implement. <br/> Now only support `starrocks` | |
| split.even-distribution.factor.lower-bound | Double | No | 0.05 | Not recommended for use.<br/> The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05. |
| split.even-distribution.factor.upper-bound | Double | No | 100 | Not recommended for use.<br/> The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0. |
| split.sample-sharding.threshold | Int | No | 1000 | This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public interface JdbcOptions {
Option<String> DRIVER =
Options.key("driver").stringType().noDefaultValue().withDescription("driver");

Option<String> DIALECT_NAME =
Options.key("dialect.name")
.stringType()
.defaultValue("default")
.withDescription(
"The dialect name, when use same jdbc drive, use this parameter to choose the special implement.");

Option<Integer> CONNECTION_CHECK_TIMEOUT_SEC =
Options.key("connection_check_timeout_sec")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class JdbcSinkConfig implements Serializable {
private static final long serialVersionUID = 2L;

private JdbcConnectionConfig jdbcConnectionConfig;
private String dialectName;
private boolean isExactlyOnce;
private String simpleSql;
private String database;
Expand All @@ -48,6 +49,7 @@ public class JdbcSinkConfig implements Serializable {
public static JdbcSinkConfig of(ReadonlyConfig config) {
JdbcSinkConfigBuilder builder = JdbcSinkConfig.builder();
builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config));
builder.dialectName(config.get(JdbcOptions.DIALECT_NAME));
builder.isExactlyOnce(config.get(JdbcOptions.IS_EXACTLY_ONCE));
config.getOptional(JdbcOptions.PRIMARY_KEYS).ifPresent(builder::primaryKeys);
config.getOptional(JdbcOptions.DATABASE).ifPresent(builder::database);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class JdbcSourceConfig implements Serializable {
private String whereConditionClause;
public String compatibleMode;
private int fetchSize;
private String dialectName;

private boolean useDynamicSplitter;
private int splitSize;
Expand All @@ -56,14 +57,14 @@ public static JdbcSourceConfig of(ReadonlyConfig config) {
builder.useDynamicSplitter(isOldVersion ? false : true);

builder.splitSize(config.get(JdbcSourceOptions.SPLIT_SIZE));
builder.dialectName(config.get(JdbcOptions.DIALECT_NAME));
builder.splitEvenDistributionFactorUpperBound(
config.get(JdbcSourceOptions.SPLIT_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND));
builder.splitEvenDistributionFactorLowerBound(
config.get(JdbcSourceOptions.SPLIT_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND));
builder.splitSampleShardingThreshold(
config.get(JdbcSourceOptions.SPLIT_SAMPLE_SHARDING_THRESHOLD));
builder.splitInverseSamplingRate(config.get(JdbcSourceOptions.SPLIT_INVERSE_SAMPLING_RATE));

config.getOptional(JdbcSourceOptions.WHERE_CONDITION)
.ifPresent(
whereConditionClause -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public class JdbcInputFormat implements Serializable {
public JdbcInputFormat(JdbcSourceConfig config, Map<TablePath, CatalogTable> tables) {
this.jdbcDialect =
JdbcDialectLoader.load(
config.getJdbcConnectionConfig().getUrl(), config.getCompatibleMode());
config.getJdbcConnectionConfig().getUrl(),
config.getCompatibleMode(),
config.getDialectName());
this.chunkSplitter = ChunkSplitter.create(config);
this.jdbcRowConverter = jdbcDialect.getRowConverter();
this.tables = tables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class DatabaseIdentifier {
public static final String INFORMIX = "Informix";
public static final String KINGBASE = "KingBase";
public static final String MYSQL = "MySQL";
public static final String STARROCKS = "StarRocks";
public static final String ORACLE = "Oracle";
public static final String PHOENIX = "Phoenix";
public static final String POSTGRESQL = "Postgres";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;

import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;

/**
* A factory to create a specific {@link JdbcDialect}
*
Expand All @@ -35,6 +37,10 @@ public interface JdbcDialectFactory {
*/
boolean acceptsURL(String url);

default String getDialectName() {
return JdbcOptions.DIALECT_NAME.defaultValue();
}

/** @return Creates a new instance of the {@link JdbcDialect}. */
JdbcDialect create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,21 @@ public final class JdbcDialectLoader {

private JdbcDialectLoader() {}

public static JdbcDialect load(String url, String compatibleMode) {
return load(url, compatibleMode, "");
public static JdbcDialect load(String url, String compatibleMode, String dialectName) {
return load(url, compatibleMode, "", dialectName);
}

/**
* Loads the unique JDBC Dialect that can handle the given database url.
*
* @param url A database URL.
* @param compatibleMode The compatible mode.
* @return The loaded dialect.
* @throws IllegalStateException if the loader cannot find exactly one dialect that can
* unambiguously process the given database URL.
* @return The loaded dialect.
*/
public static JdbcDialect load(String url, String compatibleMode, String fieldIde) {
public static JdbcDialect load(
String url, String compatibleMode, String fieldIde, String dialectName) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
List<JdbcDialectFactory> foundFactories = discoverFactories(cl);

Expand All @@ -62,7 +63,9 @@ public static JdbcDialect load(String url, String compatibleMode, String fieldId
}

final List<JdbcDialectFactory> matchingFactories =
foundFactories.stream().filter(f -> f.acceptsURL(url)).collect(Collectors.toList());
foundFactories.stream()
.filter(f -> f.acceptsURL(url) && f.getDialectName().equals(dialectName))
.collect(Collectors.toList());

if (matchingFactories.isEmpty()) {
throw new JdbcConnectorException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.starrocks;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;

public class StarRocksDialect extends MysqlDialect {

public StarRocksDialect() {}

public StarRocksDialect(String fieldIde) {
this.fieldIde = fieldIde;
}

@Override
public String dialectName() {
return DatabaseIdentifier.STARROCKS;
}

@Override
public String hashModForField(String functionName, String fieldName, int mod) {
return "ABS(md5sum_numeric(" + quoteIdentifier(fieldName) + ") % " + mod + ")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.starrocks;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

import javax.annotation.Nonnull;

@AutoService(JdbcDialectFactory.class)
public class StarRocksDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:mysql:");
}

@Override
public String getDialectName() {
return "starrocks";
}

@Override
public JdbcDialect create() {
return new StarRocksDialect();
}

@Override
public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) {
return new StarRocksDialect(fieldIde);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ public TableSink createSink(TableSinkFactoryContext context) {
JdbcDialectLoader.load(
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
fieldIdeEnum == null ? null : fieldIdeEnum.getValue());
fieldIdeEnum == null ? null : fieldIdeEnum.getValue(),
sinkConfig.getDialectName());
dialect.connectionUrlParse(
sinkConfig.getJdbcConnectionConfig().getUrl(),
sinkConfig.getJdbcConnectionConfig().getProperties(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ public ChunkSplitter(JdbcSourceConfig config) {
this.fetchSize = config.getFetchSize();
this.jdbcDialect =
JdbcDialectLoader.load(
config.getJdbcConnectionConfig().getUrl(), config.getCompatibleMode());
config.getJdbcConnectionConfig().getUrl(),
config.getCompatibleMode(),
config.getDialectName());
this.connectionProvider =
jdbcDialect.getJdbcConnectionProvider(config.getJdbcConnectionConfig());
}
Expand Down
Loading

0 comments on commit 214d5f9

Please sign in to comment.