Skip to content

Commit 359a90c

Browse files
committed
feat: add DataFrame.to_pandas_batches() to download large DataFrame objects
1 parent 33a9d9f commit 359a90c

File tree

4 files changed

+49
-4
lines changed

4 files changed

+49
-4
lines changed

bigframes/core/blocks.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,27 @@ def to_pandas(
412412
)
413413
return df, query_job
414414

415+
def to_pandas_batches(self):
416+
"""Download results one message at a time."""
417+
dtypes = dict(zip(self.index_columns, self.index_dtypes))
418+
dtypes.update(zip(self.value_columns, self.dtypes))
419+
results_iterator, _ = self._expr.start_query()
420+
for arrow_table in results_iterator.to_arrow_iterable(
421+
bqstorage_client=self._expr._session.bqstoragereadclient
422+
):
423+
df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)
424+
self._copy_index_to_pandas(df)
425+
yield df
426+
427+
def _copy_index_to_pandas(self, df: pd.DataFrame):
428+
"""Set the index on pandas DataFrame to match this block.
429+
430+
Warning: This method modifies ``df`` inplace.
431+
"""
432+
if self.index_columns:
433+
df.set_index(list(self.index_columns), inplace=True)
434+
df.index.names = self.index.names # type: ignore
435+
415436
def _compute_and_count(
416437
self,
417438
value_keys: Optional[Iterable[str]] = None,
@@ -485,10 +506,7 @@ def _compute_and_count(
485506
else:
486507
total_rows = results_iterator.total_rows
487508
df = self._to_dataframe(results_iterator)
488-
489-
if self.index_columns:
490-
df.set_index(list(self.index_columns), inplace=True)
491-
df.index.names = self.index.names # type: ignore
509+
self._copy_index_to_pandas(df)
492510

493511
return df, total_rows, query_job
494512

bigframes/dataframe.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,10 @@ def to_pandas(
893893
self._set_internal_query_job(query_job)
894894
return df.set_axis(self._block.column_labels, axis=1, copy=False)
895895

896+
def to_pandas_batches(self) -> Iterable[pandas.DataFrame]:
897+
"""Stream DataFrame results to an iterable of pandas DataFrame"""
898+
return self._block.to_pandas_batches()
899+
896900
def _compute_dry_run(self) -> bigquery.QueryJob:
897901
return self._block._compute_dry_run()
898902

tests/system/small/test_dataframe_io.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,14 @@ def test_to_pandas_array_struct_correct_result(session):
8383
)
8484

8585

86+
def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index):
87+
"""Verify to_pandas_batches() APIs returns the expected dtypes."""
88+
expected = scalars_df_default_index.dtypes
89+
for df in scalars_df_default_index.to_pandas_batches():
90+
actual = df.dtypes
91+
pd.testing.assert_series_equal(actual, expected)
92+
93+
8694
@pytest.mark.parametrize(
8795
("index"),
8896
[True, False],

tests/unit/session/test_io_pandas.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@
131131
{
132132
"date": pandas.ArrowDtype(pyarrow.date32()),
133133
"datetime": pandas.ArrowDtype(pyarrow.timestamp("us")),
134+
"float": pandas.Float64Dtype(),
135+
"int": pandas.Int64Dtype(),
134136
"string": "string[pyarrow]",
135137
"time": pandas.ArrowDtype(pyarrow.time64("us")),
136138
"timestamp": pandas.ArrowDtype(
@@ -157,6 +159,19 @@
157159
],
158160
dtype=pandas.ArrowDtype(pyarrow.timestamp("us")),
159161
),
162+
"float": pandas.Series(
163+
pandas.arrays.FloatingArray(
164+
numpy.array(
165+
[1.0, float("nan"), float("nan"), -1.0], dtype="float64"
166+
),
167+
numpy.array([False, True, False, False], dtype="bool"),
168+
),
169+
dtype=pandas.Float64Dtype(),
170+
),
171+
"int": pandas.Series(
172+
[1, None, -1, 2**63 - 1],
173+
dtype=pandas.Int64Dtype(),
174+
),
160175
"string": pandas.Series(
161176
["123", None, "abc", "xyz"], dtype="string[pyarrow]"
162177
),

0 commit comments

Comments
 (0)