Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix column indices in EnforceDistribution optimizer in Partial AggregateMode #4878

Merged
merged 2 commits into from
Jan 14, 2023

Conversation

jonmmease
Copy link
Contributor

Which issue does this PR close?

Closes #4873.

Rationale for this change

See #4873 for a failing example and some preliminary investigation.

The issue is in the EnforceDistribution physical optimizer (the query from the issue works without it). After looking at this for a while, I think I see what's going on. Normally, the column expressions for a group by have indices that correspond to the schema of the input operation. But for the case of a partial aggregation, these indices need to be updated to correspond to the schema of the partial aggregation instead of the input.

This make sense to me, and it fixes the query, but this is the first time I've looked at the Physical optimizer so someone who's more familiar with that side of things should definitely take a look before merging this.

Are these changes tested?

I added a test case based on the query reported in the issue.

…ateMode

Column expressions need to be updated to correspond with the partial aggregation schema rather than the input schema.
@github-actions github-actions bot added the core Core DataFusion crate label Jan 11, 2023
jonmmease added a commit to vega/vegafusion that referenced this pull request Jan 11, 2023
jonmmease added a commit to vega/vegafusion that referenced this pull request Jan 12, 2023
* Add VegaFusionTable::with_ordering method

* Use explicit ordering column instead of relying on the row_number window function in each transform

* Support impute null

* Add impute tests and fix serialization of null value

* Add ordering to inline tables

* Remove test case workaround to get consistent ordering

* Remove order column when inline dataset has no transforms

* Update DataFusion branch to include fix (apache/datafusion#4878)

* Add area streamgraph spec that used to trigger error

* Remove row number from joinaggregate transform
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jonmmease -- reading the 🕵️ story on #4873 (comment) is fascinating

The fact that this code fixes a bug and has test coverage is quite compelling to me

@mingmwang and @yahoNanJing , who I think are familiar / authors of this code, can you please take a look if you have time. Also cc @metesynnada

group_expr.as_any().downcast_ref::<Column>()
{
Arc::new(Column::new(group_col.name(), idx))
as Arc<dyn PhysicalExpr>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do something like this to get the rust compiler to do the right cast for you:

Suggested change
as Arc<dyn PhysicalExpr>
as _

However the rest of this module is in the same as Arc<...> style so no need to change it in this PR

Copy link
Contributor

@mingmwang mingmwang Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!! It is a bug.

@@ -2810,3 +2810,61 @@ async fn type_coercion_join_with_filter_and_equi_expr() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn test_cross_join_to_groupby_with_different_key_ordering() -> Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this test does indeed panic without the test code

---- sql::joins::test_cross_join_to_groupby_with_different_key_ordering stdout ----
thread 'sql::joins::test_cross_join_to_groupby_with_different_key_ordering' panicked at 'called `Option::unwrap()` on a `None` value', datafusion/core/src/physical_plan/joins/hash_join.rs:923:17
stack backtrace:

@mingmwang
Copy link
Contributor

The Column Index in the physical plan is confusing and error prone, I will take a closer look at this PR.

@mingmwang
Copy link
Contributor

Regarding the fix, I think you can call below methods to simply the logic. I had test it on my local and it works.

                    // Build new group expressions that correspond to the output of partial_agg
                    let new_final_group: Vec<Arc<dyn PhysicalExpr>> =
                        partial_agg.output_group_expr();
                    let new_group_by= PhysicalGroupBy::new_single(
                        new_final_group
                            .iter()
                            .enumerate()
                            .map(|(i, expr)| (expr.clone(), partial_agg.group_expr().expr()[i].1.clone()))
                            .collect(),
                    );

Copy link
Contributor

@metesynnada metesynnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the fix! I will look closer in a couple of hours.

@jonmmease
Copy link
Contributor Author

Thanks for taking a look @alamb @mingmwang @metesynnada! I made the simplification @mingmwang suggested in f32a8d9.

@metesynnada
Copy link
Contributor

LGTM, good work 😀

@alamb alamb merged commit dee0dd8 into apache:master Jan 14, 2023
@alamb
Copy link
Contributor

alamb commented Jan 14, 2023

Thank you everyone for your help!

@ursabot
Copy link

ursabot commented Jan 14, 2023

Benchmark runs are scheduled for baseline = a9ddcd3 and contender = dee0dd8. dee0dd8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@jonmmease jonmmease deleted the jonmmease/GH4873 branch January 14, 2023 12:39
andygrove pushed a commit to andygrove/datafusion that referenced this pull request Jan 17, 2023
…ateMode (apache#4878)

* Fix column indices in EnforceDistribution optimizer in Partial AggregateMode

Column expressions need to be updated to correspond with the partial aggregation schema rather than the input schema.

* Simplify new_group_by calculation
jonmmease added a commit to jonmmease/arrow-datafusion that referenced this pull request Jan 18, 2023
…ateMode (apache#4878)

* Fix column indices in EnforceDistribution optimizer in Partial AggregateMode

Column expressions need to be updated to correspond with the partial aggregation schema rather than the input schema.

* Simplify new_group_by calculation

(cherry picked from commit dee0dd8)
jonmmease added a commit to vega/vegafusion that referenced this pull request Jan 20, 2023
* Add VegaFusionTable::with_ordering method

* Use explicit ordering column instead of relying on the row_number window function in each transform

* Support impute null

* Add impute tests and fix serialization of null value

* Add ordering to inline tables

* Remove test case workaround to get consistent ordering

* Remove order column when inline dataset has no transforms

* Update DataFusion branch to include fix (apache/datafusion#4878)

* Add area streamgraph spec that used to trigger error

* Remove row number from joinaggregate transform
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

panic when GROUP BY column order doesn't match USING column order
5 participants