Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

misc(batch-proposal): misc batch proprosal tests #12455

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions smoke-test/tests/restli/test_restli_batch_ingestion.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
import time
from typing import List
import uuid
from typing import List, Set

import pytest

from datahub.emitter.mce_builder import make_dashboard_urn
from datahub.emitter.mce_builder import make_dashboard_urn, make_tag_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.com.linkedin.pegasus2avro.mxe import SystemMetadata
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata._schema_classes import MetadataChangeProposalClass
from datahub.metadata.schema_classes import (
AuditStampClass,
GlobalTagsClass,
TagAssociationClass,
ChangeAuditStampsClass,
DashboardInfoClass,
)
from tests.consistency_utils import wait_for_writes_to_sync
from tests.restli.restli_test import MetadataChangeProposalInvalidWrapper
from tests.utils import delete_urns

generated_urns: List[str] = []
generated_urns: Set[str] = set()


@pytest.fixture(scope="module")
@pytest.fixture(scope="module", autouse=True)
def ingest_cleanup_data(auth_session, graph_client, request):
yield
delete_urns(graph_client, generated_urns)
delete_urns(graph_client, list(generated_urns))


def _create_valid_dashboard_mcps() -> List[MetadataChangeProposalClass]:
Expand Down Expand Up @@ -53,7 +57,7 @@ def _create_valid_dashboard_mcps() -> List[MetadataChangeProposalClass]:
aspect=valid_dashboard_info,
)
mcps.append(mcp_valid.make_mcp())
generated_urns.extend([mcp.entityUrn for mcp in mcps if mcp.entityUrn])
generated_urns.update([mcp.entityUrn for mcp in mcps if mcp.entityUrn])

return mcps

Expand All @@ -80,10 +84,40 @@ def _create_invalid_dashboard_mcp() -> MetadataChangeProposalClass:
aspectName="dashboardInfo",
aspect=invalid_dashboard_info,
)
generated_urns.append(mcp_invalid.entityUrn) if mcp_invalid.entityUrn else None
generated_urns.update(mcp_invalid.entityUrn) if mcp_invalid.entityUrn else None
return mcp_invalid.make_mcp()


def _create_valid_duplicate_mcps() -> List[MetadataChangeProposalClass]:
mcps = []
num_valid_mcp = 20

dashboard_urn = make_dashboard_urn(
platform="looker", name=f"dummy-test-invalid-duplicate-{uuid.uuid4()}"
)

tag_urn = make_tag_urn(tag=f"testTag-{uuid.uuid4()}")
tag_association = TagAssociationClass(tag=tag_urn, context="test")
global_tags = GlobalTagsClass(tags=[tag_association])

for i in range(num_valid_mcp):
mcp_valid = MetadataChangeProposalWrapper(entityUrn=dashboard_urn, aspect=global_tags)
mcps.append(mcp_valid.make_mcp())
generated_urns.update([mcp.entityUrn for mcp in mcps if mcp.entityUrn])

return mcps


def test_restli_batch_ingestion_duplicate_tags_sync(graph_client):
mcps = _create_valid_duplicate_mcps()
ret = graph_client.emit_mcps(mcps, async_flag=False)
assert ret >= 0

mcps = _create_valid_duplicate_mcps()
ret = graph_client.emit_mcps(mcps, async_flag=True)
assert ret >= 0


def test_restli_batch_ingestion_sync(graph_client):
# Positive Test (all valid MetadataChangeProposal)
mcps = _create_valid_dashboard_mcps()
Expand Down
Loading