Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hive predicate pushdown not working with multiple filters #21472

Closed
thealexcons opened this issue Feb 26, 2025 · 4 comments · Fixed by #21477
Closed

Hive predicate pushdown not working with multiple filters #21472

thealexcons opened this issue Feb 26, 2025 · 4 comments · Fixed by #21477
Labels
A-io Area: reading and writing data A-io-partitioning Area: reading/writing (Hive) partitioned files accepted Ready for implementation enhancement New feature or an improvement of an existing feature

Comments

@thealexcons
Copy link

thealexcons commented Feb 26, 2025

Version: Polars 1.23.0

Description

I have a parquet dataset in the usual Hive format partitioned by a pl.Date field, e.g.: data.parquet/date=yyyy-mm-dd/*.parquet. If I load the data lazily by filtering on the partition column date, I can see that the query only requires reading a single file (as expected):

import polars as pl
# Data written using:
# df = pl.DataFrame(...)
# df.write_parquet("data.parquet", partition_by=["date"])

ldf = (
    pl.scan_parquet("data.parquet")
    .filter(pl.col("date") == pl.date(2025, 2, 18))
)
print(ldf.explain(optimized=True))
df = ldf.collect()

Output:

Parquet SCAN [data.parquet/date=2025-02-18/00000000.parquet]
PROJECT 20/21 COLUMNS
SELECTION: [(col("date")) == (2025-02-18)]

This is what I expect: it only needs to read a single file.

However, if I add an additional filter to the query, it seems to ignore the Hive partitioning and attempt to read all files in dataset:

ldf = (
    pl.scan_parquet("data.parquet")
    .filter(pl.col("date") == pl.date(2025, 2, 18))
    .filter(pl.col("name").str.ends_with("abc"))
)
print(ldf.explain(optimized=True))
df = ldf.collect()

Output:

Parquet SCAN [data.parquet/date=2014-01-01/00000000.parquet, ..., 2636 other sources]
PROJECT 20/21 COLUMNS
SELECTION: [(col("date")) == (2025-02-18) & (col("name").str.ends_with([String(abc)]))]

My expectation is that it would still only need to read the single relevant file because we are filtering on the partition column, and that the second filter would apply on the data within the single file. Is my understanding wrong? If so, what can I do to avoid reading all files in queries like the example above?

Thanks.

@coastalwhite
Copy link
Collaborator

The problem with doing this in the query plan is that things like row_index ruin this filtering. The new streaming engine now does it at runtime (so it won't show in explain). This will output the amount of pruned files to the console if you use POLARS_VERBOSE=1. My guess is that that will be become the standard over time. At the moment, the in-memory engine still fully relies on this query-plan pruning.

@thealexcons
Copy link
Author

thealexcons commented Feb 26, 2025

So is this row_index issue related to certain operations (such asends_with )? If I use the equality operator in the second filter instead, it seems to only read the single file.

Regarding POLARS_VERBOSE=1, I can see it log hive partitioning: skipped 2000 files, first file: ... after logging that it was able to skip each parquet file (the statistics were sufficient to apply the predicate). With the second example, it is able to skip certain parquet row groups, but not the full file.

@coastalwhite
Copy link
Collaborator

coastalwhite commented Feb 26, 2025

Oh! My bad! The issue I was describing was a more future technical issue. The reason here why we are not able to skip items there is that we don't have str.ends_with and str.starts_with implemented in SkipBatchPredicate.

Basically, we need to map:

  • col(X).str.starts_with(E) to col(X_min) == col(X_max) & col(X_null_count) == 0 & ~col(X_min).str.starts_with(E).
  • col(X).str.ends_with(E) to col(X_min) == col(X_max) & col(X_null_count) == 0 & ~col(X_min).str.ends_with(E).

@coastalwhite coastalwhite added enhancement New feature or an improvement of an existing feature accepted Ready for implementation A-io Area: reading and writing data A-io-partitioning Area: reading/writing (Hive) partitioned files labels Feb 26, 2025
coastalwhite added a commit to coastalwhite/polars that referenced this issue Feb 26, 2025
This PR adds a fallback for skip batch predicate if we don't have a better
specialized implementation. Namely any expression that does not have a better
fallback now gets lowered to:

```
E -> all(col(A_min) == col(A_max) & col(A_nc) == 0 for A in LIVE(E)) & ~(E)
```

This basically means that if the predicate columns are constant for a batch, we
now are always accurately predict whether we can skip it.

Specifically, this makes pruning hive partitions much more consistent and
potent.

Fixes pola-rs#21472.
@thealexcons
Copy link
Author

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-io Area: reading and writing data A-io-partitioning Area: reading/writing (Hive) partitioned files accepted Ready for implementation enhancement New feature or an improvement of an existing feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants