Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8990] Partition bucket index supports query pruning based on bucket id #13060

Merged
merged 16 commits into from
Apr 5, 2025

Conversation

zhangyue19921010
Copy link
Contributor

Change Logs

Followed PR #13017
Support Flink && Spark query adopt partition bucket index based buckId pruning

Impact

Flink && Spark query adopt partition bucket index based buckId pruning

Risk level (write none, low medium or high below)

low

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Mar 31, 2025
@zhangyue19921010
Copy link
Contributor Author

@hudi-bot run azure

@danny0405 danny0405 changed the title [HUDI-8990] Flink && Spark query adopt partition bucket index based buckid pruning [HUDI-8990] Partition bucket index supports query pruning based on bucket id Apr 1, 2025
} catch (Exception e) {
throw new HoodieException("Failed to get hashing config instant to load.", e);
}
}

private static Option<String> getHashingConfigInstantToLoadBeforeOrOn(List<String> hashingConfigInstants, String instant) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always load the latest bucket config to comply with the latest bucket id mappings. Otherwise the query would fail. We do not ensure snapshot isolation here because the reader read being affected by the writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method getHashingConfigInstantToLoadBeforeOrOn is designed to handle Time Travel scenarios. For example, suppose we have a sequence of commits and replace-commits:

C1, C2, R3 (a bucket rescale operation), C4, R5 (another bucket rescale).

If we perform a Time Travel query targeting commit C4 (i.e., specifiedQueryInstant = C4), Maybe we should load the hashing configuration associated with R3 (the latest bucket rescale operation before or equal to C4).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should load the hashing configuration associated with R3 (the latest bucket rescale operation before or equal to C4).

We can not do that because the data file layouts had been changed.

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Danny. After Bucket Rescale is completed, the data layout will change. Therefore, for Spark's Time Travel,something like travel to specific time point snapshot view(https://hudi.apache.org/docs/sql_queries#time-travel-query),

Such as executing a query like SELECT * FROM <table_name> TIMESTAMP AS OF <instant1> WHERE <filter_conditions>, the Hudi would init specifiedQueryTimestamp through HoodieBaseRelation.

  protected lazy val specifiedQueryTimestamp: Option[String] =
    optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key)
      .map(HoodieSqlCommonUtils.formatQueryInstant)

Then get Schema and build fsView based on specifiedQueryTimestamp

For constructing the FsView, Hudi will call getLatestMergedFileSlicesBeforeOrOn(String partitionStr, String maxInstantTime), travel fs view to the specified version. At this point, it is also necessary to load the corresponding hashing_config that was valid at that specific timestamp to ensure the historical data layout

  protected def listLatestFileSlices(globPaths: Seq[StoragePath], partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
    queryTimestamp match {
      case Some(ts) =>
        specifiedQueryTimestamp.foreach(t => validateTimestampAsOf(metaClient, t))

        val partitionDirs = if (globPaths.isEmpty) {
          fileIndex.listFiles(partitionFilters, dataFilters)
        } else {
          val inMemoryFileIndex = HoodieInMemoryFileIndex.create(sparkSession, globPaths)
          inMemoryFileIndex.listFiles(partitionFilters, dataFilters)
        }

        val fsView = new HoodieTableFileSystemView(
          metaClient, timeline, sparkAdapter.getSparkPartitionedFileUtils.toFileStatuses(partitionDirs)
            .map(fileStatus => HadoopFSUtils.convertToStoragePathInfo(fileStatus))
            .asJava)

        fsView.getPartitionPaths.asScala.flatMap { partitionPath =>
          val relativePath = getRelativePartitionPath(convertToStoragePath(basePath), partitionPath)
          fsView.getLatestMergedFileSlicesBeforeOrOn(relativePath, ts).iterator().asScala
        }.toSeq

      case _ => Seq()
    }
  }

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Apr 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example

DeltaCommit1 ==> Write C1_File1, C1_File2
Bucket-Rescale Commit2 ==> Write C2_File1, C2_File2, C2_File3(Replaced C1_File1, C1_File2)
DeltaCommit 3 ==> Write C3_File1_Log1
Bucket-Rescale Commit4 ==> Write C4_File1(Replaced C2_File1, C2_File2, C2_File3 and C3_File1_Log1)

