-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature][Connector-V2][Elasticsearch] Elasticsearch supports spaced write and is relatively real-time data source friendly #4347 #4349
Conversation
…write and is relatively real-time data source friendly apache#4347
…write and is relatively real-time data source friendly apache#4347
() -> { | ||
synchronized (ElasticsearchSinkWriter.this) { | ||
if (requestEsList.size() > 0 && !isClose) { | ||
log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move log into bulkEsWithRetry
if (requestEsList.size() >= maxBatchSize) { | ||
log.info("Batch write completion row :" + requestEsList.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move log into bulkEsWithRetry
private transient ScheduledExecutorService scheduler; | ||
private transient ScheduledFuture<?> scheduledFuture; | ||
// Whether pre-initialization is required | ||
private transient boolean isOpen; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This field is not necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you point out the number of rows boss?
@@ -89,11 +104,50 @@ public void write(SeaTunnelRow element) { | |||
|
|||
String indexRequestRow = seaTunnelRowSerializer.serializeRow(element); | |||
requestEsList.add(indexRequestRow); | |||
// Initialize the interval flush | |||
tryOpen(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move it into constructer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done~
…write and is relatively real-time data source friendly apache#4347
@@ -116,6 +157,9 @@ public synchronized void bulkEsWithRetry( | |||
ElasticsearchConnectorErrorCode.BULK_RESPONSE_ERROR, | |||
"bulk es error: " + bulkResponse.getResponse()); | |||
} | |||
log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I support not to introduce ScheduledThreadPool,but run Thead.sleep(batchIntervalMs) after bulk request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's going to be a little bit more write friendly for real-time programs and even if the consumption data is less than the current batch it's going to flush to es
CC @Hisoka-X
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem like don't a better way to flush data without ScheduledTask
. run Thead.sleep(batchIntervalMs) after bulk request
will lead data flush more slower, not more faster.
public static final Option<Integer> BATCH_INTERVAL_MS = | ||
Options.key("batch_interval_ms") | ||
.intType() | ||
.defaultValue(1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better set defualtValue is 0 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better set defualtValue is 0 ?
For this time setting, I refer to flink I think it's a little more user friendly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, better to zero, mean close this feature. Only batch_interval_ms
bigger than zero, you can create ScheduledTask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean that I set batch interval ms to 0 by default? @Hisoka-X
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, 0 mean close this feature by default. If user want to open it, just config it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
I have resubmitted the default disable policy, which requires the user to manually configure the enable policy
@Hisoka-X
public static final Option<Integer> BATCH_INTERVAL_MS = | ||
Options.key("batch_interval_ms") | ||
.intType() | ||
.defaultValue(-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need a default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need a default
We do not enable it by default until the user manually configures it
@@ -51,6 +54,8 @@ | |||
|
|||
private int maxRetryCount = MAX_RETRY_COUNT.defaultValue(); | |||
|
|||
private int batchIntervalMs = BATCH_INTERVAL_MS.defaultValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private int batchIntervalMs = BATCH_INTERVAL_MS.defaultValue(); | |
private Integer batchIntervalMs = BATCH_INTERVAL_MS.defaultValue(); |
@@ -94,6 +112,32 @@ public void write(SeaTunnelRow element) { | |||
} | |||
} | |||
|
|||
public void open() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void open() { | |
public void startScheduler() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please examine @hailin0
…write and is relatively real-time data source friendly apache#4347 apache#4349
# Conflicts: # seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java # seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1.
I don't think it is a good way to add additional threads to write data in the sink. This implementation method makes resources uncontrollable, and also loses the original intention of the committer interface design.
boss Do you have any good ideas? |
It's not a good change, for now we do not have the strong need to change it. So I will close it, if you have any questions, please send an e-mail to |
…write and is relatively real-time data source friendly #4347
Purpose of this pull request
Check list
New License Guide
release-note
.