Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: workaround for aggregates in datafusion #1120

Closed

Conversation

tokoko
Copy link
Collaborator

@tokoko tokoko commented Aug 29, 2024

Currently plans generated by ibis-substrait containing an aggregate can't be run on datafusion. After a bit of a trial and error, I used datafusion's internal sql-to-substrait converter to see what kind of plans datafusion itself generates. Seems like datafusion always expects a projection node to follow after an aggregation node.

This PR introduces a dummy projection node that basically remaps aggregate output columns and does nothing. Targeting a specific consumer in a compiler sounds bad, but since it's an easy "fix" and still generates a valid plan for other consumers, I thought a temporary workaround was worth it.

@gforsyth
Copy link
Member

gforsyth commented Sep 3, 2024

Hey @tokoko -- is it clear whether or not Datafusion considers their behavior a bug or not?

I'm willing to merge this as a temporary fix if:

  1. There's an upstream issue that we're tracking and can link to this PR
  2. We have a test that doesn't add the dummy projection that currently xfails on Datafusion so we know when we can remove this behavior.

@tokoko
Copy link
Collaborator Author

tokoko commented Sep 3, 2024

There's an upstream issue that we're tracking and can link to this PR

I haven't looked on datafusion side yet, tbh. The closest which I think is related is this one in substrait. @EpsilonPrime probably can shed more light..

We have a test that doesn't add the dummy projection that currently xfails on Datafusion so we know when we can remove this behavior.

Makes sense, but how would one do that? The compiler itself either adds the projection or it doesn't, right? We will have to somehow pass some configuration to translate function to enable/disable the workaround if we want to do that. Are you fine with that approach?

@gforsyth
Copy link
Member

gforsyth commented Sep 3, 2024

Are you fine with that approach?

Maybe instead we just have a hand-crafted (simple) substrait plan that aggs and doesn't project that we execute against datafusion?

@EpsilonPrime
Copy link

There is a bug filed for Datafusion's project behavior. I'm not aware of one for the aggregate behavior. It's odd that remapping would be required given that aggregates only output the new fields. I suppose there could be an issue with multiple grouping sets as that does add an additional column.

@EpsilonPrime
Copy link

EpsilonPrime commented Sep 3, 2024

Oh, I know what this is. Datafusion only allows references to be aggregated. If you have an expression with a calculation such as X+1 that needs to be projected first and then aggregated (I refer to this an interior calculation in the tests). If you have an expression such as sum(X)/sum(Y) the two fields need to be aggregated first and then projected and divided later (I call this an exterior calculation). Any you can mix the two to make it even more fun. Here are my tests:

https://github.com/voltrondata/spark-substrait-gateway/blob/main/src/gateway/tests/test_dataframe_api.py#L2578

And here is the implementation (bound up in the Spark to Substrait conversion):

voltrondata/spark-substrait-gateway#75

@tokoko
Copy link
Collaborator Author

tokoko commented Sep 4, 2024

Datafusion only allows references to be aggregated. If you have an expression with a calculation such as X+1 that needs to be projected first and then aggregated

Sounds similar, but not entirely sure this is the cause. Looking into datafusion-produced plans, I saw that it needed an extra project node after an aggregate, not before. plus, if an aggregate itself was a last node in the query, everything seemed to work fine, but once you add another actual projection after it, things seemed to break down.

I'll take another look and test this out.

@EpsilonPrime
Copy link

EpsilonPrime commented Sep 4, 2024

A known bug in both Datafusion and DuckDB is the way that projects output fields. They don't include the input fields (only the new proejcts). That's described in this issue.

There are a few other cases where Datafusion doesn't appear to honor emits (such as on join relations). I've heard there is a desire to add the validator to Datafusion so the generated plans can be validated.

@tokoko
Copy link
Collaborator Author

tokoko commented Sep 4, 2024

I looked closer and seems like this is a separate issue. Datafusion simply can't execute plans that have an aggregate as the last node. Doing any sort of noop after an aggregate fixes the issue. For example a substrait plan generated from this ibis expression: table.group_by("city").aggregate(sales=filter_table["order_id"].count()) throws an error, while both of the following work:

  • Adding a noop filter node:

table.group_by("city").aggregate(sales=filter_table["order_id"].count()).filter(ibis.literal(1) == ibis.literal(1))

  • Adding a noop projection:

table.group_by("city").aggregate(sales=filter_table["order_id"].count()).select("city", "sales")

@tokoko
Copy link
Collaborator Author

tokoko commented Sep 5, 2024

since this is a simple "fix" from ibis expression side, I'll simply amend the existing tests with the workaround (.filter(ibis.literal(True)) instead of changing the compiler and add a failing test to alert us whenever it's fixed on the datafusion side. I'll also open a new ticket on their side. Closing this. thanks for the discussion.

@tokoko tokoko closed this Sep 5, 2024
gforsyth added a commit that referenced this pull request Sep 19, 2024
per #1120
- adds failing test for the bug in datafusion in which a plan with
aggregate as the last node can't be executed.
- adds `.filter(ibis.literal(True)` after aggregations in other tests to
work around a bug in datafusion.

Co-authored-by: Gil Forsyth <[email protected]>
@tokoko tokoko deleted the datafusion-aggregate-workaround branch September 24, 2024 20:24
@tokoko
Copy link
Collaborator Author

tokoko commented Oct 16, 2024

FYI Fixed by 12875 in datafusion. I'll get rid of the empty filter workaround after the fix is incorporated in the next release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants