diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index eda24a74f0..39a847de84 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -103,6 +103,13 @@ class FunctionAxisOnePreviewWarning(PreviewWarning): """Remote Function and Managed UDF with axis=1 preview.""" +class FunctionPackageVersionWarning(PreviewWarning): + """ + Managed UDF package versions for Numpy, Pandas, and Pyarrow may not + precisely match users' local environment or the exact versions specified. + """ + + def format_message(message: str, fill: bool = True): """Formats a warning message with ANSI color codes for the warning color. diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 2c9dd0cb31..ae19dc1480 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -19,7 +19,6 @@ import logging import os import random -import re import shutil import string import tempfile @@ -247,7 +246,7 @@ def provision_bq_managed_function( # Augment user package requirements with any internal package # requirements. packages = _utils._get_updated_package_requirements( - packages, is_row_processor, capture_references + packages, is_row_processor, capture_references, ignore_package_version=True ) if packages: managed_function_options["packages"] = packages @@ -270,28 +269,6 @@ def provision_bq_managed_function( ) udf_name = func.__name__ - if capture_references: - # This code path ensures that if the udf body contains any - # references to variables and/or imports outside the body, they are - # captured as well. - import cloudpickle - - pickled = cloudpickle.dumps(func) - udf_code = textwrap.dedent( - f""" - import cloudpickle - {udf_name} = cloudpickle.loads({pickled}) - """ - ) - else: - # This code path ensures that if the udf body is self contained, - # i.e. there are no references to variables or imports outside the - # body. - udf_code = textwrap.dedent(inspect.getsource(func)) - match = re.search(r"^def ", udf_code, flags=re.MULTILINE) - if match is None: - raise ValueError("The UDF is not defined correctly.") - udf_code = udf_code[match.start() :] with_connection_clause = ( ( @@ -301,6 +278,13 @@ def provision_bq_managed_function( else "" ) + # Generate the complete Python code block for the managed Python UDF, + # including the user's function, necessary imports, and the BigQuery + # handler wrapper. + python_code_block = bff_template.generate_managed_function_code( + func, udf_name, is_row_processor, capture_references + ) + create_function_ddl = ( textwrap.dedent( f""" @@ -311,13 +295,11 @@ def provision_bq_managed_function( OPTIONS ({managed_function_options_str}) AS r''' __UDF_PLACE_HOLDER__ - def bigframes_handler(*args): - return {udf_name}(*args) ''' """ ) .strip() - .replace("__UDF_PLACE_HOLDER__", udf_code) + .replace("__UDF_PLACE_HOLDER__", python_code_block) ) self._ensure_dataset_exists() diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 22e6981c38..371784332c 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -847,8 +847,6 @@ def wrapper(func): if output_type: py_sig = py_sig.replace(return_annotation=output_type) - udf_sig = udf_def.UdfSignature.from_py_signature(py_sig) - # The function will actually be receiving a pandas Series, but allow # both BigQuery DataFrames and pandas object types for compatibility. is_row_processor = False @@ -856,6 +854,8 @@ def wrapper(func): py_sig = new_sig is_row_processor = True + udf_sig = udf_def.UdfSignature.from_py_signature(py_sig) + managed_function_client = _function_client.FunctionClient( dataset_ref.project, bq_location, diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index 69cf74ada0..66f3dfdd31 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -18,6 +18,7 @@ import sys import typing from typing import cast, Optional, Set +import warnings import cloudpickle import google.api_core.exceptions @@ -26,6 +27,7 @@ import pandas import pyarrow +import bigframes.exceptions as bfe import bigframes.formatting_helpers as bf_formatting from bigframes.functions import function_typing @@ -61,20 +63,36 @@ def get_remote_function_locations(bq_location): def _get_updated_package_requirements( - package_requirements=None, is_row_processor=False, capture_references=True + package_requirements=None, + is_row_processor=False, + capture_references=True, + ignore_package_version=False, ): requirements = [] if capture_references: requirements.append(f"cloudpickle=={cloudpickle.__version__}") if is_row_processor: - # bigframes function will send an entire row of data as json, which - # would be converted to a pandas series and processed Ensure numpy - # versions match to avoid unpickling problems. See internal issue - # b/347934471. - requirements.append(f"numpy=={numpy.__version__}") - requirements.append(f"pandas=={pandas.__version__}") - requirements.append(f"pyarrow=={pyarrow.__version__}") + if ignore_package_version: + # TODO(jialuo): Add back the version after b/410924784 is resolved. + # Due to current limitations on the packages version in Python UDFs, + # we use `ignore_package_version` to optionally omit the version for + # managed functions only. + msg = bfe.format_message( + "Numpy, Pandas, and Pyarrow version may not precisely match your local environment." + ) + warnings.warn(msg, category=bfe.PreviewWarning) + requirements.append("pandas") + requirements.append("pyarrow") + requirements.append("numpy") + else: + # bigframes function will send an entire row of data as json, which + # would be converted to a pandas series and processed Ensure numpy + # versions match to avoid unpickling problems. See internal issue + # b/347934471. + requirements.append(f"pandas=={pandas.__version__}") + requirements.append(f"pyarrow=={pyarrow.__version__}") + requirements.append(f"numpy=={numpy.__version__}") if package_requirements: requirements.extend(package_requirements) diff --git a/bigframes/functions/function_template.py b/bigframes/functions/function_template.py index 0809baf5cc..5f04fcc8e2 100644 --- a/bigframes/functions/function_template.py +++ b/bigframes/functions/function_template.py @@ -17,6 +17,7 @@ import inspect import logging import os +import re import textwrap from typing import Tuple @@ -291,3 +292,55 @@ def generate_cloud_function_main_code( logger.debug(f"Wrote {os.path.abspath(main_py)}:\n{open(main_py).read()}") return handler_func_name + + +def generate_managed_function_code( + def_, + udf_name: str, + is_row_processor: bool, + capture_references: bool, +) -> str: + """Generates the Python code block for managed Python UDF.""" + + if capture_references: + # This code path ensures that if the udf body contains any + # references to variables and/or imports outside the body, they are + # captured as well. + import cloudpickle + + pickled = cloudpickle.dumps(def_) + func_code = textwrap.dedent( + f""" + import cloudpickle + {udf_name} = cloudpickle.loads({pickled}) + """ + ) + else: + # This code path ensures that if the udf body is self contained, + # i.e. there are no references to variables or imports outside the + # body. + func_code = textwrap.dedent(inspect.getsource(def_)) + match = re.search(r"^def ", func_code, flags=re.MULTILINE) + if match is None: + raise ValueError("The UDF is not defined correctly.") + func_code = func_code[match.start() :] + + if is_row_processor: + udf_code = textwrap.dedent(inspect.getsource(get_pd_series)) + udf_code = udf_code[udf_code.index("def") :] + bigframes_handler_code = textwrap.dedent( + f"""def bigframes_handler(str_arg): + return {udf_name}({get_pd_series.__name__}(str_arg))""" + ) + else: + udf_code = "" + bigframes_handler_code = textwrap.dedent( + f"""def bigframes_handler(*args): + return {udf_name}(*args)""" + ) + + udf_code_block = textwrap.dedent( + f"{udf_code}\n{func_code}\n{bigframes_handler_code}" + ) + + return udf_code_block diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index c58610d1ff..c9a493e55c 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -647,3 +647,292 @@ def foo(x: int) -> int: container_cpu=2, container_memory="64Mi", )(foo) + + +def test_managed_function_df_apply_axis_1(session, dataset_id, scalars_dfs): + columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] + scalars_df, scalars_pandas_df = scalars_dfs + try: + + def serialize_row(row): + # TODO(b/435021126): Remove explicit type conversion of the field + # "name" after the issue has been addressed. It is added only to + # accept partial pandas parity for the time being. + custom = { + "name": int(row.name), + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "table": row.to_json(orient="table"), + "custom": custom, + } + ) + + serialize_row_mf = session.udf( + input_types=bigframes.series.Series, + output_type=str, + dataset=dataset_id, + name=prefixer.create_prefix(), + )(serialize_row) + + assert getattr(serialize_row_mf, "is_row_processor") + + bf_result = scalars_df[columns].apply(serialize_row_mf, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) + + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + # Let's make sure the read_gbq_function path works for this function. + serialize_row_reuse = session.read_gbq_function( + serialize_row_mf.bigframes_bigquery_function, is_row_processor=True + ) + bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + finally: + # clean up the gcp assets created for the managed function. + cleanup_function_assets( + serialize_row_mf, session.bqclient, ignore_failures=False + ) + + +def test_managed_function_df_apply_axis_1_aggregates(session, dataset_id, scalars_dfs): + columns = ["int64_col", "int64_too", "float64_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + try: + + def analyze(row): + # TODO(b/435021126): Remove explicit type conversion of the field + # "name" after the issue has been addressed. It is added only to + # accept partial pandas parity for the time being. + return str( + { + "dtype": row.dtype, + "count": int(row.count()), + "min": int(row.min()), + "max": int(row.max()), + "mean": float(row.mean()), + "std": float(row.std()), + "var": float(row.var()), + } + ) + + with pytest.warns( + bfe.PreviewWarning, + match=("Numpy, Pandas, and Pyarrow version may not precisely match."), + ): + + analyze_mf = session.udf( + input_types=bigframes.series.Series, + output_type=str, + dataset=dataset_id, + name=prefixer.create_prefix(), + )(analyze) + + assert getattr(analyze_mf, "is_row_processor") + + bf_result = scalars_df[columns].dropna().apply(analyze_mf, axis=1).to_pandas() + pd_result = scalars_pandas_df[columns].dropna().apply(analyze, axis=1) + + # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + finally: + # clean up the gcp assets created for the managed function. + cleanup_function_assets(analyze_mf, session.bqclient, ignore_failures=False) + + +@pytest.mark.parametrize( + ("pd_df",), + [ + pytest.param( + pandas.DataFrame( + { + "2": [1, 2, 3], + 2: [1.5, 3.75, 5], + "name, [with. special'- chars\")/\\": [10, 20, 30], + (3, 4): ["pq", "rs", "tu"], + (5.0, "six", 7): [8, 9, 10], + 'raise Exception("hacked!")': [11, 12, 13], + }, + # Default pandas index has non-numpy type, whereas bigframes is + # always numpy-based type, so let's use the index compatible + # with bigframes. See more details in b/369689696. + index=pandas.Index([0, 1, 2], dtype=pandas.Int64Dtype()), + ), + id="all-kinds-of-column-names", + ), + pytest.param( + pandas.DataFrame( + { + "x": [1, 2, 3], + "y": [1.5, 3.75, 5], + "z": ["pq", "rs", "tu"], + }, + index=pandas.MultiIndex.from_frame( + pandas.DataFrame( + { + "idx0": pandas.Series( + ["a", "a", "b"], dtype=pandas.StringDtype() + ), + "idx1": pandas.Series( + [100, 200, 300], dtype=pandas.Int64Dtype() + ), + } + ) + ), + ), + id="multiindex", + marks=pytest.mark.skip( + reason="TODO: revert this skip after this pandas bug is fixed: https://github.com/pandas-dev/pandas/issues/59908" + ), + ), + pytest.param( + pandas.DataFrame( + [ + [10, 1.5, "pq"], + [20, 3.75, "rs"], + [30, 8.0, "tu"], + ], + # Default pandas index has non-numpy type, whereas bigframes is + # always numpy-based type, so let's use the index compatible + # with bigframes. See more details in b/369689696. + index=pandas.Index([0, 1, 2], dtype=pandas.Int64Dtype()), + columns=pandas.MultiIndex.from_arrays( + [ + ["first", "last_two", "last_two"], + [1, 2, 3], + ] + ), + ), + id="column-multiindex", + ), + ], +) +def test_managed_function_df_apply_axis_1_complex(session, dataset_id, pd_df): + bf_df = session.read_pandas(pd_df) + + try: + + def serialize_row(row): + # TODO(b/435021126): Remove explicit type conversion of the field + # "name" after the issue has been addressed. It is added only to + # accept partial pandas parity for the time being. + custom = { + "name": int(row.name), + "index": [idx for idx in row.index], + "values": [ + val.item() if hasattr(val, "item") else val for val in row.values + ], + } + return str( + { + "default": row.to_json(), + "split": row.to_json(orient="split"), + "records": row.to_json(orient="records"), + "index": row.to_json(orient="index"), + "custom": custom, + } + ) + + serialize_row_mf = session.udf( + input_types=bigframes.series.Series, + output_type=str, + dataset=dataset_id, + name=prefixer.create_prefix(), + )(serialize_row) + + assert getattr(serialize_row_mf, "is_row_processor") + + bf_result = bf_df.apply(serialize_row_mf, axis=1).to_pandas() + pd_result = pd_df.apply(serialize_row, axis=1) + + # ignore known dtype difference between pandas and bigframes. + pandas.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + + finally: + # clean up the gcp assets created for the managed function. + cleanup_function_assets( + serialize_row_mf, session.bqclient, ignore_failures=False + ) + + +@pytest.mark.skip(reason="Revert after this bug b/435018880 is fixed.") +def test_managed_function_df_apply_axis_1_na_nan_inf(dataset_id, session): + """This test is for special cases of float values, to make sure any (nan, + inf, -inf) produced by user code is honored. + """ + bf_df = session.read_gbq( + """\ +SELECT "1" AS text, 1 AS num +UNION ALL +SELECT "2.5" AS text, 2.5 AS num +UNION ALL +SELECT "nan" AS text, IEEE_DIVIDE(0, 0) AS num +UNION ALL +SELECT "inf" AS text, IEEE_DIVIDE(1, 0) AS num +UNION ALL +SELECT "-inf" AS text, IEEE_DIVIDE(-1, 0) AS num +UNION ALL +SELECT "numpy nan" AS text, IEEE_DIVIDE(0, 0) AS num +UNION ALL +SELECT "pandas na" AS text, NULL AS num + """ + ) + + pd_df = bf_df.to_pandas() + + try: + + def float_parser(row): + import numpy as mynp + import pandas as mypd + + if row["text"] == "pandas na": + return mypd.NA + if row["text"] == "numpy nan": + return mynp.nan + return float(row["text"]) + + float_parser_mf = session.udf( + input_types=bigframes.series.Series, + output_type=float, + dataset=dataset_id, + name=prefixer.create_prefix(), + )(float_parser) + + assert getattr(float_parser_mf, "is_row_processor") + + pd_result = pd_df.apply(float_parser, axis=1) + bf_result = bf_df.apply(float_parser_mf, axis=1).to_pandas() + + # bf_result.dtype is 'Float64' while pd_result.dtype is 'object' + # , ignore this mismatch by using check_dtype=False. + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + # Let's also assert that the data is consistent in this round trip + # (BQ -> BigFrames -> BQ -> GCF -> BQ -> BigFrames) w.r.t. their + # expected values in BQ. + bq_result = bf_df["num"].to_pandas() + bq_result.name = None + pandas.testing.assert_series_equal(bq_result, bf_result) + finally: + # clean up the gcp assets created for the managed function. + cleanup_function_assets( + float_parser_mf, session.bqclient, ignore_failures=False + )