From 0200ea880c4f6f2a1717994469d8ca50b6afde39 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Tue, 14 Mar 2023 11:12:55 +0800 Subject: [PATCH 1/6] [Feature][Connector-V2][Elasticsearch] Elasticsearch supports spaced write and is relatively real-time data source friendly #4347 --- docs/en/connector-v2/sink/Elasticsearch.md | 5 ++ .../elasticsearch/config/SinkConfig.java | 7 +++ .../elasticsearch/sink/ElasticsearchSink.java | 16 +++++ .../sink/ElasticsearchSinkWriter.java | 62 ++++++++++++++++++- 4 files changed, 89 insertions(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/Elasticsearch.md b/docs/en/connector-v2/sink/Elasticsearch.md index 19cb59325b9..93e171ead13 100644 --- a/docs/en/connector-v2/sink/Elasticsearch.md +++ b/docs/en/connector-v2/sink/Elasticsearch.md @@ -30,6 +30,7 @@ Engine Supported | password | string | no | | | max_retry_count | int | no | 3 | | max_batch_size | int | no | 10 | +| batch_interval_ms | int | no | 1000 | | tls_verify_certificate | boolean | no | true | | tls_verify_hostnames | boolean | no | true | | tls_keystore_path | string | no | - | @@ -75,6 +76,10 @@ one bulk request max try size batch bulk doc max size +### batch_interval_ms [int] + +batch interval milliSecond + ### tls_verify_certificate [boolean] Enable certificates validation for HTTPS endpoints diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java index 34899ea6e39..26986fffd4e 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java @@ -58,6 +58,13 @@ public class SinkConfig { .defaultValue(10) .withDescription("batch bulk doc max size"); + @SuppressWarnings("checkstyle:MagicNumber") + public static final Option BATCH_INTERVAL_MS = + Options.key("batch_interval_ms") + .intType() + .defaultValue(1000) + .withDescription("batch interval milliSecond"); + @SuppressWarnings("checkstyle:MagicNumber") public static final Option MAX_RETRY_COUNT = Options.key("max_retry_count") diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java index f21d67a727a..5d4e2a1d1a0 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java @@ -25,6 +25,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState; @@ -33,6 +35,7 @@ import java.util.Collections; +import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.BATCH_INTERVAL_MS; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE; import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT; @@ -51,6 +54,8 @@ public class ElasticsearchSink private int maxRetryCount = MAX_RETRY_COUNT.defaultValue(); + private int batchIntervalMs = BATCH_INTERVAL_MS.defaultValue(); + @Override public String getPluginName() { return "Elasticsearch"; @@ -65,6 +70,16 @@ public void prepare(Config pluginConfig) throws PrepareFailException { if (pluginConfig.hasPath(MAX_RETRY_COUNT.key())) { maxRetryCount = pluginConfig.getInt(MAX_RETRY_COUNT.key()); } + if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) { + batchIntervalMs = pluginConfig.getInt(BATCH_INTERVAL_MS.key()); + } + if (maxBatchSize < 0 || maxRetryCount < 0 || batchIntervalMs < 0) { + throw new ElasticsearchConnectorException( + CommonErrorCode.ILLEGAL_ARGUMENT, + "An invalid parameter should be a positive integer greater than zero " + + "Check the following parameters " + + "max_batch_size、batch_interval_ms、max_retry_count "); + } } @Override @@ -86,6 +101,7 @@ public SinkWriter pluginConfig, maxBatchSize, maxRetryCount, + batchIntervalMs, Collections.emptyList()); } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java index 34d752c8796..db091d7c5ab 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java @@ -42,6 +42,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch. @@ -54,21 +59,31 @@ public class ElasticsearchSinkWriter private final int maxBatchSize; + private final int batchIntervalMs; + private final SeaTunnelRowSerializer seaTunnelRowSerializer; private final List requestEsList; private EsRestClient esRestClient; private RetryMaterial retryMaterial; private static final long DEFAULT_SLEEP_TIME_MS = 200L; + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture scheduledFuture; + // Whether pre-initialization is required + private transient boolean isOpen; + private transient boolean isClose; + public ElasticsearchSinkWriter( SinkWriter.Context context, SeaTunnelRowType seaTunnelRowType, Config pluginConfig, int maxBatchSize, int maxRetryCount, + int batchIntervalMs, List elasticsearchStates) { this.context = context; this.maxBatchSize = maxBatchSize; + this.batchIntervalMs = batchIntervalMs; IndexInfo indexInfo = new IndexInfo(pluginConfig); esRestClient = EsRestClient.createInstance(pluginConfig); @@ -89,11 +104,50 @@ public void write(SeaTunnelRow element) { String indexRequestRow = seaTunnelRowSerializer.serializeRow(element); requestEsList.add(indexRequestRow); + // Initialize the interval flush + tryOpen(); if (requestEsList.size() >= maxBatchSize) { + log.info("Batch write completion row :" + requestEsList.size()); bulkEsWithRetry(this.esRestClient, this.requestEsList); } } + private void tryOpen() { + if (!isOpen) { + isOpen = true; + open(); + } + } + + public void open() { + this.scheduler = + Executors.newScheduledThreadPool( + 1, + runnable -> { + AtomicInteger cnt = new AtomicInteger(0); + Thread thread = new Thread(runnable); + thread.setDaemon(true); + thread.setName( + "sink-elasticsearch-interval" + "-" + cnt.incrementAndGet()); + return thread; + }); + this.scheduledFuture = + this.scheduler.scheduleWithFixedDelay( + () -> { + synchronized (ElasticsearchSinkWriter.this) { + if (requestEsList.size() > 0 && !isClose) { + log.info( + "Write complete rows at batch intervals :" + + requestEsList.size()); + bulkEsWithRetry(this.esRestClient, this.requestEsList); + } + } + }, + this.batchIntervalMs, + this.batchIntervalMs, + TimeUnit.MILLISECONDS); + } + @Override public Optional prepareCommit() { bulkEsWithRetry(this.esRestClient, this.requestEsList); @@ -133,6 +187,12 @@ public synchronized void bulkEsWithRetry( @Override public void close() throws IOException { bulkEsWithRetry(this.esRestClient, this.requestEsList); - esRestClient.close(); + this.isClose = true; + if (esRestClient != null) { + esRestClient.close(); + } + if (this.scheduler != null) { + this.scheduler.shutdown(); + } } } From 8d3dd6fb0be9370241d61b8f200b37ce324bce12 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Tue, 14 Mar 2023 11:41:31 +0800 Subject: [PATCH 2/6] [Feature][Connector-V2][Elasticsearch] Elasticsearch supports spaced write and is relatively real-time data source friendly #4347 --- docs/en/faq.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/faq.md b/docs/en/faq.md index b3f698e19b2..942e0efaff5 100644 --- a/docs/en/faq.md +++ b/docs/en/faq.md @@ -223,7 +223,7 @@ For example, if you want to set the JDK version to JDK8, there are two cases: } ``` - Yarn cluster does not deploy JDK8. At this time, when you start SeaTunnel attached with JDK8.For detailed operations, see the link below: - https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html + [here](https://www.cnblogs.com/jasondan/p/spark-specific-jdk-version.html). ## What should I do if OOM always appears when running SeaTunnel in Spark local[*] mode? @@ -336,7 +336,7 @@ spark-submit --verbose ## How to use SeaTunnel to synchronize data across HDFS clusters? -Just configure hdfs-site.xml properly, refer to: https://www.cnblogs.com/suanec/p/7828139.html +Just configure hdfs-site.xml properly, refer to: [here](https://www.cnblogs.com/suanec/p/7828139.html) ## I want to learn the source code of SeaTunnel, where should I start? From aad46d8323e918990f4a2f5934c29e882141a578 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Wed, 15 Mar 2023 11:45:39 +0800 Subject: [PATCH 3/6] [Feature][Connector-V2][Elasticsearch] Elasticsearch supports spaced write and is relatively real-time data source friendly #4347 --- .../sink/ElasticsearchSinkWriter.java | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java index db091d7c5ab..6d6db475aa0 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java @@ -69,8 +69,6 @@ public class ElasticsearchSinkWriter private transient ScheduledExecutorService scheduler; private transient ScheduledFuture scheduledFuture; - // Whether pre-initialization is required - private transient boolean isOpen; private transient boolean isClose; public ElasticsearchSinkWriter( @@ -94,6 +92,8 @@ public ElasticsearchSinkWriter( this.requestEsList = new ArrayList<>(maxBatchSize); this.retryMaterial = new RetryMaterial(maxRetryCount, true, exception -> true, DEFAULT_SLEEP_TIME_MS); + // Initialize the interval flush + open(); } @Override @@ -104,21 +104,11 @@ public void write(SeaTunnelRow element) { String indexRequestRow = seaTunnelRowSerializer.serializeRow(element); requestEsList.add(indexRequestRow); - // Initialize the interval flush - tryOpen(); if (requestEsList.size() >= maxBatchSize) { - log.info("Batch write completion row :" + requestEsList.size()); bulkEsWithRetry(this.esRestClient, this.requestEsList); } } - private void tryOpen() { - if (!isOpen) { - isOpen = true; - open(); - } - } - public void open() { this.scheduler = Executors.newScheduledThreadPool( @@ -136,9 +126,6 @@ public void open() { () -> { synchronized (ElasticsearchSinkWriter.this) { if (requestEsList.size() > 0 && !isClose) { - log.info( - "Write complete rows at batch intervals :" - + requestEsList.size()); bulkEsWithRetry(this.esRestClient, this.requestEsList); } } @@ -170,6 +157,9 @@ public synchronized void bulkEsWithRetry( ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, "bulk es error: " + bulkResponse.getResponse()); } + log.info( + "bulk es successfully written to the rowNum: " + + requestEsList.size()); return bulkResponse; } return null; From d3b73d5e7b1253cb971022d88158e24342298657 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Wed, 22 Mar 2023 15:25:55 +0800 Subject: [PATCH 4/6] [Bug] [connector-v2] [Eleasticsearch] document description error #4370 --- docs/en/connector-v2/sink/Elasticsearch.md | 5 +++-- .../seatunnel/elasticsearch/config/SinkConfig.java | 4 ++-- .../seatunnel/elasticsearch/sink/ElasticsearchSink.java | 4 ++-- .../elasticsearch/sink/ElasticsearchSinkWriter.java | 5 ++++- 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/docs/en/connector-v2/sink/Elasticsearch.md b/docs/en/connector-v2/sink/Elasticsearch.md index 93e171ead13..cd189f04e65 100644 --- a/docs/en/connector-v2/sink/Elasticsearch.md +++ b/docs/en/connector-v2/sink/Elasticsearch.md @@ -30,7 +30,7 @@ Engine Supported | password | string | no | | | max_retry_count | int | no | 3 | | max_batch_size | int | no | 10 | -| batch_interval_ms | int | no | 1000 | +| batch_interval_ms | int | no | -1 | | tls_verify_certificate | boolean | no | true | | tls_verify_hostnames | boolean | no | true | | tls_keystore_path | string | no | - | @@ -78,7 +78,7 @@ batch bulk doc max size ### batch_interval_ms [int] -batch interval milliSecond +batch interval milliSecond Disable by default ### tls_verify_certificate [boolean] @@ -188,4 +188,5 @@ sink { - [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3673](https://github.com/apache/incubator-seatunnel/pull/3673)) - [Feature] Support https protocol & compatible with opensearch ([3997](https://github.com/apache/incubator-seatunnel/pull/3997)) +- [Feature] Support Interval write ([4349](https://github.com/apache/incubator-seatunnel/pull/4349)) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java index 26986fffd4e..997d5af58e4 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java @@ -62,8 +62,8 @@ public class SinkConfig { public static final Option BATCH_INTERVAL_MS = Options.key("batch_interval_ms") .intType() - .defaultValue(1000) - .withDescription("batch interval milliSecond"); + .defaultValue(-1) + .withDescription("batch interval milliSecond Disable by default"); @SuppressWarnings("checkstyle:MagicNumber") public static final Option MAX_RETRY_COUNT = diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java index 5d4e2a1d1a0..bb7db2b099d 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java @@ -73,12 +73,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException { if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) { batchIntervalMs = pluginConfig.getInt(BATCH_INTERVAL_MS.key()); } - if (maxBatchSize < 0 || maxRetryCount < 0 || batchIntervalMs < 0) { + if (maxBatchSize < 0 || maxRetryCount < 0) { throw new ElasticsearchConnectorException( CommonErrorCode.ILLEGAL_ARGUMENT, "An invalid parameter should be a positive integer greater than zero " + "Check the following parameters " - + "max_batch_size、batch_interval_ms、max_retry_count "); + + "max_batch_size、max_retry_count "); } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java index 6d6db475aa0..c2d2c27977c 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java @@ -93,7 +93,10 @@ public ElasticsearchSinkWriter( this.retryMaterial = new RetryMaterial(maxRetryCount, true, exception -> true, DEFAULT_SLEEP_TIME_MS); // Initialize the interval flush - open(); + if(this.batchIntervalMs>0){ + open(); + log.info("The initial scheduling is complete batch_interval_ms :"+batchIntervalMs); + } } @Override From d7461e9fe79a29aeb76874db3e98962cd84a21ad Mon Sep 17 00:00:00 2001 From: zhilinli Date: Wed, 22 Mar 2023 17:42:30 +0800 Subject: [PATCH 5/6] [Bug] [connector-v2] [Eleasticsearch] document description error #4370 --- .../seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java index c2d2c27977c..4700aec7967 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java @@ -93,9 +93,9 @@ public ElasticsearchSinkWriter( this.retryMaterial = new RetryMaterial(maxRetryCount, true, exception -> true, DEFAULT_SLEEP_TIME_MS); // Initialize the interval flush - if(this.batchIntervalMs>0){ + if (this.batchIntervalMs > 0) { open(); - log.info("The initial scheduling is complete batch_interval_ms :"+batchIntervalMs); + log.info("The initial scheduling is complete batch_interval_ms :" + batchIntervalMs); } } From 89636df538241caa4418ce804e62f4bdb11ba141 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Mon, 27 Mar 2023 18:28:31 +0800 Subject: [PATCH 6/6] [Feature][Connector-V2][Elasticsearch] Elasticsearch supports spaced write and is relatively real-time data source friendly #4347 #4349 --- .../seatunnel/elasticsearch/sink/ElasticsearchSink.java | 2 +- .../seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java index bb7db2b099d..252a23917b4 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java @@ -54,7 +54,7 @@ public class ElasticsearchSink private int maxRetryCount = MAX_RETRY_COUNT.defaultValue(); - private int batchIntervalMs = BATCH_INTERVAL_MS.defaultValue(); + private Integer batchIntervalMs = BATCH_INTERVAL_MS.defaultValue(); @Override public String getPluginName() { diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java index 4700aec7967..b54a5b91600 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java @@ -94,7 +94,7 @@ public ElasticsearchSinkWriter( new RetryMaterial(maxRetryCount, true, exception -> true, DEFAULT_SLEEP_TIME_MS); // Initialize the interval flush if (this.batchIntervalMs > 0) { - open(); + startScheduler(); log.info("The initial scheduling is complete batch_interval_ms :" + batchIntervalMs); } } @@ -112,7 +112,7 @@ public void write(SeaTunnelRow element) { } } - public void open() { + public void startScheduler() { this.scheduler = Executors.newScheduledThreadPool( 1,