For Sql SELECT * FROM hudi_table TIMESTAMP AS OF <DeltaCommit 3>, we need to load Bucket-Rescale Commit2 instead of load latest hashing config Bucket-Rescale Commit4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new Spark UT test("Test BucketID Pruning With Partition Bucket Index")
Without This PR will throw Exception

Expected Array([1111,3333.0,3333,2021-01-05]), but got Array()
ScalaTestFailureLocation: org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase at (HoodieSparkSqlTestBase.scala:135)
org.scalatest.exceptions.TestFailedException: Expected Array([1111,3333.0,3333,2021-01-05]), but got Array()

With PR in Always load latest hashing config logic, will throw exception

Expected Array([1111,2222.0,2222,2021-01-05]), but got Array()
ScalaTestFailureLocation: org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase at (HoodieSparkSqlTestBase.scala:135)
org.scalatest.exceptions.TestFailedException: Expected Array([1111,2222.0,2222,2021-01-05]), but got Array()

this.path = path;
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
this.tableExists = StreamerUtil.tableExists(path.toString(), hadoopConf);
this.metadataConfig = StreamerUtil.metadataConfig(conf);
this.colStatsProbe = isDataSkippingFeasible(conf.get(FlinkOptions.READ_DATA_SKIPPING_ENABLED)) ? colStatsProbe : null;
this.partitionPruner = partitionPruner;
this.dataBucket = dataBucket;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still use the bucket id here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evolve this dataBucket to a function

