Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[window function] support min max with self define sliding window and optimize segment tree . #4616

Closed
wants to merge 6 commits into from

Conversation

Ted-Jiang
Copy link
Member

@Ted-Jiang Ted-Jiang commented Dec 14, 2022

Which issue does this PR close?

Closes #4603.

Rationale for this change

There are three kind of window:

  1. RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: compute once.
  2. RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: use Aggregator compute cumulative state
  3. ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING: need Aggregator support remove sub-state, for now min_max agg func not support this remove operator

From paper For aggregate functions like min, max, count, and sum, the Segment Tree uses the obvious corresponding aggregate function.

So i try to use segment-tree to support this.

What changes are included in this PR?

Are these changes tested?

yes, also test result with spark-sql

Are there any user-facing changes?

Signed-off-by: yangjiang <[email protected]>
Signed-off-by: yangjiang <[email protected]>
@github-actions github-actions bot added core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions labels Dec 14, 2022
@Ted-Jiang Ted-Jiang marked this pull request as draft December 14, 2022 11:57
Signed-off-by: yangjiang <[email protected]>
Signed-off-by: yangjiang <[email protected]>
Signed-off-by: yangjiang <[email protected]>
@Ted-Jiang Ted-Jiang marked this pull request as ready for review December 14, 2022 13:01
@Ted-Jiang Ted-Jiang closed this Dec 14, 2022
@Ted-Jiang Ted-Jiang reopened this Dec 14, 2022
@Ted-Jiang
Copy link
Member Author

@alamb @ @mustafasrepo PTAL

@mustafasrepo
Copy link
Contributor

Thanks @Ted-Jiang for the contribution. I think in the long run breaking the update_batch and retract_batch API makes code hard to maintain. I wonder whether is there any way to hide SegmentTree implementation behind update_batch and retract_batch calls (I will experiment with it). Also your implementation stores all batch at the beginning and queries its result according to range. There is also a data structure in the discussion that supports push, pop, get (update, retract, evaluate in our case) with amortized O(1) complexity. I think this data structure is better for our use case in terms of complexity and support for incremental implementation. I have opened an issue previously discussing these points #4402. I am putting it here, in case it helps.

@alamb
Copy link
Contributor

alamb commented Dec 14, 2022

I plan to review this carefully tomorrow

@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented Dec 15, 2022

I think in the long run breaking the update_batch and retract_batch API makes code hard to maintain.

Yes, I agree with this. Now window function with agg reuse the aggExpr, if we try to imply retract_batch for min max, i think it will also break a lot of apis for avoiding effect the efficiency of non custom window frames situation. So I prefer keep this alone, is there any solution🤔

I think this data structure is better for our use case

Yes, you are right, from the paper why need stores all batch at the beginning i think it will make inter-partition parallelism (but not in this pr 😂, will experiment it)

@mustafasrepo
Copy link
Contributor

@berkaycpp is working on a PR to support MIN, MAX with custom window frames. Thinking about how to utilize existing API lead us to use design in the discussion. I think this PR will be ready in a couple of days (Main components are already there, it needs minor refactors). I think implementation in this PR is better design for future (both in terms of algorithmic complexity[it is an amortized constant algorithm and uses finite memory] and readability). @alamb What do you think about this discussion?

@ozankabak
Copy link
Contributor

ozankabak commented Dec 15, 2022

We can factor in many dimensions when deciding the optimal data structure to maintain the aggregation state, that would be a nice discussion to have. (and I'm looking forward to it!)

However, I do think we should be conforming to the API and not special casing just for this. As the PR @mustafasrepo linked to shows, it is not hard at all to do this (the PR is very small). I think this is a case where there is no good reason to accumulate technical debt since the cost of doing this "the right way" is small.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good start @Ted-Jiang -- thank you

Thanks @Ted-Jiang for the contribution. I think in the long run breaking the update_batch and retract_batch API makes code hard to maintain. I wonder whether is there any way to hide SegmentTree implementation behind update_batch and retract_batch calls (I will experiment with it).

I agree with @mustafasrepo here that it seems like the SegmentTree or whatever structure we eventually go with, would ideally be something inside the accumulator and hidden behind retract_batch

Supporting retract_batch on the existing min / max aggregators in https://github.com/apache/arrow-datafusion/blob/master/datafusion/physical-expr/src/aggregate/min_max.rs is probably a bad idea as it will slow down the non-window version

I wonder if it would be possible to make new accumulators that did support retract batch and use them instead of MinAccumulator and MaxAccumulator when operating on bounded window functions

Something like

/// An accumulator that tracks the minimum value across a window of values
#[derive(Debug)]
pub struct RetractableMinAccumulator {
    tree: SegmentTree,
}

impl RetractableMinAccumulator {
    /// new min accumulator
    pub fn try_new(datatype: &DataType) -> Result<Self> {
...
    }
}

impl Accumulator for RetractableMinAccumulator {
    fn state(&self) -> Result<Vec<ScalarValue>> {
        Ok(vec![self.min.clone()])
    }

    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
      // add values to SegmentTree
    }

    fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
      // remove values from SegmentTree
    }
