Skip to content

Commit

Permalink
feat: Allow for more RG skipping by rewriting expr in planner (#20828)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jan 30, 2025
1 parent a823c23 commit 2eaee18
Show file tree
Hide file tree
Showing 61 changed files with 1,288 additions and 553 deletions.
7 changes: 7 additions & 0 deletions crates/polars-core/src/frame/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ impl Column {
_ => None,
}
}
#[inline]
pub fn as_scalar_column_mut(&mut self) -> Option<&mut ScalarColumn> {
match self {
Column::Scalar(s) => Some(s),
_ => None,
}
}

// # Try to Chunked Arrays
pub fn try_bool(&self) -> Option<&BooleanChunked> {
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-core/src/frame/column/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ impl ScalarColumn {
self.scalar = map_scalar(std::mem::take(&mut self.scalar));
self.materialized.take();
}
pub fn with_value(&mut self, value: AnyValue<'static>) -> &mut Self {
self.scalar.update(value);
self.materialized.take();
self
}
}

impl IntoColumn for ScalarColumn {
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-core/src/scalar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl Scalar {
Self { dtype, value }
}

pub fn null(dtype: DataType) -> Self {
Self::new(dtype, AnyValue::Null)
}

#[inline(always)]
pub fn is_null(&self) -> bool {
self.value.is_null()
Expand Down Expand Up @@ -74,4 +78,10 @@ impl Scalar {
pub fn update(&mut self, value: AnyValue<'static>) {
self.value = value;
}

#[inline(always)]
pub fn with_value(mut self, value: AnyValue<'static>) -> Self {
self.update(value);
self
}
}
1 change: 0 additions & 1 deletion crates/polars-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,3 @@ round_series = ["polars-plan/round_series", "polars-ops/round_series"]
is_between = ["polars-plan/is_between"]
dynamic_group_by = ["polars-plan/dynamic_group_by", "polars-time", "temporal"]
propagate_nans = ["polars-plan/propagate_nans", "polars-ops/propagate_nans"]
panic_on_schema = ["polars-plan/panic_on_schema"]
9 changes: 0 additions & 9 deletions crates/polars-expr/src/expressions/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,10 +442,6 @@ impl PhysicalExpr for AggregationExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down Expand Up @@ -745,11 +741,6 @@ impl PhysicalExpr for AggQuantileExpr {
))
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
self.quantile.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
5 changes: 0 additions & 5 deletions crates/polars-expr/src/expressions/alias.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ impl PhysicalExpr for AliasExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.physical_expr.collect_live_columns(lv);
lv.insert(self.name.clone());
}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
44 changes: 0 additions & 44 deletions crates/polars-expr/src/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,6 @@ impl PhysicalExpr for ApplyExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
for i in &self.inputs {
i.collect_live_columns(lv);
}
}

fn isolate_column_expr(
&self,
_name: &str,
Expand All @@ -441,44 +435,6 @@ impl PhysicalExpr for ApplyExpr {
None
}

fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
if self.collect_groups == ApplyOptions::ElementWise {
let mut new_inputs = Vec::new();
for i in 0..self.inputs.len() {
match self.inputs[i].replace_elementwise_const_columns(const_columns) {
None => continue,
Some(new) => {
new_inputs.reserve(self.inputs.len());
new_inputs.extend(self.inputs[..i].iter().cloned());
new_inputs.push(new);
break;
},
}
}

// Only copy inputs if it is actually needed
if new_inputs.is_empty() {
return None;
}

new_inputs.extend(self.inputs[new_inputs.len()..].iter().map(|i| {
match i.replace_elementwise_const_columns(const_columns) {
None => i.clone(),
Some(new) => new,
}
}));

let mut slf = self.clone();
slf.inputs = new_inputs;
return Some(Arc::new(slf));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.expr.to_field(input_schema, Context::Default)
}
Expand Down
26 changes: 0 additions & 26 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,6 @@ impl PhysicalExpr for BinaryExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.left.collect_live_columns(lv);
self.right.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
name: &str,
Expand Down Expand Up @@ -301,27 +296,6 @@ impl PhysicalExpr for BinaryExpr {
Some((Arc::new(self.clone()) as _, specialized))
}

fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
let rcc_left = self.left.replace_elementwise_const_columns(const_columns);
let rcc_right = self.right.replace_elementwise_const_columns(const_columns);

if rcc_left.is_some() || rcc_right.is_some() {
let mut slf = self.clone();
if let Some(left) = rcc_left {
slf.left = left;
}
if let Some(right) = rcc_right {
slf.right = right;
}
return Some(Arc::new(slf));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
self.expr.to_field(input_schema, Context::Default)
}
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-expr/src/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ impl PhysicalExpr for CastExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
49 changes: 1 addition & 48 deletions crates/polars-expr/src/expressions/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,26 +57,7 @@ impl ColumnExpr {
}
}

