Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Clickhouse] Clickhouse supports spaced write and is relatively real-time data source friendly #4328

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/en/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Write data to Clickhouse can also be done using JDBC
| password | string | yes | - |
| clickhouse.config | map | no | |
| bulk_size | string | no | 20000 |
| batch_interval_ms | string | no | 5000 |
| split_mode | string | no | false |
| sharding_key | string | no | - |
| primary_key | string | no | - |
Expand Down Expand Up @@ -66,6 +67,10 @@ In addition to the above mandatory parameters that must be specified by `clickho

The number of rows written through [Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the `default is 20000` .

### batch_interval_ms [string]

Write flush the interval milliSecond `default is 5000` .

### split_mode [boolean]

This mode only support clickhouse table which engine is 'Distributed'.And `internal_replication` option
Expand Down
36 changes: 18 additions & 18 deletions docs/en/start-v2/locally/quick-start-flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ is a sign that the command ran successfully or not.
The SeaTunnel console will prints some logs as below:

```shell
fields : name, age
types : STRING, INT
row=1 : elWaB, 1984352560
row=2 : uAtnp, 762961563
row=3 : TQEIB, 2042675010
row=4 : DcFjo, 593971283
row=5 : SenEb, 2099913608
row=6 : DHjkg, 1928005856
row=7 : eScCM, 526029657
row=8 : sgOeE, 600878991
row=9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144
fields : name, age
types : STRING, INT
row= 1 : elWaB, 1984352560
row= 2 : uAtnp, 762961563
row= 3 : TQEIB, 2042675010
row= 4 : DcFjo, 593971283
row= 5 : SenEb, 2099913608
row= 6 : DHjkg, 1928005856
row= 7 : eScCM, 526029657
row= 8 : sgOeE, 600878991
row= 9 : gwdvw, 1951126920
row=10 : nSiKE, 488708928
row=11 : xubpl, 1420202810
row=12 : rHZqb, 331185742
row=13 : rciGD, 1112878259
row=14 : qLhdI, 1457046294
row=15 : ZTkRx, 1240668386
row=16 : SGZCr, 94186144
```

## What's More
Expand Down
16 changes: 8 additions & 8 deletions docs/en/start-v2/locally/quick-start-seatunnel-engine.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ The SeaTunnel console will prints some logs as below:
```shell
2022-12-19 11:01:45,417 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - output rowType: name<STRING>, age<INT>
2022-12-19 11:01:46,489 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: CpiOd, 8520946
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=2: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: eQqTs, 1256802974
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=3: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: UsRgO, 2053193072
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=4: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jDQJj, 1993016602
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=5: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: rqdKp, 1392682764
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=6: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: wCoWN, 986999925
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=7: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: qomTU, 72775247
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=8: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: jcqXR, 1074529204
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=9: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: AkWIO, 1961723427
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=10: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: hBoib, 929089763
2022-12-19 11:01:46,490 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=11: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: GSvzm, 827085798
2022-12-19 11:01:46,491 INFO org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=12: SeaTunnelRow#tableId=-1 SeaTunnelRow#kind=INSERT: NNAYI, 94307133
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ public class ClickhouseConfig {
.defaultValue(20000)
.withDescription("Bulk size of clickhouse jdbc");

/** Data flush interval of clickhouse jdbc */
public static final Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
.intType()
.defaultValue(5000)
.withDescription("batch interval milliSecond");

public static final Option<String> SQL =
Options.key("sql")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,8 @@ public class ReaderOption implements Serializable {
private Map<String, String> tableSchema;
@Setter private SeaTunnelRowType seaTunnelRowType;
private Properties properties;
// Data write batch size
private int bulkSize;
// Data write interval
private int batchIntervalMs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Properties;

import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BATCH_INTERVAL_MS;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
Expand Down Expand Up @@ -100,6 +101,7 @@ public void prepare(Config config) throws PrepareFailException {
Map<String, Object> defaultConfig =
ImmutableMap.<String, Object>builder()
.put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
.put(BATCH_INTERVAL_MS.key(), BATCH_INTERVAL_MS.defaultValue())
.put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
.build();

Expand Down Expand Up @@ -213,6 +215,7 @@ public void prepare(Config config) throws PrepareFailException {
.tableEngine(table.getEngine())
.tableSchema(tableSchema)
.bulkSize(config.getInt(BULK_SIZE.key()))
.batchIntervalMs(config.getInt(BATCH_INTERVAL_MS.key()))
.primaryKeys(primaryKeys)
.supportUpsert(supportUpsert)
.allowExperimentalLightweightDelete(allowExperimentalLightweightDelete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

@Slf4j
Expand All @@ -54,6 +59,11 @@ public class ClickhouseSinkWriter
private final ShardRouter shardRouter;
private final transient ClickhouseProxy proxy;
private final Map<Shard, ClickhouseBatchStatement> statementMap;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
// Whether pre-initialization is required
private transient boolean isOpen;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is not necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a historical commentary Please check the latest

private transient boolean isClose;

ClickhouseSinkWriter(ReaderOption option, Context context) {
this.option = option;
Expand Down Expand Up @@ -81,9 +91,12 @@ public void write(SeaTunnelRow element) throws IOException {
// add into batch
addIntoBatch(element, clickHouseStatement);
sizeHolder.setValue(sizeHolder.getValue() + 1);
// Initialize the interval flush
tryOpen(sizeHolder, clickHouseStatement);
// flush batch
if (sizeHolder.getValue() >= option.getBulkSize()) {
flush(clickHouseStatement);
log.info("Batch write completion row :" + sizeHolder.getValue());
sizeHolder.setValue(0);
}
}
Expand All @@ -99,6 +112,10 @@ public void abortPrepare() {}
@Override
public void close() throws IOException {
this.proxy.close();
this.isClose = true;
if (this.scheduler != null) {
this.scheduler.shutdown();
}
for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
try (ClickHouseConnectionImpl needClosedConnection =
batchStatement.getClickHouseConnection();
Expand All @@ -118,6 +135,60 @@ public void close() throws IOException {
}
}

private void tryOpen(IntHolder sizeHolder, JdbcBatchStatementExecutor clickHouseStatement) {
if (!isOpen) {
isOpen = true;
open(sizeHolder, clickHouseStatement);
}
}

public void open(IntHolder sizeHolder, JdbcBatchStatementExecutor clickHouseStatement) {
if (option.getBatchIntervalMs() < 0) {
throw new ClickhouseConnectorException(
CommonErrorCode.WRITER_OPERATION_FAILED,
"batch_interval_ms : must be milliseconds greater than zero");
}
if (option.getBulkSize() < 0) {
throw new ClickhouseConnectorException(
CommonErrorCode.WRITER_OPERATION_FAILED,
"bulk_size : It must be an integer greater than 0");
}
log.info(
"Batch configuration takes effect bulk_size : "
+ option.getBulkSize()
+ " batch_interval_ms : "
+ option.getBatchIntervalMs());
this.scheduler =
Executors.newScheduledThreadPool(
1,
runnable -> {
AtomicInteger cnt = new AtomicInteger(0);
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName(
"sink-clickhouse-interval" + "-" + cnt.incrementAndGet());
return thread;
});
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (ClickhouseSinkWriter.this) {
if (sizeHolder.getValue() > 0
&& clickHouseStatement != null
&& !isClose) {
flush(clickHouseStatement);
log.info(
"Write complete rows at batch intervals :"
+ sizeHolder.getValue());
sizeHolder.setValue(0);
}
}
},
option.getBatchIntervalMs(),
option.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}

private void addIntoBatch(SeaTunnelRow row, JdbcBatchStatementExecutor clickHouseStatement) {
try {
clickHouseStatement.addToBatch(row);
Expand Down
Loading