Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
91ccd1e
feat: add streaming utilities, range support, and improve async handl…
kosiew Sep 1, 2025
f78e90b
refactor: improve DataFrame streaming, memory management, and error h…
kosiew Sep 1, 2025
e322521
feat: enhance DataFrame streaming and improve robustness, tests, and …
kosiew Sep 2, 2025
31e8ed1
feat: add testing utilities for DataFrame range generation
kosiew Sep 2, 2025
0130a72
feat: ensure proper resource management in DataFrame streaming
kosiew Sep 2, 2025
03e530c
refactor: replace spawn_stream and spawn_streams with spawn_future fo…
kosiew Sep 2, 2025
4a3f17d
feat: add test for Arrow C stream schema selection in DataFrame
kosiew Sep 2, 2025
f7a2407
test: rename and extend test_arrow_c_stream_to_table to include Recor…
kosiew Sep 2, 2025
b1d18a8
test: add validation for schema mismatch in Arrow C stream
kosiew Sep 2, 2025
eeb2a37
fix Ruff errors
kosiew Sep 2, 2025
748b7e2
Update docs/source/user-guide/dataframe/index.rst
kosiew Sep 7, 2025
5e650aa
test: add batch iteration test for DataFrame
kosiew Sep 8, 2025
ebd2191
refactor: simplify stream capsule creation in PyDataFrame
kosiew Sep 8, 2025
6bae74b
refactor: enhance stream capsule management in PyDataFrame
kosiew Sep 8, 2025
f0cbe06
refactor: enhance DataFrame and RecordBatchStream iteration support
kosiew Sep 8, 2025
295d04a
refactor: improve docstrings for DataFrame and RecordBatchStream methods
kosiew Sep 8, 2025
475c031
refactor: add to_record_batch_stream method and improve iteration sup…
kosiew Sep 8, 2025
06c9fc7
test: update test_iter_batches_dataframe to assert RecordBatch type a…
kosiew Sep 8, 2025
94432b5
fix: update table creation from batches to use to_pyarrow conversion
kosiew Sep 8, 2025
31ed8e7
test: add test_iter_returns_datafusion_recordbatch to verify RecordBa…
kosiew Sep 8, 2025
610aed3
docs: clarify RecordBatch reference and add PyArrow conversion example
kosiew Sep 8, 2025
1ebd3c1
test: improve test_iter_batches_dataframe to validate RecordBatch con…
kosiew Sep 8, 2025
2e4b963
test: enhance test_arrow_c_stream_to_table_and_reader for batch equal…
kosiew Sep 9, 2025
d0ee865
Shelve unrelated changes
kosiew Sep 9, 2025
16a249c
Fix documentation to reference datafusion.RecordBatch instead of pyar…
kosiew Sep 9, 2025
d91ecfa
Remove redundant to_record_batch_stream method from DataFrame class
kosiew Sep 9, 2025
21f286a
Refactor Arrow stream creation in PyDataFrame to use PyCapsule directly
kosiew Sep 10, 2025
831f56f
Add `once_cell` dependency and refactor Arrow array stream capsule na…
kosiew Sep 10, 2025
7b5e461
Add `cstr` dependency and refactor Arrow array stream capsule name ha…
kosiew Sep 10, 2025
d6e8132
Refactor test_iter_returns_datafusion_recordbatch to use RecordBatch …
kosiew Sep 10, 2025
8a250a4
Add streaming execution examples to DataFrame documentation
kosiew Sep 10, 2025
7789322
Rename `to_record_batch_stream` to `execute_stream` and update refere…
kosiew Sep 10, 2025
07a8169
Clean up formatting in Cargo.toml for improved readability
kosiew Sep 10, 2025
9e27cc6
Refactor Cargo.toml for improved formatting and readability
kosiew Sep 10, 2025
9dc3fb2
Merge branch 'main' into oom-1206
kosiew Sep 10, 2025
d3c68cc
Update python/tests/test_io.py
kosiew Sep 13, 2025
33f9024
Update python/datafusion/dataframe.py
kosiew Sep 13, 2025
7553b32
Refactor test_table_from_batches_stream to use pa.table for improved …
kosiew Sep 13, 2025
b6909a5
Remove deprecated to_record_batch_stream method; use execute_stream i…
kosiew Sep 13, 2025
f4e76ea
Add example for concurrent processing of partitioned streams using as…
kosiew Sep 13, 2025
b66b441
Update documentation to reflect changes in execute_stream return type…
kosiew Sep 13, 2025
2794c88
Update PyArrow streaming example to use pa.table for eager collection
kosiew Sep 13, 2025
17c4c2c
Enhance documentation for DataFrame streaming API, clarifying schema …
kosiew Sep 13, 2025
0ff4c0d
Clarify behavior of __arrow_c_stream__ execution, emphasizing increme…
kosiew Sep 13, 2025
f450e1d
Add note on limitations of `arrow::compute::cast` for schema transfor…
kosiew Sep 13, 2025
5dc5cfa
Update python/tests/test_io.py
kosiew Sep 13, 2025
fd08dc4
Rename test function for clarity: update `test_table_from_batches_str…
kosiew Sep 13, 2025
9baa49e
Update python/datafusion/dataframe.py
kosiew Sep 13, 2025
78f6c8a
Add documentation note for Arrow C Data Interface PyCapsule in DataFr…
kosiew Sep 13, 2025
5a53633
Enhance documentation on zero-copy streaming to Arrow-based Python li…
kosiew Sep 13, 2025
ccc8633
Fix formatting of section header for zero-copy streaming in DataFrame…
kosiew Sep 13, 2025
98ac3a1
Refine zero-copy streaming documentation by removing outdated informa…
kosiew Sep 13, 2025
759fb86
Add alternative method for creating RecordBatchReader from Arrow C st…
kosiew Sep 13, 2025
57d4162
Refactor tests to use RecordBatchReader.from_stream instead of deprec…
kosiew Sep 13, 2025
d66d496
Replace deprecated _import_from_c_capsule method with from_stream for…
kosiew Sep 13, 2025
d76a509
Update test description for arrow_c_stream_large_dataset to clarify s…
kosiew Sep 13, 2025
7433234
Add comments to clarify RSS measurement in test_arrow_c_stream_large_…
kosiew Sep 13, 2025
848665e
Fix ruff errors
kosiew Sep 13, 2025
13ebaf9
Update async iterator implementation in DataFrame to ensure compatibi…
kosiew Sep 13, 2025
dae501d
Fix async iterator implementation in DataFrame for compatibility with…
kosiew Sep 15, 2025
c36aa9a
fix typo
kosiew Sep 15, 2025
914f17e
Fix formatting in DataFrame documentation and add example usage for A…
kosiew Sep 15, 2025
d120b2e
Merge branch 'main' into oom-1206
kosiew Sep 20, 2025
e7a7ec9
fix: correct formatting in documentation for RecordBatchStream
kosiew Sep 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion docs/source/user-guide/dataframe/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,39 @@ To materialize the results of your DataFrame operations:

# Display results
df.show() # Print tabular format to console

# Count rows
count = df.count()

PyArrow Streaming
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heading here is specific to PyArrow, but I think it would be good to provide a distinction here. Maybe "Zero-copy Streaming to other Arrow implementations"? Or something like that?

Then we can have a sub-section dedicated to pyarrow, but also explain that it works with any arrow-based python impl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Implemented your suggestion.

-----------------

DataFusion DataFrames implement the ``__arrow_c_stream__`` protocol, enabling
zero-copy streaming into libraries like `PyArrow <https://arrow.apache.org/>`_.
Earlier versions eagerly converted the entire DataFrame when exporting to
PyArrow, which could exhaust memory on large datasets. With streaming, batches
are produced lazily so you can process arbitrarily large results without
out-of-memory errors.

