Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.6.4 (2025-12-18)
- fix col.map() Null-type handling and string operations
- fix print display of short tables

## 0.6.3 (2025-12-17)
- implement `tbl1 >> union(tbl2)` and `union(tbl1,tbl2)`
Expand Down
1 change: 1 addition & 0 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pixi-pycharm = ">=0.0.6"
pytest = ">=7.1.2"
pytest-xdist = ">=2.5.0"
structlog = ">=25.4.0,<26"
filelock = ">=3.20.0"

[feature.release.dependencies]
hatch = ">=1.12.0"
Expand Down
6 changes: 4 additions & 2 deletions src/pydiverse/transform/_internal/pipe/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ def __repr__(self) -> str:
query = "\n\nQuery:\n" + query
except Exception:
query = ""
return res + str(df).split("\n", 1)[1]
return res + str(df).split("\n", 1)[1] + query

def __dir__(self) -> list[str]:
return [name for name in self._cache.name_to_uuid.keys()]
Expand Down Expand Up @@ -314,7 +314,9 @@ def get_head_tail(tbl: Table) -> tuple[pl.DataFrame, int]:

height = tbl >> summarize(num_rows=pdt.count()) >> export(pdt.Scalar)
tbl_rows = int(pl.Config.state().get("POLARS_FMT_MAX_ROWS") or 10)
head_tail_len = tbl_rows // 2 + 1
if height <= tbl_rows:
return tbl >> export(pdt.Polars), height
head_tail_len = tbl_rows // 2

# Only export the first and last few rows.
head: pl.DataFrame = tbl >> slice_head(head_tail_len) >> export(pdt.Polars)
Expand Down
46 changes: 7 additions & 39 deletions src/pydiverse/transform/_internal/pipe/verbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,42 +1360,6 @@ def _union_impl(
*,
distinct: bool = False,
) -> Table:
"""
Unions two tables by stacking rows vertically.

The left table in the union comes through the pipe `>>` operator from the
left.

:param right:
The right table to union with.

:param distinct:
If ``True``, performs UNION (removes duplicates). If ``False``,
performs UNION ALL (keeps duplicates).

Note
----
Both tables must have the same number of columns with compatible types.
Column names must match between the two tables.

Examples
--------
>>> t1 = pdt.Table({"a": [1, 2, 3], "b": [4, 5, 6]}, name="t1")
>>> t2 = pdt.Table({"a": [7, 8], "b": [9, 10]}, name="t2")
>>> t1 >> union(t2) >> show()
shape: (5, 2)
┌─────┬─────┐
│ a ┆ b │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞═════╪═════╡
│ 1 ┆ 4 │
│ 2 ┆ 5 │
│ 3 ┆ 6 │
│ 7 ┆ 9 │
│ 8 ┆ 10 │
└─────┴─────┘
"""
errors.check_arg_type(Table, "union", "right", right)

