From 4e4936bd328158d7182afe5935d41c2cf93c72f2 Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Mon, 8 Sep 2025 01:50:39 +0200 Subject: [PATCH 01/13] Added necesary parameters to 'to_iceberg' method to expand functionality --- awswrangler/athena/_utils.py | 6 +++++- awswrangler/athena/_write_iceberg.py | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/awswrangler/athena/_utils.py b/awswrangler/athena/_utils.py index 7a0e74aac..b26097841 100644 --- a/awswrangler/athena/_utils.py +++ b/awswrangler/athena/_utils.py @@ -43,6 +43,11 @@ _logger: logging.Logger = logging.getLogger(__name__) +class _MergeClause(TypedDict, total=False): + when: Literal["MATCHED", "NOT_MATCHED", "NOT_MATCHED_BY_SOURCE"] + condition: str | None + action: Literal["UPDATE", "DELETE", "INSERT"] + columns: list[str] | None class _QueryMetadata(NamedTuple): execution_id: str @@ -63,7 +68,6 @@ class _WorkGroupConfig(NamedTuple): encryption: str | None kms_key: str | None - def _get_s3_output( s3_output: str | None, wg_config: _WorkGroupConfig, boto3_session: boto3.Session | None = None ) -> str: diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index fabc4d032..473986f9b 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -6,7 +6,7 @@ import re import typing import uuid -from typing import Any, Dict, Literal, TypedDict +from typing import Any, Dict, Literal, TypedDict, Union import boto3 import pandas as pd @@ -18,6 +18,7 @@ _get_workgroup_config, _start_query_execution, _WorkGroupConfig, + _MergeClause ) from awswrangler.typing import GlueTableSettings @@ -361,7 +362,9 @@ def to_iceberg( # noqa: PLR0913 table_location: str | None = None, partition_cols: list[str] | None = None, merge_cols: list[str] | None = None, - merge_condition: Literal["update", "ignore"] = "update", + merge_on_condition: str | None = None, + merge_condition: Literal["update", "ignore", "conditional_merge"] = "update", + merge_conditional_clauses: list[_MergeClause] | None = None, merge_match_nulls: bool = False, keep_files: bool = True, data_source: str | None = None, From 1f56c727e80ad334cce611b928339d0f4ead5289 Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Mon, 8 Sep 2025 01:52:00 +0200 Subject: [PATCH 02/13] Removed unused import --- awswrangler/athena/_write_iceberg.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 473986f9b..cdb81855c 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -6,7 +6,7 @@ import re import typing import uuid -from typing import Any, Dict, Literal, TypedDict, Union +from typing import Any, Dict, Literal, TypedDict import boto3 import pandas as pd From 14f319494ebca8337b5b8c19d984d726d5327723 Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Mon, 8 Sep 2025 02:19:33 +0200 Subject: [PATCH 03/13] Added validation for new parameters complying with new logic --- awswrangler/athena/_write_iceberg.py | 51 ++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 7 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index cdb81855c..2d7076aa4 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -220,7 +220,9 @@ def _validate_args( mode: Literal["append", "overwrite", "overwrite_partitions"], partition_cols: list[str] | None, merge_cols: list[str] | None, - merge_condition: Literal["update", "ignore"], + merge_on_condition: str | None, + merge_condition: Literal["update", "ignore", "conditional_merge"], + merge_conditional_clauses: list[_MergeClause] | None, ) -> None: if df.empty is True: raise exceptions.EmptyDataFrame("DataFrame cannot be empty.") @@ -229,6 +231,45 @@ def _validate_args( raise exceptions.InvalidArgumentCombination( "Either path or workgroup path must be specified to store the temporary results." ) + + if merge_cols and merge_on_condition: + raise exceptions.InvalidArgumentCombination( + "Cannot specify both merge_cols and merge_on_condition. Use either merge_cols for simple equality matching or merge_on_condition for custom logic." + ) + + if merge_conditional_clauses and merge_condition != "conditional_merge": + raise exceptions.InvalidArgumentCombination( + "merge_conditional_clauses can only be used when merge_condition is 'conditional_merge'." + ) + + if (merge_cols or merge_on_condition) and merge_condition not in ["update", "ignore", "conditional_merge"]: + raise exceptions.InvalidArgumentValue( + f"Invalid merge_condition: {merge_condition}. Valid values: ['update', 'ignore', 'conditional_merge']" + ) + + if merge_condition == "conditional_merge": + if not merge_conditional_clauses: + raise exceptions.InvalidArgumentCombination( + "merge_conditional_clauses must be provided when merge_condition is 'conditional_merge'." + ) + + for i, clause in enumerate(merge_conditional_clauses): + if "when" not in clause: + raise exceptions.InvalidArgumentValue( + f"merge_conditional_clauses[{i}] must contain 'when' field." + ) + if "action" not in clause: + raise exceptions.InvalidArgumentValue( + f"merge_conditional_clauses[{i}] must contain 'action' field." + ) + if clause["when"] not in ['MATCHED', 'NOT_MATCHED', 'NOT_MATCHED_BY_SOURCE']: + raise exceptions.InvalidArgumentValue( + f"merge_conditional_clauses[{i}]['when'] must be one of ['MATCHED', 'NOT_MATCHED', 'NOT_MATCHED_BY_SOURCE']." + ) + if clause["action"] not in ["UPDATE", "DELETE", "INSERT", "IGNORE"]: + raise exceptions.InvalidArgumentValue( + f"merge_conditional_clauses[{i}]['action'] must be one of ['UPDATE', 'DELETE', 'INSERT', 'IGNORE']." + ) if mode == "overwrite_partitions": if not partition_cols: @@ -240,12 +281,6 @@ def _validate_args( "When mode is 'overwrite_partitions' merge_cols must not be specified." ) - if merge_cols and merge_condition not in ["update", "ignore"]: - raise exceptions.InvalidArgumentValue( - f"Invalid merge_condition: {merge_condition}. Valid values: ['update', 'ignore']" - ) - - def _merge_iceberg( df: pd.DataFrame, database: str, @@ -501,7 +536,9 @@ def to_iceberg( # noqa: PLR0913 mode=mode, partition_cols=partition_cols, merge_cols=merge_cols, + merge_on_condition=merge_on_condition, merge_condition=merge_condition, + merge_conditional_clauses=merge_conditional_clauses, ) glue_table_settings = glue_table_settings if glue_table_settings else {} From 12802ade4d7c9bf0e0ee1f233acb756728f07d18 Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Mon, 8 Sep 2025 03:14:35 +0200 Subject: [PATCH 04/13] Draft implementation done. Pending tests and verify optimal approach --- awswrangler/athena/_utils.py | 2 +- awswrangler/athena/_write_iceberg.py | 102 ++++++++++++++++++++------- 2 files changed, 77 insertions(+), 27 deletions(-) diff --git a/awswrangler/athena/_utils.py b/awswrangler/athena/_utils.py index b26097841..36f0f81c1 100644 --- a/awswrangler/athena/_utils.py +++ b/awswrangler/athena/_utils.py @@ -44,7 +44,7 @@ _logger: logging.Logger = logging.getLogger(__name__) class _MergeClause(TypedDict, total=False): - when: Literal["MATCHED", "NOT_MATCHED", "NOT_MATCHED_BY_SOURCE"] + when: Literal["MATCHED", "NOT MATCHED", "NOT MATCHED BY SOURCE"] condition: str | None action: Literal["UPDATE", "DELETE", "INSERT"] columns: list[str] | None diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 2d7076aa4..508094b3c 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -220,9 +220,10 @@ def _validate_args( mode: Literal["append", "overwrite", "overwrite_partitions"], partition_cols: list[str] | None, merge_cols: list[str] | None, - merge_on_condition: str | None, + merge_on_clause: str | None, merge_condition: Literal["update", "ignore", "conditional_merge"], merge_conditional_clauses: list[_MergeClause] | None, + merge_match_nulls: bool, ) -> None: if df.empty is True: raise exceptions.EmptyDataFrame("DataFrame cannot be empty.") @@ -232,9 +233,14 @@ def _validate_args( "Either path or workgroup path must be specified to store the temporary results." ) - if merge_cols and merge_on_condition: + if merge_cols and merge_on_clause: raise exceptions.InvalidArgumentCombination( - "Cannot specify both merge_cols and merge_on_condition. Use either merge_cols for simple equality matching or merge_on_condition for custom logic." + "Cannot specify both merge_cols and merge_on_clause. Use either merge_cols for simple equality matching or merge_on_clause for custom logic." + ) + + if merge_on_clause and merge_match_nulls: + raise exceptions.InvalidArgumentCombination( + "merge_match_nulls can only be used together with merge_cols." ) if merge_conditional_clauses and merge_condition != "conditional_merge": @@ -242,7 +248,7 @@ def _validate_args( "merge_conditional_clauses can only be used when merge_condition is 'conditional_merge'." ) - if (merge_cols or merge_on_condition) and merge_condition not in ["update", "ignore", "conditional_merge"]: + if (merge_cols or merge_on_clause) and merge_condition not in ["update", "ignore", "conditional_merge"]: raise exceptions.InvalidArgumentValue( f"Invalid merge_condition: {merge_condition}. Valid values: ['update', 'ignore', 'conditional_merge']" ) @@ -262,9 +268,9 @@ def _validate_args( raise exceptions.InvalidArgumentValue( f"merge_conditional_clauses[{i}] must contain 'action' field." ) - if clause["when"] not in ['MATCHED', 'NOT_MATCHED', 'NOT_MATCHED_BY_SOURCE']: + if clause["when"] not in ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE']: raise exceptions.InvalidArgumentValue( - f"merge_conditional_clauses[{i}]['when'] must be one of ['MATCHED', 'NOT_MATCHED', 'NOT_MATCHED_BY_SOURCE']." + f"merge_conditional_clauses[{i}]['when'] must be one of ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE']." ) if clause["action"] not in ["UPDATE", "DELETE", "INSERT", "IGNORE"]: raise exceptions.InvalidArgumentValue( @@ -287,7 +293,9 @@ def _merge_iceberg( table: str, source_table: str, merge_cols: list[str] | None = None, - merge_condition: Literal["update", "ignore"] = "update", + merge_on_clause: str | None = None, + merge_condition: Literal["update", "ignore", "conditional_merge"] = "update", + merge_conditional_clauses: list[_MergeClause] | None = None, merge_match_nulls: bool = False, kms_key: str | None = None, boto3_session: boto3.Session | None = None, @@ -342,27 +350,66 @@ def _merge_iceberg( wg_config: _WorkGroupConfig = _get_workgroup_config(session=boto3_session, workgroup=workgroup) sql_statement: str - if merge_cols: - if merge_condition == "update": - match_condition = f"""WHEN MATCHED THEN - UPDATE SET {", ".join([f'"{x}" = source."{x}"' for x in df.columns])}""" - else: - match_condition = "" - - if merge_match_nulls: - merge_conditions = [f'(target."{x}" IS NOT DISTINCT FROM source."{x}")' for x in merge_cols] + if merge_cols or merge_on_clause: + if merge_on_clause: + on_condition = merge_on_clause else: - merge_conditions = [f'(target."{x}" = source."{x}")' for x in merge_cols] - + if merge_match_nulls: + merge_conditions = [f'(target."{x}" IS NOT DISTINCT FROM source."{x}")' for x in merge_cols] + else: + merge_conditions = [f'(target."{x}" = source."{x}")' for x in merge_cols] + on_condition = " AND ".join(merge_conditions) + + # Build WHEN clauses based on merge_condition + when_clauses = [] + + if merge_condition == "update": + when_clauses.append(f"""WHEN MATCHED THEN + UPDATE SET {", ".join([f'"{x}" = source."{x}"' for x in df.columns])}""") + when_clauses.append(f"""WHEN NOT MATCHED THEN + INSERT ({", ".join([f'"{x}"' for x in df.columns])}) + VALUES ({", ".join([f'source."{x}"' for x in df.columns])})""") + + elif merge_condition == "ignore": + when_clauses.append(f"""WHEN NOT MATCHED THEN + INSERT ({", ".join([f'"{x}"' for x in df.columns])}) + VALUES ({", ".join([f'source."{x}"' for x in df.columns])})""") + + elif merge_condition == "conditional_merge": + for clause in merge_conditional_clauses: + when_type = clause["when"] + action = clause["action"] + condition = clause.get("condition") + columns = clause.get("columns") + + # Build WHEN clause + when_part = f"WHEN {when_type}" + if condition: + when_part += f" AND {condition}" + + # Build action + if action == "UPDATE": + update_columns = columns or df.columns.tolist() + update_sets = [f'"{col}" = source."{col}"' for col in update_columns] + when_part += f" THEN UPDATE SET {', '.join(update_sets)}" + + elif action == "DELETE": + when_part += " THEN DELETE" + + elif action == "INSERT": + insert_columns = columns or df.columns.tolist() + column_list = ", ".join([f'"{col}"' for col in insert_columns]) + values_list = ", ".join([f'source."{col}"' for col in insert_columns]) + when_part += f" THEN INSERT ({column_list}) VALUES ({values_list})" + + when_clauses.append(when_part) + sql_statement = f""" MERGE INTO "{database}"."{table}" target USING "{database}"."{source_table}" source - ON {" AND ".join(merge_conditions)} - {match_condition} - WHEN NOT MATCHED THEN - INSERT ({", ".join([f'"{x}"' for x in df.columns])}) - VALUES ({", ".join([f'source."{x}"' for x in df.columns])}) - """ + ON {on_condition} + {"\n ".join(when_clauses)} + """ else: sql_statement = f""" INSERT INTO "{database}"."{table}" ({", ".join([f'"{x}"' for x in df.columns])}) @@ -397,7 +444,7 @@ def to_iceberg( # noqa: PLR0913 table_location: str | None = None, partition_cols: list[str] | None = None, merge_cols: list[str] | None = None, - merge_on_condition: str | None = None, + merge_on_clause: str | None = None, merge_condition: Literal["update", "ignore", "conditional_merge"] = "update", merge_conditional_clauses: list[_MergeClause] | None = None, merge_match_nulls: bool = False, @@ -536,9 +583,10 @@ def to_iceberg( # noqa: PLR0913 mode=mode, partition_cols=partition_cols, merge_cols=merge_cols, - merge_on_condition=merge_on_condition, + merge_on_clause=merge_on_clause, merge_condition=merge_condition, merge_conditional_clauses=merge_conditional_clauses, + merge_match_nulls=merge_match_nulls, ) glue_table_settings = glue_table_settings if glue_table_settings else {} @@ -661,7 +709,9 @@ def to_iceberg( # noqa: PLR0913 table=table, source_table=temp_table, merge_cols=merge_cols, + merge_on_clause=merge_on_clause, merge_condition=merge_condition, + merge_conditional_clauses=merge_conditional_clauses, merge_match_nulls=merge_match_nulls, kms_key=kms_key, boto3_session=boto3_session, From 991f610ba26800e2104c1c1ba65ab38b1a237d73 Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Wed, 10 Sep 2025 18:38:50 +0200 Subject: [PATCH 05/13] Added additional validation and removed unhandled action. --- awswrangler/athena/_write_iceberg.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 508094b3c..e13fb290a 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -259,6 +259,7 @@ def _validate_args( "merge_conditional_clauses must be provided when merge_condition is 'conditional_merge'." ) + seen_not_matched = False for i, clause in enumerate(merge_conditional_clauses): if "when" not in clause: raise exceptions.InvalidArgumentValue( @@ -272,9 +273,17 @@ def _validate_args( raise exceptions.InvalidArgumentValue( f"merge_conditional_clauses[{i}]['when'] must be one of ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE']." ) - if clause["action"] not in ["UPDATE", "DELETE", "INSERT", "IGNORE"]: + if clause["action"] not in ["UPDATE", "DELETE", "INSERT"]: raise exceptions.InvalidArgumentValue( - f"merge_conditional_clauses[{i}]['action'] must be one of ['UPDATE', 'DELETE', 'INSERT', 'IGNORE']." + f"merge_conditional_clauses[{i}]['action'] must be one of ['UPDATE', 'DELETE', 'INSERT']." + ) + + if clause["when"] in ["NOT MATCHED", "NOT MATCHED BY SOURCE"]: + seen_not_matched = True + elif clause["when"] == "MATCHED" and seen_not_matched: + raise exceptions.InvalidArgumentValue( + f"merge_conditional_clauses[{i}]['when'] is MATCHED but appears after a NOT MATCHED clause. " + "WHEN MATCHED must come before WHEN NOT MATCHED or WHEN NOT MATCHED BY SOURCE." ) if mode == "overwrite_partitions": From 9b8cbf4394516dced34a27287bc89d011d4a3dcb Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Wed, 10 Sep 2025 19:11:25 +0200 Subject: [PATCH 06/13] Added new params to python docstrings --- awswrangler/athena/_write_iceberg.py | 33 +++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index e13fb290a..79bbf3f62 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -331,11 +331,26 @@ def _merge_iceberg( source_table: str AWS Glue/Athena source table name. merge_cols: List[str], optional - List of column names that will be used for conditional inserts and updates. + List of column names that will be used for conditional inserts and updates. Cannot be used together with ``merge_on_clause``. https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html + merge_on_clause: str, optional + Custom ON clause for the MERGE statement. If specified, this string will be used as the ON condition + between the target and source tables, allowing for complex join logic beyond simple equality on columns. + Cannot be used together with ``merge_cols``. merge_condition: str, optional - The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore']. + The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore', 'conditional_merge']. + - 'update': Update matched rows and insert non-matched rows. + - 'ignore': Only insert non-matched rows. + - 'conditional_merge': Use custom conditional clauses for merge actions. + merge_conditional_clauses : List[dict], optional + List of dictionaries specifying custom conditional clauses for the MERGE statement. + Each dictionary should have: + - 'when': One of ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE'] + - 'condition': (optional) Additional SQL condition for the clause + - 'action': One of ['UPDATE', 'DELETE', 'INSERT'] + - 'columns': (optional) List of columns to update or insert + Used only when merge_condition is 'conditional_merge'. merge_match_nulls: bool, optional Instruct whether to have nulls in the merge condition match other nulls kms_key : str, optional @@ -504,8 +519,20 @@ def to_iceberg( # noqa: PLR0913 List of column names that will be used for conditional inserts and updates. https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html + merge_on_clause + Custom ON clause for the MERGE statement. If specified, this string will be used as the ON condition + between the target and source tables, allowing for complex join logic beyond simple equality on columns. + Cannot be used together with ``merge_cols``. merge_condition - The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore']. + The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore', 'conditional_merge']. + merge_conditional_clauses + List of dictionaries specifying custom conditional clauses for the MERGE statement. + Each dictionary should have: + - 'when': One of ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE'] + - 'action': One of ['UPDATE', 'DELETE', 'INSERT'] + - 'condition': (optional) Additional SQL condition for the clause + - 'columns': (optional) List of columns to update or insert + Used only when merge_condition is 'conditional_merge'. merge_match_nulls Instruct whether to have nulls in the merge condition match other nulls keep_files From 31d1a1d2213ffbc1f796698e4d307cb3fa6d5816 Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Wed, 10 Sep 2025 19:39:22 +0200 Subject: [PATCH 07/13] Fixed backslash in f-string --- awswrangler/athena/_write_iceberg.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 79bbf3f62..81eeb23b5 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -428,11 +428,12 @@ def _merge_iceberg( when_clauses.append(when_part) + joined_clauses = "\n ".join(when_clauses) sql_statement = f""" MERGE INTO "{database}"."{table}" target USING "{database}"."{source_table}" source ON {on_condition} - {"\n ".join(when_clauses)} + {joined_clauses} """ else: sql_statement = f""" From 7685bc1462288d4a3add958b1560cfac8b0af636 Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Wed, 10 Sep 2025 19:39:31 +0200 Subject: [PATCH 08/13] Added tests --- tests/unit/test_athena_iceberg.py | 113 ++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index 6233e8425..fb9c3facd 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -999,6 +999,119 @@ def test_athena_delete_from_iceberg_empty_df_error( keep_files=False, ) +def test_to_iceberg_merge_cols_and_merge_on_clause_error( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + df = pd.DataFrame({"id": [1], "val": ["a"]}) + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + merge_cols=["id"], + merge_on_clause="id = source.id", + ) + +def test_to_iceberg_merge_match_nulls_with_merge_on_clause_error( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + df = pd.DataFrame({"id": [1], "val": ["a"]}) + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + merge_on_clause="id = source.id", + merge_match_nulls=True, + ) + +def test_to_iceberg_merge_conditional_clauses_without_conditional_merge_error( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + df = pd.DataFrame({"id": [1], "val": ["a"]}) + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + merge_cols=["id"], + merge_conditional_clauses=[{"when": "MATCHED", "action": "UPDATE"}], + merge_condition="update", + ) + +def test_to_iceberg_conditional_merge_without_clauses_error( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + df = pd.DataFrame({"id": [1], "val": ["a"]}) + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + merge_cols=["id"], + merge_condition="conditional_merge", + ) + +def test_to_iceberg_invalid_merge_condition_error( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + df = pd.DataFrame({"id": [1], "val": ["a"]}) + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + merge_cols=["id"], + merge_condition="not_a_valid_condition", + ) + +def test_to_iceberg_conditional_merge_happy_path( + path: str, path2: str, glue_database: str, glue_table: str +) -> None: + df = pd.DataFrame({"id": [1, 2], "val": ["a", "b"]}) + wr.athena.to_iceberg( + df=df, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + keep_files=False, + ) + df2 = pd.DataFrame({"id": [1, 3], "val": ["c", "d"]}) + clauses = [ + {"when": "MATCHED", "action": "UPDATE", "columns": ["val"]}, + {"when": "NOT MATCHED", "action": "INSERT"}, + ] + wr.athena.to_iceberg( + df=df2, + database=glue_database, + table=glue_table, + table_location=path, + temp_path=path2, + merge_cols=["id"], + merge_condition="conditional_merge", + merge_conditional_clauses=clauses, + keep_files=False, + ) + df_out = wr.athena.read_sql_query( + sql=f'SELECT * FROM "{glue_table}" ORDER BY id', + database=glue_database, + ctas_approach=False, + unload_approach=False, + ) + # id=1 should be updated, id=2 should remain, id=3 should be inserted + expected = pd.DataFrame({"id": [1, 2, 3], "val": ["c", "b", "d"]}) + assert_pandas_equals(expected, df_out.reset_index(drop=True)) def test_athena_iceberg_use_partition_function( path: str, From eb25d2a50b872b93e54e669f2145c625ecd5c804 Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Wed, 10 Sep 2025 20:36:27 +0200 Subject: [PATCH 09/13] Updated stringdocs --- awswrangler/athena/_write_iceberg.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 81eeb23b5..82917d0a4 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -338,6 +338,7 @@ def _merge_iceberg( Custom ON clause for the MERGE statement. If specified, this string will be used as the ON condition between the target and source tables, allowing for complex join logic beyond simple equality on columns. Cannot be used together with ``merge_cols``. + It must produce at most one match per target row. Using OR conditions may result in merge failures. merge_condition: str, optional The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore', 'conditional_merge']. - 'update': Update matched rows and insert non-matched rows. @@ -491,9 +492,9 @@ def to_iceberg( # noqa: PLR0913 glue_table_settings: GlueTableSettings | None = None, ) -> None: """ - Insert into Athena Iceberg table using INSERT INTO ... SELECT. Will create Iceberg table if it does not exist. + Write a Pandas DataFrame to an Athena Iceberg table, supporting table creation, schema evolution, and advanced merge operations. - Creates temporary external table, writes staged files and inserts via INSERT INTO ... SELECT. + This function inserts data into an Athena Iceberg table, creating the table if it does not exist. It supports multiple write modes (append, overwrite, overwrite_partitions), schema evolution, and conditional merge logic using Athena's MERGE INTO statement. Advanced options allow for custom merge conditions, partitioning, and table properties. Parameters ---------- @@ -524,6 +525,7 @@ def to_iceberg( # noqa: PLR0913 Custom ON clause for the MERGE statement. If specified, this string will be used as the ON condition between the target and source tables, allowing for complex join logic beyond simple equality on columns. Cannot be used together with ``merge_cols``. + It must produce at most one match per target row. Using OR conditions may result in merge failures. merge_condition The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore', 'conditional_merge']. merge_conditional_clauses From 262bfc620535b75d668af3ee56edf86d6c2345fb Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Wed, 10 Sep 2025 20:50:00 +0200 Subject: [PATCH 10/13] fix: ruff reformat --- awswrangler/athena/_utils.py | 3 + awswrangler/athena/_write_iceberg.py | 112 ++++++++++++--------------- tests/unit/test_athena_iceberg.py | 15 ++-- 3 files changed, 63 insertions(+), 67 deletions(-) diff --git a/awswrangler/athena/_utils.py b/awswrangler/athena/_utils.py index 36f0f81c1..fa3ba15d0 100644 --- a/awswrangler/athena/_utils.py +++ b/awswrangler/athena/_utils.py @@ -43,12 +43,14 @@ _logger: logging.Logger = logging.getLogger(__name__) + class _MergeClause(TypedDict, total=False): when: Literal["MATCHED", "NOT MATCHED", "NOT MATCHED BY SOURCE"] condition: str | None action: Literal["UPDATE", "DELETE", "INSERT"] columns: list[str] | None + class _QueryMetadata(NamedTuple): execution_id: str dtype: dict[str, str] @@ -68,6 +70,7 @@ class _WorkGroupConfig(NamedTuple): encryption: str | None kms_key: str | None + def _get_s3_output( s3_output: str | None, wg_config: _WorkGroupConfig, boto3_session: boto3.Session | None = None ) -> str: diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 82917d0a4..d72c9eae7 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -14,12 +14,7 @@ from awswrangler import _data_types, _utils, catalog, exceptions, s3 from awswrangler._config import apply_configs from awswrangler.athena._executions import wait_query -from awswrangler.athena._utils import ( - _get_workgroup_config, - _start_query_execution, - _WorkGroupConfig, - _MergeClause -) +from awswrangler.athena._utils import _get_workgroup_config, _start_query_execution, _WorkGroupConfig, _MergeClause from awswrangler.typing import GlueTableSettings _logger: logging.Logger = logging.getLogger(__name__) @@ -232,59 +227,53 @@ def _validate_args( raise exceptions.InvalidArgumentCombination( "Either path or workgroup path must be specified to store the temporary results." ) - + if merge_cols and merge_on_clause: - raise exceptions.InvalidArgumentCombination( - "Cannot specify both merge_cols and merge_on_clause. Use either merge_cols for simple equality matching or merge_on_clause for custom logic." - ) - + raise exceptions.InvalidArgumentCombination( + "Cannot specify both merge_cols and merge_on_clause. Use either merge_cols for simple equality matching or merge_on_clause for custom logic." + ) + if merge_on_clause and merge_match_nulls: - raise exceptions.InvalidArgumentCombination( - "merge_match_nulls can only be used together with merge_cols." - ) - + raise exceptions.InvalidArgumentCombination("merge_match_nulls can only be used together with merge_cols.") + if merge_conditional_clauses and merge_condition != "conditional_merge": - raise exceptions.InvalidArgumentCombination( - "merge_conditional_clauses can only be used when merge_condition is 'conditional_merge'." - ) - + raise exceptions.InvalidArgumentCombination( + "merge_conditional_clauses can only be used when merge_condition is 'conditional_merge'." + ) + if (merge_cols or merge_on_clause) and merge_condition not in ["update", "ignore", "conditional_merge"]: raise exceptions.InvalidArgumentValue( f"Invalid merge_condition: {merge_condition}. Valid values: ['update', 'ignore', 'conditional_merge']" ) - + if merge_condition == "conditional_merge": - if not merge_conditional_clauses: - raise exceptions.InvalidArgumentCombination( - "merge_conditional_clauses must be provided when merge_condition is 'conditional_merge'." - ) - - seen_not_matched = False - for i, clause in enumerate(merge_conditional_clauses): - if "when" not in clause: - raise exceptions.InvalidArgumentValue( - f"merge_conditional_clauses[{i}] must contain 'when' field." - ) - if "action" not in clause: - raise exceptions.InvalidArgumentValue( - f"merge_conditional_clauses[{i}] must contain 'action' field." - ) - if clause["when"] not in ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE']: - raise exceptions.InvalidArgumentValue( - f"merge_conditional_clauses[{i}]['when'] must be one of ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE']." - ) - if clause["action"] not in ["UPDATE", "DELETE", "INSERT"]: - raise exceptions.InvalidArgumentValue( - f"merge_conditional_clauses[{i}]['action'] must be one of ['UPDATE', 'DELETE', 'INSERT']." - ) - - if clause["when"] in ["NOT MATCHED", "NOT MATCHED BY SOURCE"]: - seen_not_matched = True - elif clause["when"] == "MATCHED" and seen_not_matched: - raise exceptions.InvalidArgumentValue( - f"merge_conditional_clauses[{i}]['when'] is MATCHED but appears after a NOT MATCHED clause. " - "WHEN MATCHED must come before WHEN NOT MATCHED or WHEN NOT MATCHED BY SOURCE." - ) + if not merge_conditional_clauses: + raise exceptions.InvalidArgumentCombination( + "merge_conditional_clauses must be provided when merge_condition is 'conditional_merge'." + ) + + seen_not_matched = False + for i, clause in enumerate(merge_conditional_clauses): + if "when" not in clause: + raise exceptions.InvalidArgumentValue(f"merge_conditional_clauses[{i}] must contain 'when' field.") + if "action" not in clause: + raise exceptions.InvalidArgumentValue(f"merge_conditional_clauses[{i}] must contain 'action' field.") + if clause["when"] not in ["MATCHED", "NOT MATCHED", "NOT MATCHED BY SOURCE"]: + raise exceptions.InvalidArgumentValue( + f"merge_conditional_clauses[{i}]['when'] must be one of ['MATCHED', 'NOT MATCHED', 'NOT MATCHED BY SOURCE']." + ) + if clause["action"] not in ["UPDATE", "DELETE", "INSERT"]: + raise exceptions.InvalidArgumentValue( + f"merge_conditional_clauses[{i}]['action'] must be one of ['UPDATE', 'DELETE', 'INSERT']." + ) + + if clause["when"] in ["NOT MATCHED", "NOT MATCHED BY SOURCE"]: + seen_not_matched = True + elif clause["when"] == "MATCHED" and seen_not_matched: + raise exceptions.InvalidArgumentValue( + f"merge_conditional_clauses[{i}]['when'] is MATCHED but appears after a NOT MATCHED clause. " + "WHEN MATCHED must come before WHEN NOT MATCHED or WHEN NOT MATCHED BY SOURCE." + ) if mode == "overwrite_partitions": if not partition_cols: @@ -296,6 +285,7 @@ def _validate_args( "When mode is 'overwrite_partitions' merge_cols must not be specified." ) + def _merge_iceberg( df: pd.DataFrame, database: str, @@ -387,55 +377,55 @@ def _merge_iceberg( # Build WHEN clauses based on merge_condition when_clauses = [] - + if merge_condition == "update": when_clauses.append(f"""WHEN MATCHED THEN UPDATE SET {", ".join([f'"{x}" = source."{x}"' for x in df.columns])}""") when_clauses.append(f"""WHEN NOT MATCHED THEN INSERT ({", ".join([f'"{x}"' for x in df.columns])}) VALUES ({", ".join([f'source."{x}"' for x in df.columns])})""") - + elif merge_condition == "ignore": when_clauses.append(f"""WHEN NOT MATCHED THEN INSERT ({", ".join([f'"{x}"' for x in df.columns])}) VALUES ({", ".join([f'source."{x}"' for x in df.columns])})""") - + elif merge_condition == "conditional_merge": for clause in merge_conditional_clauses: when_type = clause["when"] action = clause["action"] condition = clause.get("condition") columns = clause.get("columns") - + # Build WHEN clause when_part = f"WHEN {when_type}" if condition: when_part += f" AND {condition}" - + # Build action if action == "UPDATE": update_columns = columns or df.columns.tolist() update_sets = [f'"{col}" = source."{col}"' for col in update_columns] when_part += f" THEN UPDATE SET {', '.join(update_sets)}" - + elif action == "DELETE": when_part += " THEN DELETE" - + elif action == "INSERT": insert_columns = columns or df.columns.tolist() column_list = ", ".join([f'"{col}"' for col in insert_columns]) values_list = ", ".join([f'source."{col}"' for col in insert_columns]) when_part += f" THEN INSERT ({column_list}) VALUES ({values_list})" - + when_clauses.append(when_part) - + joined_clauses = "\n ".join(when_clauses) sql_statement = f""" MERGE INTO "{database}"."{table}" target USING "{database}"."{source_table}" source ON {on_condition} {joined_clauses} - """ + """ else: sql_statement = f""" INSERT INTO "{database}"."{table}" ({", ".join([f'"{x}"' for x in df.columns])}) diff --git a/tests/unit/test_athena_iceberg.py b/tests/unit/test_athena_iceberg.py index fb9c3facd..aebd776cc 100644 --- a/tests/unit/test_athena_iceberg.py +++ b/tests/unit/test_athena_iceberg.py @@ -999,6 +999,7 @@ def test_athena_delete_from_iceberg_empty_df_error( keep_files=False, ) + def test_to_iceberg_merge_cols_and_merge_on_clause_error( path: str, path2: str, glue_database: str, glue_table: str ) -> None: @@ -1014,6 +1015,7 @@ def test_to_iceberg_merge_cols_and_merge_on_clause_error( merge_on_clause="id = source.id", ) + def test_to_iceberg_merge_match_nulls_with_merge_on_clause_error( path: str, path2: str, glue_database: str, glue_table: str ) -> None: @@ -1029,6 +1031,7 @@ def test_to_iceberg_merge_match_nulls_with_merge_on_clause_error( merge_match_nulls=True, ) + def test_to_iceberg_merge_conditional_clauses_without_conditional_merge_error( path: str, path2: str, glue_database: str, glue_table: str ) -> None: @@ -1045,6 +1048,7 @@ def test_to_iceberg_merge_conditional_clauses_without_conditional_merge_error( merge_condition="update", ) + def test_to_iceberg_conditional_merge_without_clauses_error( path: str, path2: str, glue_database: str, glue_table: str ) -> None: @@ -1060,9 +1064,8 @@ def test_to_iceberg_conditional_merge_without_clauses_error( merge_condition="conditional_merge", ) -def test_to_iceberg_invalid_merge_condition_error( - path: str, path2: str, glue_database: str, glue_table: str -) -> None: + +def test_to_iceberg_invalid_merge_condition_error(path: str, path2: str, glue_database: str, glue_table: str) -> None: df = pd.DataFrame({"id": [1], "val": ["a"]}) with pytest.raises(wr.exceptions.InvalidArgumentValue): wr.athena.to_iceberg( @@ -1075,9 +1078,8 @@ def test_to_iceberg_invalid_merge_condition_error( merge_condition="not_a_valid_condition", ) -def test_to_iceberg_conditional_merge_happy_path( - path: str, path2: str, glue_database: str, glue_table: str -) -> None: + +def test_to_iceberg_conditional_merge_happy_path(path: str, path2: str, glue_database: str, glue_table: str) -> None: df = pd.DataFrame({"id": [1, 2], "val": ["a", "b"]}) wr.athena.to_iceberg( df=df, @@ -1113,6 +1115,7 @@ def test_to_iceberg_conditional_merge_happy_path( expected = pd.DataFrame({"id": [1, 2, 3], "val": ["c", "b", "d"]}) assert_pandas_equals(expected, df_out.reset_index(drop=True)) + def test_athena_iceberg_use_partition_function( path: str, path2: str, From 3f7ca50d47af783a63838345a4c2b8ab6878fe9f Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Wed, 10 Sep 2025 21:04:12 +0200 Subject: [PATCH 11/13] refactor: _validate_args refactor to comply with ruff check --- awswrangler/athena/_write_iceberg.py | 35 +++++++++++++++++++--------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index d72c9eae7..fd2cdd3ca 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -14,7 +14,7 @@ from awswrangler import _data_types, _utils, catalog, exceptions, s3 from awswrangler._config import apply_configs from awswrangler.athena._executions import wait_query -from awswrangler.athena._utils import _get_workgroup_config, _start_query_execution, _WorkGroupConfig, _MergeClause +from awswrangler.athena._utils import _get_workgroup_config, _MergeClause, _start_query_execution, _WorkGroupConfig from awswrangler.typing import GlueTableSettings _logger: logging.Logger = logging.getLogger(__name__) @@ -228,6 +228,29 @@ def _validate_args( "Either path or workgroup path must be specified to store the temporary results." ) + _validate_merge_arguments( + merge_cols, merge_on_clause, merge_condition, + merge_conditional_clauses, merge_match_nulls + ) + + if mode == "overwrite_partitions": + if not partition_cols: + raise exceptions.InvalidArgumentCombination( + "When mode is 'overwrite_partitions' partition_cols must be specified." + ) + if merge_cols: + raise exceptions.InvalidArgumentCombination( + "When mode is 'overwrite_partitions' merge_cols must not be specified." + ) + + +def _validate_merge_arguments( + merge_cols: list[str] | None, + merge_on_clause: str | None, + merge_condition: Literal["update", "ignore", "conditional_merge"], + merge_conditional_clauses: list[_MergeClause] | None, + merge_match_nulls: bool, +) -> None: if merge_cols and merge_on_clause: raise exceptions.InvalidArgumentCombination( "Cannot specify both merge_cols and merge_on_clause. Use either merge_cols for simple equality matching or merge_on_clause for custom logic." @@ -275,16 +298,6 @@ def _validate_args( "WHEN MATCHED must come before WHEN NOT MATCHED or WHEN NOT MATCHED BY SOURCE." ) - if mode == "overwrite_partitions": - if not partition_cols: - raise exceptions.InvalidArgumentCombination( - "When mode is 'overwrite_partitions' partition_cols must be specified." - ) - if merge_cols: - raise exceptions.InvalidArgumentCombination( - "When mode is 'overwrite_partitions' merge_cols must not be specified." - ) - def _merge_iceberg( df: pd.DataFrame, From 8b694c59d9fc197f8b2b2e95e15670da77009fb7 Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Wed, 10 Sep 2025 21:08:42 +0200 Subject: [PATCH 12/13] reformat: small reformat --- awswrangler/athena/_write_iceberg.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index fd2cdd3ca..8c4d7982c 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -229,8 +229,7 @@ def _validate_args( ) _validate_merge_arguments( - merge_cols, merge_on_clause, merge_condition, - merge_conditional_clauses, merge_match_nulls + merge_cols, merge_on_clause, merge_condition, merge_conditional_clauses, merge_match_nulls ) if mode == "overwrite_partitions": From f33af547052abb93cec0807da29742202f885aff Mon Sep 17 00:00:00 2001 From: pedrorfdez Date: Wed, 10 Sep 2025 21:21:28 +0200 Subject: [PATCH 13/13] refactor: fix mypy typecheck errors --- awswrangler/athena/_write_iceberg.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awswrangler/athena/_write_iceberg.py b/awswrangler/athena/_write_iceberg.py index 8c4d7982c..9e788513a 100644 --- a/awswrangler/athena/_write_iceberg.py +++ b/awswrangler/athena/_write_iceberg.py @@ -380,7 +380,7 @@ def _merge_iceberg( if merge_cols or merge_on_clause: if merge_on_clause: on_condition = merge_on_clause - else: + elif merge_cols is not None: if merge_match_nulls: merge_conditions = [f'(target."{x}" IS NOT DISTINCT FROM source."{x}")' for x in merge_cols] else: @@ -402,7 +402,7 @@ def _merge_iceberg( INSERT ({", ".join([f'"{x}"' for x in df.columns])}) VALUES ({", ".join([f'source."{x}"' for x in df.columns])})""") - elif merge_condition == "conditional_merge": + elif merge_condition == "conditional_merge" and merge_conditional_clauses is not None: for clause in merge_conditional_clauses: when_type = clause["when"] action = clause["action"]