Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
91ccd1e
feat: add streaming utilities, range support, and improve async handl…
kosiew Sep 1, 2025
f78e90b
refactor: improve DataFrame streaming, memory management, and error h…
kosiew Sep 1, 2025
e322521
feat: enhance DataFrame streaming and improve robustness, tests, and …
kosiew Sep 2, 2025
31e8ed1
feat: add testing utilities for DataFrame range generation
kosiew Sep 2, 2025
0130a72
feat: ensure proper resource management in DataFrame streaming
kosiew Sep 2, 2025
03e530c
refactor: replace spawn_stream and spawn_streams with spawn_future fo…
kosiew Sep 2, 2025
4a3f17d
feat: add test for Arrow C stream schema selection in DataFrame
kosiew Sep 2, 2025
f7a2407
test: rename and extend test_arrow_c_stream_to_table to include Recor…
kosiew Sep 2, 2025
b1d18a8
test: add validation for schema mismatch in Arrow C stream
kosiew Sep 2, 2025
eeb2a37
fix Ruff errors
kosiew Sep 2, 2025
748b7e2
Update docs/source/user-guide/dataframe/index.rst
kosiew Sep 7, 2025
5e650aa
test: add batch iteration test for DataFrame
kosiew Sep 8, 2025
ebd2191
refactor: simplify stream capsule creation in PyDataFrame
kosiew Sep 8, 2025
6bae74b
refactor: enhance stream capsule management in PyDataFrame
kosiew Sep 8, 2025
f0cbe06
refactor: enhance DataFrame and RecordBatchStream iteration support
kosiew Sep 8, 2025
295d04a
refactor: improve docstrings for DataFrame and RecordBatchStream methods
kosiew Sep 8, 2025
475c031
refactor: add to_record_batch_stream method and improve iteration sup…
kosiew Sep 8, 2025
06c9fc7
test: update test_iter_batches_dataframe to assert RecordBatch type a…
kosiew Sep 8, 2025
94432b5
fix: update table creation from batches to use to_pyarrow conversion
kosiew Sep 8, 2025
31ed8e7
test: add test_iter_returns_datafusion_recordbatch to verify RecordBa…
kosiew Sep 8, 2025
610aed3
docs: clarify RecordBatch reference and add PyArrow conversion example
kosiew Sep 8, 2025
1ebd3c1
test: improve test_iter_batches_dataframe to validate RecordBatch con…
kosiew Sep 8, 2025
2e4b963
test: enhance test_arrow_c_stream_to_table_and_reader for batch equal…
kosiew Sep 9, 2025
d0ee865
Shelve unrelated changes
kosiew Sep 9, 2025
16a249c
Fix documentation to reference datafusion.RecordBatch instead of pyar…
kosiew Sep 9, 2025
d91ecfa
Remove redundant to_record_batch_stream method from DataFrame class
kosiew Sep 9, 2025
21f286a
Refactor Arrow stream creation in PyDataFrame to use PyCapsule directly
kosiew Sep 10, 2025
831f56f
Add `once_cell` dependency and refactor Arrow array stream capsule na…
kosiew Sep 10, 2025
7b5e461
Add `cstr` dependency and refactor Arrow array stream capsule name ha…
kosiew Sep 10, 2025
d6e8132
Refactor test_iter_returns_datafusion_recordbatch to use RecordBatch …
kosiew Sep 10, 2025
8a250a4
Add streaming execution examples to DataFrame documentation
kosiew Sep 10, 2025
7789322
Rename `to_record_batch_stream` to `execute_stream` and update refere…
kosiew Sep 10, 2025
07a8169
Clean up formatting in Cargo.toml for improved readability
kosiew Sep 10, 2025
9e27cc6
Refactor Cargo.toml for improved formatting and readability
kosiew Sep 10, 2025
9dc3fb2
Merge branch 'main' into oom-1206
kosiew Sep 10, 2025
d3c68cc
Update python/tests/test_io.py
kosiew Sep 13, 2025
33f9024
Update python/datafusion/dataframe.py
kosiew Sep 13, 2025
7553b32
Refactor test_table_from_batches_stream to use pa.table for improved …
kosiew Sep 13, 2025
b6909a5
Remove deprecated to_record_batch_stream method; use execute_stream i…
kosiew Sep 13, 2025
f4e76ea
Add example for concurrent processing of partitioned streams using as…
kosiew Sep 13, 2025
b66b441
Update documentation to reflect changes in execute_stream return type…
kosiew Sep 13, 2025
2794c88
Update PyArrow streaming example to use pa.table for eager collection
kosiew Sep 13, 2025
17c4c2c
Enhance documentation for DataFrame streaming API, clarifying schema …
kosiew Sep 13, 2025
0ff4c0d
Clarify behavior of __arrow_c_stream__ execution, emphasizing increme…
kosiew Sep 13, 2025
f450e1d
Add note on limitations of `arrow::compute::cast` for schema transfor…
kosiew Sep 13, 2025
5dc5cfa
Update python/tests/test_io.py
kosiew Sep 13, 2025
fd08dc4
Rename test function for clarity: update `test_table_from_batches_str…
kosiew Sep 13, 2025
9baa49e
Update python/datafusion/dataframe.py
kosiew Sep 13, 2025
78f6c8a
Add documentation note for Arrow C Data Interface PyCapsule in DataFr…
kosiew Sep 13, 2025
5a53633
Enhance documentation on zero-copy streaming to Arrow-based Python li…
kosiew Sep 13, 2025
ccc8633
Fix formatting of section header for zero-copy streaming in DataFrame…
kosiew Sep 13, 2025
98ac3a1
Refine zero-copy streaming documentation by removing outdated informa…
kosiew Sep 13, 2025
759fb86
Add alternative method for creating RecordBatchReader from Arrow C st…
kosiew Sep 13, 2025
57d4162
Refactor tests to use RecordBatchReader.from_stream instead of deprec…
kosiew Sep 13, 2025
d66d496
Replace deprecated _import_from_c_capsule method with from_stream for…
kosiew Sep 13, 2025
d76a509
Update test description for arrow_c_stream_large_dataset to clarify s…
kosiew Sep 13, 2025
7433234
Add comments to clarify RSS measurement in test_arrow_c_stream_large_…
kosiew Sep 13, 2025
848665e
Fix ruff errors
kosiew Sep 13, 2025
13ebaf9
Update async iterator implementation in DataFrame to ensure compatibi…
kosiew Sep 13, 2025
dae501d
Fix async iterator implementation in DataFrame for compatibility with…
kosiew Sep 15, 2025
c36aa9a
fix typo
kosiew Sep 15, 2025
914f17e
Fix formatting in DataFrame documentation and add example usage for A…
kosiew Sep 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 33 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,34 @@ readme = "README.md"
license = "Apache-2.0"
edition = "2021"
rust-version = "1.78"
include = ["/src", "/datafusion", "/LICENSE.txt", "build.rs", "pyproject.toml", "Cargo.toml", "Cargo.lock"]
include = [
"/src",
"/datafusion",
"/LICENSE.txt",
"build.rs",
"pyproject.toml",
"Cargo.toml",
"Cargo.lock",
]

[features]
default = ["mimalloc"]
protoc = [ "datafusion-substrait/protoc" ]
protoc = ["datafusion-substrait/protoc"]
substrait = ["dep:datafusion-substrait"]

[dependencies]
tokio = { version = "1.45", features = ["macros", "rt", "rt-multi-thread", "sync"] }
pyo3 = { version = "0.24", features = ["extension-module", "abi3", "abi3-py39"] }
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"]}
tokio = { version = "1.45", features = [
"macros",
"rt",
"rt-multi-thread",
"sync",
] }
pyo3 = { version = "0.24", features = [
"extension-module",
"abi3",
"abi3-py39",
] }
pyo3-async-runtimes = { version = "0.24", features = ["tokio-runtime"] }
pyo3-log = "0.12.4"
arrow = { version = "55.1.0", features = ["pyarrow"] }
datafusion = { version = "49.0.2", features = ["avro", "unicode_expressions"] }
Expand All @@ -45,15 +62,23 @@ datafusion-proto = { version = "49.0.2" }
datafusion-ffi = { version = "49.0.2" }
prost = "0.13.1" # keep in line with `datafusion-substrait`
uuid = { version = "1.18", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = [
"local_dynamic_tls",
] }
async-trait = "0.1.89"
futures = "0.3"
object_store = { version = "0.12.3", features = ["aws", "gcp", "azure", "http"] }
cstr = "0.2"
object_store = { version = "0.12.3", features = [
"aws",
"gcp",
"azure",
"http",
] }
url = "2"
log = "0.4.27"

[build-dependencies]
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
prost-types = "0.13.1" # keep in line with `datafusion-substrait`
pyo3-build-config = "0.24"

[lib]
Expand Down
110 changes: 109 additions & 1 deletion docs/source/user-guide/dataframe/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,118 @@ To materialize the results of your DataFrame operations:

# Display results
df.show() # Print tabular format to console

# Count rows
count = df.count()

Zero-copy streaming to Arrow-based Python libraries
---------------------------------------------------

DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
zero-copy, lazy streaming into Arrow-based Python libraries. With the streaming
protocol, batches are produced on demand so you can process arbitrarily large
results without out-of-memory errors.

.. note::

The protocol is implementation-agnostic and works with any Python library
that understands the Arrow C streaming interface (for example, PyArrow
or other Arrow-compatible implementations). The sections below provide a
short PyArrow-specific example and general guidance for other
implementations.

PyArrow
-------

.. code-block:: python

import pyarrow as pa

# Create a PyArrow RecordBatchReader without materializing all batches
reader = pa.RecordBatchReader.from_stream(df)
for batch in reader:
... # process each batch as it is produced

DataFrames are also iterable, yielding :class:`datafusion.RecordBatch`
objects lazily so you can loop over results directly without importing
PyArrow:

.. code-block:: python

for batch in df:
... # each batch is a ``datafusion.RecordBatch``

Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow
table. ``pa.table(df)`` collects the entire DataFrame eagerly into a
PyArrow table::

.. code-block:: python

import pyarrow as pa
table = pa.table(df)

Asynchronous iteration is supported as well, allowing integration with
``asyncio`` event loops::

.. code-block:: python

async for batch in df:
... # process each batch as it is produced

To work with the stream directly, use ``execute_stream()``, which returns a
:class:`~datafusion.RecordBatchStream`::

.. code-block:: python

stream = df.execute_stream()
for batch in stream:
...

Execute as Stream
^^^^^^^^^^^^^^^^^

For finer control over streaming execution, use
:py:meth:`~datafusion.DataFrame.execute_stream` to obtain a
:py:class:`datafusion.RecordBatchStream`:

.. code-block:: python

stream = df.execute_stream()
for batch in stream:
... # process each batch as it is produced

.. tip::

To get a PyArrow reader instead, call
``pa.RecordBatchReader.from_stream(df)``.

When partition boundaries are important,
:py:meth:`~datafusion.DataFrame.execute_stream_partitioned`
returns an iterable of :py:class:`datafusion.RecordBatchStream` objects, one per
partition:

.. code-block:: python

for stream in df.execute_stream_partitioned():
for batch in stream:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Can these streams be polled concurrently? Can you do

streams = list(df.execute_stream_partitioned())

and then concurrently iterate over all the streams, yielding whatever batch comes in first? I suppose that would just do in Python what execute_stream is doing in Rust?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question!
I added a concurrent iteration example in the same document to clarify this.

To process partitions concurrently, first collect the streams into a list
and then poll each one in a separate ``asyncio`` task:

.. code-block:: python

    import asyncio

    async def consume(stream):
        async for batch in stream:
            ...

    streams = list(df.execute_stream_partitioned())
    await asyncio.gather(*(consume(s) for s in streams))

... # each stream yields RecordBatches

To process partitions concurrently, first collect the streams into a list
and then poll each one in a separate ``asyncio`` task:

.. code-block:: python

import asyncio

async def consume(stream):
async for batch in stream:
...

streams = list(df.execute_stream_partitioned())
await asyncio.gather(*(consume(s) for s in streams))

See :doc:`../io/arrow` for additional details on the Arrow interface.

HTML Rendering
--------------

Expand Down
16 changes: 12 additions & 4 deletions docs/source/user-guide/io/arrow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,22 @@ Exporting from DataFusion
DataFusion DataFrames implement ``__arrow_c_stream__`` PyCapsule interface, so any
Python library that accepts these can import a DataFusion DataFrame directly.

.. warning::
It is important to note that this will cause the DataFrame execution to happen, which may be
a time consuming task. That is, you will cause a
:py:func:`datafusion.dataframe.DataFrame.collect` operation call to occur.
.. note::
Invoking ``__arrow_c_stream__`` still triggers execution of the underlying
query, but batches are yielded incrementally rather than materialized all at
once in memory. Consumers can process the stream as it arrives, avoiding the
memory overhead of a full
:py:func:`datafusion.dataframe.DataFrame.collect`.

For an example of this streamed execution and its memory safety, see the
``test_arrow_c_stream_large_dataset`` unit test in
:mod:`python.tests.test_io`.


.. ipython:: python

from datafusion import col, lit

df = df.select((col("a") * lit(1.5)).alias("c"), lit("df").alias("d"))
pa.table(df)

56 changes: 47 additions & 9 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Iterable,
Iterator,
Literal,
Optional,
Union,
Expand All @@ -42,7 +44,7 @@
from datafusion._internal import ParquetWriterOptions as ParquetWriterOptionsInternal
from datafusion.expr import Expr, SortExpr, sort_or_default
from datafusion.plan import ExecutionPlan, LogicalPlan
from datafusion.record_batch import RecordBatchStream
from datafusion.record_batch import RecordBatch, RecordBatchStream

if TYPE_CHECKING:
import pathlib
Expand Down Expand Up @@ -296,6 +298,9 @@ def __init__(
class DataFrame:
"""Two dimensional table representation of data.

DataFrame objects are iterable; iterating over a DataFrame yields
:class:`datafusion.RecordBatch` instances lazily.

See :ref:`user_guide_concepts` in the online documentation for more information.
"""

Expand All @@ -312,7 +317,7 @@ def into_view(self) -> pa.Table:
return self.df.into_view()

def __getitem__(self, key: str | list[str]) -> DataFrame:
"""Return a new :py:class`DataFrame` with the specified column or columns.
"""Return a new :py:class:`DataFrame` with the specified column or columns.

Args:
key: Column name or list of column names to select.
Expand Down Expand Up @@ -1105,21 +1110,54 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))

def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
"""Export an Arrow PyCapsule Stream.
"""Export the DataFrame as an Arrow C Stream.

The DataFrame is executed using DataFusion's streaming APIs and exposed via
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to have a link somewhere in the docstring to https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Arrow's C Stream interface. Record batches are produced incrementally, so the
full result set is never materialized in memory.

This will execute and collect the DataFrame. We will attempt to respect the
requested schema, but only trivial transformations will be applied such as only
returning the fields listed in the requested schema if their data types match
those in the DataFrame.
When ``requested_schema`` is provided, DataFusion applies only simple
projections such as selecting a subset of existing columns or reordering
them. Column renaming, computed expressions, or type coercion are not
supported through this interface.

Args:
requested_schema: Attempt to provide the DataFrame using this schema.
requested_schema: Either a :py:class:`pyarrow.Schema` or an Arrow C
Schema capsule (``PyCapsule``) produced by
``schema._export_to_c_capsule()``. The DataFrame will attempt to
align its output with the fields and order specified by this schema.

Returns:
Arrow PyCapsule object.
Arrow ``PyCapsule`` object representing an ``ArrowArrayStream``.

Examples:
>>> schema = df.schema()
>>> stream = df.__arrow_c_stream__(schema)
>>> capsule = schema._export_to_c_capsule()
>>> stream = df.__arrow_c_stream__(capsule)

Notes:
The Arrow C Data Interface PyCapsule details are documented by Apache
Arrow and can be found at:
https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html
"""
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
# ``execute_stream_partitioned`` under the hood to stream batches while
# preserving the original partition order.
return self.df.__arrow_c_stream__(requested_schema)

def __iter__(self) -> Iterator[RecordBatch]:
"""Return an iterator over this DataFrame's record batches."""
return iter(self.execute_stream())

def __aiter__(self) -> AsyncIterator[RecordBatch]:
"""Return an async iterator over this DataFrame's record batches.

We're using __aiter__ because we support Python < 3.10 where aiter() is not
available.
"""
return self.execute_stream().__aiter__()

def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
"""Apply a function to the current DataFrame which returns another DataFrame.

Expand Down
28 changes: 24 additions & 4 deletions python/datafusion/record_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@ def to_pyarrow(self) -> pa.RecordBatch:
"""Convert to :py:class:`pa.RecordBatch`."""
return self.record_batch.to_pyarrow()

def __arrow_c_array__(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 👍

self, requested_schema: object | None = None
) -> tuple[object, object]:
"""Export the record batch via the Arrow C Data Interface.

This allows zero-copy interchange with libraries that support the
`Arrow PyCapsule interface <https://arrow.apache.org/docs/format/
CDataInterface/PyCapsuleInterface.html>`_.

Args:
requested_schema: Attempt to provide the record batch using this
schema. Only straightforward projections such as column
selection or reordering are applied.

Returns:
Two Arrow PyCapsule objects representing the ``ArrowArray`` and
``ArrowSchema``.
"""
return self.record_batch.__arrow_c_array__(requested_schema)


class RecordBatchStream:
"""This class represents a stream of record batches.
Expand All @@ -63,19 +83,19 @@ def next(self) -> RecordBatch:
return next(self)

async def __anext__(self) -> RecordBatch:
"""Async iterator function."""
"""Return the next :py:class:`RecordBatch` in the stream asynchronously."""
next_batch = await self.rbs.__anext__()
return RecordBatch(next_batch)

def __next__(self) -> RecordBatch:
"""Iterator function."""
"""Return the next :py:class:`RecordBatch` in the stream."""
next_batch = next(self.rbs)
return RecordBatch(next_batch)

def __aiter__(self) -> typing_extensions.Self:
"""Async iterator function."""
"""Return an asynchronous iterator over record batches."""
return self

def __iter__(self) -> typing_extensions.Self:
"""Iterator function."""
"""Return an iterator over record batches."""
return self
Loading
Loading