-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-9248] Unify code paths for all write operations about bulk_insert
#13066
base: master
Are you sure you want to change the base?
Conversation
…ility and maintainability 1. Unify code paths for all bulk_Insert to improve code readability and maintainability 2. Using `bucket_rescale` instead of `insert_overwrite` to distinguish it from normal INSERT OVERWRITE Signed-off-by: TheR1sing3un <[email protected]>
fbb4d3b
to
76a01e4
Compare
} | ||
|
||
@Override | ||
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it better each executor has its own logic for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it better each executor has its own logic for this?
This method can be retained for future use by new Executors, but the logic of bulk_insert/insert_overwrite/insert_overwrite_table is fixed and can be consolidated into DataSourceInternalWriterHelper
.
The execution and commit logic of current bulk_insert
and insert_overwrite
/insert_overwrite_table
are not uniform, but in fact, the only difference between them in theory is that get replaced file ids
. So I unified the execution and commit logic, only treating get replaced file ids
differently
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it better each executor has its own logic for this?
I I have retained this method, please review it again~
1. keep `getPartitionToReplacedFileIds` for scalability Signed-off-by: TheR1sing3un <[email protected]>
@hudi-bot run azure |
@@ -52,6 +52,12 @@ public class HoodieInternalConfig extends HoodieConfig { | |||
.markAdvanced() | |||
.withDocumentation("Inner configure to pass static partition paths to executors for SQL operations."); | |||
|
|||
public static final ConfigProperty<WriteOperationType> BULK_INSERT_WRITE_OPERATION_TYPE = ConfigProperty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we eliminate it, it does not make sense to introduce a sub-type and a default value is itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we eliminate it, it does not make sense to introduce a sub-type and a default value is itself.
Done~
…_TYPE 1. eliminate HoodieInternalConfig::BULK_INSERT_WRITE_OPERATION_TYPE Signed-off-by: TheR1sing3un <[email protected]>
} catch (Exception ioe) { | ||
throw new HoodieException(ioe.getMessage(), ioe); | ||
} finally { | ||
writeClient.close(); | ||
} | ||
} | ||
|
||
private Map<String, List<String>> getReplacedFileIds(List<WriteStatus> writeStatuses) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I still think keeping getReplacedFileIds
in each executor is more cleaner and reasonable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I still think keeping
getReplacedFileIds
in each executor is more cleaner and reasonable.
Your suggestion is reasonable. However, since the current executor is only used to do the bulk_insert pre-preparation, and then the rest of the writes and commits are handed over to DataSourceInternalWriterHelper
, executor::getReplaceFileIds
are not called. Because executor is not responsible for committing, the getReplaceFileId
method is not required. My original intention was just to converge the actual execution and commit code paths of bulk_insert
/insert_overwrite
/insert_overwrite_table
together, instead of this: Some use HoodieDatasetBulkInsertHelper
, some DataSourceInternalWriterHelper
.
So do you have any good suggestions? Looking forward to your reply~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is only used to do the bulk_insert pre-preparation, and then the rest of the writes and commits are handed over to DataSourceInternalWriterHelper
Can we move the writes and commits inside the executors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move the writes and commits inside the executors?
Of course we can, then we don't need DataSourceInternalWriterHelper
to perform writes and commits, all writes and commits are consolidated inside executor, and I'll make changes to follow that logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 about Can we move the writes and commits inside the executors?
It is better to let different executor to take care of their own replace logic instead of union together in a big switch or if-else
@@ -73,10 +66,4 @@ protected void preExecute() { | |||
ValidationUtils.checkArgument(res); | |||
LOG.info("Finish to save hashing config " + hashingConfig); | |||
} | |||
|
|||
@Override | |||
protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still need to overwrite this getPartitionToReplacedFileIds
in DatasetBucketRescaleCommitActionExecutor ,Get rid of the influence of String staticOverwritePartition = config.getStringOrDefault(HoodieInternalConfig.STATIC_OVERWRITE_PARTITION_PATHS);
parameter
} catch (Exception ioe) { | ||
throw new HoodieException(ioe.getMessage(), ioe); | ||
} finally { | ||
writeClient.close(); | ||
} | ||
} | ||
|
||
private Map<String, List<String>> getReplacedFileIds(List<WriteStatus> writeStatuses) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 about Can we move the writes and commits inside the executors?
It is better to let different executor to take care of their own replace logic instead of union together in a big switch or if-else
Change Logs
refactor: Unify code paths for all bulk_Insert to improve code readability and maintainability
bucket_rescale
instead ofinsert_overwrite
to distinguish it from normal INSERT OVERWRITEImpact
improve code readability and maintainability
Risk level (write none, low medium or high below)
none
Documentation Update
none
Contributor's checklist