diff --git a/smoke-test/tests/restli/test_restli_batch_ingestion.py b/smoke-test/tests/restli/test_restli_batch_ingestion.py index 0e92988ed6470..524f269afe4a8 100644 --- a/smoke-test/tests/restli/test_restli_batch_ingestion.py +++ b/smoke-test/tests/restli/test_restli_batch_ingestion.py @@ -1,14 +1,18 @@ import time -from typing import List +import uuid +from typing import List, Set import pytest -from datahub.emitter.mce_builder import make_dashboard_urn +from datahub.emitter.mce_builder import make_dashboard_urn, make_tag_urn from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.metadata.com.linkedin.pegasus2avro.mxe import SystemMetadata from datahub.emitter.serialization_helper import pre_json_transform from datahub.metadata._schema_classes import MetadataChangeProposalClass from datahub.metadata.schema_classes import ( AuditStampClass, + GlobalTagsClass, + TagAssociationClass, ChangeAuditStampsClass, DashboardInfoClass, ) @@ -16,13 +20,13 @@ from tests.restli.restli_test import MetadataChangeProposalInvalidWrapper from tests.utils import delete_urns -generated_urns: List[str] = [] +generated_urns: Set[str] = set() -@pytest.fixture(scope="module") +@pytest.fixture(scope="module", autouse=True) def ingest_cleanup_data(auth_session, graph_client, request): yield - delete_urns(graph_client, generated_urns) + delete_urns(graph_client, list(generated_urns)) def _create_valid_dashboard_mcps() -> List[MetadataChangeProposalClass]: @@ -53,7 +57,7 @@ def _create_valid_dashboard_mcps() -> List[MetadataChangeProposalClass]: aspect=valid_dashboard_info, ) mcps.append(mcp_valid.make_mcp()) - generated_urns.extend([mcp.entityUrn for mcp in mcps if mcp.entityUrn]) + generated_urns.update([mcp.entityUrn for mcp in mcps if mcp.entityUrn]) return mcps @@ -80,10 +84,40 @@ def _create_invalid_dashboard_mcp() -> MetadataChangeProposalClass: aspectName="dashboardInfo", aspect=invalid_dashboard_info, ) - generated_urns.append(mcp_invalid.entityUrn) if mcp_invalid.entityUrn else None + generated_urns.update(mcp_invalid.entityUrn) if mcp_invalid.entityUrn else None return mcp_invalid.make_mcp() +def _create_valid_duplicate_mcps() -> List[MetadataChangeProposalClass]: + mcps = [] + num_valid_mcp = 20 + + dashboard_urn = make_dashboard_urn( + platform="looker", name=f"dummy-test-invalid-duplicate-{uuid.uuid4()}" + ) + + tag_urn = make_tag_urn(tag=f"testTag-{uuid.uuid4()}") + tag_association = TagAssociationClass(tag=tag_urn, context="test") + global_tags = GlobalTagsClass(tags=[tag_association]) + + for i in range(num_valid_mcp): + mcp_valid = MetadataChangeProposalWrapper(entityUrn=dashboard_urn, aspect=global_tags) + mcps.append(mcp_valid.make_mcp()) + generated_urns.update([mcp.entityUrn for mcp in mcps if mcp.entityUrn]) + + return mcps + + +def test_restli_batch_ingestion_duplicate_tags_sync(graph_client): + mcps = _create_valid_duplicate_mcps() + ret = graph_client.emit_mcps(mcps, async_flag=False) + assert ret >= 0 + + mcps = _create_valid_duplicate_mcps() + ret = graph_client.emit_mcps(mcps, async_flag=True) + assert ret >= 0 + + def test_restli_batch_ingestion_sync(graph_client): # Positive Test (all valid MetadataChangeProposal) mcps = _create_valid_dashboard_mcps()