Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix root names in pyspark reduction with nw.all() #1787

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

camriddell
Copy link
Contributor

@camriddell camriddell commented Jan 10, 2025

What type of PR is this? (check all applicable)

  • πŸ’Ύ Refactor
  • ✨ Feature
  • πŸ› Bug Fix
  • πŸ”§ Optimization
  • πŸ“ Documentation
  • βœ… Test
  • 🐳 Other

Related issues

Checklist

  • Code follows style guide (ruff)
  • Tests added
  • Documented the changes

If you have comments or can explain your changes, please do so below

Checking if the parent expression is the root and if that expression is "all" to also alias the result seems to alleviate this problem in this particular test.

This may need to be revisited when the .name namespace is created for pyspark as I am unsure whether or not this approach play nicely with multi columnar renaming functions like prefix, suffix or map.

@@ -102,7 +102,9 @@ def func(df: SparkLikeLazyFrame) -> list[Column]:
for _input in inputs:
input_col_name = get_column_name(df, _input)
column_result = call(_input, **_kwargs)
if not returns_scalar:
if not returns_scalar or (
(self._depth == 0) and (self._function_name == "all")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for looking into this!

I think this might be too specific and that we'll need a more generic solution - will look into it and let you know

@MarcoGorelli
Copy link
Member

MarcoGorelli commented Jan 21, 2025

Rough plan of attack could be:

  • aggregations should just be carried out as-is. For example, expr.sum() just becomes F.sum(expr)
  • in binary operations between multiple expressions, if one of them aggregates but the other one doesn't, then the one that aggregates gets broadcasted via .over(Window(F.lit(1))) and the resulting expression gets returns_scalar=False
  • in select, if everything aggregates, just use .agg. Else, those which aggregate get an extra .over(Window(F.lit(1)))
  • in group-by aggregations, everything is expected to aggregate

So then, we can support:

  • everything aggregates (e.g. df.select(nw.col('a').min(), nw.col('b').max()))
  • some aggregate, and some don't (e.g. df.select('a', nw.col('b').max())):
   ...: select a, max(b) over ()
   ...: from df
  • binary operations between some expressions which aggregate and others which don't, e.g. df.select((nw.col('a') - nw.col('a').mean()).round(2)):
   ...: select round(a - (mean(a) over ()), 2)
   ...: from rel
  • group-by, e.g. `df.group_by('a').agg(nw.col('b').mean())
    ...: select a, mean(b)
    ...: from rel
    ...: group by a

This would require some refactoring, but it would be well-worth it


Limitations - some examples of operations which I don't think we can support simply are:

  • df.select((nw.col('a') - nw.col('a').mean()).sum()). If we did
    ...: select sum(a - (mean(a) over ()))
    ...: from df

we'd get aggregate function calls cannot contain window function calls

  • df.group_by('a').agg((nw.col('a') - nw.col('a').mean()).sum())

But, I think that's OK

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: Pyspark reduction does not preserve names root names with nw.all()
2 participants