-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for linear range calculation in WINDOW functions #4989
Conversation
@alamb, I think you will like this. As I was reading the segment tree paper from #4904, one of the remarks therein that stood out to me was that in RANGE frames a simple linear search was preferred to bisections due to amortization. We wanted to check if this theoretical gain shows up in practice -- and it does bigly! This PR uses cuts down on bisect usage in appropriate places and uses a linear search to exploit this amortization. Also, there seems to be an intermittent CI failure, if you re-run the failed workflows it should pass. |
Github is experiencing some serious issues today, so it's probably related to their status. I'll keep an eye on it and kick CI once in a while... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not deeply familiar with this part of the code, so I'd like a backup review. But the tests pass and the numbers are better so I'm inclined to
pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> { | ||
columns | ||
.iter() | ||
.map(|arr| ScalarValue::try_from_array(arr, idx)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What dark magic converts this from an Vec<Result<ScalarValue>>
to a Result<Vec<ScalarValue>>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🙂 collect
automagically leverages the info in the return type annotation, as if there was a let
binding or if we had used collect::<...>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Longer term, I think there is quite a bit more performance to be gained by avoiding the use of ScalarValue
here and instead using something more optimized like the RowFormat. This is not something for this PR I am just trying to plan a seed if we need more performance, there is a path
@@ -85,6 +88,7 @@ impl<'a> WindowFrameContext<'a> { | |||
sort_options, | |||
length, | |||
idx, | |||
last_range, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last_range being added everywhere is a performance optimization to not throw away the index which was previously calculated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, as it goes through batches, the search picks up from where it left off before. This works for fixed-boundary RANGE frames (which is what Datafusion supports).
I will plan to review this carefully tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very nice 👍 thank you @mustafasrepo and @ozankabak -- the only thing I think that needs to be addressed prior to merging is my question about linear_search
not being used (I may be confused)
Longer term I think there are additional optimization opportunities here but this PR is a great step forward
pub fn get_row_at_idx(columns: &[ArrayRef], idx: usize) -> Result<Vec<ScalarValue>> { | ||
columns | ||
.iter() | ||
.map(|arr| ScalarValue::try_from_array(arr, idx)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Longer term, I think there is quite a bit more performance to be gained by avoiding the use of ScalarValue
here and instead using something more optimized like the RowFormat. This is not something for this PR I am just trying to plan a seed if we need more performance, there is a path
DataFusionError::Internal("Column array shouldn't be empty".to_string()) | ||
})? | ||
.len(); | ||
let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW the arrow kernels have https://docs.rs/arrow/31.0.0/arrow/compute/struct.LexicographicalComparator.html which you might be able to use and avoid having to construct ScalarValues to compare.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LexicographicalComparator
API, compares values at the two indices and returns their ordering. This useful to find change detection, or partition boundaries. However, in our case we need to search for specific value inside Array (possibly not existing in the array.). However, maybe with some kind of tweak, we maybe able to use LexicographicalComparator
for our use case. I will think about it in detail.
@@ -83,7 +83,7 @@ pub trait PartitionEvaluator: Debug + Send { | |||
fn evaluate_inside_range( | |||
&self, | |||
_values: &[ArrayRef], | |||
_range: Range<usize>, | |||
_range: &Range<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} else { | ||
last_range.end | ||
}; | ||
let compare_fn = |current: &[ScalarValue], target: &[ScalarValue]| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the above comment about https://docs.rs/arrow/31.0.0/arrow/compute/struct.LexicographicalComparator.html possibly being another way to improve performance
where | ||
F: Fn(&[ScalarValue], &[ScalarValue]) -> Result<bool>, | ||
{ | ||
while low < high { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if this would be faster or not, but I winder if you might be able to use .iter()
and find()
to find the relevant index and possibly avoid the bounds checks 🤔
Like if you could do something like item_columns.iter().enumerate().find(compare_fn).map(|(i, _)| i).unwrap()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mean something like this:
Ok((low..high).find(|&idx| {
let val = get_row_at_idx(item_columns, idx)?;
!compare_fn(&val, target)?
}).unwrap_or(high))
The problem is with the ?
operators, we would need to change them to unwrap
calls for this to work. The code would look nicer, but we would be incurring the downside of panicking in case something goes wrong. In general, I prefer to err on the side of being a little more verbose than necessary but retain control over errors, but I don't have a strong opinion on this specific case. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I was wondering if we could get an iter over item_columns
somehow (and thus avoid all the bounds checks) -- I realize this is not really easy w/ multiple arrays. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. But let's still keep this in our minds in the background, and improve this section in the future if anyone finds neat way.
/// rows (`item_columns`) via a linear scan. It assumes that `item_columns` is sorted | ||
/// according to `sort_options` and returns the insertion index of `target`. | ||
/// Template argument `SIDE` being `true`/`false` means left/right insertion. | ||
pub fn linear_search<const SIDE: bool>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see linear_search
used anywhere in this PR other than tests -- I wonder if the intention was to call it in datafusion/physical-expr/src/window/window_frame_state.rs
? It looks like that currently has an inlined version of this function 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it compatible with bisect
api, we add two versions; where one version takes sort_options: &[SortOptions]
, other version takes compare_fn: F
. We wanted to add both versions in case anyone need them. In terms of functionality first version is not necessary. However, since constructing comparator function from SortOptions
is a bit cumbersome, we wanted to have that version also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so the same logic has two drivers: One with a comparison function, one with SortOptions
. We currently use the former, but also anticipate to use the latter in the near future (we plan a follow-up of this PR for GROUPS mode). As @mustafasrepo mentions, it also brings both search APIs in line, which is good too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My hesitation was that the tests are for linear_search
-- the fact that you plan to use it in the future makes sense to me
Thank you for carefully reviewing @alamb. We will consider further optimizing by leveraging |
Benchmark runs are scheduled for baseline = 92d0a05 and contender = b71cae8. b71cae8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4979. Makes progress on #4904.
Rationale for this change
During range calculation for window frames, we can use linear search instead of bisect. Since we know that table is already sorted and we would progress only in one direction. Linear search is amortized constant (When window frame boundaries are static). Hence this version is more optimal than current bisect implementation (where search complexity is log(n)). This change decreases overall window range calculation complexity from
O(n*log(n))
toO(n)
.What changes are included in this PR?
Are these changes tested?
Existing tests verify window range calculation correctness. Unit tests for
linear_search
is also added. We also compared time elapsed during Window result calculation for the query. Time comparison between linear and bisect version for different conditions can be seen in table below.
Are there any user-facing changes?
No.