.. code-block:: python

import pyarrow as pa

# Create a PyArrow RecordBatchReader without materializing all batches
reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
for batch in reader:
... # process each batch as it is produced

DataFrames are also iterable, yielding :class:`pyarrow.RecordBatch` objects
lazily so you can loop over results directly:

.. code-block:: python

for batch in df:
... # process each batch as it is produced
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the user can iterate over the stream accessed by the target library, I don't think we should define our own custom integration here, and if we do, then the yielded object should not be a pyarrow RecordBatch, but rather an opaque, minimal Python class that just exposes __arrow_c_array__ so that the user can choose what Arrow library they want to use to work with the batch.

Copy link
Contributor

@kylebarron kylebarron Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have our own RecordBatch class: https://datafusion.apache.org/python/autoapi/datafusion/record_batch/index.html#datafusion.record_batch.RecordBatch

Also, we should ensure that the dunder methods are rendered in the docs. It doesn't look like they are currently. (Or maybe the dunder methods on that RecordBatch aren't documented?)


See :doc:`../io/arrow` for additional details on the Arrow interface.

HTML Rendering
--------------

Expand Down
37 changes: 31 additions & 6 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
TYPE_CHECKING,
Any,
Iterable,
Iterator,
Literal,
Optional,
Union,
Expand Down Expand Up @@ -289,6 +290,9 @@ def __init__(
class DataFrame:
"""Two dimensional table representation of data.

DataFrame objects are iterable; iterating over a DataFrame yields
:class:`pyarrow.RecordBatch` instances lazily.

See :ref:`user_guide_concepts` in the online documentation for more information.
"""

Expand Down Expand Up @@ -1098,21 +1102,42 @@ def unnest_columns(self, *columns: str, preserve_nulls: bool = True) -> DataFram
return DataFrame(self.df.unnest_columns(columns, preserve_nulls=preserve_nulls))

def __arrow_c_stream__(self, requested_schema: object | None = None) -> object:
"""Export an Arrow PyCapsule Stream.
"""Export the DataFrame as an Arrow C Stream.

This will execute and collect the DataFrame. We will attempt to respect the
requested schema, but only trivial transformations will be applied such as only
returning the fields listed in the requested schema if their data types match
those in the DataFrame.
The DataFrame is executed using DataFusion's streaming APIs and exposed via
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to have a link somewhere in the docstring to https://arrow.apache.org/docs/format/CDataInterface/PyCapsuleInterface.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Arrow's C Stream interface. Record batches are produced incrementally, so the
full result set is never materialized in memory. When ``requested_schema`` is
provided, only straightforward projections such as column selection or
reordering are applied.

Args:
requested_schema: Attempt to provide the DataFrame using this schema.

Returns:
Arrow PyCapsule object.
Arrow PyCapsule object representing an ``ArrowArrayStream``.
"""
# ``DataFrame.__arrow_c_stream__`` in the Rust extension leverages
# ``execute_stream_partitioned`` under the hood to stream batches while
# preserving the original partition order.
return self.df.__arrow_c_stream__(requested_schema)

def __iter__(self) -> Iterator[pa.RecordBatch]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really think there's a good rationale for having this method, especially as it reuses the exact same mechanism as the PyCapsule Interface. If anything, we might want to have an __aiter__ method that has a custom async connection to the DataFusion context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RecordBatchStream already has __iter__ and __aiter__ methods https://datafusion.apache.org/python/autoapi/datafusion/record_batch/index.html#datafusion.record_batch.RecordBatchStream

Can we just have a method that converts a DataFrame into a RecordBatchStream? Then an __iter__ on DataFrame would just convert to a RecordBatchStream under the hood.

"""Yield record batches from the DataFrame without materializing results.

This implementation streams record batches via the Arrow C Stream
interface, allowing callers such as :func:`pyarrow.Table.from_batches` to
consume results lazily. The DataFrame is executed using DataFusion's
partitioned streaming APIs so ``collect`` is never invoked and batch
order across partitions is preserved.
"""
from contextlib import closing

import pyarrow as pa

reader = pa.RecordBatchReader._import_from_c_capsule(self.__arrow_c_stream__())
with closing(reader):
yield from reader

def transform(self, func: Callable[..., DataFrame], *args: Any) -> DataFrame:
"""Apply a function to the current DataFrame which returns another DataFrame.

Expand Down
11 changes: 10 additions & 1 deletion python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import pyarrow as pa
import pytest
from datafusion import SessionContext
from datafusion import DataFrame, SessionContext
from pyarrow.csv import write_csv


Expand Down Expand Up @@ -49,3 +49,12 @@ def database(ctx, tmp_path):
delimiter=",",
schema_infer_max_records=10,
)


