Skip to content

Commit 5a33d9f

Browse files
add an argument to control handling nulls in merge criteria
1 parent 2a4234a commit 5a33d9f

File tree

4 files changed

+356
-36
lines changed

4 files changed

+356
-36
lines changed

awswrangler/athena/_write_iceberg.py

+118-35
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,109 @@ def _validate_args(
239239
)
240240

241241

242+
def _merge_iceberg(
243+
df: pd.DataFrame,
244+
database: str,
245+
table: str,
246+
source_table: str,
247+
merge_cols: list[str] | None = None,
248+
merge_condition: Literal["update", "ignore"] = "update",
249+
merge_match_nulls: bool = False,
250+
kms_key: str | None = None,
251+
boto3_session: boto3.Session | None = None,
252+
s3_output: str | None = None,
253+
workgroup: str = "primary",
254+
encryption: str | None = None,
255+
data_source: str | None = None,
256+
) -> None:
257+
"""
258+
Merge iceberg.
259+
260+
Merge data from source_table and write it to an Athena iceberg table.
261+
262+
Parameters
263+
----------
264+
df : pd.DataFrame
265+
Pandas DataFrame.
266+
database : str
267+
AWS Glue/Athena database name - It is only the origin database from where the query will be launched.
268+
You can still using and mixing several databases writing the full table name within the sql
269+
(e.g. `database.table`).
270+
table : str
271+
AWS Glue/Athena destination table name.
272+
source_table: str
273+
AWS Glue/Athena source table name.
274+
merge_cols: List[str], optional
275+
List of column names that will be used for conditional inserts and updates.
276+
277+
https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html
278+
merge_condition: str, optional
279+
The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore'].
280+
merge_match_nulls: bool, optional
281+
Instruct whether to have nulls in the merge condition match other nulls
282+
kms_key : str, optional
283+
For SSE-KMS, this is the KMS key ARN or ID.
284+
boto3_session : boto3.Session(), optional
285+
Boto3 Session. The default boto3 session will be used if boto3_session receive None.
286+
s3_output : str, optional
287+
Amazon S3 path used for query execution.
288+
workgroup : str
289+
Athena workgroup. Primary by default.
290+
encryption : str, optional
291+
Valid values: [None, 'SSE_S3', 'SSE_KMS']. Notice: 'CSE_KMS' is not supported.
292+
data_source : str, optional
293+
Data Source / Catalog name. If None, 'AwsDataCatalog' will be used by default.
294+
295+
Returns
296+
-------
297+
None
298+
299+
"""
300+
wg_config: _WorkGroupConfig = _get_workgroup_config(session=boto3_session, workgroup=workgroup)
301+
302+
sql_statement: str
303+
if merge_cols:
304+
if merge_condition == "update":
305+
match_condition = f"""WHEN MATCHED THEN
306+
UPDATE SET {', '.join([f'"{x}" = source."{x}"' for x in df.columns])}"""
307+
else:
308+
match_condition = ""
309+
310+
if merge_match_nulls:
311+
merge_conditions = [f'(target."{x}" IS NOT DISTINCT FROM source."{x}")' for x in merge_cols]
312+
else:
313+
merge_conditions = [f'(target."{x}" = source."{x}")' for x in merge_cols]
314+
315+
sql_statement = f"""
316+
MERGE INTO "{database}"."{table}" target
317+
USING "{database}"."{source_table}" source
318+
ON {' AND '.join(merge_conditions)}
319+
{match_condition}
320+
WHEN NOT MATCHED THEN
321+
INSERT ({', '.join([f'"{x}"' for x in df.columns])})
322+
VALUES ({', '.join([f'source."{x}"' for x in df.columns])})
323+
"""
324+
else:
325+
sql_statement = f"""
326+
INSERT INTO "{database}"."{table}" ({', '.join([f'"{x}"' for x in df.columns])})
327+
SELECT {', '.join([f'"{x}"' for x in df.columns])}
328+
FROM "{database}"."{source_table}"
329+
"""
330+
331+
query_execution_id: str = _start_query_execution(
332+
sql=sql_statement,
333+
workgroup=workgroup,
334+
wg_config=wg_config,
335+
database=database,
336+
data_source=data_source,
337+
s3_output=s3_output,
338+
encryption=encryption,
339+
kms_key=kms_key,
340+
boto3_session=boto3_session,
341+
)
342+
wait_query(query_execution_id=query_execution_id, boto3_session=boto3_session)
343+
344+
242345
@apply_configs
243346
@_utils.validate_distributed_kwargs(
244347
unsupported_kwargs=["boto3_session", "s3_additional_kwargs"],
@@ -253,6 +356,7 @@ def to_iceberg(
253356
partition_cols: list[str] | None = None,
254357
merge_cols: list[str] | None = None,
255358
merge_condition: Literal["update", "ignore"] = "update",
359+
merge_match_nulls: bool = False,
256360
keep_files: bool = True,
257361
data_source: str | None = None,
258362
s3_output: str | None = None,
@@ -267,7 +371,7 @@ def to_iceberg(
267371
catalog_id: str | None = None,
268372
schema_evolution: bool = False,
269373
fill_missing_columns_in_df: bool = True,
270-
glue_table_settings: GlueTableSettings | None = None,
374+
glue_table_settings: GlueTableSettings | None = None
271375
) -> None:
272376
"""
273377
Insert into Athena Iceberg table using INSERT INTO ... SELECT. Will create Iceberg table if it does not exist.
@@ -301,6 +405,8 @@ def to_iceberg(
301405
https://docs.aws.amazon.com/athena/latest/ug/merge-into-statement.html
302406
merge_condition: str, optional
303407
The condition to be used in the MERGE INTO statement. Valid values: ['update', 'ignore'].
408+
merge_match_nulls: bool, optional
409+
Instruct whether to have nulls in the merge condition match other nulls
304410
keep_files : bool
305411
Whether staging files produced by Athena are retained. 'True' by default.
306412
data_source : str, optional
@@ -504,44 +610,21 @@ def to_iceberg(
504610
glue_table_settings=glue_table_settings,
505611
)
506612

507-
# Insert or merge into Iceberg table
508-
sql_statement: str
509-
if merge_cols:
510-
if merge_condition == "update":
511-
match_condition = f"""WHEN MATCHED THEN
512-
UPDATE SET {', '.join([f'"{x}" = source."{x}"' for x in df.columns])}"""
513-
else:
514-
match_condition = ""
515-
sql_statement = f"""
516-
MERGE INTO "{database}"."{table}" target
517-
USING "{database}"."{temp_table}" source
518-
ON {' AND '.join([
519-
f'(target."{x}" = source."{x}" OR (target."{x}" IS NULL AND source."{x}" IS NULL))'
520-
for x in merge_cols])}
521-
{match_condition}
522-
WHEN NOT MATCHED THEN
523-
INSERT ({', '.join([f'"{x}"' for x in df.columns])})
524-
VALUES ({', '.join([f'source."{x}"' for x in df.columns])})
525-
"""
526-
else:
527-
sql_statement = f"""
528-
INSERT INTO "{database}"."{table}" ({', '.join([f'"{x}"' for x in df.columns])})
529-
SELECT {', '.join([f'"{x}"' for x in df.columns])}
530-
FROM "{database}"."{temp_table}"
531-
"""
532-
533-
query_execution_id: str = _start_query_execution(
534-
sql=sql_statement,
535-
workgroup=workgroup,
536-
wg_config=wg_config,
613+
_merge_iceberg(
614+
df=df,
537615
database=database,
538-
data_source=data_source,
539-
s3_output=s3_output,
540-
encryption=encryption,
616+
table=table,
617+
source_table=temp_table,
618+
merge_cols=merge_cols,
619+
merge_condition=merge_condition,
620+
merge_match_nulls=merge_match_nulls,
541621
kms_key=kms_key,
542622
boto3_session=boto3_session,
623+
s3_output=s3_output,
624+
workgroup=workgroup,
625+
encryption=encryption,
626+
data_source=data_source,
543627
)
544-
wait_query(query_execution_id=query_execution_id, boto3_session=boto3_session)
545628

546629
except Exception as ex:
547630
_logger.error(ex)

0 commit comments

Comments
 (0)