// this path should not happen
#[cfg(feature = "panic_on_schema")]
{
if _state.ext_contexts.is_empty()
&& std::env::var("POLARS_NO_SCHEMA_CHECK").is_err()
{
panic!(
"got {} expected: {} from schema: {:?} and DataFrame: {:?}",
out.name(),
&*self.name,
_schema,
df
)
}
}
// in release we fallback to linear search
#[allow(unreachable_code)]
{
df.column(&self.name).cloned()
}
df.column(&self.name).cloned()
} else {
Ok(out.clone())
}
Expand All @@ -87,17 +68,6 @@ impl ColumnExpr {
_state: &ExecutionState,
_panic_during_test: bool,
) -> PolarsResult<Column> {
#[cfg(feature = "panic_on_schema")]
{
if _panic_during_test
&& _state.ext_contexts.is_empty()
&& std::env::var("POLARS_NO_SCHEMA_CHECK").is_err()
{
panic!("invalid schema: df {:?};\ncolumn: {}", df, &self.name)
}
}
// in release we fallback to linear search
#[allow(unreachable_code)]
df.column(&self.name).cloned()
}

Expand Down Expand Up @@ -179,10 +149,6 @@ impl PhysicalExpr for ColumnExpr {
Some(self)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
lv.insert(self.name.clone());
}

fn isolate_column_expr(
&self,
_name: &str,
Expand All @@ -197,19 +163,6 @@ impl PhysicalExpr for ColumnExpr {
Some(&self.name)
}

fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
if let Some(av) = const_columns.get(&self.name) {
let lv = LiteralValue::from(av.clone());
let le = LiteralExpr::new(lv, self.expr.clone());
return Some(Arc::new(le));
}

None
}

fn to_field(&self, input_schema: &Schema) -> PolarsResult<Field> {
input_schema.get_field(&self.name).ok_or_else(|| {
polars_err!(
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-expr/src/expressions/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ impl PhysicalExpr for CountExpr {
Ok(AggregationContext::new(c, Cow::Borrowed(groups), true))
}

fn collect_live_columns(&self, _lv: &mut PlIndexSet<PlSmallStr>) {}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
5 changes: 0 additions & 5 deletions crates/polars-expr/src/expressions/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,6 @@ impl PhysicalExpr for FilterExpr {
}
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
self.by.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
5 changes: 0 additions & 5 deletions crates/polars-expr/src/expressions/gather.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ impl PhysicalExpr for GatherExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.phys_expr.collect_live_columns(lv);
self.idx.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-expr/src/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ impl PhysicalExpr for LiteralExpr {
Some(self)
}

fn collect_live_columns(&self, _lv: &mut PlIndexSet<PlSmallStr>) {}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
20 changes: 0 additions & 20 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,10 +602,6 @@ pub trait PhysicalExpr: Send + Sync {
None
}

/// Get the variables that are used in the expression i.e. live variables.
/// This can contain duplicates.
fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>);

fn isolate_column_expr(
&self,
name: &str,
Expand All @@ -617,18 +613,6 @@ pub trait PhysicalExpr: Send + Sync {
None
}

/// Replace columns that are known to be a constant value with their const value.
///
/// This should not replace values that are calculated non-elementwise e.g. col.max(),
/// col.std(), etc.
fn replace_elementwise_const_columns(
&self,
const_columns: &PlHashMap<PlSmallStr, AnyValue<'static>>,
) -> Option<Arc<dyn PhysicalExpr>> {
_ = const_columns;
None
}

/// Can take &dyn Statistics and determine of a file should be
/// read -> `true`
/// or not -> `false`
Expand Down Expand Up @@ -669,10 +653,6 @@ impl PhysicalIoExpr for PhysicalIoHelper {
.map(|c| c.take_materialized_series())
}

fn collect_live_columns(&self, live_columns: &mut PlIndexSet<PlSmallStr>) {
self.expr.collect_live_columns(live_columns);
}

#[cfg(feature = "parquet")]
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
self.expr.as_stats_evaluator()
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-expr/src/expressions/rolling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ impl PhysicalExpr for RollingExpr {
polars_bail!(InvalidOperation: "rolling expression not allowed in aggregation");
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.phys_function.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
6 changes: 0 additions & 6 deletions crates/polars-expr/src/expressions/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,12 +269,6 @@ impl PhysicalExpr for SliceExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.input.collect_live_columns(lv);
self.offset.collect_live_columns(lv);
self.length.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
4 changes: 0 additions & 4 deletions crates/polars-expr/src/expressions/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,6 @@ impl PhysicalExpr for SortExpr {
Ok(ac)
}

fn collect_live_columns(&self, lv: &mut PlIndexSet<PlSmallStr>) {
self.physical_expr.collect_live_columns(lv);
}

fn isolate_column_expr(
&self,
_name: &str,
Expand Down
Loading

0 comments on commit 2eaee18

Please sign in to comment.