@@ -45,7 +45,7 @@ public class PrimaryKeyPruners {

public static final int BUCKET_ID_NO_PRUNING = -1;

public static int getBucketId(List<ResolvedExpression> hashKeyFilters, Configuration conf) {
public static int getBucketFieldHashing(List<ResolvedExpression> hashKeyFilters, Configuration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still return the bucket id? We can add a new param for the function:

  // the input of bucketIdFunc is the partition path
  getBucketId(List<ResolvedExpression> hashKeyFilters, Configuration conf, Function<String, Integer> bucketIdFunc)

Copy link
Contributor Author

@zhangyue19921010 zhangyue19921010 Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Danny, I didn't get this. Is that possible to get full partition path and use it in this new bucketIdFunc In the original code's call site?

  @Override
  public Result applyFilters(List<ResolvedExpression> filters) {
    List<ResolvedExpression> simpleFilters = filterSimpleCallExpression(filters);
    Tuple2<List<ResolvedExpression>, List<ResolvedExpression>> splitFilters = splitExprByPartitionCall(simpleFilters, this.partitionKeys, this.tableRowType);
    this.predicates = ExpressionPredicates.fromExpression(splitFilters.f0);
    this.columnStatsProbe = ColumnStatsProbe.newInstance(splitFilters.f0);
    this.partitionPruner = createPartitionPruner(splitFilters.f1, columnStatsProbe);
    this.dataBucket = getDataBucket(splitFilters.f0);
    // refuse all the filters now
    return SupportsFilterPushDown.Result.of(new ArrayList<>(splitFilters.f1), new ArrayList<>(filters));
  }

What is PR did is get and pass hashing value to getFilesInPartitions, then compute numBuckets , finally compute the final bucket id hashing value % numBuckets

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we evolve this dataBucket to a function (num_buckets_per_partition) -> (int)bucketId to make it somehow more flexible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, changed

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;

public class TestPartitionBucketPruning {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move the tests into TestHoodieTableSource, can we at least add a IT test for the query result validation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change also add a IT named tesQueryWithPartitionBucketIndexPruning to do query result validation.


@transient private lazy val bucketIndexSupport = if (isPartitionSimpleBucketIndex) {
val specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant)
new PartitionBucketIndexSupport(spark, metadataConfig, metaClient, specifiedQueryInstant)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

always query from the latest hash config because there is no SI for reader/writers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as TimeTravel scenarios mentioned above.

*/
public static String getLatestHashingConfigInstantToLoad(HoodieTableMetaClient metaClient) {
public static Option<String> getHashingConfigInstantToLoad(HoodieTableMetaClient metaClient, Option<String> instant) {
try {
List<String> allCommittedHashingConfig = getCommittedHashingConfigInstants(metaClient);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assume there is always config files once the partition bucket index is enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can.
Currently, we have two methods to use partition-level bucket indexing:

  1. Enabling during table creation via DDL: When enabled during table creation, this method initializes a 0000000000000.hashing_config file through the catalog.

  2. Upgrading existing table-level bucket indexes via the CALL command: For tables that already use table-level bucket indexing, invoking a CALL command triggers an upgrade process. This generates a replace-commit instant and initializes a corresponding .hashing_config file

String insertInto = "insert into " + catalogName + ".hudi.hoodie_sink select * from csv_source";
execInsertSql(tableEnv, insertInto);

List<Row> result1 = CollectionUtil.iterableToList(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use execSelectSql(TableEnvironment tEnv, String select)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -69,7 +70,7 @@ public class FileIndex implements Serializable {
private final org.apache.hadoop.conf.Configuration hadoopConf;
private final PartitionPruners.PartitionPruner partitionPruner; // for partition pruning
private final ColumnStatsProbe colStatsProbe; // for probing column stats
private final int dataBucketHashing; // for bucket pruning
private final Option<Functions.Function1<Integer, Integer>> dataBucket; // for bucket pruning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Java function, can we align the indentation of the comments:

  // for partition pruning
  // for probing column stats
  // for bucket pruning

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataBucket -> dataBucketFunc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chenged

public static final int BUCKET_ID_NO_PRUNING = -1;

public static int getBucketFieldHashing(List<ResolvedExpression> hashKeyFilters, Configuration conf) {
public static Option<Functions.Function1<Integer, Integer>> getBucketId(List<ResolvedExpression> hashKeyFilters, Configuration conf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getBucketId -> getBucketIdFunc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the return value is always non-empty, can we just return the function instead of the option instead?

@@ -158,7 +158,7 @@ public class HoodieTableSource implements
private List<Predicate> predicates;
private ColumnStatsProbe columnStatsProbe;
private PartitionPruners.PartitionPruner partitionPruner;
private int dataBucketHashing;
private Option<Functions.Function1<Integer, Integer>> dataBucket;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataBucket -> dataBucketFunc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@@ -179,7 +180,7 @@ public HoodieTableSource(
@Nullable List<Predicate> predicates,
@Nullable ColumnStatsProbe columnStatsProbe,
@Nullable PartitionPruners.PartitionPruner partitionPruner,
int dataBucketHashing,
Option<Functions.Function1<Integer, Integer>> dataBucket,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataBucket -> dataBucketFunc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all changed

@VisibleForTesting
public int getDataBucketHashing() {
return dataBucketHashing;
public Option<Functions.Function1<Integer, Integer>> getDataBucket() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDataBucketFunc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

if (!OptionsResolver.isBucketIndexType(conf) || dataFilters.isEmpty()) {
return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
return Option.empty();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the return value is always non-empty, can we just return the function instead of the option instead?

Here may return Option.empty() when getDataBucketFunc

}
Set<String> indexKeyFields = Arrays.stream(OptionsResolver.getIndexKeys(conf)).collect(Collectors.toSet());
List<ResolvedExpression> indexKeyFilters = dataFilters.stream().filter(expr -> ExpressionUtils.isEqualsLitExpr(expr, indexKeyFields)).collect(Collectors.toList());
if (!ExpressionUtils.isFilteringByAllFields(indexKeyFilters, indexKeyFields)) {
return PrimaryKeyPruners.BUCKET_ID_NO_PRUNING;
return Option.empty();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the return value is always non-empty, can we just return the function instead of the option instead?

Here also may return Option.empty() when getDataBucketFunc

@danny0405
Copy link
Contributor

Thanks for the contribution, I have made some refactoring with the given patch:

13060.patch.zip

@zhangyue19921010
Copy link
Contributor Author

Thanks for the contribution, I have made some refactoring with the given patch:

13060.patch.zip

Thanks for your help Danny, All changed.

return filesInPartition.stream().filter(fileInfo -> fileInfo.getPath().getName().contains(bucketIdStr));
}).collect(Collectors.toList());
} else {
allFiles = FSUtils.getFilesInPartitions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use partition2Files to avoid redundant files fetching.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opsss. changed this.

@danny0405
Copy link
Contributor

danny0405 commented Apr 5, 2025

I have made another patch to fix the bucket pruning logging:
fix_the_bucket_pruning_logging.patch.zip

@hudi-bot
Copy link

hudi-bot commented Apr 5, 2025

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@zhangyue19921010
Copy link
Contributor Author

I have made another patch to fix the bucket pruning logging: fix_the_bucket_pruning_logging.patch.zip

Done. Also CI passed.

@danny0405 danny0405 merged commit ff1cecf into master Apr 5, 2025
60 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants