-
Notifications
You must be signed in to change notification settings - Fork 289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sink(ticdc): optimize buffer sink flush from O(N^2) to O(N) #3899
Conversation
Signed-off-by: Neil Shen <[email protected]>
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
Signed-off-by: Neil Shen <[email protected]>
Codecov Report
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## master #3899 +/- ##
================================================
- Coverage 57.0741% 55.2297% -1.8445%
================================================
Files 478 480 +2
Lines 56551 58445 +1894
================================================
+ Hits 32276 32279 +3
- Misses 20978 22871 +1893
+ Partials 3297 3295 -2 |
Can you add some description about how it works? |
Updated, see description. |
/run-kafka-integration-test |
startEmit := time.Now() | ||
// find all rows before resolvedTs and emit to backend sink | ||
for i := 0; i < batchSize; i++ { | ||
tableID, resolvedTs := batch[i].tableID, batch[i].resolvedTs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there are many flush events in batch
for one table, does it help to improve the performance to find the max resovedTs per table first, do not need to search and flush multi times?
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { | ||
return m.getCheckpointTs(tableID), nil | ||
} | ||
m.flushMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, mq sink is not thread-safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Manager flushes bufferSink concurrently, and bufferSink itself is thread-safe.
Signed-off-by: Neil Shen <[email protected]>
Signed-off-by: Neil Shen <[email protected]>
@zhaoxinyu: Thanks for your review. The bot only counts approvals from reviewers and higher roles in list, but you're still welcome to leave your comments. In response to this: Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: f7fbff2
|
Signed-off-by: ti-chi-bot <[email protected]>
In response to a cherrypick label: new pull request created: #3947. |
Signed-off-by: ti-chi-bot <[email protected]>
In response to a cherrypick label: new pull request created: #3948. |
Signed-off-by: ti-chi-bot <[email protected]>
In response to a cherrypick label: new pull request created: #3949. |
Signed-off-by: ti-chi-bot <[email protected]>
In response to a cherrypick label: new pull request created: #3950. |
Signed-off-by: ti-chi-bot <[email protected]>
In response to a cherrypick label: new pull request created: #3951. |
What problem does this PR solve?
Optimize buffer sink flush from O(N^2) to O(N).
O(N^2): It flushes all tables for every
flushEvent
, and each table will periodically generateflushEvent
s.O(N): It flushes tables according to
flushEvent.tableID
.Close https://github.com/pingcap/ticdc/issues/3900
Benchmark:
Check List
Tests
Related changes
Release note