-
Notifications
You must be signed in to change notification settings - Fork 51
feat: df.join lsuffix and rsuffix support #1857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
8e85a0b
515c985
481a6bb
e66a0a1
798d3d5
14a1c54
8c6630b
69fa715
9748b35
53ef0cc
8b09d10
4e80220
052e090
d661ea6
1ba81a4
014bb73
cd4d962
12464f2
6892d84
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3485,70 +3485,138 @@ def join( | |
*, | ||
on: Optional[str] = None, | ||
how: str = "left", | ||
lsuffix: str = "", | ||
rsuffix: str = "", | ||
) -> DataFrame: | ||
if isinstance(other, bigframes.series.Series): | ||
other = other.to_frame() | ||
|
||
left, right = self, other | ||
|
||
if not left.columns.intersection(right.columns).empty: | ||
raise NotImplementedError( | ||
f"Deduping column names is not implemented. {constants.FEEDBACK_LINK}" | ||
) | ||
col_intersection = left.columns.intersection(right.columns) | ||
|
||
if not col_intersection.empty: | ||
if lsuffix == rsuffix == "": | ||
raise ValueError( | ||
f"columns overlap but no suffix specified: {col_intersection}" | ||
) | ||
|
||
if how == "cross": | ||
if on is not None: | ||
raise ValueError("'on' is not supported for cross join.") | ||
result_block = left._block.merge( | ||
right._block, | ||
left_join_ids=[], | ||
right_join_ids=[], | ||
suffixes=("", ""), | ||
suffixes=(lsuffix, rsuffix), | ||
how="cross", | ||
sort=True, | ||
) | ||
return DataFrame(result_block) | ||
|
||
# Join left columns with right index | ||
if on is not None: | ||
if left._has_index and (on in left.index.names): | ||
if on in left.columns: | ||
raise ValueError( | ||
f"'{on}' is both an index level and a column label, which is ambiguous." | ||
) | ||
else: | ||
raise NotImplementedError( | ||
f"Joining on index level '{on}' is not yet supported. {constants.FEEDBACK_LINK}" | ||
) | ||
if (left.columns == on).sum() > 1: | ||
raise ValueError(f"The column label '{on}' is not unique.") | ||
|
||
if other._block.index.nlevels != 1: | ||
raise ValueError( | ||
"Join on columns must match the index level of the other DataFrame. Join on column with multi-index haven't been supported." | ||
) | ||
# Switch left index with on column | ||
left_columns = left.columns | ||
left_idx_original_names = left.index.names if left._has_index else () | ||
left_idx_names_in_cols = [ | ||
f"bigframes_left_idx_name_{i}" | ||
for i in range(len(left_idx_original_names)) | ||
] | ||
if left._has_index: | ||
left.index.names = left_idx_names_in_cols | ||
left = left.reset_index(drop=False) | ||
left = left.set_index(on) | ||
|
||
# Join on index and switch back | ||
combined_df = left._perform_join_by_index(right, how=how) | ||
combined_df.index.name = on | ||
combined_df = combined_df.reset_index(drop=False) | ||
combined_df = combined_df.set_index(left_idx_names_in_cols) | ||
|
||
# To be consistent with Pandas | ||
if combined_df._has_index: | ||
combined_df.index.names = ( | ||
left_idx_original_names | ||
if how in ("inner", "left") | ||
else ([None] * len(combined_df.index.names)) | ||
) | ||
|
||
# Reorder columns | ||
combined_df = combined_df[list(left_columns) + list(right.columns)] | ||
return combined_df | ||
return self._join_on_key( | ||
other, | ||
on=on, | ||
how=how, | ||
lsuffix=lsuffix, | ||
rsuffix=rsuffix, | ||
should_duplicate_on_key=(on in col_intersection), | ||
) | ||
|
||
# Join left index with right index | ||
if left._block.index.nlevels != right._block.index.nlevels: | ||
raise ValueError("Index to join on must have the same number of levels.") | ||
|
||
return left._perform_join_by_index(right, how=how) | ||
return left._perform_join_by_index(right, how=how)._add_join_suffix( | ||
left.columns, right.columns, lsuffix=lsuffix, rsuffix=rsuffix | ||
) | ||
|
||
def _join_on_key( | ||
self, | ||
other: DataFrame, | ||
on: str, | ||
how: str, | ||
lsuffix: str, | ||
rsuffix: str, | ||
should_duplicate_on_key: bool, | ||
) -> DataFrame: | ||
left, right = self, other | ||
# Replace all columns names with unique names for reordering. | ||
left_col_original_names = left.columns | ||
on_col_name = "bigframes_left_col_on" | ||
dup_on_col_name = "bigframes_left_col_on_dup" | ||
left_col_temp_names = [ | ||
f"bigframes_left_col_name_{i}" if col_name != on else on_col_name | ||
for i, col_name in enumerate(left_col_original_names) | ||
] | ||
left.columns = pandas.Index(left_col_temp_names) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems dangerous. We haven't made a copy of self, so I'm uncomfortable with mutating it. If we must do this, then please either:
I prefer (1) since it's less likely to have problems in we're in a multi-threaded environment. |
||
# if on column is also in right df, we need to duplicate the column | ||
# and set it to be the first column | ||
if should_duplicate_on_key: | ||
left[dup_on_col_name] = left[on_col_name] | ||
on_col_name = dup_on_col_name | ||
left_col_temp_names = [on_col_name] + left_col_temp_names | ||
left = left[left_col_temp_names] | ||
|
||
# Switch left index with on column | ||
left_idx_original_names = left.index.names if left._has_index else () | ||
left_idx_names_in_cols = [ | ||
f"bigframes_left_idx_name_{i}" for i in range(len(left_idx_original_names)) | ||
] | ||
if left._has_index: | ||
left.index.names = left_idx_names_in_cols | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. Mutating the index is dangerous. Can we avoid this? |
||
left = left.reset_index(drop=False) | ||
left = left.set_index(on_col_name) | ||
|
||
right_col_original_names = right.columns | ||
right_col_temp_names = [ | ||
f"bigframes_right_col_name_{i}" | ||
for i in range(len(right_col_original_names)) | ||
] | ||
right.columns = pandas.Index(right_col_temp_names) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. |
||
|
||
# Join on index and switch back | ||
combined_df = left._perform_join_by_index(right, how=how) | ||
combined_df.index.name = on_col_name | ||
combined_df = combined_df.reset_index(drop=False) | ||
combined_df = combined_df.set_index(left_idx_names_in_cols) | ||
|
||
# To be consistent with Pandas | ||
if combined_df._has_index: | ||
combined_df.index.names = ( | ||
left_idx_original_names | ||
if how in ("inner", "left") | ||
else ([None] * len(combined_df.index.names)) | ||
) | ||
|
||
# Reorder columns | ||
combined_df = combined_df[left_col_temp_names + right_col_temp_names] | ||
return combined_df._add_join_suffix( | ||
left_col_original_names, | ||
right_col_original_names, | ||
lsuffix=lsuffix, | ||
rsuffix=rsuffix, | ||
extra_col=on if on_col_name == dup_on_col_name else None, | ||
) | ||
|
||
def _perform_join_by_index( | ||
self, | ||
|
@@ -3562,6 +3630,30 @@ def _perform_join_by_index( | |
) | ||
return DataFrame(block) | ||
|
||
def _add_join_suffix( | ||
self, | ||
left_columns, | ||
right_columns, | ||
lsuffix: str = "", | ||
rsuffix: str = "", | ||
extra_col: typing.Optional[str] = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a docstring explaining this |
||
): | ||
col_intersection = left_columns.intersection(right_columns) | ||
final_col_names = [] if extra_col is None else [extra_col] | ||
for col_name in left_columns: | ||
if col_name in col_intersection: | ||
final_col_names.append(f"{col_name}{lsuffix}") | ||
else: | ||
final_col_names.append(col_name) | ||
|
||
for col_name in right_columns: | ||
if col_name in col_intersection: | ||
final_col_names.append(f"{col_name}{rsuffix}") | ||
else: | ||
final_col_names.append(col_name) | ||
self.columns = pandas.Index(final_col_names) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should only be modifying |
||
return self | ||
|
||
@validations.requires_ordering() | ||
def rolling( | ||
self, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2899,12 +2899,99 @@ def test_join_different_table( | |
assert_pandas_df_equal(bf_result, pd_result, ignore_order=True) | ||
|
||
|
||
def test_join_duplicate_columns_raises_not_implemented(scalars_dfs): | ||
scalars_df, _ = scalars_dfs | ||
df_a = scalars_df[["string_col", "float64_col"]] | ||
df_b = scalars_df[["float64_col"]] | ||
with pytest.raises(NotImplementedError): | ||
df_a.join(df_b, how="outer").to_pandas() | ||
@all_joins | ||
def test_join_different_table_with_duplicate_column_name( | ||
scalars_df_index, scalars_pandas_df_index, how | ||
): | ||
bf_df_a = scalars_df_index[["string_col", "int64_col", "int64_too"]].rename( | ||
columns={"int64_too": "int64_col"} | ||
) | ||
bf_df_b = scalars_df_index.dropna()[ | ||
["string_col", "int64_col", "int64_too"] | ||
].rename(columns={"int64_too": "int64_col"}) | ||
bf_result = bf_df_a.join(bf_df_b, how=how, lsuffix="_l", rsuffix="_r").to_pandas() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add some checks that |
||
pd_df_a = scalars_pandas_df_index[["string_col", "int64_col", "int64_too"]].rename( | ||
columns={"int64_too": "int64_col"} | ||
) | ||
pd_df_b = scalars_pandas_df_index.dropna()[ | ||
["string_col", "int64_col", "int64_too"] | ||
].rename(columns={"int64_too": "int64_col"}) | ||
pd_result = pd_df_a.join(pd_df_b, how=how, lsuffix="_l", rsuffix="_r") | ||
|
||
pd.testing.assert_frame_equal(bf_result, pd_result, check_index_type=False) | ||
|
||
|
||
@all_joins | ||
def test_join_param_on_with_duplicate_column_name_not_on_col( | ||
scalars_df_index, scalars_pandas_df_index, how | ||
): | ||
# This test is for duplicate column names, but the 'on' column is not duplicated. | ||
if how == "cross": | ||
return | ||
bf_df_a = scalars_df_index[ | ||
["string_col", "datetime_col", "timestamp_col", "int64_too"] | ||
].rename(columns={"timestamp_col": "datetime_col"}) | ||
bf_df_b = scalars_df_index.dropna()[ | ||
["string_col", "datetime_col", "timestamp_col"] | ||
].rename(columns={"timestamp_col": "datetime_col"}) | ||
bf_result = bf_df_a.join( | ||
bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" | ||
).to_pandas() | ||
pd_df_a = scalars_pandas_df_index[ | ||
["string_col", "datetime_col", "timestamp_col", "int64_too"] | ||
].rename(columns={"timestamp_col": "datetime_col"}) | ||
pd_df_b = scalars_pandas_df_index.dropna()[ | ||
["string_col", "datetime_col", "timestamp_col"] | ||
].rename(columns={"timestamp_col": "datetime_col"}) | ||
pd_result = pd_df_a.join( | ||
pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" | ||
) | ||
pd.testing.assert_frame_equal( | ||
bf_result.sort_index(), | ||
pd_result.sort_index(), | ||
check_like=True, | ||
check_index_type=False, | ||
check_names=False, | ||
) | ||
pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) | ||
|
||
|
||
@pytest.mark.skipif( | ||
pandas.__version__.startswith("1."), reason="bad left join in pandas 1.x" | ||
) | ||
@all_joins | ||
def test_join_param_on_with_duplicate_column_name_on_col( | ||
scalars_df_index, scalars_pandas_df_index, how | ||
): | ||
# This test is for duplicate column names, and the 'on' column is duplicated. | ||
if how == "cross": | ||
return | ||
bf_df_a = scalars_df_index[ | ||
["string_col", "datetime_col", "timestamp_col", "int64_too"] | ||
].rename(columns={"timestamp_col": "datetime_col"}) | ||
bf_df_b = scalars_df_index.dropna()[ | ||
["string_col", "datetime_col", "timestamp_col", "int64_too"] | ||
].rename(columns={"timestamp_col": "datetime_col"}) | ||
bf_result = bf_df_a.join( | ||
bf_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" | ||
).to_pandas() | ||
pd_df_a = scalars_pandas_df_index[ | ||
["string_col", "datetime_col", "timestamp_col", "int64_too"] | ||
].rename(columns={"timestamp_col": "datetime_col"}) | ||
pd_df_b = scalars_pandas_df_index.dropna()[ | ||
["string_col", "datetime_col", "timestamp_col", "int64_too"] | ||
].rename(columns={"timestamp_col": "datetime_col"}) | ||
pd_result = pd_df_a.join( | ||
pd_df_b, on="int64_too", how=how, lsuffix="_l", rsuffix="_r" | ||
) | ||
pd.testing.assert_frame_equal( | ||
bf_result.sort_index(), | ||
pd_result.sort_index(), | ||
check_like=True, | ||
check_index_type=False, | ||
check_names=False, | ||
) | ||
pd.testing.assert_index_equal(bf_result.columns, pd_result.columns) | ||
|
||
|
||
@all_joins | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2444,12 +2444,40 @@ def test_join_different_table( | |
assert_pandas_df_equal(bf_result, pd_result, ignore_order=True) | ||
|
||
|
||
def test_join_duplicate_columns_raises_not_implemented(scalars_dfs): | ||
@all_joins | ||
def test_join_raise_when_param_on_duplicate_with_column(scalars_df_index, how): | ||
if how == "cross": | ||
return | ||
bf_df_a = scalars_df_index[["string_col", "int64_col"]].rename( | ||
columns={"int64_col": "string_col"} | ||
) | ||
bf_df_b = scalars_df_index.dropna()["string_col"] | ||
with pytest.raises( | ||
ValueError, match="The column label 'string_col' is not unique." | ||
): | ||
bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") | ||
|
||
|
||
def test_join_duplicate_columns_raises_value_error(scalars_dfs): | ||
scalars_df, _ = scalars_dfs | ||
df_a = scalars_df[["string_col", "float64_col"]] | ||
df_b = scalars_df[["float64_col"]] | ||
with pytest.raises(NotImplementedError): | ||
df_a.join(df_b, how="outer").to_pandas() | ||
with pytest.raises(ValueError, match="columns overlap but no suffix specified"): | ||
df_a.join(df_b, how="outer") | ||
|
||
|
||
@all_joins | ||
def test_join_param_on_duplicate_with_index_raises_value_error(scalars_df_index, how): | ||
if how == "cross": | ||
return | ||
Comment on lines
+2471
to
+2472
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it'd be worth added a test that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cross join actually raise another error, match added. |
||
bf_df_a = scalars_df_index[["string_col"]] | ||
bf_df_a.index.name = "string_col" | ||
bf_df_b = scalars_df_index.dropna()["string_col"] | ||
with pytest.raises( | ||
ValueError, | ||
match="'string_col' is both an index level and a column label, which is ambiguous.", | ||
): | ||
bf_df_a.join(bf_df_b, on="string_col", how=how, lsuffix="_l", rsuffix="_r") | ||
|
||
|
||
@all_joins | ||
|
@@ -2461,7 +2489,7 @@ def test_join_param_on(scalars_dfs, how): | |
bf_df_b = bf_df[["float64_col"]] | ||
|
||
if how == "cross": | ||
with pytest.raises(ValueError): | ||
with pytest.raises(ValueError, match="'on' is not supported for cross join."): | ||
bf_df_a.join(bf_df_b, on="rowindex_2", how=how) | ||
else: | ||
bf_result = bf_df_a.join(bf_df_b, on="rowindex_2", how=how).to_pandas() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This
if
block is getting pretty long. Might be time for a helper function.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added _join_on_key function.