Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2][Elasticsearch] Elasticsearch supports spaced write and is relatively real-time data source friendly #4347 #4349

Closed
wants to merge 8 commits into from
5 changes: 5 additions & 0 deletions docs/en/connector-v2/sink/Elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Engine Supported
| password | string | no | |
| max_retry_count | int | no | 3 |
| max_batch_size | int | no | 10 |
| batch_interval_ms | int | no | 1000 |
| tls_verify_certificate | boolean | no | true |
| tls_verify_hostnames | boolean | no | true |
| tls_keystore_path | string | no | - |
Expand Down Expand Up @@ -75,6 +76,10 @@ one bulk request max try size

batch bulk doc max size

### batch_interval_ms [int]

batch interval milliSecond

### tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints
Expand Down
4 changes: 2 additions & 2 deletions docs/en/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ For example, if you want to set the JDK version to JDK8, there are two cases:
}
```
- Yarn cluster does not deploy JDK8. At this time, when you start SeaTunnel attached with JDK8.For detailed operations, see the link below:
https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html
[here](https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html).

## What should I do if OOM always appears when running SeaTunnel in Spark local[*] mode?

Expand Down Expand Up @@ -336,7 +336,7 @@ spark-submit --verbose

## How to use SeaTunnel to synchronize data across HDFS clusters?

Just configure hdfs-site.xml properly, refer to: https://www.cnblogs.com/suanec/p/7828139.html
Just configure hdfs-site.xml properly, refer to: [here](https://www.cnblogs.com/suanec/p/7828139.html)

## I want to learn the source code of SeaTunnel, where should I start?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ public class SinkConfig {
.defaultValue(10)
.withDescription("batch bulk doc max size");

@SuppressWarnings("checkstyle:MagicNumber")
public static final Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
.intType()
.defaultValue(1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better set defualtValue is 0 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better set defualtValue is 0 ?
For this time setting, I refer to flink I think it's a little more user friendly

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, better to zero, mean close this feature. Only batch_interval_ms bigger than zero, you can create ScheduledTask

Copy link
Contributor Author

@zhilinli123 zhilinli123 Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean that I set batch interval ms to 0 by default? @Hisoka-X

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, 0 mean close this feature by default. If user want to open it, just config it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

I have resubmitted the default disable policy, which requires the user to manually configure the enable policy
@Hisoka-X

.withDescription("batch interval milliSecond");

@SuppressWarnings("checkstyle:MagicNumber")
public static final Option<Integer> MAX_RETRY_COUNT =
Options.key("max_retry_count")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
Expand All @@ -33,6 +35,7 @@

import java.util.Collections;

import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.BATCH_INTERVAL_MS;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;

Expand All @@ -51,6 +54,8 @@ public class ElasticsearchSink

private int maxRetryCount = MAX_RETRY_COUNT.defaultValue();

private int batchIntervalMs = BATCH_INTERVAL_MS.defaultValue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private int batchIntervalMs = BATCH_INTERVAL_MS.defaultValue();
private Integer batchIntervalMs = BATCH_INTERVAL_MS.defaultValue();


@Override
public String getPluginName() {
return "Elasticsearch";
Expand All @@ -65,6 +70,16 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(MAX_RETRY_COUNT.key())) {
maxRetryCount = pluginConfig.getInt(MAX_RETRY_COUNT.key());
}
if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) {
batchIntervalMs = pluginConfig.getInt(BATCH_INTERVAL_MS.key());
}
if (maxBatchSize < 0 || maxRetryCount < 0 || batchIntervalMs < 0) {
throw new ElasticsearchConnectorException(
CommonErrorCode.ILLEGAL_ARGUMENT,
"An invalid parameter should be a positive integer greater than zero "
+ "Check the following parameters "
+ "max_batch_size、batch_interval_ms、max_retry_count ");
}
}

@Override
Expand All @@ -86,6 +101,7 @@ public SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkState>
pluginConfig,
maxBatchSize,
maxRetryCount,
batchIntervalMs,
Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch.
Expand All @@ -54,21 +59,29 @@ public class ElasticsearchSinkWriter

private final int maxBatchSize;

private final int batchIntervalMs;

private final SeaTunnelRowSerializer seaTunnelRowSerializer;
private final List<String> requestEsList;
private EsRestClient esRestClient;
private RetryMaterial retryMaterial;
private static final long DEFAULT_SLEEP_TIME_MS = 200L;

private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient boolean isClose;

public ElasticsearchSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
Config pluginConfig,
int maxBatchSize,
int maxRetryCount,
int batchIntervalMs,
List<ElasticsearchSinkState> elasticsearchStates) {
this.context = context;
this.maxBatchSize = maxBatchSize;
this.batchIntervalMs = batchIntervalMs;

IndexInfo indexInfo = new IndexInfo(pluginConfig);
esRestClient = EsRestClient.createInstance(pluginConfig);
Expand All @@ -79,6 +92,8 @@ public ElasticsearchSinkWriter(
this.requestEsList = new ArrayList<>(maxBatchSize);
this.retryMaterial =
new RetryMaterial(maxRetryCount, true, exception -> true, DEFAULT_SLEEP_TIME_MS);
// Initialize the interval flush
open();
}

@Override
Expand All @@ -94,6 +109,32 @@ public void write(SeaTunnelRow element) {
}
}

public void open() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void open() {
public void startScheduler() {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please examine @hailin0

this.scheduler =
Executors.newScheduledThreadPool(
1,
runnable -> {
AtomicInteger cnt = new AtomicInteger(0);
Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName(
"sink-elasticsearch-interval" + "-" + cnt.incrementAndGet());
return thread;
});
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (ElasticsearchSinkWriter.this) {
if (requestEsList.size() > 0 && !isClose) {
bulkEsWithRetry(this.esRestClient, this.requestEsList);
}
}
},
this.batchIntervalMs,
this.batchIntervalMs,
TimeUnit.MILLISECONDS);
}

@Override
public Optional<ElasticsearchCommitInfo> prepareCommit() {
bulkEsWithRetry(this.esRestClient, this.requestEsList);
Expand All @@ -116,6 +157,9 @@ public synchronized void bulkEsWithRetry(
ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR,
"bulk es error: " + bulkResponse.getResponse());
}
log.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support not to introduce ScheduledThreadPool,but run Thead.sleep(batchIntervalMs) after bulk request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's going to be a little bit more write friendly for real-time programs and even if the consumption data is less than the current batch it's going to flush to es
CC @Hisoka-X

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem like don't a better way to flush data without ScheduledTask. run Thead.sleep(batchIntervalMs) after bulk request will lead data flush more slower, not more faster.

"bulk es successfully written to the rowNum: "
+ requestEsList.size());
return bulkResponse;
}
return null;
Expand All @@ -133,6 +177,12 @@ public synchronized void bulkEsWithRetry(
@Override
public void close() throws IOException {
bulkEsWithRetry(this.esRestClient, this.requestEsList);
esRestClient.close();
this.isClose = true;
if (esRestClient != null) {
esRestClient.close();
}
if (this.scheduler != null) {
this.scheduler.shutdown();
}
}
}