Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/source/user-guide/dataframe/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,53 @@ DataFusion's DataFrame API offers a wide range of operations:
# Drop columns
df = df.drop("temporary_column")

String Columns and Expressions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the title here is misleading. "String Columns" to me would mean columns that contain string values. I think maybe we should call this something like "Function arguments taking column names" or "Column names as function arguments"

------------------------------

Some ``DataFrame`` methods accept plain strings when an argument refers to an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend "plain strings" -> "column names"

existing column. These include:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a note to see the full function documentation for details on any specific function.


* :py:meth:`~datafusion.DataFrame.select`
* :py:meth:`~datafusion.DataFrame.sort`
* :py:meth:`~datafusion.DataFrame.drop`
* :py:meth:`~datafusion.DataFrame.join` (``on`` argument)
* :py:meth:`~datafusion.DataFrame.aggregate` (grouping columns)

Note that :py:meth:`~datafusion.DataFrame.join_on` expects ``col()``/``column()`` expressions rather than plain strings.

For such methods, you can pass column names directly:

.. code-block:: python

from datafusion import col, functions as f

df.sort('id')
df.aggregate('id', [f.count(col('value'))])

The same operation can also be written with explicit column expressions, using either ``col()`` or ``column()``:

.. code-block:: python

from datafusion import col, column, functions as f

df.sort(col('id'))
df.aggregate(column('id'), [f.count(col('value'))])

Note that ``column()`` is an alias of ``col()``, so you can use either name; the example above shows both in action.

Whenever an argument represents an expression—such as in
:py:meth:`~datafusion.DataFrame.filter` or
:py:meth:`~datafusion.DataFrame.with_column`—use ``col()`` to reference columns
and wrap constant values with ``lit()`` (also available as ``literal()``):

.. code-block:: python

from datafusion import col, lit
df.filter(col('age') > lit(21))

Without ``lit()`` DataFusion would treat ``21`` as a column name rather than a
constant value.
Comment on lines +168 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this statement true? df.filter(col('age') > 21) would treat 21 as a column name? I think that's a change in how the comparison operator works.


Terminal Operations
-------------------

Expand Down
63 changes: 39 additions & 24 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,31 @@
import warnings
from typing import TYPE_CHECKING, Any, Protocol

import pyarrow as pa

try:
from warnings import deprecated # Python 3.13+
except ImportError:
from typing_extensions import deprecated # Python 3.12

import pyarrow as pa

from datafusion.catalog import Catalog, CatalogProvider, Table
from datafusion.dataframe import DataFrame
from datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list
from datafusion.expr import SortKey, sort_list_to_raw_sort_list
from datafusion.record_batch import RecordBatchStream
from datafusion.user_defined import AggregateUDF, ScalarUDF, TableFunction, WindowUDF

from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
from ._internal import SessionConfig as SessionConfigInternal
from ._internal import SessionContext as SessionContextInternal
from ._internal import SQLOptions as SQLOptionsInternal
from ._internal import expr as expr_internal

if TYPE_CHECKING:
import pathlib
from collections.abc import Sequence

import pandas as pd
import polars as pl
import polars as pl # type: ignore[import]

from datafusion.plan import ExecutionPlan, LogicalPlan

Expand Down Expand Up @@ -553,7 +555,7 @@ def register_listing_table(
table_partition_cols: list[tuple[str, str | pa.DataType]] | None = None,
file_extension: str = ".parquet",
schema: pa.Schema | None = None,
file_sort_order: list[list[Expr | SortExpr]] | None = None,
file_sort_order: Sequence[Sequence[SortKey]] | None = None,
) -> None:
"""Register multiple files as a single table.

Expand All @@ -567,23 +569,20 @@ def register_listing_table(
table_partition_cols: Partition columns.
file_extension: File extension of the provided table.
schema: The data source schema.
file_sort_order: Sort order for the file.
file_sort_order: Sort order for the file. Each sort key can be
specified as a column name (``str``), an expression
(``Expr``), or a ``SortExpr``.
"""
if table_partition_cols is None:
table_partition_cols = []
table_partition_cols = self._convert_table_partition_cols(table_partition_cols)
file_sort_order_raw = (
[sort_list_to_raw_sort_list(f) for f in file_sort_order]
if file_sort_order is not None
else None
)
self.ctx.register_listing_table(
name,
str(path),
table_partition_cols,
file_extension,
schema,
file_sort_order_raw,
self._convert_file_sort_order(file_sort_order),
)

