Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Elasticsearch] Elasticsearch supports spaced write and is relatively real-time data source friendly #4347 #4349

Closed
wants to merge 8 commits into from
6 changes: 6 additions & 0 deletions docs/en/connector-v2/sink/Elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Engine Supported
| password | string | no | |
| max_retry_count | int | no | 3 |
| max_batch_size | int | no | 10 |
| batch_interval_ms | int | no | -1 |
| tls_verify_certificate | boolean | no | true |
| tls_verify_hostnames | boolean | no | true |
| tls_keystore_path | string | no | - |
Expand Down Expand Up @@ -75,6 +76,10 @@ one bulk request max try size

batch bulk doc max size

### batch_interval_ms [int]

batch interval milliSecond Disable by default

### tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints
Expand Down Expand Up @@ -183,4 +188,5 @@ sink {

- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3673](https://github.com/apache/incubator-seatunnel/pull/3673))
- [Feature] Support https protocol & compatible with opensearch ([3997](https://github.com/apache/incubator-seatunnel/pull/3997))
- [Feature] Support Interval write ([4349](https://github.com/apache/incubator-seatunnel/pull/4349))

4 changes: 2 additions & 2 deletions docs/en/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ For example, if you want to set the JDK version to JDK8, there are two cases:
}
```
- Yarn cluster does not deploy JDK8. At this time, when you start SeaTunnel attached with JDK8.For detailed operations, see the link below:
https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html
[here](https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html).

## What should I do if OOM always appears when running SeaTunnel in Spark local[*] mode?

Expand Down Expand Up @@ -331,7 +331,7 @@ spark-submit --verbose

## How to use SeaTunnel to synchronize data across HDFS clusters?

Just configure hdfs-site.xml properly, refer to: https://www.cnblogs.com/suanec/p/7828139.html
Just configure hdfs-site.xml properly, refer to: [here](https://www.cnblogs.com/suanec/p/7828139.html)

## I want to learn the source code of SeaTunnel, where should I start?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ public class SinkConfig {
.defaultValue(10)
.withDescription("batch bulk doc max size");

@SuppressWarnings("checkstyle:MagicNumber")
public static final Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
.intType()
.defaultValue(-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a default
We do not enable it by default until the user manually configures it

.withDescription("batch interval milliSecond Disable by default");

@SuppressWarnings("checkstyle:MagicNumber")
public static final Option<Integer> MAX_RETRY_COUNT =
Options.key("max_retry_count")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
Expand All @@ -33,6 +35,7 @@

import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.BATCH_INTERVAL_MS;

@AutoService(SeaTunnelSink.class)
public class ElasticsearchSink
Expand All @@ -49,6 +52,8 @@ public class ElasticsearchSink

private int maxRetryCount = MAX_RETRY_COUNT.defaultValue();

private Integer batchIntervalMs = BATCH_INTERVAL_MS.defaultValue();

@Override
public String getPluginName() {
return "Elasticsearch";
Expand All @@ -63,6 +68,16 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(MAX_RETRY_COUNT.key())) {
maxRetryCount = pluginConfig.getInt(MAX_RETRY_COUNT.key());
}
if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) {
batchIntervalMs = pluginConfig.getInt(BATCH_INTERVAL_MS.key());
}
if (maxBatchSize < 0 || maxRetryCount < 0) {
throw new ElasticsearchConnectorException(
CommonErrorCode.ILLEGAL_ARGUMENT,
"An invalid parameter should be a positive integer greater than zero "
+ "Check the following parameters "
+ "max_batch_size、max_retry_count ");
}
}

@Override
Expand All @@ -79,6 +94,6 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState> createWriter(
SinkWriter.Context context) {
return new ElasticsearchSinkWriter(
context, seaTunnelRowType, pluginConfig, maxBatchSize, maxRetryCount);
context, seaTunnelRowType, pluginConfig, maxBatchSize, maxRetryCount,batchIntervalMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch.
Expand All @@ -54,20 +59,28 @@ public class ElasticsearchSinkWriter

private final int maxBatchSize;

private final int batchIntervalMs;

private final SeaTunnelRowSerializer seaTunnelRowSerializer;
private final List<String> requestEsList;
private EsRestClient esRestClient;
private RetryMaterial retryMaterial;
private static final long DEFAULT_SLEEP_TIME_MS = 200L;

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient boolean isClose;

public ElasticsearchSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
Config pluginConfig,
int maxBatchSize,
int maxRetryCount) {
int maxRetryCount,
int batchIntervalMs) {
this.context = context;
this.maxBatchSize = maxBatchSize;
this.batchIntervalMs = batchIntervalMs;

IndexInfo indexInfo = new IndexInfo(pluginConfig);
esRestClient = EsRestClient.createInstance(pluginConfig);
Expand All @@ -78,6 +91,11 @@ public ElasticsearchSinkWriter(
this.requestEsList = new ArrayList<>(maxBatchSize);
this.retryMaterial =
new RetryMaterial(maxRetryCount, true, exception -> true, DEFAULT_SLEEP_TIME_MS);
// Initialize the interval flush
if (this.batchIntervalMs > 0) {
startScheduler();
log.info("The initial scheduling is complete batch_interval_ms :" + batchIntervalMs);
}
}

@Override
Expand All @@ -93,6 +111,32 @@ public void write(SeaTunnelRow element) {
}
}

public void startScheduler() {
this.scheduler =
Executors.newScheduledThreadPool(
1,
runnable -> {
AtomicInteger cnt = new AtomicInteger(0);
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName(
"sink-elasticsearch-interval" + "-" + cnt.incrementAndGet());
return thread;
});
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (ElasticsearchSinkWriter.this) {
if (requestEsList.size() > 0 && !isClose) {
bulkEsWithRetry(this.esRestClient, this.requestEsList);
}
}
},
this.batchIntervalMs,
this.batchIntervalMs,
TimeUnit.MILLISECONDS);
}

@Override
public Optional<ElasticsearchCommitInfo> prepareCommit() {
bulkEsWithRetry(this.esRestClient, this.requestEsList);
Expand All @@ -115,6 +159,9 @@ public synchronized void bulkEsWithRetry(
ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
"bulk es error: " + bulkResponse.getResponse());
}
log.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support not to introduce ScheduledThreadPool,but run Thead.sleep(batchIntervalMs) after bulk request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's going to be a little bit more write friendly for real-time programs and even if the consumption data is less than the current batch it's going to flush to es
CC @Hisoka-X

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem like don't a better way to flush data without ScheduledTask. run Thead.sleep(batchIntervalMs) after bulk request will lead data flush more slower, not more faster.

"bulk es successfully written to the rowNum: "
+ requestEsList.size());
return bulkResponse;
}
return null;
Expand All @@ -132,6 +179,12 @@ public synchronized void bulkEsWithRetry(
@Override
public void close() throws IOException {
bulkEsWithRetry(this.esRestClient, this.requestEsList);
esRestClient.close();
this.isClose = true;
if (esRestClient != null) {
esRestClient.close();
}
if (this.scheduler != null) {
this.scheduler.shutdown();
}
}
}