-
Notifications
You must be signed in to change notification settings - Fork 125
Add Arrow C streaming, DataFrame iteration, and OOM-safe streaming execution #1222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…ing in DataFrame - Add `range` method to SessionContext and iterator support to DataFrame - Introduce `spawn_stream` utility and refactor async execution for better signal handling - Add tests for `KeyboardInterrupt` in `__arrow_c_stream__` and incremental DataFrame streaming - Improve memory usage tracking in tests with psutil - Update DataFrame docs with PyArrow streaming section and enhance `__arrow_c_stream__` documentation - Replace Tokio runtime creation with `spawn_stream` in PySessionContext - Bump datafusion packages to 49.0.1 and update dependencies - Remove unused imports and restore main Cargo.toml
…andling - Refactor record batch streaming to use `poll_next_batch` for clearer error handling - Improve `spawn_future`/`spawn_stream` functions for better Python exception integration and code reuse - Update `datafusion` and `datafusion-ffi` dependencies to 49.0.2 - Fix PyArrow `RecordBatchReader` import to use `_import_from_c_capsule` for safer memory handling - Refactor `ArrowArrayStream` handling to use `PyCapsule` with destructor for improved memory management - Refactor projection initialization in `PyDataFrame` for clarity - Move `range` functionality into `_testing.py` helper - Rename test column in `test_table_from_batches_stream` for accuracy - Add tests for `RecordBatchReader` and enhance DataFrame stream handling
…docs - Preserve partition order in DataFrame streaming and update related tests - Add tests for record batch ordering and DataFrame batch iteration - Improve `drop_stream` to correctly handle PyArrow ownership transfer and null pointers - Replace `assert` with `debug_assert` for safer ArrowArrayStream validation - Add documentation for `poll_next_batch` in PyRecordBatchStream - Refactor tests to use `fail_collect` fixture for DataFrame collect - Refactor `range_table` return type to `DataFrame` for clearer type hints - Minor cleanup in SessionContext (remove extra blank line)
…dBatchReader validation
I'm invested in this and plan to review this this afternoon! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would strongly advocate for less direct integration with pyarrow, not more. Pyarrow is a massive dependency, while the Arrow PyCapsule Interface should allow for better decentralized sharing of Arrow data.
DataFrames are also iterable, yielding :class:`pyarrow.RecordBatch` objects | ||
lazily so you can loop over results directly: | ||
|
||
.. code-block:: python | ||
|
||
for batch in df: | ||
... # process each batch as it is produced |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the user can iterate over the stream accessed by the target library, I don't think we should define our own custom integration here, and if we do, then the yielded object should not be a pyarrow RecordBatch, but rather an opaque, minimal Python class that just exposes __arrow_c_array__
so that the user can choose what Arrow library they want to use to work with the batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have our own RecordBatch
class: https://datafusion.apache.org/python/autoapi/datafusion/record_batch/index.html#datafusion.record_batch.RecordBatch
Also, we should ensure that the dunder methods are rendered in the docs. It doesn't look like they are currently. (Or maybe the dunder methods on that RecordBatch
aren't documented?)
python/datafusion/dataframe.py
Outdated
return self.df.__arrow_c_stream__(requested_schema) | ||
|
||
def __iter__(self) -> Iterator[pa.RecordBatch]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really think there's a good rationale for having this method, especially as it reuses the exact same mechanism as the PyCapsule Interface. If anything, we might want to have an __aiter__
method that has a custom async connection to the DataFusion context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RecordBatchStream
already has __iter__
and __aiter__
methods https://datafusion.apache.org/python/autoapi/datafusion/record_batch/index.html#datafusion.record_batch.RecordBatchStream
Can we just have a method that converts a DataFrame
into a RecordBatchStream
? Then an __iter__
on DataFrame
would just convert to a RecordBatchStream
under the hood.
src/dataframe.rs
Outdated
#[allow(clippy::manual_c_str_literals)] | ||
static ARROW_STREAM_NAME: &CStr = | ||
unsafe { CStr::from_bytes_with_nul_unchecked(b"arrow_array_stream\0") }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As suggested by the linter, can we just use c"arrow_array_stream"
?
src/dataframe.rs
Outdated
unsafe extern "C" fn drop_stream(capsule: *mut ffi::PyObject) { | ||
if capsule.is_null() { | ||
return; | ||
} | ||
|
||
// When PyArrow imports this capsule it steals the raw stream pointer and | ||
// sets the capsule's internal pointer to NULL. In that case | ||
// `PyCapsule_IsValid` returns 0 and this destructor must not drop the | ||
// stream as ownership has been transferred to PyArrow. If the capsule was | ||
// never imported, the pointer remains valid and we are responsible for | ||
// freeing the stream here. | ||
if ffi::PyCapsule_IsValid(capsule, ARROW_STREAM_NAME.as_ptr()) == 1 { | ||
let stream_ptr = ffi::PyCapsule_GetPointer(capsule, ARROW_STREAM_NAME.as_ptr()) | ||
as *mut FFI_ArrowArrayStream; | ||
if !stream_ptr.is_null() { | ||
drop(Box::from_raw(stream_ptr)); | ||
} | ||
} | ||
|
||
// `PyCapsule_GetPointer` sets a Python error on failure. Clear it only | ||
// after the stream has been released (or determined to be owned | ||
// elsewhere). | ||
ffi::PyErr_Clear(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need to do any of this, according to upstream discussion apache/arrow-rs#5070 (comment)
self.schema.clone() | ||
} | ||
} | ||
|
||
#[pymethods] | ||
impl PyDataFrame { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially this changes the DataFrame
construct to always be "lazy"? Previously a DataFrame
was always materialized in memory, whereas now it's just a representation of future batches?
src/dataframe.rs
Outdated
let ffi_stream = FFI_ArrowArrayStream::new(reader); | ||
let stream_capsule_name = CString::new("arrow_array_stream").unwrap(); | ||
PyCapsule::new(py, ffi_stream, Some(stream_capsule_name)).map_err(PyDataFusionError::from) | ||
let stream = Box::new(FFI_ArrowArrayStream::new(reader)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have an FFI_ArrowArrayStream
you should be able to just pass that to PyCapsule::new
without touching any unsafe: https://github.com/kylebarron/arro3/blob/cb2453bf022d0d8704e56e81a324ab5a772e0247/pyo3-arrow/src/ffi/to_python/utils.rs#L93-L94
In #1227 I explicitly suggested removing the pyarrow dependency altogether. I thought I had created an issue before, but apparently not. |
This would also close #1011 |
Co-authored-by: Kyle Barron <[email protected]>
…port in DataFrame
I'm sorry I've not been following this one closely, but I hope to look into this tomorrow. |
Cargo.toml
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are most of the changes here just formatting? Did you just add cstr
?
Some(Ok(batch)) => Ok(batch.into()), | ||
Some(Err(e)) => Err(PyDataFusionError::from(e))?, | ||
None => { | ||
match poll_next_batch(&mut stream).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a tokio expert; how does this materially change?
Did you have to make a new function here? Could you have just used
match stream.next().await.transpose()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
poll_next_batch
was created to encapsulate the recurring asynchronous polling pattern used when consuming the stream. Centralizing this behavior improves readability, prevents duplicated error-mapping logic, facilitates unit testing, and enables future enhancements without modifying multiple call sites.
python/tests/test_io.py
Outdated
def test_table_from_batches_stream(ctx, fail_collect): | ||
df = range_table(ctx, 0, 10) | ||
|
||
table = pa.Table.from_batches(batch.to_pyarrow() for batch in df) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you intentionally testing a Python iterator here instead of using __arrow_c_stream__
?
If you call
pa.table(df)
that will use the C Stream and materialize the data on the pyarrow side, and it'll be more efficient because it keeps everything as a C pointer instead of having to go through Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
I also renamed the test to
test_table_from_arrow_c_stream
@@ -46,6 +46,26 @@ def to_pyarrow(self) -> pa.RecordBatch: | |||
"""Convert to :py:class:`pa.RecordBatch`.""" | |||
return self.record_batch.to_pyarrow() | |||
|
|||
def __arrow_c_array__( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 👍
python/datafusion/dataframe.py
Outdated
@deprecated("Use execute_stream() instead") | ||
def to_record_batch_stream(self) -> RecordBatchStream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a new method? Why are we creating a new method that's immediately deprecated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦
Removed.
.. code-block:: python | ||
|
||
for stream in df.execute_stream_partitioned(): | ||
for batch in stream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. Can these streams be polled concurrently? Can you do
streams = list(df.execute_stream_partitioned())
and then concurrently iterate over all the streams, yielding whatever batch comes in first? I suppose that would just do in Python what execute_stream
is doing in Rust?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question!
I added a concurrent iteration example in the same document to clarify this.
To process partitions concurrently, first collect the streams into a list
and then poll each one in a separate ``asyncio`` task:
.. code-block:: python
import asyncio
async def consume(stream):
async for batch in stream:
...
streams = list(df.execute_stream_partitioned())
await asyncio.gather(*(consume(s) for s in streams))
:py:meth:`~datafusion.DataFrame.execute_stream` to obtain a | ||
:py:class:`pyarrow.RecordBatchReader`: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execute_stream
returns a pyarrow
RecordBatchReader
? I thought it returned our own DataFusion-specific stream wrapper? I.e. datafusion.RecordBatchStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right!
Corrected.
Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow | ||
table without collecting everything eagerly: | ||
|
||
.. code-block:: python | ||
|
||
import pyarrow as pa | ||
table = pa.Table.from_batches(b.to_pyarrow() for b in df) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if we provide an example of collecting as a table, we should suggest pa.table(df)
instead, which will be more efficient because it doesn't go through Python.
And
without collecting everything eagerly
This isn't a good example for that, because pa.Table
necessarily materializes everything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
corrected.
# Count rows | ||
count = df.count() | ||
|
||
PyArrow Streaming |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The heading here is specific to PyArrow, but I think it would be good to provide a distinction here. Maybe "Zero-copy Streaming to other Arrow implementations"? Or something like that?
Then we can have a sub-section dedicated to pyarrow, but also explain that it works with any arrow-based python impl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
Implemented your suggestion.
Amazing. Happy to review as well. I think it's fine to have some methods that specifically convert to a pyarrow record batch/table. But we should call |
Co-authored-by: Kyle Barron <[email protected]>
Co-authored-by: Kyle Barron <[email protected]>
…handling and limitations
…ntal batch processing and memory efficiency
Co-authored-by: Kyle Barron <[email protected]>
…eam` to `test_table_from_arrow_c_stream`
Co-authored-by: Kyle Barron <[email protected]>
…braries, clarifying the protocol and adding implementation-agnostic notes.
…tion about eager conversion, emphasizing on-demand batch processing to prevent memory exhaustion.
…ated _import_from_c_capsule method
… RecordBatchReader in test_arrow_c_stream_schema_selection
…treaming method and usage of public API
…lity with Python < 3.10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a massive improvement! I took a skim through and at a high level looks like the correct way to approach it. I will try to take more time this week to review more carefully and to test a few things out myself.
directly to remain compatible with Python < 3.10 (this project | ||
supports Python >= 3.6). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our minimum supported version is 3.9 right now
Which issue does this PR close?
Rationale for this change
Exporting DataFrame results via Arrow previously could trigger eager collection of the entire result set which risks exhausting process memory for large datasets. The project needs a zero-copy, lazy streaming path into PyArrow that:
This PR implements streaming-friendly paths in both the Rust extension and Python bindings, fixes some async/spawn patterns (improving signal handling and runtime usage), and adds tests and documentation to exercise the new behavior.
What changes are included in this PR?
High level
__arrow_c_stream__
using a partitioned streaming reader that drains partition streams sequentially and exposes an ArrowArrowArrayStream
PyCapsule.DataFrame
iteration and async iteration support (Python):__iter__
and__aiter__
returningRecordBatch
instances.RecordBatch.__arrow_c_array__
for zero-copy export of individual record batches as Arrow C Data Interface capsules.spawn_future
to run DataFusion futures on the Tokio runtime while preserving Python signal handling instead of directly creatingJoinHandle
/blocking joins.KeyboardInterrupt
, and memory behavior when streaming large datasets.tests/utils.py::range_table
used to construct large range tables without expanding the public API.PyArrow Streaming
section.cstr
dependency and small Cargo.toml tidy / formatting changes.Files changed (summary)
Rust
src/dataframe.rs
PartitionedDataFrameStreamReader
implementingRecordBatchReader
that pulls batches from partitionedSendableRecordBatchStream
s and applies per-batch projection if requested.__arrow_c_stream__
to useexecute_stream_partitioned()
and create anFFI_ArrowArrayStream
from theRecordBatchReader
without materializing all batches.cstr
crate.spawn_future
to run async tasks on the Tokio runtime.src/record_batch.rs
poll_next_batch
helper and uses it to unify stream polling logic.next_stream
.src/utils.rs
spawn_future
utility that spawns a future on the shared Tokio runtime and waits for it while preserving Python signal behavior and converting errors appropriately.src/context.rs
spawn_future
forexecute_stream_partitioned
/execution paths.Python
python/datafusion/dataframe.py
__iter__
,__aiter__
to iterate overRecordBatch
objects produced byexecute_stream()
.to_record_batch_stream
(alias toexecute_stream
).RecordBatch
.python/datafusion/record_batch.py
__arrow_c_array__
to export aRecordBatch
via Arrow C Data Interface (two capsules).RecordBatchStream
.python/tests/*
fail_collect
), tests for iteration (test_iter_batches
,test_iter_returns_datafusion_recordbatch
), streaming (`test_execute_stream_b