def sql(self, query: str, options: SQLOptions | None = None) -> DataFrame:
Expand Down Expand Up @@ -808,7 +807,7 @@ def register_parquet(
file_extension: str = ".parquet",
skip_metadata: bool = True,
schema: pa.Schema | None = None,
file_sort_order: list[list[SortExpr]] | None = None,
file_sort_order: Sequence[Sequence[SortKey]] | None = None,
) -> None:
"""Register a Parquet file as a table.

Expand All @@ -827,7 +826,9 @@ def register_parquet(
that may be in the file schema. This can help avoid schema
conflicts due to metadata.
schema: The data source schema.
file_sort_order: Sort order for the file.
file_sort_order: Sort order for the file. Each sort key can be
specified as a column name (``str``), an expression
(``Expr``), or a ``SortExpr``.
"""
if table_partition_cols is None:
table_partition_cols = []
Expand All @@ -840,9 +841,7 @@ def register_parquet(
file_extension,
skip_metadata,
schema,
[sort_list_to_raw_sort_list(exprs) for exprs in file_sort_order]
if file_sort_order is not None
else None,
self._convert_file_sort_order(file_sort_order),
)

def register_csv(
Expand Down Expand Up @@ -1099,7 +1098,7 @@ def read_parquet(
file_extension: str = ".parquet",
skip_metadata: bool = True,
schema: pa.Schema | None = None,
file_sort_order: list[list[Expr | SortExpr]] | None = None,
file_sort_order: Sequence[Sequence[SortKey]] | None = None,
) -> DataFrame:
"""Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.

Expand All @@ -1116,19 +1115,17 @@ def read_parquet(
schema: An optional schema representing the parquet files. If None,
the parquet reader will try to infer it based on data in the
file.
file_sort_order: Sort order for the file.
file_sort_order: Sort order for the file. Each sort key can be
specified as a column name (``str``), an expression
(``Expr``), or a ``SortExpr``.

Returns:
DataFrame representation of the read Parquet files
"""
if table_partition_cols is None:
table_partition_cols = []
table_partition_cols = self._convert_table_partition_cols(table_partition_cols)
file_sort_order = (
[sort_list_to_raw_sort_list(f) for f in file_sort_order]
if file_sort_order is not None
else None
)
file_sort_order = self._convert_file_sort_order(file_sort_order)
return DataFrame(
self.ctx.read_parquet(
str(path),
Expand Down Expand Up @@ -1179,6 +1176,24 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
"""Execute the ``plan`` and return the results."""
return RecordBatchStream(self.ctx.execute(plan._raw_plan, partitions))

@staticmethod
def _convert_file_sort_order(
file_sort_order: Sequence[Sequence[SortKey]] | None,
) -> list[list[expr_internal.SortExpr]] | None:
"""Convert nested ``SortKey`` sequences into raw sort expressions.

Each ``SortKey`` can be a column name string, an ``Expr``, or a
``SortExpr`` and will be converted using
:func:`datafusion.expr.sort_list_to_raw_sort_list`.
"""
# Convert each ``SortKey`` in the provided sort order to the low-level
# representation expected by the Rust bindings.
return (
[sort_list_to_raw_sort_list(f) for f in file_sort_order]
if file_sort_order is not None
else None
)

@staticmethod
def _convert_table_partition_cols(
table_partition_cols: list[tuple[str, str | pa.DataType]],
Expand Down
Loading
Loading