Skip to content

feat: add DataFrame.to_pandas_batches() to download large DataFrame objects #136

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c35523c
refactor: make `to_pandas()` call `to_arrow()` and use local dtypes i…
tswast Oct 23, 2023
9f5865c
Merge branch 'main' into b280662868-to_arrow
tswast Oct 24, 2023
c2f9d72
use integer_object_nulls=True to preserve NA/NaN distinction
tswast Oct 24, 2023
7ccb61d
allow NUMERIC/BIGNUMERIC to cast to FLOAT64
tswast Oct 24, 2023
829cf99
better workaround for Float64Dtype NaNs
tswast Oct 25, 2023
c158780
Merge remote-tracking branch 'origin/main' into b280662868-to_arrow
tswast Oct 25, 2023
3a90214
fix type error
tswast Oct 25, 2023
8bdfd79
add unit tests for extreme values
tswast Oct 25, 2023
db81a1c
fix tests on latest pandas
tswast Oct 25, 2023
b25112b
mypy fixes
tswast Oct 25, 2023
a3705f9
fix mod tests
tswast Oct 26, 2023
33a9d9f
Merge remote-tracking branch 'origin/main' into b280662868-to_arrow
tswast Oct 26, 2023
1a7b2d7
feat: add `DataFrame.to_pandas_batches()` to download large `DataFram…
tswast Oct 24, 2023
0c23388
Merge branch 'main' into b280662868-to_pandas_batches
tswast Oct 26, 2023
c4a8b15
allow copies
tswast Oct 26, 2023
239e5ef
allow copies only for contiguous arrays
tswast Oct 26, 2023
ea4d9df
test with chunked_array
tswast Oct 26, 2023
38dba2a
Merge remote-tracking branch 'origin/main' into b280662868-to_pandas_…
tswast Oct 26, 2023
2f225dd
Merge branch 'main' into b280662868-to_pandas_batches
tswast Oct 26, 2023
e1e291d
explain type: ignore
tswast Oct 26, 2023
3166a65
Merge remote-tracking branch 'origin/main' into b280662868-to_pandas_…
tswast Oct 26, 2023
2e218e9
Merge branch 'b280662868-to_pandas_batches' of github.com:googleapis/…
tswast Oct 26, 2023
6dd5aae
Merge branch 'main' into b280662868-to_pandas_batches
gcf-merge-on-green[bot] Oct 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,30 @@ def to_pandas(
)
return df, query_job

def to_pandas_batches(self):
"""Download results one message at a time."""
dtypes = dict(zip(self.index_columns, self.index_dtypes))
dtypes.update(zip(self.value_columns, self.dtypes))
results_iterator, _ = self._expr.start_query()
for arrow_table in results_iterator.to_arrow_iterable(
bqstorage_client=self._expr._session.bqstoragereadclient
):
df = bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)
self._copy_index_to_pandas(df)
yield df

def _copy_index_to_pandas(self, df: pd.DataFrame):
"""Set the index on pandas DataFrame to match this block.

Warning: This method modifies ``df`` inplace.
"""
if self.index_columns:
df.set_index(list(self.index_columns), inplace=True)
# Pandas names is annotated as list[str] rather than the more
# general Sequence[Label] that BigQuery DataFrames has.
# See: https://github.com/pandas-dev/pandas-stubs/issues/804
df.index.names = self.index.names # type: ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious what's the error here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bigframes/core/blocks.py:434: error: Incompatible types in assignment (expression has type "Sequence[Hashable]", variable has type "list[str]") [assignment]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though list[str] is a sequence of hashable objects, I think maybe type covariance can't kick in. I'll try with some casts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I had it opposite. pandas says "names" is list[str], but actually I don't think that's true.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed pandas-dev/pandas-stubs#804 and commented with an explanation.


def _compute_and_count(
self,
value_keys: Optional[Iterable[str]] = None,
Expand Down Expand Up @@ -489,10 +513,7 @@ def _compute_and_count(
else:
total_rows = results_iterator.total_rows
df = self._to_dataframe(results_iterator)

if self.index_columns:
df.set_index(list(self.index_columns), inplace=True)
df.index.names = self.index.names # type: ignore
self._copy_index_to_pandas(df)

return df, total_rows, query_job

Expand Down
4 changes: 4 additions & 0 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,10 @@ def to_pandas(
self._set_internal_query_job(query_job)
return df.set_axis(self._block.column_labels, axis=1, copy=False)

def to_pandas_batches(self) -> Iterable[pandas.DataFrame]:
"""Stream DataFrame results to an iterable of pandas DataFrame"""
return self._block.to_pandas_batches()

def _compute_dry_run(self) -> bigquery.QueryJob:
return self._block._compute_dry_run()

Expand Down
20 changes: 16 additions & 4 deletions bigframes/session/_io/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,32 @@ def arrow_to_pandas(
# Preserve NA/NaN distinction. Note: This is currently needed, even if we use
# nullable Float64Dtype in the types_mapper. See:
# https://github.com/pandas-dev/pandas/issues/55668
mask = pyarrow.compute.is_null(column)
nonnull = pyarrow.compute.fill_null(column, float("nan"))
# Regarding type: ignore, this class has been public at this
# location since pandas 1.2.0. See:
# https://pandas.pydata.org/docs/dev/reference/api/pandas.arrays.FloatingArray.html
pd_array = pandas.arrays.FloatingArray( # type: ignore
column.to_numpy(),
pyarrow.compute.is_null(column).to_numpy(),
nonnull.to_numpy()
if isinstance(nonnull, pyarrow.ChunkedArray)
else nonnull.to_numpy(zero_copy_only=False),
mask.to_numpy()
if isinstance(mask, pyarrow.ChunkedArray)
else mask.to_numpy(zero_copy_only=False),
)
series = pandas.Series(pd_array, dtype=dtype)
elif dtype == pandas.Int64Dtype():
# Avoid out-of-bounds errors in Pandas 1.5.x, which incorrectly
# casts to float64 in an intermediate step.
mask = pyarrow.compute.is_null(column)
nonnull = pyarrow.compute.fill_null(column, 0)
pd_array = pandas.arrays.IntegerArray(
pyarrow.compute.fill_null(column, 0).to_numpy(),
pyarrow.compute.is_null(column).to_numpy(),
nonnull.to_numpy()
if isinstance(nonnull, pyarrow.ChunkedArray)
else nonnull.to_numpy(zero_copy_only=False),
mask.to_numpy()
if isinstance(mask, pyarrow.ChunkedArray)
else mask.to_numpy(zero_copy_only=False),
)
series = pandas.Series(pd_array, dtype=dtype)
elif isinstance(dtype, pandas.ArrowDtype):
Expand Down
8 changes: 8 additions & 0 deletions tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ def test_to_pandas_array_struct_correct_result(session):
)


def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index):
"""Verify to_pandas_batches() APIs returns the expected dtypes."""
expected = scalars_df_default_index.dtypes
for df in scalars_df_default_index.to_pandas_batches():
actual = df.dtypes
pd.testing.assert_series_equal(actual, expected)


@pytest.mark.parametrize(
("index"),
[True, False],
Expand Down
56 changes: 56 additions & 0 deletions tests/unit/session/test_io_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,62 @@
),
id="scalar-dtypes",
),
pytest.param(
pyarrow.Table.from_pydict(
{
"bool": pyarrow.chunked_array(
[[True, None], [True, False]],
type=pyarrow.bool_(),
),
"bytes": pyarrow.chunked_array(
[[b"123", None], [b"abc", b"xyz"]],
type=pyarrow.binary(),
),
"float": pyarrow.chunked_array(
[[1.0, None], [float("nan"), -1.0]],
type=pyarrow.float64(),
),
"int": pyarrow.chunked_array(
[[1, None], [-1, 2**63 - 1]],
type=pyarrow.int64(),
),
"string": pyarrow.chunked_array(
[["123", None], ["abc", "xyz"]],
type=pyarrow.string(),
),
}
),
{
"bool": "boolean",
"bytes": "object",
"float": pandas.Float64Dtype(),
"int": pandas.Int64Dtype(),
"string": "string[pyarrow]",
},
pandas.DataFrame(
{
"bool": pandas.Series([True, None, True, False], dtype="boolean"),
"bytes": [b"123", None, b"abc", b"xyz"],
"float": pandas.Series(
pandas.arrays.FloatingArray( # type: ignore
numpy.array(
[1.0, float("nan"), float("nan"), -1.0], dtype="float64"
),
numpy.array([False, True, False, False], dtype="bool"),
),
dtype=pandas.Float64Dtype(),
),
"int": pandas.Series(
[1, None, -1, 2**63 - 1],
dtype=pandas.Int64Dtype(),
),
"string": pandas.Series(
["123", None, "abc", "xyz"], dtype="string[pyarrow]"
),
}
),
id="scalar-dtypes-chunked_array",
),
pytest.param(
pyarrow.Table.from_pydict(
{
Expand Down