-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add fetch to SortPreservingMergeExec
and SortPreservingMergeStream
#6811
Conversation
SortPreservingMergeExec
SortPreservingMergeExec
SortPreservingMergeExec
and SortPreservingMergeStream
…serving_merge_exec
streaming_merge( | ||
streams, | ||
self.schema.clone(), | ||
&self.expr, | ||
metrics, | ||
self.batch_size, | ||
self.fetch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was removed a few months ago to use streaming_merge
in SortExec
causing SortExec
to spill even with a (low) fetch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed this carefully -- thank you @Dandandan .
I think it is worth considering adding fetch
to the fuzz test cases in https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/fuzz_cases/merge_fuzz.rs to add some additional test coverage
@tustvold could you give this quick review for any issues?
@@ -285,14 +286,13 @@ impl ExternalSorter { | |||
}) | |||
.collect::<Result<_>>()?; | |||
|
|||
// TODO: Pushdown fetch to streaming merge (#6000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
@@ -67,7 +67,7 @@ Limit: skip=0, fetch=10 | |||
------------------TableScan: supplier projection=[s_suppkey, s_comment], partial_filters=[supplier.s_comment LIKE Utf8("%Customer%Complaints%")] | |||
physical_plan | |||
GlobalLimitExec: skip=0, fetch=10 | |||
--SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST] | |||
--SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST], fetch=10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
Outdated
Show resolved
Hide resolved
@@ -227,11 +239,21 @@ impl<C: Cursor> SortPreservingMergeStream<C> { | |||
if self.advance(stream_idx) { | |||
self.loser_tree_adjusted = false; | |||
self.in_progress.push_row(stream_idx); | |||
if self.in_progress.len() < self.batch_size { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the hot path for SortPreservingMerge -- I suspect adding an extra check for fetch won't impact performance in any measurable way, but it might be worth while checking the benchmarks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, otherwise we could maybe need two separate implementations (with/without fetch)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As benchmarks showed no significant difference, I kept this check in the hot path.
Co-authored-by: Andrew Lamb <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a small comment, otherwise LGTM.
SortPreservingMergeExec
and SortPreservingMergeStream
SortPreservingMergeExec
and SortPreservingMergeStream
…serving_merge_exec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left some inline comments, other than these, this PR is LGTM!. Thanks @Dandandan for this PR.
Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mustafa Akur <[email protected]>
Thanks everyone -- this looks like it was a great team effort 👍 |
apache#6811) * Add fetch to sortpreservingmergeexec * Add fetch to sortpreservingmergeexec * fmt * Deserialize * Fmt * Fix test * Fix test * Fix test * Fix plan output * Doc * Update datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs Co-authored-by: Andrew Lamb <[email protected]> * Extract into method * Remove from sort enforcement * Update datafusion/core/src/physical_plan/sorts/merge.rs Co-authored-by: Mustafa Akur <[email protected]> * Update datafusion/proto/src/physical_plan/mod.rs Co-authored-by: Mustafa Akur <[email protected]> --------- Co-authored-by: Daniël Heres <[email protected]> Co-authored-by: Andrew Lamb <[email protected]> Co-authored-by: Mustafa Akur <[email protected]>
Which issue does this PR close?
Closes #6000
Rationale for this change
Without fetch in
SortPreserveMergeExec
and instreaming_merge
(which is used inSortExec
) we effectively don't use the fetch anymore in sort besides limiting the amount of rows in batches.This used to work before, as the
fetch
was also used to exit early whenever the limit has been reached.No regressions in the
sort
bench:What changes are included in this PR?
Are these changes tested?
Covered by existing tests
Are there any user-facing changes?