Skip to content

Commit 1e2d3a2

Browse files
authored
Merge pull request #731 from atlanhq/APP-8880
APP-8880: Ensure custom metadata is flushed before bulk `save()`
2 parents 0ee2615 + 7eb0e03 commit 1e2d3a2

File tree

4 files changed

+142
-11
lines changed

4 files changed

+142
-11
lines changed

pyatlan/client/aio/asset.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,14 +353,14 @@ async def save(
353353
:raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit
354354
"""
355355

356-
query_params, request = Save.prepare_request(
356+
query_params, request = await Save.prepare_request_async(
357357
entity=entity,
358358
replace_atlan_tags=replace_atlan_tags,
359359
replace_custom_metadata=replace_custom_metadata,
360360
overwrite_custom_metadata=overwrite_custom_metadata,
361361
append_atlan_tags=append_atlan_tags,
362+
client=self._client, # type: ignore[arg-type]
362363
)
363-
Save.validate_and_flush_entities(request.entities, self._client)
364364
raw_json = await self._client._call_api(BULK_UPDATE, query_params, request)
365365
response = Save.process_response(raw_json)
366366
if connections_created := response.assets_created(Connection):

pyatlan/client/asset.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,8 @@ def save(
447447
replace_custom_metadata=replace_custom_metadata,
448448
overwrite_custom_metadata=overwrite_custom_metadata,
449449
append_atlan_tags=append_atlan_tags,
450+
client=self._client, # type: ignore[arg-type]
450451
)
451-
Save.validate_and_flush_entities(request.entities, self._client)
452452
raw_json = self._client._call_api(BULK_UPDATE, query_params, request)
453453
response = Save.process_response(raw_json)
454454
if connections_created := response.assets_created(Connection):
@@ -564,7 +564,7 @@ def save_replacing_cm(
564564
query_params, request = Save.prepare_request_replacing_cm(
565565
entity=entity,
566566
replace_atlan_tags=replace_atlan_tags,
567-
client=self._client,
567+
client=self._client, # type: ignore[arg-type]
568568
)
569569
raw_json = self._client._call_api(BULK_UPDATE, query_params, request)
570570
return Save.process_response_replacing_cm(raw_json)

pyatlan/client/common/asset.py

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
from pyatlan.utils import unflatten_custom_metadata_for_entity
6262

6363
if TYPE_CHECKING:
64+
from pyatlan.client.aio import AsyncAtlanClient
65+
from pyatlan.client.atlan import AtlanClient
6466
from pyatlan.model.fluent_search import FluentSearch
6567

6668
LOGGER = logging.getLogger(__name__)
@@ -676,6 +678,7 @@ def prepare_request(
676678
replace_custom_metadata: bool = False,
677679
overwrite_custom_metadata: bool = False,
678680
append_atlan_tags: bool = False,
681+
client: Optional[AtlanClient] = None,
679682
) -> tuple[Dict[str, Any], BulkRequest[Asset]]:
680683
"""
681684
Prepare the request for saving assets.
@@ -685,6 +688,48 @@ def prepare_request(
685688
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
686689
:param overwrite_custom_metadata: overwrites any custom metadata, even with empty values
687690
:param append_atlan_tags: whether to add/update/remove AtlanTags during an update
691+
:param client: the Atlan client instance for flushing custom metadata
692+
:returns: tuple of (query_params, bulk_request)
693+
"""
694+
query_params = {
695+
"replaceTags": replace_atlan_tags,
696+
"appendTags": append_atlan_tags,
697+
"replaceBusinessAttributes": replace_custom_metadata,
698+
"overwriteBusinessAttributes": overwrite_custom_metadata,
699+
}
700+
701+
entities: List[Asset] = []
702+
if isinstance(entity, list):
703+
entities.extend(entity)
704+
else:
705+
entities.append(entity)
706+
707+
if not client:
708+
raise ValueError(
709+
"AtlanClient instance must be provided to validate and flush cm for assets."
710+
)
711+
# Validate and flush entities BEFORE creating the BulkRequest
712+
Save.validate_and_flush_entities(entities, client)
713+
return query_params, BulkRequest[Asset](entities=entities)
714+
715+
@staticmethod
716+
async def prepare_request_async(
717+
entity: Union[Asset, List[Asset]],
718+
replace_atlan_tags: bool = False,
719+
replace_custom_metadata: bool = False,
720+
overwrite_custom_metadata: bool = False,
721+
append_atlan_tags: bool = False,
722+
client: Optional[AsyncAtlanClient] = None,
723+
) -> tuple[Dict[str, Any], BulkRequest[Asset]]:
724+
"""
725+
Prepare the request for saving assets.
726+
727+
:param entity: one or more assets to save
728+
:param replace_atlan_tags: whether to replace AtlanTags during an update
729+
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
730+
:param overwrite_custom_metadata: overwrites any custom metadata, even with empty values
731+
:param append_atlan_tags: whether to add/update/remove AtlanTags during an update
732+
:param client: Optional[AsyncAtlanClient] = None,
688733
:returns: tuple of (query_params, bulk_request)
689734
"""
690735
query_params = {
@@ -700,10 +745,16 @@ def prepare_request(
700745
else:
701746
entities.append(entity)
702747

748+
if not client:
749+
raise ValueError(
750+
"AsyncAtlanClient instance must be provided to validate and flush cm for assets."
751+
)
752+
# Validate and flush entities BEFORE creating the BulkRequest
753+
await Save.validate_and_flush_entities_async(entities, client)
703754
return query_params, BulkRequest[Asset](entities=entities)
704755

705756
@staticmethod
706-
def validate_and_flush_entities(entities: List[Asset], client) -> None:
757+
def validate_and_flush_entities(entities: List[Asset], client: AtlanClient) -> None:
707758
"""
708759
Validate required fields and flush custom metadata for each asset.
709760
@@ -714,6 +765,20 @@ def validate_and_flush_entities(entities: List[Asset], client) -> None:
714765
asset.validate_required()
715766
asset.flush_custom_metadata(client=client)
716767

768+
@staticmethod
769+
async def validate_and_flush_entities_async(
770+
entities: List[Asset], client: AsyncAtlanClient
771+
) -> None:
772+
"""
773+
Validate required fields and flush custom metadata for each asset.
774+
775+
:param entities: list of assets to validate and flush
776+
:param client: the Atlan client instance
777+
"""
778+
for asset in entities:
779+
asset.validate_required()
780+
await asset.flush_custom_metadata_async(client=client)
781+
717782
@staticmethod
718783
def process_response(raw_json: Dict[str, Any]) -> AssetMutationResponse:
719784
"""
@@ -750,7 +815,7 @@ def log_connections_finished():
750815
def prepare_request_replacing_cm(
751816
entity: Union[Asset, List[Asset]],
752817
replace_atlan_tags: bool = False,
753-
client=None,
818+
client: Optional[AtlanClient] = None,
754819
) -> tuple[Dict[str, Any], BulkRequest[Asset]]:
755820
"""
756821
Prepare the request for saving assets with replacing custom metadata.
@@ -773,12 +838,12 @@ def prepare_request_replacing_cm(
773838
else:
774839
entities.append(entity)
775840

841+
if not client:
842+
raise ValueError(
843+
"AtlanClient instance must be provided to validate and flush cm for assets."
844+
)
776845
# Validate and flush entities BEFORE creating the BulkRequest
777-
if client:
778-
for asset in entities:
779-
asset.validate_required()
780-
asset.flush_custom_metadata(client=client)
781-
846+
Save.validate_and_flush_entities(entities, client)
782847
return query_params, BulkRequest[Asset](entities=entities)
783848

784849
@staticmethod

tests/integration/custom_metadata_test.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161

6262

6363
CM_QUALITY = f"{MODULE_NAME}_DQ"
64+
CM_SQL = f"{MODULE_NAME}_SQL"
6465
CM_ATTR_QUALITY_COUNT = "Count"
6566
CM_ATTR_QUALITY_SQL = "SQL"
6667
CM_ATTR_QUALITY_TYPE = "Type"
@@ -432,6 +433,29 @@ def cm_dq(
432433
wait_for_successful_custometadatadef_purge(CM_QUALITY, client=client)
433434

434435

436+
@pytest.fixture(scope="module")
437+
def cm_sql(
438+
client: AtlanClient,
439+
) -> Generator[CustomMetadataDef, None, None]:
440+
attribute_defs = [
441+
AttributeDef.create(
442+
client=client,
443+
display_name=f"{MODULE_NAME}_{CM_ATTR_QUALITY_SQL}",
444+
attribute_type=AtlanCustomAttributePrimitiveType.SQL,
445+
),
446+
]
447+
cm = create_custom_metadata(
448+
client,
449+
name=CM_SQL,
450+
attribute_defs=attribute_defs,
451+
logo="https://github.com/great-expectations/great_expectations/raw/develop/docs/docusaurus/static/img/"
452+
"gx-mark-160.png",
453+
locked=False,
454+
)
455+
yield cm
456+
wait_for_successful_custometadatadef_purge(CM_SQL, client=client)
457+
458+
435459
def test_cm_dq(
436460
cm_dq: CustomMetadataDef,
437461
):
@@ -1145,3 +1169,45 @@ def test_add_badge_cm_dq(
11451169
assert (badges := response.assets_created(asset_type=Badge))
11461170
assert len(badges) == 1
11471171
client.asset.purge_by_guid(badges[0].guid)
1172+
1173+
1174+
@pytest.mark.order()
1175+
def test_save_merging_cm(
1176+
term: AtlasGlossaryTerm,
1177+
glossary: AtlasGlossary,
1178+
cm_sql: CustomMetadataDef,
1179+
client: AtlanClient,
1180+
):
1181+
cm_sql_dict = CustomMetadataDict(client=client, name=CM_SQL)
1182+
cm_sql_dict[f"{MODULE_NAME}_{CM_ATTR_QUALITY_SQL}"] = "SELECT * FROM batman;"
1183+
assert term.qualified_name
1184+
assert term.name
1185+
1186+
to_update = AtlasGlossaryTerm.create_for_modification(
1187+
qualified_name=term.qualified_name, name=term.name, glossary_guid=glossary.guid
1188+
)
1189+
to_update.set_custom_metadata(custom_metadata=cm_sql_dict, client=client)
1190+
response = client.asset.save_merging_cm(to_update, replace_atlan_tags=False)
1191+
assert response
1192+
assert len(response.assets_deleted(asset_type=AtlasGlossaryTerm)) == 0
1193+
assert len(response.assets_created(asset_type=AtlasGlossaryTerm)) == 0
1194+
assert len(response.assets_updated(asset_type=AtlasGlossaryTerm)) == 1
1195+
1196+
t = response.assets_updated(asset_type=AtlasGlossaryTerm)[0]
1197+
assert isinstance(t, AtlasGlossaryTerm)
1198+
assert t.guid == term.guid
1199+
assert t.qualified_name == term.qualified_name
1200+
assert term.qualified_name
1201+
1202+
x = client.asset.get_by_qualified_name(
1203+
qualified_name=term.qualified_name,
1204+
asset_type=AtlasGlossaryTerm,
1205+
ignore_relationships=False,
1206+
)
1207+
assert x
1208+
assert not x.is_incomplete
1209+
assert x.qualified_name == term.qualified_name
1210+
retrieved_cm_sql = x.get_custom_metadata(client=client, name=CM_SQL)
1211+
assert retrieved_cm_sql
1212+
sql_attr_value = retrieved_cm_sql[f"{MODULE_NAME}_{CM_ATTR_QUALITY_SQL}"]
1213+
assert sql_attr_value == "SELECT * FROM batman;"

0 commit comments

Comments
 (0)