Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add all, any and null_count Spark Expressions #1724

Merged
merged 19 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions narwhals/_spark_like/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,22 @@ def _alias(df: SparkLikeLazyFrame) -> list[Column]:
kwargs={**self._kwargs, "name": name},
)

def all(self) -> Self:
def _all(_input: Column) -> Column:
from pyspark.sql import functions as F # noqa: N812

return F.bool_and(_input)

return self._from_call(_all, "all", returns_scalar=True)

def any(self) -> Self:
def _any(_input: Column) -> Column:
from pyspark.sql import functions as F # noqa: N812

return F.bool_or(_input)

return self._from_call(_any, "any", returns_scalar=True)

def count(self) -> Self:
def _count(_input: Column) -> Column:
from pyspark.sql import functions as F # noqa: N812
Expand Down Expand Up @@ -233,6 +249,14 @@ def _min(_input: Column) -> Column:

return self._from_call(_min, "min", returns_scalar=True)

def null_count(self) -> Self:
def _null_count(_input: Column) -> Column:
from pyspark.sql import functions as F # noqa: N812

return F.count_if(F.isnull(_input))

return self._from_call(_null_count, "null_count", returns_scalar=True)

def sum(self) -> Self:
def _sum(_input: Column) -> Column:
from pyspark.sql import functions as F # noqa: N812
Expand Down
20 changes: 20 additions & 0 deletions narwhals/_spark_like/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,26 @@ def func(df: SparkLikeLazyFrame) -> list[Column]:
kwargs={"exprs": exprs},
)

def any_horizontal(self, *exprs: IntoSparkLikeExpr) -> SparkLikeExpr:
parsed_exprs = parse_into_exprs(*exprs, namespace=self)

def func(df: SparkLikeLazyFrame) -> list[Column]:
cols = [c for _expr in parsed_exprs for c in _expr(df)]
col_name = get_column_name(df, cols[0])
return [reduce(operator.or_, cols).alias(col_name)]

return SparkLikeExpr( # type: ignore[abstract]
call=func,
depth=max(x._depth for x in parsed_exprs) + 1,
function_name="any_horizontal",
root_names=combine_root_names(parsed_exprs),
output_names=reduce_output_names(parsed_exprs),
returns_scalar=False,
backend_version=self._backend_version,
version=self._version,
kwargs={"exprs": exprs},
)

def col(self, *column_names: str) -> SparkLikeExpr:
return SparkLikeExpr.from_column_names(
*column_names, backend_version=self._backend_version, version=self._version
Expand Down
90 changes: 90 additions & 0 deletions tests/spark_like_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import narwhals.stable.v1 as nw
from narwhals.exceptions import ColumnNotFoundError
from tests.utils import ConstructorEager
from tests.utils import assert_equal_data

if TYPE_CHECKING:
Expand Down Expand Up @@ -285,6 +286,35 @@ def test_allh_all(pyspark_constructor: Constructor) -> None:
assert_equal_data(result, expected)


# copied from tests/expr_and_series/any_horizontal_test.py
@pytest.mark.parametrize("expr1", ["a", nw.col("a")])
@pytest.mark.parametrize("expr2", ["b", nw.col("b")])
def test_anyh(constructor: Constructor, expr1: Any, expr2: Any) -> None:
data = {
"a": [False, False, True],
"b": [False, True, True],
}
df = nw.from_native(constructor(data))
result = df.select(any=nw.any_horizontal(expr1, expr2))

expected = {"any": [False, True, True]}
assert_equal_data(result, expected)


def test_anyh_all(constructor: Constructor) -> None:
data = {
"a": [False, False, True],
"b": [False, True, True],
}
df = nw.from_native(constructor(data))
result = df.select(any=nw.any_horizontal(nw.all()))
expected = {"any": [False, True, True]}
assert_equal_data(result, expected)
result = df.select(nw.any_horizontal(nw.all()))
expected = {"a": [False, True, True]}
assert_equal_data(result, expected)


# copied from tests/expr_and_series/sum_horizontal_test.py
@pytest.mark.parametrize("col_expr", [nw.col("a"), "a"])
def test_sumh(pyspark_constructor: Constructor, col_expr: Any) -> None:
Expand Down Expand Up @@ -324,6 +354,44 @@ def test_sumh_all(pyspark_constructor: Constructor) -> None:
assert_equal_data(result, expected)


# copied from tests/expr_and_series/any_all_test.py
def test_any_all(pyspark_constructor: Constructor) -> None:
df = nw.from_native(
pyspark_constructor(
{
"a": [True, False, True],
"b": [True, True, True],
"c": [False, False, False],
}
)
)
result = df.select(nw.col("a", "b", "c").all())
expected = {"a": [False], "b": [True], "c": [False]}
assert_equal_data(result, expected)
result = df.select(nw.all().any())
expected = {"a": [True], "b": [True], "c": [False]}
assert_equal_data(result, expected)


def test_any_all_series(constructor_eager: ConstructorEager) -> None:
df = nw.from_native(
constructor_eager(
{
"a": [True, False, True],
"b": [True, True, True],
"c": [False, False, False],
}
),
eager_only=True,
)
result = {"a": [df["a"].all()], "b": [df["b"].all()], "c": [df["c"].all()]}
expected = {"a": [False], "b": [True], "c": [False]}
assert_equal_data(result, expected)
result = {"a": [df["a"].any()], "b": [df["b"].any()], "c": [df["c"].any()]}
expected = {"a": [True], "b": [True], "c": [False]}
assert_equal_data(result, expected)


# copied from tests/expr_and_series/count_test.py
def test_count(pyspark_constructor: Constructor) -> None:
data = {"a": [1, 2, 3], "b": [4, None, 6], "z": [7.0, None, None]}
Expand Down Expand Up @@ -374,6 +442,28 @@ def test_expr_min_expr(pyspark_constructor: Constructor) -> None:
assert_equal_data(result, expected)


# copied from tests/expr_and_series/null_count_test.py
def test_null_count_expr(constructor: Constructor) -> None:
data = {
"a": [1.0, None, None, 3.0],
"b": [1.0, None, 4, 5.0],
}
df = nw.from_native(constructor(data))
result = df.select(nw.all().null_count())
expected = {
"a": [2],
"b": [1],
}
assert_equal_data(result, expected)


def test_null_count_series(constructor_eager: ConstructorEager) -> None:
data = [1, 2, None]
series = nw.from_native(constructor_eager({"a": data}), eager_only=True)["a"]
result = series.null_count()
assert result == 1


# copied from tests/expr_and_series/min_test.py
@pytest.mark.parametrize("expr", [nw.col("a", "b", "z").sum(), nw.sum("a", "b", "z")])
def test_expr_sum_expr(pyspark_constructor: Constructor, expr: nw.Expr) -> None:
Expand Down
Loading