if left._cache.backend != right._cache.backend:
Expand Down Expand Up @@ -1457,11 +1421,15 @@ def union(
"""
Unions two tables by stacking rows vertically.

The left table in the union comes through the pipe `>>` operator from the
The left table in the union may come through the pipe `>>` operator from the
left.

:param left_or_right:
In case of tbl1 >> union(tbl2) this is the right table (tbl2).
In case of union(tbl1, tbl2) this is the left table (tbl1).

:param right:
The right table to union with.
The right table to union with in case of union(tbl1, tbl2).

:param distinct:
If ``True``, performs UNION (removes duplicates). If ``False``,
Expand Down Expand Up @@ -1490,7 +1458,7 @@ def union(
│ 8 ┆ 10 │
└─────┴─────┘

You can also call it directly:
You can also call it in this form:
>>> union(t1, t2) >> show()
"""
# If called with two arguments directly (union(tbl1, tbl2)), return Table directly
Expand Down
12 changes: 10 additions & 2 deletions tests/test_backend_equivalence/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from tests.fixtures.backend import BACKEND_MARKS, flatten
from tests.util.backend import BACKEND_TABLES
from tests.util.filelock import lock

dataframes = {
"df1": pl.DataFrame(
Expand Down Expand Up @@ -310,16 +311,23 @@ def pytest_generate_tests(metafunc: pytest.Metafunc):
return metafunc.parametrize(param_names, params, indirect=True)


@pytest.fixture(scope="session")
def locked():
with lock("/tmp/transform/lock"):
yield True


def generate_df_fixtures():
def gen_fixture(table_name):
@pytest.fixture(scope="function")
def table_fixture(request):
def table_fixture(request, locked):
_ = locked
df = dataframes[table_name]
name = table_name

table_x = BACKEND_TABLES[request.param[0]](df, name)
table_y = BACKEND_TABLES[request.param[1]](df, name)
return table_x, table_y
yield table_x, table_y

return table_fixture

Expand Down
2 changes: 1 addition & 1 deletion tests/test_backend_equivalence/test_print.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

def test_preview_print(df3, df4, df_strings):
def data_part(p: str):
return p.split("\n", 1)[1]
return p.split("\n", 1)[1].split("\n\n", 1)[0]

assert data_part(str(df3[0])) == data_part(str(df3[1]))
assert data_part(str(df_strings[0])) == data_part(str(df_strings[1]))
Expand Down
17 changes: 10 additions & 7 deletions tests/test_backend_equivalence/test_union.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pydiverse.transform.extended import *
from tests.fixtures.backend import skip_backends
from tests.util import assert_result_equal
from tests.util.filelock import lock


def test_union_basic(df3, df4):
Expand Down Expand Up @@ -189,13 +190,15 @@ def test_union_error_different_backends():

# Create tables with different backends
polars_tbl = pdt.Table(pl.DataFrame({"a": [1, 2]}))
engine = sqa.create_engine("sqlite:///:memory:")
pl.DataFrame({"a": [1, 2]}).write_database("test", engine, if_table_exists="replace")
sql_tbl = pdt.Table("test", pdt.SqlAlchemy(engine))

# Should raise TypeError
with pytest.raises(TypeError, match="cannot union two tables with different backends"):
polars_tbl >> union(sql_tbl)
file = "/tmp/transform/test2.sqlite"
with lock(file):
engine = sqa.create_engine("sqlite:///" + file)
pl.DataFrame({"a": [1, 2]}).write_database("test", engine, if_table_exists="replace")
sql_tbl = pdt.Table("test", pdt.SqlAlchemy(engine))

# Should raise TypeError
with pytest.raises(TypeError, match="cannot union two tables with different backends"):
polars_tbl >> union(sql_tbl)


def test_union_error_grouped_table(df3, df4):
Expand Down
33 changes: 18 additions & 15 deletions tests/test_sql_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pydiverse.transform as pdt
from pydiverse.transform.extended import *
from tests.util import assert_equal
from tests.util.filelock import lock

df1 = pl.DataFrame(
{
Expand Down Expand Up @@ -60,21 +61,23 @@

@pytest.fixture
def engine():
engine = sqa.create_engine("sqlite:///:memory:")
# engine = sqa.create_engine("postgresql://sqa:[email protected]:6543")
# engine = sqa.create_engine(
# "mssql+pyodbc://sqa:[email protected]:1433"
# "/master?driver=ODBC+Driver+18+for+SQL+Server&encrypt=no"
# )
# engine = sqa.create_engine("db2+ibm_db://db2inst1:password@localhost:50000/testdb")

df1.write_database("df1", engine, if_table_exists="replace")
df2.write_database("df2", engine, if_table_exists="replace")
df3.write_database("df3", engine, if_table_exists="replace")
df4.write_database("df4", engine, if_table_exists="replace")
df_left.write_database("df_left", engine, if_table_exists="replace")
df_right.write_database("df_right", engine, if_table_exists="replace")
return engine
file = "/tmp/transform/test_pdt.sqlite"
with lock(file):
engine = sqa.create_engine("sqlite:///" + file)
# engine = sqa.create_engine("postgresql://sqa:[email protected]:6543")
# engine = sqa.create_engine(
# "mssql+pyodbc://sqa:[email protected]:1433"
# "/master?driver=ODBC+Driver+18+for+SQL+Server&encrypt=no"
# )
# engine = sqa.create_engine("db2+ibm_db://db2inst1:password@localhost:50000/testdb")

df1.write_database("df1", engine, if_table_exists="replace")
df2.write_database("df2", engine, if_table_exists="replace")
df3.write_database("df3", engine, if_table_exists="replace")
df4.write_database("df4", engine, if_table_exists="replace")
df_left.write_database("df_left", engine, if_table_exists="replace")
df_right.write_database("df_right", engine, if_table_exists="replace")
yield engine


@pytest.fixture
Expand Down
8 changes: 5 additions & 3 deletions tests/util/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ def sql_table(

@_cached_table
def sqlite_table(df: pl.DataFrame, name: str):
return sql_table(df, name, "sqlite:///:memory:")
return sql_table(df, name, "sqlite:////tmp/transform/test.sqlite")


@_cached_table
def duckdb_table(df: pl.DataFrame, name: str):
return sql_table(df, name, "duckdb:///:memory:")
return sql_table(df, name, "duckdb:////tmp/transform/test.duckdb")


@_cached_table
Expand All @@ -96,7 +96,9 @@ def duckdb_parquet_table(df: pl.DataFrame, name: str):
if "duckdb_parquet" in _sql_engine_cache:
engine = _sql_engine_cache["duckdb_parquet"]
else:
engine = sqa.create_engine("duckdb:///:memory:")
file = "/tmp/transform/test_parquet.duckdb"
Path(file).parent.mkdir(parents=True, exist_ok=True)
engine = sqa.create_engine("duckdb:///" + file)
_sql_engine_cache["duckdb_parquet"] = engine
path = Path(tempfile.gettempdir()) / "pdtransform" / "tests"
file = path / f"{name}.parquet"
Expand Down
16 changes: 16 additions & 0 deletions tests/util/filelock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright (c) QuantCo and pydiverse contributors 2025-2025
# SPDX-License-Identifier: BSD-3-Clause

from contextlib import contextmanager
from pathlib import Path


@contextmanager
def lock(lock_path: Path | str):
if isinstance(lock_path, str):
lock_path = Path(lock_path)
import filelock

lock_path.parent.mkdir(parents=True, exist_ok=True)
with filelock.FileLock(lock_path.with_suffix(".lock")):
yield
Loading