Skip to content

Commit

Permalink
fix: allow any name in expression parsing (#1871)
Browse files Browse the repository at this point in the history
fix: allow any name in expression parsin
  • Loading branch information
FBruzzesi authored Jan 26, 2025
1 parent 5f45151 commit a851b13
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 87 deletions.
4 changes: 2 additions & 2 deletions narwhals/_arrow/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def simple_select(self, *column_names: str) -> Self:
return self._from_native_frame(self._native_frame.select(list(column_names)))

def select(self: Self, *exprs: IntoArrowExpr, **named_exprs: IntoArrowExpr) -> Self:
new_series = evaluate_into_exprs(self, *exprs, **named_exprs)
new_series: list[ArrowSeries] = evaluate_into_exprs(self)(*exprs, **named_exprs)
if not new_series:
# return empty dataframe, like Polars does
return self._from_native_frame(self._native_frame.__class__.from_arrays([]))
Expand All @@ -306,7 +306,7 @@ def with_columns(
self: Self, *exprs: IntoArrowExpr, **named_exprs: IntoArrowExpr
) -> Self:
native_frame = self._native_frame
new_columns = evaluate_into_exprs(self, *exprs, **named_exprs)
new_columns: list[ArrowSeries] = evaluate_into_exprs(self)(*exprs, **named_exprs)

length = len(self)
columns = self.columns
Expand Down
4 changes: 2 additions & 2 deletions narwhals/_dask/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _from_native_frame(self: Self, df: Any) -> Self:

def with_columns(self: Self, *exprs: DaskExpr, **named_exprs: DaskExpr) -> Self:
df = self._native_frame
new_series = parse_exprs_and_named_exprs(self, *exprs, **named_exprs)
new_series = parse_exprs_and_named_exprs(self)(*exprs, **named_exprs)
df = df.assign(**new_series)
return self._from_native_frame(df)

Expand Down Expand Up @@ -115,7 +115,7 @@ def simple_select(self: Self, *column_names: str) -> Self:
)

def select(self: Self, *exprs: DaskExpr, **named_exprs: DaskExpr) -> Self:
new_series = parse_exprs_and_named_exprs(self, *exprs, **named_exprs)
new_series = parse_exprs_and_named_exprs(self)(*exprs, **named_exprs)

if not new_series:
# return empty dataframe, like Polars does
Expand Down
50 changes: 27 additions & 23 deletions narwhals/_dask/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import TYPE_CHECKING
from typing import Any
from typing import Callable

from narwhals._expression_parsing import evaluate_output_names_and_aliases
from narwhals._pandas_like.utils import select_columns_by_name
Expand Down Expand Up @@ -44,29 +45,32 @@ def maybe_evaluate(df: DaskLazyFrame, obj: Any) -> Any:
return obj


def parse_exprs_and_named_exprs(
df: DaskLazyFrame, *exprs: DaskExpr, **named_exprs: DaskExpr
) -> dict[str, dx.Series]:
native_results: dict[str, dx.Series] = {}
for expr in exprs:
native_series_list = expr._call(df)
return_scalar = getattr(expr, "_returns_scalar", False)
_, aliases = evaluate_output_names_and_aliases(expr, df, [])
if len(aliases) != len(native_series_list): # pragma: no cover
msg = f"Internal error: got aliases {aliases}, but only got {len(native_series_list)} results"
raise AssertionError(msg)
for native_series, alias in zip(native_series_list, aliases):
native_results[alias] = native_series[0] if return_scalar else native_series
for name, value in named_exprs.items():
native_series_list = value._call(df)
if len(native_series_list) != 1: # pragma: no cover
msg = "Named expressions must return a single column"
raise AssertionError(msg)
return_scalar = getattr(value, "_returns_scalar", False)
native_results[name] = (
native_series_list[0][0] if return_scalar else native_series_list[0]
)
return native_results
def parse_exprs_and_named_exprs(df: DaskLazyFrame) -> Callable[..., dict[str, dx.Series]]:
def func(*exprs: DaskExpr, **named_exprs: DaskExpr) -> dict[str, dx.Series]:
native_results: dict[str, dx.Series] = {}
for expr in exprs:
native_series_list = expr._call(df)
return_scalar = getattr(expr, "_returns_scalar", False)
_, aliases = evaluate_output_names_and_aliases(expr, df, [])
if len(aliases) != len(native_series_list): # pragma: no cover
msg = f"Internal error: got aliases {aliases}, but only got {len(native_series_list)} results"
raise AssertionError(msg)
for native_series, alias in zip(native_series_list, aliases):
native_results[alias] = (
native_series[0] if return_scalar else native_series
)
for name, value in named_exprs.items():
native_series_list = value._call(df)
if len(native_series_list) != 1: # pragma: no cover
msg = "Named expressions must return a single column"
raise AssertionError(msg)
return_scalar = getattr(value, "_returns_scalar", False)
native_results[name] = (
native_series_list[0][0] if return_scalar else native_series_list[0]
)
return native_results

