-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix column indices in EnforceDistribution optimizer in Partial AggregateMode #4878
Conversation
…ateMode Column expressions need to be updated to correspond with the partial aggregation schema rather than the input schema.
* Add VegaFusionTable::with_ordering method * Use explicit ordering column instead of relying on the row_number window function in each transform * Support impute null * Add impute tests and fix serialization of null value * Add ordering to inline tables * Remove test case workaround to get consistent ordering * Remove order column when inline dataset has no transforms * Update DataFusion branch to include fix (apache/datafusion#4878) * Add area streamgraph spec that used to trigger error * Remove row number from joinaggregate transform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jonmmease -- reading the 🕵️ story on #4873 (comment) is fascinating
The fact that this code fixes a bug and has test coverage is quite compelling to me
@mingmwang and @yahoNanJing , who I think are familiar / authors of this code, can you please take a look if you have time. Also cc @metesynnada
group_expr.as_any().downcast_ref::<Column>() | ||
{ | ||
Arc::new(Column::new(group_col.name(), idx)) | ||
as Arc<dyn PhysicalExpr> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do something like this to get the rust compiler to do the right cast for you:
as Arc<dyn PhysicalExpr> | |
as _ |
However the rest of this module is in the same as Arc<...>
style so no need to change it in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!! It is a bug.
@@ -2810,3 +2810,61 @@ async fn type_coercion_join_with_filter_and_equi_expr() -> Result<()> { | |||
|
|||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] | |||
async fn test_cross_join_to_groupby_with_different_key_ordering() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I verified that this test does indeed panic without the test code
---- sql::joins::test_cross_join_to_groupby_with_different_key_ordering stdout ----
thread 'sql::joins::test_cross_join_to_groupby_with_different_key_ordering' panicked at 'called `Option::unwrap()` on a `None` value', datafusion/core/src/physical_plan/joins/hash_join.rs:923:17
stack backtrace:
The Column Index in the physical plan is confusing and error prone, I will take a closer look at this PR. |
Regarding the fix, I think you can call below methods to simply the logic. I had test it on my local and it works. // Build new group expressions that correspond to the output of partial_agg
let new_final_group: Vec<Arc<dyn PhysicalExpr>> =
partial_agg.output_group_expr();
let new_group_by= PhysicalGroupBy::new_single(
new_final_group
.iter()
.enumerate()
.map(|(i, expr)| (expr.clone(), partial_agg.group_expr().expr()[i].1.clone()))
.collect(),
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the fix! I will look closer in a couple of hours.
Thanks for taking a look @alamb @mingmwang @metesynnada! I made the simplification @mingmwang suggested in f32a8d9. |
LGTM, good work 😀 |
Thank you everyone for your help! |
Benchmark runs are scheduled for baseline = a9ddcd3 and contender = dee0dd8. dee0dd8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
…ateMode (apache#4878) * Fix column indices in EnforceDistribution optimizer in Partial AggregateMode Column expressions need to be updated to correspond with the partial aggregation schema rather than the input schema. * Simplify new_group_by calculation
…ateMode (apache#4878) * Fix column indices in EnforceDistribution optimizer in Partial AggregateMode Column expressions need to be updated to correspond with the partial aggregation schema rather than the input schema. * Simplify new_group_by calculation (cherry picked from commit dee0dd8)
* Add VegaFusionTable::with_ordering method * Use explicit ordering column instead of relying on the row_number window function in each transform * Support impute null * Add impute tests and fix serialization of null value * Add ordering to inline tables * Remove test case workaround to get consistent ordering * Remove order column when inline dataset has no transforms * Update DataFusion branch to include fix (apache/datafusion#4878) * Add area streamgraph spec that used to trigger error * Remove row number from joinaggregate transform
Which issue does this PR close?
Closes #4873.
Rationale for this change
See #4873 for a failing example and some preliminary investigation.
The issue is in the EnforceDistribution physical optimizer (the query from the issue works without it). After looking at this for a while, I think I see what's going on. Normally, the column expressions for a group by have indices that correspond to the schema of the input operation. But for the case of a partial aggregation, these indices need to be updated to correspond to the schema of the partial aggregation instead of the input.
This make sense to me, and it fixes the query, but this is the first time I've looked at the Physical optimizer so someone who's more familiar with that side of things should definitely take a look before merging this.
Are these changes tested?
I added a test case based on the query reported in the issue.