@pytest.fixture
def fail_collect(monkeypatch):
def _fail_collect(self, *args, **kwargs): # pragma: no cover - failure path
msg = "collect should not be called"
raise AssertionError(msg)

monkeypatch.setattr(DataFrame, "collect", _fail_collect)
220 changes: 220 additions & 0 deletions python/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from datafusion.expr import Window
from pyarrow.csv import write_csv

pa_cffi = pytest.importorskip("pyarrow.cffi")

MB = 1024 * 1024


Expand Down Expand Up @@ -1582,6 +1584,120 @@ def test_empty_to_arrow_table(df):
assert set(pyarrow_table.column_names) == {"a", "b", "c"}


def test_iter_batches_dataframe(fail_collect):
ctx = SessionContext()

batch1 = pa.record_batch([pa.array([1])], names=["a"])
batch2 = pa.record_batch([pa.array([2])], names=["a"])
df = ctx.create_dataframe([[batch1], [batch2]])

expected = [batch1, batch2]
for got, exp in zip(df, expected):
assert got.equals(exp)


def test_arrow_c_stream_to_table_and_reader(fail_collect):
ctx = SessionContext()

# Create a DataFrame with two separate record batches
batch1 = pa.record_batch([pa.array([1])], names=["a"])
batch2 = pa.record_batch([pa.array([2])], names=["a"])
df = ctx.create_dataframe([[batch1], [batch2]])

