Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-9205] Introduce a representative file containing the estimated total size of file slice #13070

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

TheR1sing3un
Copy link
Member

close to: #12139

When dealing with HadoopFsRelation, Spark merges PartitionedFile based on data such as file size. At present, we directly use the base file or a random log file as the PartitionedFile of the FileSlice. As a result, spark cannot accurately use representative data when merging. Therefore, I estimated the size of the entire FileSlice file if it is converted into parqeut file. Using this data to represent the file slice can provide more accurate data for spark to optimize.

Change Logs

  1. Introduce a representative file containing the estimated total size of file slice

Impact

Reduces task tilt when reading

Risk level (write none, low medium or high below)

low

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:M PR with lines of changes in (100, 300] label Apr 2, 2025
.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
fileSliceMapping.getSlice(filegroupName) match {
case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || fileSlice.getLogFiles.findAny().isPresent) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the requiredSchema.nonEmpty check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the requiredSchema.nonEmpty check.

image This check has not been removed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the requiredSchema.nonEmpty check.

Hi, I have rolled back these changes, I have kept the original logic as much as possible, and only changed the logic for creating representative files

@TheR1sing3un TheR1sing3un requested a review from danny0405 April 2, 2025 09:29
@TheR1sing3un
Copy link
Member Author

@hudi-bot run azure

@@ -178,8 +183,8 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
internalSchemaOpt,
metaClient,
props,
file.start,
file.length,
0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you elaborate this change?

image These two arguments are provided to the file group reader to tell it the start location and length of the base file. This value used to be taken directly from the `PartitiondFile` because when there is a base file in the file slice, The actual size of the base file is used as the length of the `PartitiondFile`. Now the `PartitiondFile` is a representative file of the file slice and is not the length of the actual base file, so we need to get the actual length out of the base file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then why the start is constant 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then why the start is constant 0?

Please see the latest commit, start is still file.start

…size of file slice

1. Introduce a representative file containing the estimated total size of file slice

NOTE:
When dealing with `HadoopFsRelation`, Spark merges `PartitionedFile` based on data such as file size.
At present, we directly use the base file or a random log file as the `PartitionedFile` of the `FileSlice`.
As a result, spark cannot accurately use representative data when merging.
Therefore, I estimated the size of the entire `FileSlice` file if it is converted into parqeut file.
Using this data to represent the file slice can provide more accurate data for spark to optimize.

Signed-off-by: TheR1sing3un <[email protected]>
1. fix scala check style

Signed-off-by: TheR1sing3un <[email protected]>
…nd any log files

1. only read base file if file slice only has base file but not found any log files

Signed-off-by: TheR1sing3un <[email protected]>
…the representative file size calculation logic

1. Retain the original logic to the maximum extent and modify only the representative file size calculation logic

Signed-off-by: TheR1sing3un <[email protected]>
…he estimated proportions

1. Use the configuration in HoodieStorageConfig to calculate the estimated proportions

Signed-off-by: TheR1sing3un <[email protected]>
@TheR1sing3un TheR1sing3un force-pushed the optimize_slice_estimation branch from e522af2 to ad42cec Compare April 3, 2025 02:24
: logFileSize;
}

private static long convertLogFilesSizeToExpectedParquetSize(List<HoodieLogFile> hoodieLogFiles, double logFileFraction) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log format can be Avro, Parquet, or HFile. Could this be generalized instead of being fixated on Parquet?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log format can be Avro, Parquet, or HFile. Could this be generalized instead of being fixated on Parquet?

Now it is not fixed, logFileFraction comes from HoodieStorageConfig#LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION, users can change this value according to their own log format and the actual data pattern

* Get the total file size of a file slice in parquet format.
* For the log file, we need to convert its size to the estimated size in the parquet format in a certain proportion
*/
public static long getTotalFileSizeAsParquetFormat(FileSlice fileSlice, double logFileFraction) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar on the base file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar on the base file.

done~

}).filter(slice => slice != null)
.map(fileInfo => new FileStatus(fileInfo.getLength, fileInfo.isDirectory, 0, fileInfo.getBlockSize,
fileInfo.getModificationTime, new Path(fileInfo.getPath.toUri)))
new FileStatus(estimationFileSize, fileInfo.isDirectory, 0, fileInfo.getBlockSize, fileInfo.getModificationTime, new Path(fileInfo.getPath.toUri))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this affect reading the file because the file status is manipulated? Is there a different way of letting Spark know the file / partitioned file size estimation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this affect reading the file because the file status is manipulated? Is there a different way of letting Spark know the file / partitioned file size estimation?

The actual read process is also controlled by hudi code. spark only does some parallelism related optimization through this file status, and the actual read is still controlled by hudi.

@@ -73,13 +74,13 @@ protected List<SmallFile> getSmallFiles(String partitionPath) {
if (smallFileSlice.getBaseFile().isPresent()) {
HoodieBaseFile baseFile = smallFileSlice.getBaseFile().get();
sf.location = new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId());
sf.sizeBytes = getTotalFileSize(smallFileSlice);
sf.sizeBytes = FileSliceUtils.getTotalFileSizeAsParquetFormat(smallFileSlice, config.getLogFileToParquetCompressionRatio());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be moved to class FileSlice, and keep the comment of method: convertLogFilesSizeToExpectedParquetSize.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be moved to class FileSlice, and keep the comment of method: convertLogFilesSizeToExpectedParquetSize.

done~

…FileSlice::getTotalFileSizeSimilarIOnBaseFile`

1. move `FileSliceUtils::getTotalFileSizeAsParquetFormat` to `FileSlice::getTotalFileSizeSimilarIOnBaseFile`

Signed-off-by: TheR1sing3un <[email protected]>
@TheR1sing3un
Copy link
Member Author

@hudi-bot run azure

1 similar comment
@TheR1sing3un
Copy link
Member Author

@hudi-bot run azure

@hudi-bot
Copy link

hudi-bot commented Apr 7, 2025

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@TheR1sing3un TheR1sing3un requested review from yihua and danny0405 April 8, 2025 07:56
@TheR1sing3un
Copy link
Member Author

@danny0405 @yihua Ready for review again~

if (slice.getBaseFile.isPresent) {
val logFileEstimationFraction = options.getOrElse(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(),
HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.defaultValue()).toDouble
// 1. Generate a disguised representative file for each file slice, which spark uses to optimize rdd partition parallelism based on data such as file size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disguised -> delegate?

// 1. Generate a disguised representative file for each file slice, which spark uses to optimize rdd partition parallelism based on data such as file size
// For file slice only has base file, we directly use the base file size as representative file size
// For file slice has log file, we estimate the representative file size based on the log file size and option(base file) size
val representFiles = fileSlices.map(slice => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delegateFiles

return getBaseFile().isPresent() ? getBaseFile().get().getFileSize() + logFileSize : logFileSize;
}

private long convertLogFilesSizeSimilarOnBaseFile(double logFileFraction) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still think the original method name is better.

Copy link
Contributor

@danny0405 danny0405 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TheR1sing3un Thanks for the contribution, do we have some diagram to illustrage the files balance changes before/after the patch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:M PR with lines of changes in (100, 300]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants