Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine evaluate_stateful and evaluate_inside_range #6665

Merged
merged 6 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions datafusion/physical-expr/src/window/partition_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,20 @@ pub trait PartitionEvaluator: Debug + Send {
Ok(())
}

/// Gets the range where the window function result is calculated.
///
/// `idx`: is the index of last row for which result is calculated.
/// `n_rows`: is the number of rows of the input record batch (Used during bounds check)
fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<Range<usize>> {
Err(DataFusionError::NotImplemented(
"get_range is not implemented for this window function".to_string(),
))
/// If `uses_window_frame` flag is `false`. This method is used to calculate required range for the window function
/// Generally there is no required range, hence by default this returns smallest range(current row). e.g seeing current row
/// is enough to calculate window result (such as row_number, rank, etc)
fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
if self.uses_window_frame() {
Err(DataFusionError::Execution(
"Range should be calculated from window frame".to_string(),
))
} else {
Ok(Range {
start: idx,
end: idx + 1,
})
}
}

/// Called for window functions that *do not use* values from the
Expand All @@ -150,7 +156,7 @@ pub trait PartitionEvaluator: Debug + Send {
// Default implementation may behave suboptimally (For instance `NumRowEvaluator` overwrites it)
if !self.uses_window_frame() && self.supports_bounded_execution() {
let res = (0..num_rows)
.map(|_idx| self.evaluate(values, &Range { start: 0, end: 1 }))
.map(|idx| self.evaluate(values, &self.get_range(idx, num_rows)?))
.collect::<Result<Vec<_>>>()?;
ScalarValue::iter_to_array(res.into_iter())
} else {
Expand Down
6 changes: 0 additions & 6 deletions datafusion/physical-expr/src/window/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,6 @@ pub(crate) struct RankEvaluator {
}

impl PartitionEvaluator for RankEvaluator {
fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

let start = idx;
let end = idx + 1;
Ok(Range { start, end })
}

fn update_state(
&mut self,
state: &WindowAggState,
Expand Down
6 changes: 0 additions & 6 deletions datafusion/physical-expr/src/window/row_number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ pub(crate) struct NumRowsEvaluator {
}

impl PartitionEvaluator for NumRowsEvaluator {
fn get_range(&self, idx: usize, _n_rows: usize) -> Result<Range<usize>> {
let start = idx;
let end = idx + 1;
Ok(Range { start, end })
}

/// evaluate window function result inside given range
fn evaluate(
&mut self,
Expand Down