-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improve][Connector-file-base] Improved file allocation algorithm for subtasks. #8453
[Improve][Connector-file-base] Improved file allocation algorithm for subtasks. #8453
Conversation
@@ -107,8 +110,7 @@ private void assignSplit(int taskId) { | |||
context.assignSplit(taskId, currentTaskSplits); | |||
// save the state of assigned splits | |||
assignedSplit.addAll(currentTaskSplits); | |||
// remove the assigned splits from pending splits | |||
currentTaskSplits.forEach(split -> pendingSplit.remove(split)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why delete this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the principle of the poll-based file allocation strategy I consider is based on the location of the split in the collection of pendingSplit files, using assignCount and parallelism modulus to determine which task should be assigned to the split. The premise of this method is that the pendingSplit set remains unchanged during allocation. If you remove allocated splits from the pendingSplit collection, it can cause a change in the location of other splits, potentially changing their owners and causing problems.
I know that the original purpose of adding this line of code is to prevent the double allocation of split, but in my opinion, the submitted code calculates the module based on assignCount and parallelism, and only when the result is consistent with the taskId value, the split is assigned to the corresponding task, which will not cause the double allocation of files. It was checked in unit tests.
So that's why I deleted this line of code. If I need to keep it, I can try to redesign the code flow. If you have any questions, please contact me in time. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point, thanks for your explain. This update LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should change the fields name. pendingSplit
-> allSplit
, So new pendingSplit
= allSplit
- assignedSplit
. It's more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I have changed the field name and commited it. Thanks for your review
@@ -0,0 +1,94 @@ | |||
package org.apache.seatunnel.connectors.seatunnel.file.split; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new file need add license header.
private static int getSplitOwner(String tp, int numReaders) { | ||
return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; | ||
private static int getSplitOwner(int assignCount, int numReaders) { | ||
return assignCount % numReaders; | ||
} | ||
|
||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also should update currentUnassignedSplitSize()
method. the result should be pendingSplit - assignedSplit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I have updated the code as required, thanks for your review
19e589b
to
23818c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. cc @hailin0
int splitOwner = | ||
getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism()); | ||
getSplitOwner(assignCount.getAndIncrement(), context.currentParallelism()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ParallelSource this change may cause duplicate reading of files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to consult you about the circumstances in which repetitive file reading is caused by ParallelSource. Could you please explain in detail? I will try to solve it by reproducing. So far I've written test cases and found nothing like this.
In addition, I would like to ask you if you have any modification opinions, thanks for your review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ParallelSource, the object relationship created is as follows:
Parallelism=5
FileSourceSplitEnumerator-1 -> Reader-1
FileSourceSplitEnumerator-2 -> Reader-2
FileSourceSplitEnumerator-3 -> Reader-3
FileSourceSplitEnumerator-4 -> Reader-4
FileSourceSplitEnumerator-5 -> Reader-5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hailin0 I have found the reason for the duplication of file allocation under ParallelSource: FileSourceSplitEnumerator
allSplit
is a HashSet type attributes, lead to when different ParallelSource
objects initialize allSplit
with the open
method, the order of the files in the allSplit
collection is inconsistent. As a result, files allocated in different ParallelSource
readers will be duplicated under the above algorithm. I have reproduced this situation in unit test, you are right.
To solve this problem, I tried to change the type of allSplit
to TreeSet and define the comparator to be based on splitid values. This ensures the same order of allSplit
collection contents under different Parallelsources, thus solving the problem of duplicate file allocation.
I have created a ParallelSource
unit test under the Seatunnel-Translatation-Base project. You can see if there are still problems with the testing process.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the number of file count read multiple times is inconsistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
cc @Hisoka-X
…n under ParallelSource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @JeremyXin , waiting #8485 to fix ci.
Please rebase from dev to retrigger ci. |
|
Thanks @JeremyXin for update, please fix ci. |
6349e3c
to
9c0437a
Compare
@Hisoka-X Ok, I've added the license header to fix ci |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @JeremyXin
I find many connectors has same problem. Such as Jdbc, could you help to improve them too? @JeremyXin |
@Hisoka-X Ok, I am very glad to do this. I will study the principle of other connectors, such as jdbc. If there is any progress, I will put forward pr or communicate with you in the future. Thanks! |
Purpose of this pull request
This pull request is to solve issue #8451
In order to try to solve the above problems, I try to use a polling algorithm to allocate files for subtasks (instead of the current random allocation based on file hash), to ensure the load balance of the allocation, so as to improve performance. When using seatunnel to synchronize hdfs files, I set the number of concurrent files to 10, and there are five files in the path. The following screenshots show the file allocation results of using the original random file allocation algorithm in the source code and the improved polling file allocation algorithm:
The original file allocation algorithm based on file hashing, when the degree of parallelism is greater than the number of files, a SubTask needs to process multiple files.
Optimized file allocation algorithm based on polling, when the degree of parallelism is greater than the number of files, a SubTask only needs to process one file.
Next, the processing performance of the two allocation algorithm are compared. The following task runtime information shows the processing performance of the origin file allocation algorithm and the polling file allocation algorithm:
As you can see, using the original file allocation algorithm, the task processing performance per second is 4520, and the total task time is 929 seconds
It can be seen that using the polling file allocation algorithm, the task's processing performance per second is 10719, and the total task time is 518 seconds
To sum up, it can be seen that the optimized poll-based file allocation algorithm can make the file allocation of subtasks more balanced and effectively improve the task processing performance, which is a direction worthy of consideration for optimization
Does this PR introduce any user-facing change?
How was this patch tested?
The preceding case is based on the fact that I use seatunnel to synchronize external hdfs files to local hdfs files. In this scenario, I set the task concurrency to 10, source to HdfsFile, sink to HdfsFile, and five files in the upstream Hdfs path. The performance of two different file allocation algorithms is compared by actual synchronization task.
My unit test in FileSourceSplitEnumeratorTest class.
If you have any questions, please contact me in time. Thanks.
Check list
New License Guide
release-note
.