return func


def add_row_index(
Expand Down
4 changes: 2 additions & 2 deletions narwhals/_duckdb/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def select(
*exprs: DuckDBExpr,
**named_exprs: DuckDBExpr,
) -> Self:
new_columns_map = parse_exprs_and_named_exprs(self, *exprs, **named_exprs)
new_columns_map = parse_exprs_and_named_exprs(self)(*exprs, **named_exprs)
if not new_columns_map:
# TODO(marco): return empty relation with 0 columns?
return self._from_native_frame(self._native_frame.limit(0))
Expand Down Expand Up @@ -139,7 +139,7 @@ def with_columns(
*exprs: DuckDBExpr,
**named_exprs: DuckDBExpr,
) -> Self:
new_columns_map = parse_exprs_and_named_exprs(self, *exprs, **named_exprs)
new_columns_map = parse_exprs_and_named_exprs(self)(*exprs, **named_exprs)
result = []
for col in self._native_frame.columns:
if col in new_columns_map:
Expand Down
44 changes: 25 additions & 19 deletions narwhals/_duckdb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from functools import lru_cache
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable

import duckdb

Expand Down Expand Up @@ -36,25 +37,30 @@ def maybe_evaluate(df: DuckDBLazyFrame, obj: Any) -> Any:


def parse_exprs_and_named_exprs(
df: DuckDBLazyFrame, *exprs: DuckDBExpr, **named_exprs: DuckDBExpr
) -> dict[str, duckdb.Expression]:
native_results: dict[str, list[duckdb.Expression]] = {}
for expr in exprs:
native_series_list = expr._call(df)
output_names = expr._evaluate_output_names(df)
if expr._alias_output_names is not None:
output_names = expr._alias_output_names(output_names)
if len(output_names) != len(native_series_list): # pragma: no cover
msg = f"Internal error: got output names {output_names}, but only got {len(native_series_list)} results"
raise AssertionError(msg)
native_results.update(zip(output_names, native_series_list))
for col_alias, expr in named_exprs.items():
native_series_list = expr._call(df)
if len(native_series_list) != 1: # pragma: no cover
msg = "Named expressions must return a single column"
raise ValueError(msg)
native_results[col_alias] = native_series_list[0]
return native_results
df: DuckDBLazyFrame,
) -> Callable[..., dict[str, duckdb.Expression]]:
def func(
*exprs: DuckDBExpr, **named_exprs: DuckDBExpr
) -> dict[str, duckdb.Expression]:
native_results: dict[str, list[duckdb.Expression]] = {}
for expr in exprs:
native_series_list = expr._call(df)
output_names = expr._evaluate_output_names(df)
if expr._alias_output_names is not None:
output_names = expr._alias_output_names(output_names)
if len(output_names) != len(native_series_list): # pragma: no cover
msg = f"Internal error: got output names {output_names}, but only got {len(native_series_list)} results"
raise AssertionError(msg)
native_results.update(zip(output_names, native_series_list))
for col_alias, expr in named_exprs.items():
native_series_list = expr._call(df)
if len(native_series_list) != 1: # pragma: no cover
msg = "Named expressions must return a single column"
raise ValueError(msg)
native_results[col_alias] = native_series_list[0]
return native_results

return func


@lru_cache(maxsize=16)
Expand Down
37 changes: 21 additions & 16 deletions narwhals/_expression_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,28 @@ def evaluate_into_expr(

def evaluate_into_exprs(
df: CompliantDataFrame,
*exprs: IntoCompliantExpr[CompliantSeriesT_co],
**named_exprs: IntoCompliantExpr[CompliantSeriesT_co],
) -> Sequence[CompliantSeriesT_co]:
) -> Callable[..., list[CompliantSeriesT_co]]:
"""Evaluate each expr into Series."""
series = [
item
for sublist in (evaluate_into_expr(df, into_expr) for into_expr in exprs)
for item in sublist
]
for name, expr in named_exprs.items():
evaluated_expr = evaluate_into_expr(df, expr)
if len(evaluated_expr) > 1:
msg = "Named expressions must return a single column" # pragma: no cover
raise AssertionError(msg)
to_append = evaluated_expr[0].alias(name)
series.append(to_append)
return series