table = pa.Table.from_batches(df)
batches = table.to_batches()

assert len(batches) == 2
assert batches[0].equals(batch1)
assert batches[1].equals(batch2)
assert table.schema == df.schema()
assert table.column("a").num_chunks == 2

reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())
assert isinstance(reader, pa.RecordBatchReader)
reader_table = pa.Table.from_batches(reader)
expected = pa.Table.from_batches([batch1, batch2])
assert reader_table.equals(expected)


def test_arrow_c_stream_order():
ctx = SessionContext()

batch1 = pa.record_batch([pa.array([1])], names=["a"])
batch2 = pa.record_batch([pa.array([2])], names=["a"])

df = ctx.create_dataframe([[batch1, batch2]])

table = pa.Table.from_batches(df)
expected = pa.Table.from_batches([batch1, batch2])

assert table.equals(expected)
col = table.column("a")
assert col.chunk(0)[0].as_py() == 1
assert col.chunk(1)[0].as_py() == 2


def test_arrow_c_stream_schema_selection(fail_collect):
ctx = SessionContext()

batch = pa.RecordBatch.from_arrays(
[
pa.array([1, 2]),
pa.array([3, 4]),
pa.array([5, 6]),
],
names=["a", "b", "c"],
)
df = ctx.create_dataframe([[batch]])

requested_schema = pa.schema([("c", pa.int64()), ("a", pa.int64())])

c_schema = pa_cffi.ffi.new("struct ArrowSchema*")
address = int(pa_cffi.ffi.cast("uintptr_t", c_schema))
requested_schema._export_to_c(address)
capsule_new = ctypes.pythonapi.PyCapsule_New
capsule_new.restype = ctypes.py_object
capsule_new.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
schema_capsule = capsule_new(ctypes.c_void_p(address), b"arrow_schema", None)

reader = pa.RecordBatchReader._import_from_c_capsule(
df.__arrow_c_stream__(schema_capsule)
)

assert reader.schema == requested_schema

batches = list(reader)

assert len(batches) == 1
expected_batch = pa.record_batch(
[pa.array([5, 6]), pa.array([1, 2])], names=["c", "a"]
)
assert batches[0].equals(expected_batch)


def test_arrow_c_stream_schema_mismatch(fail_collect):
ctx = SessionContext()

batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2]), pa.array([3, 4])], names=["a", "b"]
)
df = ctx.create_dataframe([[batch]])

