Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Upgrade to latest DataFusion revision #909

Merged
merged 19 commits into from
Sep 5, 2024

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Sep 4, 2024

Which issue does this PR close?

N/A

Rationale for this change

DataFusion 42 will be released soon so we need to make sure there are no changes that cause regressions in Comet before it is released.

What changes are included in this PR?

  • Update DataFusion revision
  • Refactor aggregate functions due to upstream API changes
  • Refactor to reduce duplicate code
  • Remove our copy of StatsType and use DataFusion's version
  • Remove our copy of down_cast_any_ref and use DataFusion's version
  • Implement group accumulator support for stddev and variance (or file follow-on issue)

How are these changes tested?

Existing tests.

Comment on lines -35 to -48
/// A utility function from DataFusion. It is not exposed by DataFusion.
pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
if any.is::<Arc<dyn PhysicalExpr>>() {
any.downcast_ref::<Arc<dyn PhysicalExpr>>()
.unwrap()
.as_any()
} else if any.is::<Box<dyn PhysicalExpr>>() {
any.downcast_ref::<Box<dyn PhysicalExpr>>()
.unwrap()
.as_any()
} else {
any
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is now public in DataFusion, so we use that version now

@andygrove andygrove marked this pull request as ready for review September 4, 2024 17:27
@andygrove
Copy link
Member Author

@huaxingao There are quite a few changes to aggregates in this PR due to upstream API changes. Could you review when you get a chance?

Copy link
Contributor

@kazuyukitanimura kazuyukitanimura left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending CI

Ok(Arc::new(SumDecimal::new("sum", child, datatype)))
let func = AggregateUDF::new_from_impl(SumDecimal::new(
"sum",
Arc::clone(&child),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for me to understand, what would happen if we do not do Arc::clone() here?

Copy link
Member Author

@andygrove andygrove Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to clone because we reference child again in the next statement. If I remove the clone, the code fails to compile:

error[E0382]: use of moved value: `child`
    --> core/src/execution/datafusion/planner.rs:1357:72
     |
1347 |                 let child = self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&schema))?;
     |                     ----- move occurs because `child` has type `Arc<dyn datafusion_physical_expr::PhysicalExpr>`, which does not implement the `Copy` trait
...
1354 |                             child,
     |                             ----- value moved here
...
1357 |                         AggregateExprBuilder::new(Arc::new(func), vec![child])
     |                                                                        ^^^^^ value used here after move

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think the second parameter is Arc<dyn PhysicalExpr>. If it is not changed, it should be child?

Copy link
Member

@viirya viirya Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we recently started using Arc::clone(foo) instead of foo.clone() to make it easy to see when we are just cloning an Arc (cheap) vs a more expensive clone operation. There is a clippy lint that checks that we are using this style.

@kazuyukitanimura
Copy link
Contributor

Oops, some test failures

@andygrove
Copy link
Member Author

failure:

2024-09-04T18:07:57.8583417Z - var_pop and var_samp *** FAILED *** (532 milliseconds)
2024-09-04T18:07:57.8588646Z   org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 34999.0 failed 1 times, most recent failure: Lost task 1.0 in stage 34999.0 (TID 19829) (e5b532f86b88 executor driver): org.apache.comet.CometNativeException: Invalid argument error: column types must match schema types, expected Float64 but found UInt64 at column index 1

I rolled back implementing the group accumulators.

@andygrove andygrove merged commit 00eaa8e into apache:main Sep 5, 2024
75 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants