-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RFC: User Defined Window Functions #6617
Conversation
c7119e6
to
1bc2d6e
Compare
"SELECT car, \ | ||
speed, \ | ||
lag(speed, 1) OVER (PARTITION BY car ORDER BY time),\ | ||
my_average(speed) OVER (PARTITION BY car ORDER BY time),\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shows calling the user defined window function via SQL
} | ||
|
||
/// These different evaluation methods are called depending on the various settings of WindowUDF | ||
impl PartitionEvaluator for MyPartitionEvaluator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the proposal of how a user would specify specify the window calculation -- by impl PartitionEvaluator
// TODO make a helper funciton like `crate_udf` that helps to make these signatures | ||
|
||
fn my_average() -> WindowUDF { | ||
WindowUDF { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the structure that provides metadata about the window function
@@ -17,14 +17,24 @@ | |||
|
|||
//! Partition evaluation module | |||
|
|||
use crate::window::window_expr::BuiltinWindowState; | |||
use crate::window::WindowAggState; | |||
use crate::window_frame_state::WindowAggState; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is moved into datafusion_expr
and I had to make a few small changes related to state management -- but I think I may be able to avoid that.
|
||
/// Logical representation of a user-defined window function (UDWF) | ||
/// A UDAF is different from a UDF in that it is stateful across batches. | ||
#[derive(Clone)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is the proposed interface for defining a WindowUDF -- it is very similar to ScalarUDF and AggregateUDF, on purpose
use std::cmp::min; | ||
use std::collections::VecDeque; | ||
use std::fmt::Debug; | ||
use std::ops::Range; | ||
use std::sync::Arc; | ||
|
||
/// State for each unique partition determined according to PARTITION BY column(s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this stuff is related to state management and was just moved (not modified)
Thanks for putting this together. @mustafasrepo will take a look and we will discuss in our next meeting, then we will circle back to you with our thoughts. |
@stuartcarnie / @ozankabak / @mustafasrepo / @doki23 as you have expressed interest in the user defined window functionality I wonder if you have some time to review the proposed API for doing so |
Thanks @alamb for this proposal. I have examined this PR. Overall it is very well written, and showcases the usage of the new API. However, I have some concerns about flexibility of
Hence I think, we should either support for user to create custom I am working on a new design to solve this problem. @metesynnada and I will discuss it tomorrow, and let you know about the final result. |
thank you for the response @mustafasrepo -- this makes sense to me. I agree it would be better if the type system could help enforce functions that needed to be implemented Are you thinking something like this?
And then adding a separate function to AggregateUDF: enum AccumulatorImpl {. // <---- add this enum
NonRetractable(Box<dyn Accumulator + 'static>
/// returned if the accumulator supports retractable windows
Retractable(Box<dyn RetractableAccumulator + 'static>),
}
pub struct AggregateUDF {
pub name: ...,
pub signature: ...,
pub return_type: ...
// change this signature ----+v
pub accumulator: Arc<dyn Fn(&DataType) -> Result<AccumulatorImpl>, DataFusionError> + Sync + Send + 'static>,
pub state_type: ...
} ? Also related is #6611 we found (which is one of the features of the |
fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> { | ||
Ok(Box::new(MyPartitionEvaluator::new())) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible we could support passing scalar arguments when creating an instance of the function, similar to the built-in functions?
For example, the lag
function takes an optional scalar value for the second argument, which is the shift offset:
I would use this for functions such as moving_average
, which requires a scalar for specifying the minimum number of rows to average.
Note
This would be a welcomed feature for UDAFs too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try and figure out how to do this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stuartcarnie I looked into adding the arguments. The primary issue I encountered is that a WindowUDF
is specified in terms of structures in datafusion-expr
( aka it doesn't have access to PhysicalExpr
s as those are defined in a different crate.
Here are some possible signatures we could provide. Do you have any feedback on these possibilities?
Pass in the Expr
s from the logical plan
This is non ideal in my mind as the PartitionEvaluator is created during execution (where the Expr
s are normally not around anymore)
/// Factory that creates a PartitionEvaluator for the given window function.
///
/// This function is passed its input arguments so that cases such as
/// constants can be correctly handled.
pub type PartitionEvaluatorFunctionFactory =
Arc<dyn Fn(&[Expr]) -> Result<Box<dyn PartitionEvaluator>> + Send + Sync>;
Pass in a ArgType
enum
This is also non ideal in my mind as it seemingly artificially limits what the user defined window function can special case (why not Column's for example??)
enum ArgType {
/// The argument was a single value
Scalar(ScalarValue),
/// the argument is something other than a single value
Array
}
/// Factory that creates a PartitionEvaluator for the given window function.
///
/// This function is passed its input arguments so that cases such as
/// constants can be specially handled if desired.
pub type PartitionEvaluatorFunctionFactory =
Arc<dyn Fn(args: Vec<ArgType>) -> Result<Box<dyn PartitionEvaluator>> + Send + Sync>;
Others?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of passing the PhysicalExpr
trait objects? Example:
/// Factory that creates a PartitionEvaluator for the given window function.
///
/// This function is passed its input arguments and schema so that cases such as
/// constants can be correctly handled.
pub type PartitionEvaluatorFunctionFactory =
Arc<dyn Fn(&[Arc<dyn PhysicalExpr>], &Schema) -> Result<Box<dyn PartitionEvaluator>> + Send + Sync>;
Note
I've also included the
input_schema
, as this would be necessary to evaluate types for the arguments.
This would be similar to the create_built_in_window_expr
:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of passing the PhysicalExpr trait objects? Example:
I think this would be ideal, the problem is that PhysicalExpr
is defined in datafusion-physical-expr
which is not a dependency of datafusion-expr
(the dependency goes the other way): https://github.com/apache/arrow-datafusion/blob/6194d588d5c3e9f202a31a0c524f63e6fb08d040/datafusion/physical-expr/Cargo.toml#L54
Thus, since WindowUDF
is defined in datafusion-expr
it can't depend on PhysicalExpr
datafusion_expr: https://github.com/apache/arrow-datafusion/blob/6194d588d5c3e9f202a31a0c524f63e6fb08d040/datafusion/expr/Cargo.toml#L37
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, of course!
I'd suggest we don't hold up this work and move this problem to another PR to solve it for both user-defined aggregate and window functions.
It works today, just that the update_batch
feels a bit awkward, as the scalar argument is passed as an ArrayRef
. We might be able to engineer it so that it isn't a breaking change in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual runtime passes ColumnarValue
: https://docs.rs/datafusion/latest/datafusion/physical_plan/enum.ColumnarValue.html
Which is either a scalar or an array
We could potentially update the signatures to accept that instead maybe (though we would have to move it to the datafusion_expr crate)
I pushed d531d50 that refines the API somewhat and makes it possible to call My plan for this PR is to
Once we have settled on the final API (pending the feedback from @mustafasrepo and @metesynnada, on #6617 (comment)) I'll then make one or more PRs to master with that API and the examples / tests. |
We had a meeting on this today. @mustafasrepo and @metesynnada will update you with their progress soon (tomorrow possibly). |
How about moving |
In the PR I have implemented my proposals. As a summary, I think on top of current approach, if we combine evaluate and evaluate_stateful fields. User can accomplish desired behavior by setting appropriate flags and implementing corresponding evaluation (either I will open the PR that unify |
I think this is a good idea @doki23, and I think it is broadly speaking what @mustafasrepo has proposed in alamb#12 |
TLDR I really like the proposal. I left my comments here: alamb#12 (review) |
Update here is that the final PR (that has all the learnings from this PR and various cleanups we did along the way) is ready for review: #6703 |
Which issue does this PR close?
#5781
Rationale for this change
We would like to allow users to take full advantage of the power of DataFusion's window functions (largely contributed by @ozankabak and @mustafasrepo 👏 )
This PR contains a potential implementation of User Defined Window Functions: (the "Use existing APIs" approach described on #5781 (comment))
I don't intend to merge this specific PR. Instead, if the community likes this basic approach I will break this PR up into pieces and incrementally merge it
What changes are included in this PR?
The new example in this PR shows how this works. Run
Which produces the following output (where
my_average
's implementation is defined insimple_udwf.rs
as a user defined window function):Here are the major changes in this PR
PartitionEvaluator
definition into datafusion_expr (much like theAccumulator
trait for AggregateUDFs)WindowAggState
,WindwFrameContext
and some related structures todatafusion_expr
(so the UDWF did not depend ondatafusion-physical-expr
Traiti
fy the built in state soWindowUDF
did not depend ondatafusion-physical-expr
Open questions:
I think it may be possible to simplify the
PartitionEvaluator
to remove the state management which would make the needed changes (the amount of code that needs to be moved todatafusion_expr
) smaller. I will try to do this as a separate PROutstanding cleanups
I found a place where the optimizer special cases a particular window function which I think I can remove (and I will try to do so as separate PR (#6619)
https://github.com/apache/arrow-datafusion/blob/1af846bd8de387ce7a6e61a2008917a7610b9a7b/datafusion/core/src/physical_plan/windows/mod.rs#L254-L257