bad_schema = pa.schema([("a", pa.string())])

c_schema = pa_cffi.ffi.new("struct ArrowSchema*")
address = int(pa_cffi.ffi.cast("uintptr_t", c_schema))
bad_schema._export_to_c(address)

capsule_new = ctypes.pythonapi.PyCapsule_New
capsule_new.restype = ctypes.py_object
capsule_new.argtypes = [ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
bad_capsule = capsule_new(ctypes.c_void_p(address), b"arrow_schema", None)

with pytest.raises(Exception, match="Fail to merge schema"):
df.__arrow_c_stream__(bad_capsule)


def test_to_pylist(df):
# Convert datafusion dataframe to Python list
pylist = df.to_pylist()
Expand Down Expand Up @@ -2666,6 +2782,110 @@ def trigger_interrupt():
interrupt_thread.join(timeout=1.0)


def test_arrow_c_stream_interrupted():
"""__arrow_c_stream__ responds to ``KeyboardInterrupt`` signals.

Similar to ``test_collect_interrupted`` this test issues a long running
query, but consumes the results via ``__arrow_c_stream__``. It then raises
``KeyboardInterrupt`` in the main thread and verifies that the stream
iteration stops promptly with the appropriate exception.
"""

ctx = SessionContext()

batches = []
for i in range(10):
batch = pa.RecordBatch.from_arrays(
[
pa.array(list(range(i * 1000, (i + 1) * 1000))),
pa.array([f"value_{j}" for j in range(i * 1000, (i + 1) * 1000)]),
],
names=["a", "b"],
)
batches.append(batch)

ctx.register_record_batches("t1", [batches])
ctx.register_record_batches("t2", [batches])

df = ctx.sql(
"""
WITH t1_expanded AS (
SELECT
a,
b,
CAST(a AS DOUBLE) / 1.5 AS c,
CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS d
FROM t1
CROSS JOIN (SELECT 1 AS dummy FROM t1 LIMIT 5)
),
t2_expanded AS (
SELECT
a,
b,
CAST(a AS DOUBLE) * 2.5 AS e,
CAST(a AS DOUBLE) * CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS f
FROM t2
CROSS JOIN (SELECT 1 AS dummy FROM t2 LIMIT 5)
)
SELECT
t1.a, t1.b, t1.c, t1.d,
t2.a AS a2, t2.b AS b2, t2.e, t2.f
FROM t1_expanded t1
JOIN t2_expanded t2 ON t1.a % 100 = t2.a % 100
WHERE t1.a > 100 AND t2.a > 100
"""
)

reader = pa.RecordBatchReader._import_from_c_capsule(df.__arrow_c_stream__())

interrupted = False
interrupt_error = None
query_started = threading.Event()
max_wait_time = 5.0

def trigger_interrupt():
start_time = time.time()
while not query_started.is_set():
time.sleep(0.1)
if time.time() - start_time > max_wait_time:
msg = f"Query did not start within {max_wait_time} seconds"
raise RuntimeError(msg)

thread_id = threading.main_thread().ident
if thread_id is None:
msg = "Cannot get main thread ID"
raise RuntimeError(msg)

exception = ctypes.py_object(KeyboardInterrupt)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread_id), exception
)
if res != 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(thread_id), ctypes.py_object(0)
)
msg = "Failed to raise KeyboardInterrupt in main thread"
raise RuntimeError(msg)

interrupt_thread = threading.Thread(target=trigger_interrupt)
interrupt_thread.daemon = True
interrupt_thread.start()

try:
query_started.set()
# consume the reader which should block and be interrupted
reader.read_all()
except KeyboardInterrupt:
interrupted = True
except Exception as e: # pragma: no cover - unexpected errors
interrupt_error = e

if not interrupted:
pytest.fail(f"Stream was not interrupted; got error: {interrupt_error}")

interrupt_thread.join(timeout=1.0)


def test_show_select_where_no_rows(capsys) -> None:
ctx = SessionContext()
df = ctx.sql("SELECT 1 WHERE 1=0")
Expand Down
Loading
Loading