def func(
*exprs: IntoCompliantExpr[CompliantSeriesT_co],
**named_exprs: IntoCompliantExpr[CompliantSeriesT_co],
) -> list[CompliantSeriesT_co]:
series = [
item
for sublist in (evaluate_into_expr(df, into_expr) for into_expr in exprs)
for item in sublist
]
for name, expr in named_exprs.items():
evaluated_expr = evaluate_into_expr(df, expr)
if len(evaluated_expr) > 1:
msg = "Named expressions must return a single column" # pragma: no cover
raise AssertionError(msg)
to_append = evaluated_expr[0].alias(name)
series.append(to_append)
return series

return func


def maybe_evaluate_expr(
Expand Down
8 changes: 6 additions & 2 deletions narwhals/_pandas_like/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ def select(
*exprs: IntoPandasLikeExpr,
**named_exprs: IntoPandasLikeExpr,
) -> Self:
new_series = evaluate_into_exprs(self, *exprs, **named_exprs)
new_series: list[PandasLikeSeries] = evaluate_into_exprs(self)(
*exprs, **named_exprs
)
if not new_series:
# return empty dataframe, like Polars does
return self._from_native_frame(self._native_frame.__class__())
Expand Down Expand Up @@ -433,7 +435,9 @@ def with_columns(
**named_exprs: IntoPandasLikeExpr,
) -> Self:
index = self._native_frame.index
new_columns = evaluate_into_exprs(self, *exprs, **named_exprs)
new_columns: list[PandasLikeSeries] = evaluate_into_exprs(self)(
*exprs, **named_exprs
)
if not new_columns and len(self) == 0:
return self

Expand Down
4 changes: 2 additions & 2 deletions narwhals/_spark_like/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def select(
*exprs: SparkLikeExpr,
**named_exprs: SparkLikeExpr,
) -> Self:
new_columns = parse_exprs_and_named_exprs(self, *exprs, **named_exprs)
new_columns = parse_exprs_and_named_exprs(self)(*exprs, **named_exprs)

if not new_columns:
# return empty dataframe, like Polars does
Expand Down Expand Up @@ -135,7 +135,7 @@ def with_columns(
*exprs: SparkLikeExpr,
**named_exprs: SparkLikeExpr,
) -> Self:
new_columns_map = parse_exprs_and_named_exprs(self, *exprs, **named_exprs)
new_columns_map = parse_exprs_and_named_exprs(self)(*exprs, **named_exprs)
return self._from_native_frame(self._native_frame.withColumns(new_columns_map))

def drop(self: Self, columns: list[str], strict: bool) -> Self: # noqa: FBT001
Expand Down
42 changes: 23 additions & 19 deletions narwhals/_spark_like/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from functools import lru_cache
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable

from pyspark.sql import functions as F # noqa: N812

Expand Down Expand Up @@ -110,25 +111,28 @@ def narwhals_to_native_dtype(


def parse_exprs_and_named_exprs(
df: SparkLikeLazyFrame, *exprs: SparkLikeExpr, **named_exprs: SparkLikeExpr
) -> dict[str, Column]:
native_results: dict[str, list[Column]] = {}
for expr in exprs:
native_series_list = expr._call(df)
output_names = expr._evaluate_output_names(df)
if expr._alias_output_names is not None:
output_names = expr._alias_output_names(output_names)
if len(output_names) != len(native_series_list): # pragma: no cover
msg = f"Internal error: got output names {output_names}, but only got {len(native_series_list)} results"
raise AssertionError(msg)
native_results.update(zip(output_names, native_series_list))
for col_alias, expr in named_exprs.items():
native_series_list = expr._call(df)
if len(native_series_list) != 1: # pragma: no cover
msg = "Named expressions must return a single column"
raise ValueError(msg)
native_results[col_alias] = native_series_list[0]
return native_results
df: SparkLikeLazyFrame,
) -> Callable[..., dict[str, Column]]:
def func(*exprs: SparkLikeExpr, **named_exprs: SparkLikeExpr) -> dict[str, Column]:
native_results: dict[str, list[Column]] = {}
for expr in exprs:
native_series_list = expr._call(df)
output_names = expr._evaluate_output_names(df)
if expr._alias_output_names is not None:
output_names = expr._alias_output_names(output_names)
if len(output_names) != len(native_series_list): # pragma: no cover
msg = f"Internal error: got output names {output_names}, but only got {len(native_series_list)} results"
raise AssertionError(msg)
native_results.update(zip(output_names, native_series_list))
for col_alias, expr in named_exprs.items():
native_series_list = expr._call(df)
if len(native_series_list) != 1: # pragma: no cover
msg = "Named expressions must return a single column"
raise ValueError(msg)
native_results[col_alias] = native_series_list[0]
return native_results

return func


def maybe_evaluate(df: SparkLikeLazyFrame, obj: Any) -> Any:
Expand Down

0 comments on commit a851b13

Please sign in to comment.