Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: support various reductions in pyspark #1870

Merged
merged 9 commits into from
Jan 27, 2025

Conversation

FBruzzesi
Copy link
Member

What type of PR is this? (check all applicable)

  • πŸ’Ύ Refactor
  • ✨ Feature
  • πŸ› Bug Fix
  • πŸ”§ Optimization
  • πŸ“ Documentation
  • βœ… Test
  • 🐳 Other

Checklist

  • Code follows style guide (ruff)
  • Tests added
  • Documented the changes

If you have comments or can explain your changes, please do so below

@FBruzzesi FBruzzesi added the fix label Jan 26, 2025
@FBruzzesi FBruzzesi added the pyspark Issue is related to pyspark backend label Jan 26, 2025
]
return self._from_native_frame(self._native_frame.select(*new_columns_list))

def with_columns(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just moved this closer to select method - it was easier to debug them while in the same screen πŸ™ˆ

returns_scalar=self._returns_scalar or returns_scalar,
returns_scalar=returns_scalar,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gave me so much headache before spotting it

@FBruzzesi FBruzzesi marked this pull request as ready for review January 26, 2025 21:01
Comment on lines +119 to +126
new_columns_list = [
col.over(Window.partitionBy(F.lit(1))).alias(col_name)
if _returns_scalar
else col.alias(col_name)
for (col_name, col), _returns_scalar in zip(
new_columns.items(), returns_scalar
)
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is may be too late to set over

for example, in nw.col('a') - nw.col('a').mean() - I think it's in the binary operation __sub__ that nw.col('a').mean() needs to become nw.col('a').mean().over(lit(1))

as in, we want to translate nw.col('a') - nw.col('a').mean() to F.col('a') - F.col('a').mean().over(F.lit(1)). the code, however, as far as I can tell, translates it to (F.col('a') - F.col('a').mean()).over(F.lit(1))

Copy link
Member Author

@FBruzzesi FBruzzesi Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That happens in maybe_evaluate, and you can see that now the reduction_test are passing.
I know it's not ideal to have the logic for setting over in two places, but I couldn't figure out a unique place in which to handle this as maybe_evaluate is called only to evaluate other arguments

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I see! yes this might be fine then, thanks!

Copy link
Member

@MarcoGorelli MarcoGorelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The group by simplification is wonderful 😻 thanks so much for doing this!

native_results: dict[str, list[Column]] = {}

# `returns_scalar` keeps track if an expression returns a scalar and is not lit.
# Notice that lit is quite special case, since it gets broadcasted by pyspark
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of interest, do we run into any issues if we use over anyway with lit? No objections to special casing it, just curious

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We end up with tests/expr_and_series/lit_test.py failing 3 tests due to:

pyspark.errors.exceptions.captured.AnalysisException: [UNSUPPORTED_EXPR_FOR_WINDOW] Expression "1" not supported within a window function.;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this still going to break for, say

df.with_columns(nw.lit(2)+1)

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes correct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test case and now it works!

Copy link
Member

@MarcoGorelli MarcoGorelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, well done @FBruzzesi !

just one comment/question

this seems correct! or at least, I couldn't think of a way to break it

@MarcoGorelli
Copy link
Member

cool, release? πŸš€

@MarcoGorelli MarcoGorelli merged commit 702eea5 into main Jan 27, 2025
23 checks passed
@MarcoGorelli MarcoGorelli deleted the feat/support-pyspark-reductions branch January 27, 2025 09:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
fix pyspark Issue is related to pyspark backend
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants