Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine evaluate_stateful and evaluate_inside_range #6665

Merged
merged 6 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions datafusion/physical-expr/src/window/built_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ impl WindowExpr for BuiltInWindowExpr {
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let evaluator = self.expr.create_evaluator()?;
let mut evaluator = self.expr.create_evaluator()?;
let num_rows = batch.num_rows();
if self.expr.uses_window_frame() {
if evaluator.uses_window_frame() {
let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
let mut row_wise_results = vec![];
Expand All @@ -114,18 +114,18 @@ impl WindowExpr for BuiltInWindowExpr {
num_rows,
idx,
)?;
let value = evaluator.evaluate_inside_range(&values, &range)?;
let value = evaluator.evaluate(&values, &range)?;
row_wise_results.push(value);
last_range = range;
}
ScalarValue::iter_to_array(row_wise_results.into_iter())
} else if self.expr.include_rank() {
} else if evaluator.include_rank() {
let columns = self.sort_columns(batch)?;
let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?;
evaluator.evaluate_with_rank(num_rows, &sort_partition_points)
evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points)
} else {
let (values, _) = self.get_values_orderbys(batch)?;
evaluator.evaluate(&values, num_rows)
evaluator.evaluate_all(&values, num_rows)
}
}

Expand Down Expand Up @@ -164,15 +164,15 @@ impl WindowExpr for BuiltInWindowExpr {
// We iterate on each row to perform a running calculation.
let record_batch = &partition_batch_state.record_batch;
let num_rows = record_batch.num_rows();
let sort_partition_points = if self.expr.include_rank() {
let sort_partition_points = if evaluator.include_rank() {
let columns = self.sort_columns(record_batch)?;
evaluate_partition_ranges(num_rows, &columns)?
} else {
vec![]
};
let mut row_wise_results: Vec<ScalarValue> = vec![];
for idx in state.last_calculated_index..num_rows {
let frame_range = if self.expr.uses_window_frame() {
let frame_range = if evaluator.uses_window_frame() {
state
.window_frame_ctx
.get_or_insert_with(|| {
Expand All @@ -199,7 +199,8 @@ impl WindowExpr for BuiltInWindowExpr {
// Update last range
state.window_frame_range = frame_range;
evaluator.update_state(state, idx, &order_bys, &sort_partition_points)?;
row_wise_results.push(evaluator.evaluate_stateful(&values)?);
row_wise_results
.push(evaluator.evaluate(&values, &state.window_frame_range)?);
}
let out_col = if row_wise_results.is_empty() {
new_empty_array(out_type)
Expand Down Expand Up @@ -231,8 +232,12 @@ impl WindowExpr for BuiltInWindowExpr {
}

fn uses_bounded_memory(&self) -> bool {
self.expr.supports_bounded_execution()
&& (!self.expr.uses_window_frame()
|| !self.window_frame.end_bound.is_unbounded())
if let Ok(evaluator) = self.expr.create_evaluator() {
evaluator.supports_bounded_execution()
&& (!evaluator.uses_window_frame()
|| !self.window_frame.end_bound.is_unbounded())
} else {
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,29 +85,4 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
///
/// The default implementation does nothing
fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {}

/// Can the window function be incrementally computed using
/// bounded memory?
///
/// If this function returns true, [`Self::create_evaluator`] must
/// implement [`PartitionEvaluator::evaluate_stateful`]
fn supports_bounded_execution(&self) -> bool {
false
}

/// Does the window function use the values from its window frame?
///
/// If this function returns true, [`Self::create_evaluator`] must
/// implement [`PartitionEvaluator::evaluate_inside_range`]
fn uses_window_frame(&self) -> bool {
false
}

/// Can this function be evaluated with (only) rank
///
/// If `include_rank` is true, then [`Self::create_evaluator`] must
/// implement [`PartitionEvaluator::evaluate_with_rank`]
fn include_rank(&self) -> bool {
false
}
}
12 changes: 6 additions & 6 deletions datafusion/physical-expr/src/window/cume_dist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,13 @@ impl BuiltInWindowFunctionExpr for CumeDist {
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CumeDistEvaluator {}))
}

fn include_rank(&self) -> bool {
true
}
}

#[derive(Debug)]
pub(crate) struct CumeDistEvaluator;

impl PartitionEvaluator for CumeDistEvaluator {
fn evaluate_with_rank(
fn evaluate_with_rank_all(
&self,
num_rows: usize,
ranks_in_partition: &[Range<usize>],
Expand All @@ -94,6 +90,10 @@ impl PartitionEvaluator for CumeDistEvaluator {
);
Ok(Arc::new(result))
}

fn include_rank(&self) -> bool {
true
}
}

#[cfg(test)]
Expand All @@ -109,7 +109,7 @@ mod tests {
) -> Result<()> {
let result = expr
.create_evaluator()?
.evaluate_with_rank(num_rows, &ranks)?;
.evaluate_with_rank_all(num_rows, &ranks)?;
let result = as_float64_array(&result)?;
let result = result.values();
assert_eq!(expected, *result);
Expand Down
22 changes: 15 additions & 7 deletions datafusion/physical-expr/src/window/lead_lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ impl BuiltInWindowFunctionExpr for WindowShift {
}))
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
Some(Arc::new(Self {
name: self.name.clone(),
Expand Down Expand Up @@ -206,7 +202,11 @@ impl PartitionEvaluator for WindowShiftEvaluator {
}
}

fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result<ScalarValue> {
fn evaluate(
&mut self,
values: &[ArrayRef],
_range: &Range<usize>,
) -> Result<ScalarValue> {
let array = &values[0];
let dtype = array.data_type();
let idx = self.state.idx as i64 - self.shift_offset;
Expand All @@ -217,11 +217,19 @@ impl PartitionEvaluator for WindowShiftEvaluator {
}
}

fn evaluate(&self, values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
fn evaluate_all(
&mut self,
values: &[ArrayRef],
_num_rows: usize,
) -> Result<ArrayRef> {
// LEAD, LAG window functions take single column, values will have size 1
let value = &values[0];
shift_with_default_value(value, self.shift_offset, self.default_value.as_ref())
}

fn supports_bounded_execution(&self) -> bool {
true
}
}

fn get_default_value(
Expand Down Expand Up @@ -258,7 +266,7 @@ mod tests {
let values = expr.evaluate_args(&batch)?;
let result = expr
.create_evaluator()?
.evaluate(&values, batch.num_rows())?;
.evaluate_all(&values, batch.num_rows())?;
let result = as_int32_array(&result)?;
assert_eq!(expected, *result);
Ok(())
Expand Down
70 changes: 33 additions & 37 deletions datafusion/physical-expr/src/window/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,6 @@ impl BuiltInWindowFunctionExpr for NthValue {
Ok(Box::new(NthValueEvaluator { state }))
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn uses_window_frame(&self) -> bool {
true
}

fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
let reversed_kind = match self.kind {
NthValueKind::First => NthValueKind::Last,
Expand Down Expand Up @@ -197,40 +189,44 @@ impl PartitionEvaluator for NthValueEvaluator {
Ok(())
}

fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result<ScalarValue> {
if let Some(ref result) = self.state.finalized_result {
Ok(result.clone())
} else {
self.evaluate_inside_range(values, &self.state.range)
}
}

fn evaluate_inside_range(
&self,
fn evaluate(
&mut self,
values: &[ArrayRef],
range: &Range<usize>,
) -> Result<ScalarValue> {
// FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
let arr = &values[0];
let n_range = range.end - range.start;
if n_range == 0 {
// We produce None if the window is empty.
return ScalarValue::try_from(arr.data_type());
}
match self.state.kind {
NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1),
NthValueKind::Nth(n) => {
// We are certain that n > 0.
let index = (n as usize) - 1;
if index >= n_range {
ScalarValue::try_from(arr.data_type())
} else {
ScalarValue::try_from_array(arr, range.start + index)
if let Some(ref result) = self.state.finalized_result {
Ok(result.clone())
} else {
// FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1.
let arr = &values[0];
let n_range = range.end - range.start;
if n_range == 0 {
// We produce None if the window is empty.
return ScalarValue::try_from(arr.data_type());
}
match self.state.kind {
NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1),
NthValueKind::Nth(n) => {
// We are certain that n > 0.
let index = (n as usize) - 1;
if index >= n_range {
ScalarValue::try_from(arr.data_type())
} else {
ScalarValue::try_from_array(arr, range.start + index)
}
}
}
}
}

fn supports_bounded_execution(&self) -> bool {
true
}

fn uses_window_frame(&self) -> bool {
true
}
}

#[cfg(test)]
Expand All @@ -254,11 +250,11 @@ mod tests {
end: i + 1,
})
}
let evaluator = expr.create_evaluator()?;
let mut evaluator = expr.create_evaluator()?;
let values = expr.evaluate_args(&batch)?;
let result = ranges
.iter()
.map(|range| evaluator.evaluate_inside_range(&values, range))
.map(|range| evaluator.evaluate(&values, range))
.collect::<Result<Vec<ScalarValue>>>()?;
let result = ScalarValue::iter_to_array(result.into_iter())?;
let result = as_int32_array(&result)?;
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-expr/src/window/ntile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ pub(crate) struct NtileEvaluator {
}

impl PartitionEvaluator for NtileEvaluator {
fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
fn evaluate_all(
&mut self,
_values: &[ArrayRef],
num_rows: usize,
) -> Result<ArrayRef> {
let num_rows = num_rows as u64;
let mut vec: Vec<u64> = Vec::new();
for i in 0..num_rows {
Expand Down
Loading