Skip to content

Commit

Permalink
perf: Provide a fallback skip batch predicate for constant batches (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Feb 26, 2025
1 parent 989d191 commit bf1b47f
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 2 deletions.
43 changes: 41 additions & 2 deletions crates/polars-plan/src/plans/aexpr/predicates/skip_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use polars_core::prelude::{AnyValue, DataType, Scalar};
use polars_core::schema::Schema;
use polars_utils::aliases::PlIndexMap;
use polars_utils::arena::{Arena, Node};
use polars_utils::format_pl_smallstr;
use polars_utils::pl_str::PlSmallStr;
Expand All @@ -11,7 +12,7 @@ use super::super::evaluate::{constant_evaluate, into_column};
use super::super::{AExpr, BooleanFunction, Operator, OutputName};
use crate::dsl::FunctionExpr;
use crate::plans::predicates::get_binary_expr_col_and_lv;
use crate::plans::{ExprIR, LiteralValue};
use crate::plans::{aexpr_to_leaf_names_iter, rename_columns, ExprIR, LiteralValue};
use crate::prelude::FunctionOptions;

/// Return a new boolean expression determines whether a batch can be skipped based on min, max and
Expand Down Expand Up @@ -207,7 +208,7 @@ fn aexpr_to_skip_batch_predicate_rec(
}
}

match expr_arena.get(e) {
let specialized = match expr_arena.get(e) {
AExpr::Explode(_) => None,
AExpr::Alias(_, _) => None,
AExpr::Column(_) => None,
Expand Down Expand Up @@ -524,5 +525,43 @@ fn aexpr_to_skip_batch_predicate_rec(
AExpr::Window { .. } => None,
AExpr::Slice { .. } => None,
AExpr::Len => None,
};

if let Some(specialized) = specialized {
return Some(specialized);
}

// If we don't have a specialized implementation we can check if the whole block is constant
// and fill that value in. This is especially useful when filtering hive partitions which are
// filtered using this expression and which set their min == max.
//
// Essentially, what this does is
// E -> all(col(A_min) == col(A_max) & col(A_nc) == 0 for A in LIVE(E)) & ~(E)

let live_columns = PlIndexMap::from_iter(aexpr_to_leaf_names_iter(e, expr_arena).map(|col| {
let min_name = format_pl_smallstr!("{col}_min");
(col, min_name)
}));
// Rename all uses of column names with the min value.
let expr = rename_columns(e, expr_arena, &live_columns);
let mut expr = expr_arena.add(AExpr::Function {
input: vec![ExprIR::new(expr, OutputName::Alias(PlSmallStr::EMPTY))],
function: FunctionExpr::Boolean(BooleanFunction::Not),
options: FunctionOptions {
collect_groups: crate::plans::ApplyOptions::ElementWise,
..Default::default()
},
});
for col in live_columns.keys() {
let col_min = col!(min: col);
let col_max = col!(max: col);
let col_nc = col!(null_count: col);

let min_is_max = binexpr!(Eq, col_min, col_max); // Eq so that (None == None) == None
let idx_zero = lv!(idx: 0);
let has_no_nulls = eq_missing!(col_nc, idx_zero);

expr = and!(min_is_max, has_no_nulls, expr);
}
Some(expr)
}
33 changes: 33 additions & 0 deletions crates/polars-plan/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::iter::FlatMap;

use polars_core::prelude::*;

use self::visitor::{AexprNode, RewritingVisitor, TreeWalker};
use crate::constants::get_len_name;
use crate::prelude::*;

Expand Down Expand Up @@ -365,3 +366,35 @@ pub fn merge_schemas(schemas: &[SchemaRef]) -> PolarsResult<Schema> {

Ok(merged_schema)
}

/// Rename all reference to the column in `map` with their corresponding new name.
pub fn rename_columns(
node: Node,
expr_arena: &mut Arena<AExpr>,
map: &PlIndexMap<PlSmallStr, PlSmallStr>,
) -> Node {
struct RenameColumns<'a>(&'a PlIndexMap<PlSmallStr, PlSmallStr>);
impl RewritingVisitor for RenameColumns<'_> {
type Node = AexprNode;
type Arena = Arena<AExpr>;

fn mutate(
&mut self,
node: Self::Node,
arena: &mut Self::Arena,
) -> PolarsResult<Self::Node> {
if let AExpr::Column(name) = arena.get(node.node()) {
if let Some(new_name) = self.0.get(name) {
return Ok(AexprNode::new(arena.add(AExpr::Column(new_name.clone()))));
}
}

Ok(node)
}
}

AexprNode::new(node)
.rewrite(&mut RenameColumns(map), expr_arena)
.unwrap()
.node()
}

0 comments on commit bf1b47f

Please sign in to comment.