...
}

Comment on lines +154 to +156
WindowFrameBound::Preceding(x) => x.eq(&ScalarValue::Null),
WindowFrameBound::CurrentRow => false,
WindowFrameBound::Following(x) => x.eq(&ScalarValue::Null),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WindowFrameBound::Preceding(x) => x.eq(&ScalarValue::Null),
WindowFrameBound::CurrentRow => false,
WindowFrameBound::Following(x) => x.eq(&ScalarValue::Null),
WindowFrameBound::Preceding(x) => x.is_null(),
WindowFrameBound::CurrentRow => false,
WindowFrameBound::Following(x) => x.is_null(),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using x.is_null() so that it also catches typed nulls (like ScalarValue::UInt6(None))

@ozankabak
Copy link
Contributor

Supporting retract_batch on the existing min / max aggregators in https://github.com/apache/arrow-datafusion/blob/master/datafusion/physical-expr/src/aggregate/min_max.rs is probably a bad idea as it will slow down the non-window version

I wonder if it would be possible to make new accumulators that did support retract batch and use them instead of MinAccumulator and MaxAccumulator when operating on bounded window functions

Agreed, this makes sense. Planner can choose which version to use depending on the query.

@alamb
Copy link
Contributor

alamb commented Dec 15, 2022

I think implementation in this https://github.com/synnada-ai/arrow-datafusion/pull/29 is better design for future (both in terms of algorithmic complexity[it is an amortized constant algorithm and uses finite memory] and readability). @alamb What do you think about this discussion?

I don't have any strong opinion on the specific data structure used. My ideal workflow is:

  1. Whoever contributes the code initially gets to decide on the initial algorithm
  2. The initial PR includes sufficient tests that ensure subsequent algorithmic improvements remain correct
  3. We accept follow on PRs to improve the algorithms that show improved performance in benchmarks

Does that make sense?

@ozankabak
Copy link
Contributor

ozankabak commented Dec 15, 2022

It makes sense to me. I will check with @mustafasrepo tomorrow but I suspect he will agree 🙂 Good testing + reasonable initial algorithm is sufficient for a first implementation. We can go with whichever implementation that satisfies these baselines and improvements will always follow later.

On the other hand, I also think that one should avoid breaking a design pattern to implement incremental new functionality unless there is a good justification for doing so. In this case, I do not see a compelling reason to break the design, especially given that there are easy ways to do this within the current design. What do you think, @alamb?

Edit:

I agree with @mustafasrepo here that it seems like the SegmentTree or whatever structure we eventually go with, would ideally be something inside the accumulator and hidden behind retract_batch

Didn't see this before, so it seems like everybody is on the same page. Consider my question answered.

@Ted-Jiang, would you like to collaborate with @mustafasrepo to explore how we can add a new MIN/MAX accumulator that conforms to the update/retract API? I would love to help you guys in the effort too.

@Ted-Jiang
Copy link
Member Author

@Ted-Jiang, would you like to collaborate with @mustafasrepo to explore how we can add a new MIN/MAX accumulator that conforms to the update/retract API? I would love to help you guys in the effort too.

@mustafasrepo Sure, You suggestions all make sense to me. As There's been a lot of work in https://github.com/synnada-ai/arrow-datafusion/pull/29. I will close this pr and wait for and be glad to review it 😄.

@Ted-Jiang Ted-Jiang closed this Dec 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate logical-expr Logical plan and expressions physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[window function] support min max with self define sliding window.
4 participants