From 8e85a0b5bffb3238a62c6de180f01ab416c6756f Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 26 Jun 2025 00:21:52 +0000 Subject: [PATCH 01/12] feat: df.join lsuffix and rsuffix support --- bigframes/dataframe.py | 84 +++++++++++++++++++++++---- tests/system/small/test_dataframe.py | 85 ++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 11 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 495e242f43..1db97b3ccb 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3412,16 +3412,22 @@ def join( *, on: Optional[str] = None, how: str = "left", + lsuffix: str = "", + rsuffix: str = "", ) -> DataFrame: if isinstance(other, bigframes.series.Series): other = other.to_frame() left, right = self, other - if not left.columns.intersection(right.columns).empty: - raise NotImplementedError( - f"Deduping column names is not implemented. {constants.FEEDBACK_LINK}" - ) + col_intersection = left.columns.intersection(right.columns) + + if not col_intersection.empty: + if lsuffix == rsuffix == "": + raise ValueError( + f"columns overlap but no suffix specified: {col_intersection}" + ) + if how == "cross": if on is not None: raise ValueError("'on' is not supported for cross join.") @@ -3429,7 +3435,7 @@ def join( right._block, left_join_ids=[], right_join_ids=[], - suffixes=("", ""), + suffixes=(lsuffix, rsuffix), how="cross", sort=True, ) @@ -3441,8 +3447,25 @@ def join( raise ValueError( "Join on columns must match the index level of the other DataFrame. Join on column with multi-index haven't been supported." ) + + # Replace all columns names with unique names for reordering. + left_col_original_names = left.columns + on_col_name = "bigframes_left_col_on" + dup_on_col_name = "bigframes_left_col_on_dup" + left_col_temp_names = [ + f"bigframes_left_col_name_{i}" if col_name != on else on_col_name + for i, col_name in enumerate(left_col_original_names) + ] + left.columns = pandas.Index(left_col_temp_names) + # if on column is also in right df, we need to duplicate the column + # and set it to be the first column + if on in col_intersection: + left[dup_on_col_name] = left[on_col_name] + on_col_name = dup_on_col_name + left_col_temp_names = [on_col_name] + left_col_temp_names + left = left[left_col_temp_names] + # Switch left index with on column - left_columns = left.columns left_idx_original_names = left.index.names if left._has_index else () left_idx_names_in_cols = [ f"bigframes_left_idx_name_{i}" @@ -3451,11 +3474,18 @@ def join( if left._has_index: left.index.names = left_idx_names_in_cols left = left.reset_index(drop=False) - left = left.set_index(on) + left = left.set_index(on_col_name) + + right_col_original_names = right.columns + right_col_temp_names = [ + f"bigframes_right_col_name_{i}" + for i in range(len(right_col_original_names)) + ] + right.columns = pandas.Index(right_col_temp_names) # Join on index and switch back combined_df = left._perform_join_by_index(right, how=how) - combined_df.index.name = on + combined_df.index.name = on_col_name combined_df = combined_df.reset_index(drop=False) combined_df = combined_df.set_index(left_idx_names_in_cols) @@ -3468,14 +3498,22 @@ def join( ) # Reorder columns - combined_df = combined_df[list(left_columns) + list(right.columns)] - return combined_df + combined_df = combined_df[left_col_temp_names + right_col_temp_names] + return combined_df._add_suffix( + left_col_original_names, + right_col_original_names, + lsuffix=lsuffix, + rsuffix=rsuffix, + extra_col=on if on_col_name == dup_on_col_name else None, + ) # Join left index with right index if left._block.index.nlevels != right._block.index.nlevels: raise ValueError("Index to join on must have the same number of levels.") - return left._perform_join_by_index(right, how=how) + return left._perform_join_by_index(right, how=how)._add_suffix( + left.columns, right.columns, lsuffix=lsuffix, rsuffix=rsuffix + ) def _perform_join_by_index( self, @@ -3489,6 +3527,30 @@ def _perform_join_by_index( ) return DataFrame(block) + def _add_suffix( + self, + left_columns, + right_columns, + lsuffix: str = "", + rsuffix: str = "", + extra_col: typing.Optional[str] = None, + ): + col_intersection = left_columns.intersection(right_columns) + final_col_names = [] if extra_col is None else [extra_col] + for col_name in left_columns: + if col_name in col_intersection: + final_col_names.append(f"{col_name}{lsuffix}") + else: + final_col_names.append(col_name) + + for col_name in right_columns: + if col_name in col_intersection: + final_col_names.append(f"{col_name}{rsuffix}") + else: + final_col_names.append(col_name) + self.columns = pandas.Index(final_col_names) + return self + @validations.requires_ordering() def rolling( self, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index b037c6f371..0e9d113587 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2816,6 +2816,91 @@ def test_join_different_table( assert_pandas_df_equal(bf_result, pd_result, ignore_order=True) +@all_joins +def test_join_different_table_with_duplicate_column_name( + scalars_df_index, scalars_pandas_df_index, how +): + bf_df_a = scalars_df_index[["string_col", "int64_col", "int64_too"]].rename( + columns={"int64_too": "int64_col"} + ) + bf_df_b = scalars_df_index.dropna()[ + ["string_col", "int64_col", "int64_too"] + ].rename(columns={"int64_too": "int64_col"}) + bf_result = bf_df_a.join(bf_df_b, how=how, lsuffix="_l", rsuffix="_r").to_pandas() + + pd_df_a = scalars_pandas_df_index[["string_col", "int64_col", "int64_too"]].rename( + columns={"int64_too": "int64_col"} + ) + pd_df_b = scalars_pandas_df_index.dropna()[ + ["string_col", "int64_col", "int64_too"] + ].rename(columns={"int64_too": "int64_col"}) + pd_result = pd_df_a.join(pd_df_b, how=how, lsuffix="_l", rsuffix="_r") + + pd.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False) + + +@all_joins +def test_join_param_on_with_duplicate_column_name_not_on_col( + scalars_df_index, scalars_pandas_df_index, how +): + # This test is for duplicate column names, but the 'on' column is not duplicated. + if how == "cross": + return + bf_df_a = scalars_df_index[ + ["string_col", "datetime_col", "timestamp_col", "int64_too"] + ].rename(columns={"timestamp_col": "datetime_col"}) + bf_df_b = scalars_df_index.dropna()[ + ["string_col", "datetime_col", "timestamp_col"] + ].rename(columns={"timestamp_col": "datetime_col"}) + bf_result = bf_df_a.join( + bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" + ).to_pandas() + pd_df_a = scalars_pandas_df_index[ + ["string_col", "datetime_col", "timestamp_col", "int64_too"] + ].rename(columns={"timestamp_col": "datetime_col"}) + pd_df_b = scalars_pandas_df_index.dropna()[ + ["string_col", "datetime_col", "timestamp_col"] + ].rename(columns={"timestamp_col": "datetime_col"}) + pd_result = pd_df_a.join( + pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" + ) + pd.testing.assert_frame_equal( + bf_result, pd_result, check_like=True, check_index_type=False + ) + pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) + + +@all_joins +def test_join_param_on_with_duplicate_column_name_on_col( + scalars_df_index, scalars_pandas_df_index, how +): + # This test is for duplicate column names, and the 'on' column is duplicated. + if how == "cross": + return + bf_df_a = scalars_df_index[ + ["string_col", "datetime_col", "timestamp_col", "int64_too"] + ].rename(columns={"timestamp_col": "datetime_col"}) + bf_df_b = scalars_df_index.dropna()[ + ["string_col", "datetime_col", "timestamp_col", "int64_too"] + ].rename(columns={"timestamp_col": "datetime_col"}) + bf_result = bf_df_a.join( + bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" + ).to_pandas() + pd_df_a = scalars_pandas_df_index[ + ["string_col", "datetime_col", "timestamp_col", "int64_too"] + ].rename(columns={"timestamp_col": "datetime_col"}) + pd_df_b = scalars_pandas_df_index.dropna()[ + ["string_col", "datetime_col", "timestamp_col", "int64_too"] + ].rename(columns={"timestamp_col": "datetime_col"}) + pd_result = pd_df_a.join( + pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" + ) + pd.testing.assert_frame_equal( + bf_result, pd_result, check_like=True, check_index_type=False + ) + pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) + + def test_join_duplicate_columns_raises_not_implemented(scalars_dfs): scalars_df, _ = scalars_dfs df_a = scalars_df[["string_col", "float64_col"]] From 515c98518ae92ac3d40da95f4d6dff2c77a56564 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 26 Jun 2025 00:50:50 +0000 Subject: [PATCH 02/12] raise error when on is duplicated. --- bigframes/dataframe.py | 7 +++++++ tests/system/small/test_dataframe.py | 30 +++++++++++++++++++++++++--- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 1db97b3ccb..e4e12e6baf 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3443,6 +3443,13 @@ def join( # Join left columns with right index if on is not None: + if on in left.index.names: + raise ValueError( + f"'{on}' is both an index level and a column label, which is ambiguous." + ) + if on in left.columns: + raise ValueError(f"The column label '{on}' is not unique.") + if other._block.index.nlevels != 1: raise ValueError( "Join on columns must match the index level of the other DataFrame. Join on column with multi-index haven't been supported." diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 0e9d113587..bc8879f774 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2901,12 +2901,36 @@ def test_join_param_on_with_duplicate_column_name_on_col( pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) -def test_join_duplicate_columns_raises_not_implemented(scalars_dfs): +@all_joins +def test_join_raise_when_param_on_duplicate_with_index(scalars_df_index, how): + if how == "cross": + return + bf_df_a = scalars_df_index[["string_col"]] + bf_df_a.index.name = "string_col" + bf_df_b = scalars_df_index.dropna()["string_col"] + with pytest.raises(ValueError): + bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") + + +@all_joins +def test_join_raise_when_param_on_duplicate_with_column(scalars_df_index, how): + if how == "cross": + return + bf_df_a = scalars_df_index[["string_col", "int64_col"]].rename( + columns={"int64_col": "string_col"} + ) + bf_df_a.index.name = "string_col" + bf_df_b = scalars_df_index.dropna()["string_col"] + with pytest.raises(ValueError): + bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") + + +def test_join_duplicate_columns_raises_value_error(scalars_dfs): scalars_df, _ = scalars_dfs df_a = scalars_df[["string_col", "float64_col"]] df_b = scalars_df[["float64_col"]] - with pytest.raises(NotImplementedError): - df_a.join(df_b, how="outer").to_pandas() + with pytest.raises(ValueError): + df_a.join(df_b, how="outer") @all_joins From 481a6bb7d7f13551725d57c80e717692aa695ff7 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 26 Jun 2025 00:52:31 +0000 Subject: [PATCH 03/12] rename --- bigframes/dataframe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index e4e12e6baf..d9687c9554 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3506,7 +3506,7 @@ def join( # Reorder columns combined_df = combined_df[left_col_temp_names + right_col_temp_names] - return combined_df._add_suffix( + return combined_df._add_join_suffix( left_col_original_names, right_col_original_names, lsuffix=lsuffix, @@ -3518,7 +3518,7 @@ def join( if left._block.index.nlevels != right._block.index.nlevels: raise ValueError("Index to join on must have the same number of levels.") - return left._perform_join_by_index(right, how=how)._add_suffix( + return left._perform_join_by_index(right, how=how)._add_join_suffix( left.columns, right.columns, lsuffix=lsuffix, rsuffix=rsuffix ) @@ -3534,7 +3534,7 @@ def _perform_join_by_index( ) return DataFrame(block) - def _add_suffix( + def _add_join_suffix( self, left_columns, right_columns, From 798d3d5e58533a4106cecc3c0a7ca383253b4071 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 26 Jun 2025 17:23:36 +0000 Subject: [PATCH 04/12] error update. --- bigframes/dataframe.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index d9687c9554..693f2e1b1d 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3444,10 +3444,15 @@ def join( # Join left columns with right index if on is not None: if on in left.index.names: - raise ValueError( - f"'{on}' is both an index level and a column label, which is ambiguous." - ) - if on in left.columns: + if on in left.columns: + raise ValueError( + f"'{on}' is both an index level and a column label, which is ambiguous." + ) + else: + raise NotImplementedError( + f"Joining on index level '{on}' is not yet supported. {constants.FEEDBACK_LINK}" + ) + if (left.columns == on).sum() > 1: raise ValueError(f"The column label '{on}' is not unique.") if other._block.index.nlevels != 1: From 8c6630b5a9bb07c7e16b72e1efd03c8b1276a253 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 26 Jun 2025 17:33:34 +0000 Subject: [PATCH 05/12] test fix. --- tests/system/small/test_dataframe.py | 21 --------------------- tests/unit/test_dataframe_polars.py | 19 ++++++++++++++++--- 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index bc8879f774..048848cad1 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2912,27 +2912,6 @@ def test_join_raise_when_param_on_duplicate_with_index(scalars_df_index, how): bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") -@all_joins -def test_join_raise_when_param_on_duplicate_with_column(scalars_df_index, how): - if how == "cross": - return - bf_df_a = scalars_df_index[["string_col", "int64_col"]].rename( - columns={"int64_col": "string_col"} - ) - bf_df_a.index.name = "string_col" - bf_df_b = scalars_df_index.dropna()["string_col"] - with pytest.raises(ValueError): - bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") - - -def test_join_duplicate_columns_raises_value_error(scalars_dfs): - scalars_df, _ = scalars_dfs - df_a = scalars_df[["string_col", "float64_col"]] - df_b = scalars_df[["float64_col"]] - with pytest.raises(ValueError): - df_a.join(df_b, how="outer") - - @all_joins def test_join_param_on(scalars_dfs, how): bf_df, pd_df = scalars_dfs diff --git a/tests/unit/test_dataframe_polars.py b/tests/unit/test_dataframe_polars.py index f7f0cc80bb..a3ba98325e 100644 --- a/tests/unit/test_dataframe_polars.py +++ b/tests/unit/test_dataframe_polars.py @@ -2443,12 +2443,25 @@ def test_join_different_table( assert_pandas_df_equal(bf_result, pd_result, ignore_order=True) -def test_join_duplicate_columns_raises_not_implemented(scalars_dfs): +@all_joins +def test_join_raise_when_param_on_duplicate_with_column(scalars_df_index, how): + if how == "cross": + return + bf_df_a = scalars_df_index[["string_col", "int64_col"]].rename( + columns={"int64_col": "string_col"} + ) + bf_df_a.index.name = "string_col" + bf_df_b = scalars_df_index.dropna()["string_col"] + with pytest.raises(ValueError): + bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") + + +def test_join_duplicate_columns_raises_value_error(scalars_dfs): scalars_df, _ = scalars_dfs df_a = scalars_df[["string_col", "float64_col"]] df_b = scalars_df[["float64_col"]] - with pytest.raises(NotImplementedError): - df_a.join(df_b, how="outer").to_pandas() + with pytest.raises(ValueError): + df_a.join(df_b, how="outer") @all_joins From 69fa7157f24c7f9f0adf6dc310fa0d11ce03aac0 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 26 Jun 2025 18:16:15 +0000 Subject: [PATCH 06/12] add doc and test fixes --- bigframes/dataframe.py | 2 +- tests/system/small/test_dataframe.py | 23 +++++--------- tests/unit/test_dataframe_polars.py | 11 +++++++ .../bigframes_vendored/pandas/core/frame.py | 31 ++++++++++++++++++- 4 files changed, 50 insertions(+), 17 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 693f2e1b1d..4a3c7864ad 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3443,7 +3443,7 @@ def join( # Join left columns with right index if on is not None: - if on in left.index.names: + if left._has_index and (on in left.index.names): if on in left.columns: raise ValueError( f"'{on}' is both an index level and a column label, which is ambiguous." diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 048848cad1..dcbda51d49 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2865,9 +2865,11 @@ def test_join_param_on_with_duplicate_column_name_not_on_col( pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" ) pd.testing.assert_frame_equal( - bf_result, pd_result, check_like=True, check_index_type=False + bf_result.sort_index(), + pd_result.sort_index(), + check_like=True, + check_index_type=False, ) - pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) @all_joins @@ -2896,20 +2898,11 @@ def test_join_param_on_with_duplicate_column_name_on_col( pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" ) pd.testing.assert_frame_equal( - bf_result, pd_result, check_like=True, check_index_type=False + bf_result.sort_index(), + pd_result.sort_index(), + check_like=True, + check_index_type=False, ) - pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) - - -@all_joins -def test_join_raise_when_param_on_duplicate_with_index(scalars_df_index, how): - if how == "cross": - return - bf_df_a = scalars_df_index[["string_col"]] - bf_df_a.index.name = "string_col" - bf_df_b = scalars_df_index.dropna()["string_col"] - with pytest.raises(ValueError): - bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") @all_joins diff --git a/tests/unit/test_dataframe_polars.py b/tests/unit/test_dataframe_polars.py index a3ba98325e..d5e260a905 100644 --- a/tests/unit/test_dataframe_polars.py +++ b/tests/unit/test_dataframe_polars.py @@ -2464,6 +2464,17 @@ def test_join_duplicate_columns_raises_value_error(scalars_dfs): df_a.join(df_b, how="outer") +@all_joins +def test_join_param_on_duplicate_with_index_raises_value_error(scalars_df_index, how): + if how == "cross": + return + bf_df_a = scalars_df_index[["string_col"]] + bf_df_a.index.name = "string_col" + bf_df_b = scalars_df_index.dropna()["string_col"] + with pytest.raises(ValueError): + bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") + + @all_joins def test_join_param_on(scalars_dfs, how): bf_df, pd_df = scalars_dfs diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 0606032d34..f5a505cd39 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4536,7 +4536,15 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: # ---------------------------------------------------------------------- # Merging / joining methods - def join(self, other, *, on: Optional[str] = None, how: str) -> DataFrame: + def join( + self, + other, + *, + on: Optional[str] = None, + how: str, + lsuffix: str = "", + rsuffix: str = "", + ) -> DataFrame: """Join columns of another DataFrame. Join columns with `other` DataFrame on index @@ -4609,6 +4617,19 @@ def join(self, other, *, on: Optional[str] = None, how: str) -> DataFrame: [2 rows x 4 columns] + If there are overlapping columns, `lsuffix` and `rsuffix` can be used: + + >>> df1 = bpd.DataFrame({'key': ['K0', 'K1', 'K2'], 'A': ['A0', 'A1', 'A2']}) + >>> df2 = bpd.DataFrame({'key': ['K0', 'K1', 'K2'], 'A': ['B0', 'B1', 'B2']}) + >>> df1.set_index('key').join(df2.set_index('key'), lsuffix='_left', rsuffix='_right') + A_left A_right + key + K0 A0 B0 + K1 A1 B1 + K2 A2 B2 + + [3 rows x 2 columns] + Args: other: DataFrame or Series with an Index similar to the Index of this one. @@ -4625,6 +4646,10 @@ def join(self, other, *, on: Optional[str] = None, how: str) -> DataFrame: index, preserving the order of the calling's one. ``cross``: creates the cartesian product from both frames, preserves the order of the left keys. + lsuffix(str, default ''): + Suffix to use from left frame's overlapping columns. + rsuffix(str, default ''): + Suffix to use from right frame's overlapping columns. Returns: bigframes.pandas.DataFrame: @@ -4639,6 +4664,10 @@ def join(self, other, *, on: Optional[str] = None, how: str) -> DataFrame: ValueError: If left index to join on does not have the same number of levels as the right index. + ValueError: + If columns overlap but no suffix is specified. + ValueError: + If `on` column is not unique. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 53ef0cc8c150b8556c8971daf3aa7ffc55181196 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Thu, 26 Jun 2025 19:12:48 +0000 Subject: [PATCH 07/12] skip pandas 1.x test --- tests/system/small/test_dataframe.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index dcbda51d49..c9e5c4c487 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2827,7 +2827,7 @@ def test_join_different_table_with_duplicate_column_name( ["string_col", "int64_col", "int64_too"] ].rename(columns={"int64_too": "int64_col"}) bf_result = bf_df_a.join(bf_df_b, how=how, lsuffix="_l", rsuffix="_r").to_pandas() - + print(bf_result) pd_df_a = scalars_pandas_df_index[["string_col", "int64_col", "int64_too"]].rename( columns={"int64_too": "int64_col"} ) @@ -2835,6 +2835,7 @@ def test_join_different_table_with_duplicate_column_name( ["string_col", "int64_col", "int64_too"] ].rename(columns={"int64_too": "int64_col"}) pd_result = pd_df_a.join(pd_df_b, how=how, lsuffix="_l", rsuffix="_r") + print(pd_result) pd.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False) @@ -2872,6 +2873,9 @@ def test_join_param_on_with_duplicate_column_name_not_on_col( ) +@pytest.mark.skipif( + pandas.__version__.startswith("1."), reason="bad left join in pandas 1.x" +) @all_joins def test_join_param_on_with_duplicate_column_name_on_col( scalars_df_index, scalars_pandas_df_index, how @@ -2888,6 +2892,7 @@ def test_join_param_on_with_duplicate_column_name_on_col( bf_result = bf_df_a.join( bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" ).to_pandas() + print(bf_result) pd_df_a = scalars_pandas_df_index[ ["string_col", "datetime_col", "timestamp_col", "int64_too"] ].rename(columns={"timestamp_col": "datetime_col"}) @@ -2897,6 +2902,7 @@ def test_join_param_on_with_duplicate_column_name_on_col( pd_result = pd_df_a.join( pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" ) + print(pd_result) pd.testing.assert_frame_equal( bf_result.sort_index(), pd_result.sort_index(), From 4e802202a4d409381eb2aeb2fc327fc80b5f5fd5 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Mon, 7 Jul 2025 18:02:59 +0000 Subject: [PATCH 08/12] test fixes --- tests/system/small/test_dataframe.py | 4 ---- tests/unit/test_dataframe_polars.py | 14 +++++++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index c9e5c4c487..c138f13a11 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2827,7 +2827,6 @@ def test_join_different_table_with_duplicate_column_name( ["string_col", "int64_col", "int64_too"] ].rename(columns={"int64_too": "int64_col"}) bf_result = bf_df_a.join(bf_df_b, how=how, lsuffix="_l", rsuffix="_r").to_pandas() - print(bf_result) pd_df_a = scalars_pandas_df_index[["string_col", "int64_col", "int64_too"]].rename( columns={"int64_too": "int64_col"} ) @@ -2835,7 +2834,6 @@ def test_join_different_table_with_duplicate_column_name( ["string_col", "int64_col", "int64_too"] ].rename(columns={"int64_too": "int64_col"}) pd_result = pd_df_a.join(pd_df_b, how=how, lsuffix="_l", rsuffix="_r") - print(pd_result) pd.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False) @@ -2892,7 +2890,6 @@ def test_join_param_on_with_duplicate_column_name_on_col( bf_result = bf_df_a.join( bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" ).to_pandas() - print(bf_result) pd_df_a = scalars_pandas_df_index[ ["string_col", "datetime_col", "timestamp_col", "int64_too"] ].rename(columns={"timestamp_col": "datetime_col"}) @@ -2902,7 +2899,6 @@ def test_join_param_on_with_duplicate_column_name_on_col( pd_result = pd_df_a.join( pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" ) - print(pd_result) pd.testing.assert_frame_equal( bf_result.sort_index(), pd_result.sort_index(), diff --git a/tests/unit/test_dataframe_polars.py b/tests/unit/test_dataframe_polars.py index d5e260a905..ef4f8826ab 100644 --- a/tests/unit/test_dataframe_polars.py +++ b/tests/unit/test_dataframe_polars.py @@ -2450,9 +2450,10 @@ def test_join_raise_when_param_on_duplicate_with_column(scalars_df_index, how): bf_df_a = scalars_df_index[["string_col", "int64_col"]].rename( columns={"int64_col": "string_col"} ) - bf_df_a.index.name = "string_col" bf_df_b = scalars_df_index.dropna()["string_col"] - with pytest.raises(ValueError): + with pytest.raises( + ValueError, match="The column label 'string_col' is not unique." + ): bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") @@ -2460,7 +2461,7 @@ def test_join_duplicate_columns_raises_value_error(scalars_dfs): scalars_df, _ = scalars_dfs df_a = scalars_df[["string_col", "float64_col"]] df_b = scalars_df[["float64_col"]] - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="columns overlap but no suffix specified"): df_a.join(df_b, how="outer") @@ -2471,7 +2472,10 @@ def test_join_param_on_duplicate_with_index_raises_value_error(scalars_df_index, bf_df_a = scalars_df_index[["string_col"]] bf_df_a.index.name = "string_col" bf_df_b = scalars_df_index.dropna()["string_col"] - with pytest.raises(ValueError): + with pytest.raises( + ValueError, + match="'string_col' is both an index level and a column label, which is ambiguous.", + ): bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") @@ -2484,7 +2488,7 @@ def test_join_param_on(scalars_dfs, how): bf_df_b = bf_df[["float64_col"]] if how == "cross": - with pytest.raises(ValueError): + with pytest.raises(ValueError, match="'on' is not supported for cross join."): bf_df_a.join(bf_df_b, on="rowindex_2", how=how) else: bf_result = bf_df_a.join(bf_df_b, on="rowindex_2", how=how).to_pandas() From d661ea668004e06173893cf6a1c9a975cf0f8e88 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Mon, 7 Jul 2025 18:33:49 +0000 Subject: [PATCH 09/12] create join on key helper function --- bigframes/dataframe.py | 128 +++++++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 55 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 735873367a..5f467d5ed3 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3488,63 +3488,13 @@ def join( "Join on columns must match the index level of the other DataFrame. Join on column with multi-index haven't been supported." ) - # Replace all columns names with unique names for reordering. - left_col_original_names = left.columns - on_col_name = "bigframes_left_col_on" - dup_on_col_name = "bigframes_left_col_on_dup" - left_col_temp_names = [ - f"bigframes_left_col_name_{i}" if col_name != on else on_col_name - for i, col_name in enumerate(left_col_original_names) - ] - left.columns = pandas.Index(left_col_temp_names) - # if on column is also in right df, we need to duplicate the column - # and set it to be the first column - if on in col_intersection: - left[dup_on_col_name] = left[on_col_name] - on_col_name = dup_on_col_name - left_col_temp_names = [on_col_name] + left_col_temp_names - left = left[left_col_temp_names] - - # Switch left index with on column - left_idx_original_names = left.index.names if left._has_index else () - left_idx_names_in_cols = [ - f"bigframes_left_idx_name_{i}" - for i in range(len(left_idx_original_names)) - ] - if left._has_index: - left.index.names = left_idx_names_in_cols - left = left.reset_index(drop=False) - left = left.set_index(on_col_name) - - right_col_original_names = right.columns - right_col_temp_names = [ - f"bigframes_right_col_name_{i}" - for i in range(len(right_col_original_names)) - ] - right.columns = pandas.Index(right_col_temp_names) - - # Join on index and switch back - combined_df = left._perform_join_by_index(right, how=how) - combined_df.index.name = on_col_name - combined_df = combined_df.reset_index(drop=False) - combined_df = combined_df.set_index(left_idx_names_in_cols) - - # To be consistent with Pandas - if combined_df._has_index: - combined_df.index.names = ( - left_idx_original_names - if how in ("inner", "left") - else ([None] * len(combined_df.index.names)) - ) - - # Reorder columns - combined_df = combined_df[left_col_temp_names + right_col_temp_names] - return combined_df._add_join_suffix( - left_col_original_names, - right_col_original_names, + return self._join_on_key( + other, + on=on, + how=how, lsuffix=lsuffix, rsuffix=rsuffix, - extra_col=on if on_col_name == dup_on_col_name else None, + should_duplicate_on_key=(on in col_intersection), ) # Join left index with right index @@ -3555,6 +3505,74 @@ def join( left.columns, right.columns, lsuffix=lsuffix, rsuffix=rsuffix ) + def _join_on_key( + self, + other: DataFrame, + on: str, + how: str, + lsuffix: str, + rsuffix: str, + should_duplicate_on_key: bool, + ) -> DataFrame: + left, right = self, other + # Replace all columns names with unique names for reordering. + left_col_original_names = left.columns + on_col_name = "bigframes_left_col_on" + dup_on_col_name = "bigframes_left_col_on_dup" + left_col_temp_names = [ + f"bigframes_left_col_name_{i}" if col_name != on else on_col_name + for i, col_name in enumerate(left_col_original_names) + ] + left.columns = pandas.Index(left_col_temp_names) + # if on column is also in right df, we need to duplicate the column + # and set it to be the first column + if should_duplicate_on_key: + left[dup_on_col_name] = left[on_col_name] + on_col_name = dup_on_col_name + left_col_temp_names = [on_col_name] + left_col_temp_names + left = left[left_col_temp_names] + + # Switch left index with on column + left_idx_original_names = left.index.names if left._has_index else () + left_idx_names_in_cols = [ + f"bigframes_left_idx_name_{i}" for i in range(len(left_idx_original_names)) + ] + if left._has_index: + left.index.names = left_idx_names_in_cols + left = left.reset_index(drop=False) + left = left.set_index(on_col_name) + + right_col_original_names = right.columns + right_col_temp_names = [ + f"bigframes_right_col_name_{i}" + for i in range(len(right_col_original_names)) + ] + right.columns = pandas.Index(right_col_temp_names) + + # Join on index and switch back + combined_df = left._perform_join_by_index(right, how=how) + combined_df.index.name = on_col_name + combined_df = combined_df.reset_index(drop=False) + combined_df = combined_df.set_index(left_idx_names_in_cols) + + # To be consistent with Pandas + if combined_df._has_index: + combined_df.index.names = ( + left_idx_original_names + if how in ("inner", "left") + else ([None] * len(combined_df.index.names)) + ) + + # Reorder columns + combined_df = combined_df[left_col_temp_names + right_col_temp_names] + return combined_df._add_join_suffix( + left_col_original_names, + right_col_original_names, + lsuffix=lsuffix, + rsuffix=rsuffix, + extra_col=on if on_col_name == dup_on_col_name else None, + ) + def _perform_join_by_index( self, other: Union[DataFrame, indexes.Index], From 1ba81a4ccdd6a80a0a1db1e6e10a99df18eb3081 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Mon, 7 Jul 2025 19:02:38 +0000 Subject: [PATCH 10/12] test fix --- tests/system/small/test_dataframe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 362a3701ea..b830b3aa33 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2870,7 +2870,9 @@ def test_join_param_on_with_duplicate_column_name_not_on_col( pd_result.sort_index(), check_like=True, check_index_type=False, + check_names=False, ) + pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) @pytest.mark.skipif( From 014bb736df4a177c2b89ab5360be2a129ce49324 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Mon, 7 Jul 2025 19:45:18 +0000 Subject: [PATCH 11/12] test fix --- tests/system/small/test_dataframe.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index b830b3aa33..8b1044b451 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2908,7 +2908,9 @@ def test_join_param_on_with_duplicate_column_name_on_col( pd_result.sort_index(), check_like=True, check_index_type=False, + check_names=False, ) + pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) @all_joins From 2ced5af84954ba6cd896b87b4da52df1658ac541 Mon Sep 17 00:00:00 2001 From: Huan Chen Date: Mon, 28 Jul 2025 20:08:52 +0000 Subject: [PATCH 12/12] update join to avoid inplace changes. --- bigframes/dataframe.py | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index c5ae9813c6..30133f5e5c 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3559,7 +3559,7 @@ def _join_on_key( rsuffix: str, should_duplicate_on_key: bool, ) -> DataFrame: - left, right = self, other + left, right = self.copy(), other # Replace all columns names with unique names for reordering. left_col_original_names = left.columns on_col_name = "bigframes_left_col_on" @@ -3638,6 +3638,35 @@ def _add_join_suffix( rsuffix: str = "", extra_col: typing.Optional[str] = None, ): + """Applies suffixes to overlapping column names to mimic a pandas join. + + This method identifies columns that are common to both a "left" and "right" + set of columns and renames them using the provided suffixes. Columns that + are not in the intersection are kept with their original names. + + Args: + left_columns (pandas.Index): + The column labels from the left DataFrame. + right_columns (pandas.Index): + The column labels from the right DataFrame. + lsuffix (str): + The suffix to apply to overlapping column names from the left side. + rsuffix (str): + The suffix to apply to overlapping column names from the right side. + extra_col (typing.Optional[str]): + An optional column name to prepend to the final list of columns. + This argument is used specifically to match the behavior of a + pandas join. When a join key (i.e., the 'on' column) exists + in both the left and right DataFrames, pandas creates two versions + of that column: one copy keeps its original name and is placed as + the first column, while the other instances receive the normal + suffix. Passing the join key's name here replicates that behavior. + + Returns: + DataFrame: + A new DataFrame with the columns renamed to resolve overlaps. + """ + combined_df = self.copy() col_intersection = left_columns.intersection(right_columns) final_col_names = [] if extra_col is None else [extra_col] for col_name in left_columns: @@ -3651,8 +3680,8 @@ def _add_join_suffix( final_col_names.append(f"{col_name}{rsuffix}") else: final_col_names.append(col_name) - self.columns = pandas.Index(final_col_names) - return self + combined_df.columns = pandas.Index(final_col_names) + return combined_df @validations.requires_ordering() def rolling(