Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 179 additions & 1 deletion metadata-ingestion/src/datahub/api/graphql/assertion.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import Any, Dict, List, Optional

from gql import gql
from gql import GraphQLRequest, gql

from datahub.api.graphql.base import BaseApi

Expand Down Expand Up @@ -51,6 +51,75 @@ class Assertion(BaseApi):
}
"""

ASSERTION_RESULT_FRAGMENT: str = """
fragment assertionResult on AssertionResult {
type
rowCount
missingCount
unexpectedCount
actualAggValue
externalUrl
nativeResults {
value
}
error {
type
properties {
value
}
}
}"""

RUN_ASSERTION_RESULT_FRAGMENT: str = """
fragment runAssertionResult on RunAssertionResult {
assertion { urn }
result { ... assertionResult }
}"""

RUN_ASSERTION_MUTATION: str = (
"""
%s
mutation runAssertion($urn: String!, $saveResult: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean) {
runAssertion(urn: $urn, saveResult: $saveResult, parameters: $parameters, async: $async) {
... assertionResult
}
}"""
% ASSERTION_RESULT_FRAGMENT
)

RUN_ASSERTIONS_MUTATION: str = """
%s
%s
mutation runAssertions($urns: [String!]!, $saveResults: Boolean, $parameters: [StringMapEntryInput!], $async: Boolean) {
runAssertions(urns: $urns, saveResults: $saveResults, parameters: $parameters, async: $async) {
passingCount
failingCount
errorCount
results { ... runAssertionResult }
}
}""" % (ASSERTION_RESULT_FRAGMENT, RUN_ASSERTION_RESULT_FRAGMENT)

RUN_ASSERTIONS_FOR_ASSET_MUTATION: str = """
%s
%s
mutation runAssertionsForAsset($urn: String!, $tagUrns: [String!], $parameters: [StringMapEntryInput!], $async: Boolean) {
runAssertionsForAsset(urn: $urn, tagUrns: $tagUrns, parameters: $parameters, async: $async) {
passingCount
failingCount
errorCount
results { ... runAssertionResult }
}
}""" % (ASSERTION_RESULT_FRAGMENT, RUN_ASSERTION_RESULT_FRAGMENT)

@staticmethod
def _build_string_map_entries(
params: Optional[Dict[str, str]],
) -> Optional[List[Dict[str, str]]]:
"""Convert a dictionary to a list of StringMapEntryInput objects for GraphQL."""
if not params:
return None
return [{"key": k, "value": v} for k, v in params.items()]

def query_assertion(
self,
urn: str,
Expand Down Expand Up @@ -88,3 +157,112 @@ def query_assertion(
assertions = result["dataset"]["assertions"]["assertions"]

return assertions

def run_assertion(
self,
urn: str,
save_result: Optional[bool] = None,
parameters: Optional[Dict[str, str]] = None,
async_flag: Optional[bool] = None,
) -> Dict[str, Any]:
r"""
Run a single native assertion by URN.

:param urn: The DataHub assertion unique identifier.
:param save_result: If True, the result is stored for later viewing in the UI.
:param parameters: Key/value pairs for injecting runtime parameters into SQL fragments.
:param async_flag: If True, returns immediately with null result; poll run events later.
"""
variable_values: Dict[str, Any] = {"urn": urn}

if save_result is not None:
variable_values["saveResult"] = save_result

if parameters is not None:
variable_values["parameters"] = Assertion._build_string_map_entries(
parameters
)

if async_flag is not None:
variable_values["async"] = async_flag

request = GraphQLRequest(
Assertion.RUN_ASSERTION_MUTATION, variable_values=variable_values
)

result = self.client.execute(request)

return result["runAssertion"]

def run_assertions(
self,
urns: List[str],
save_results: Optional[bool] = None,
parameters: Optional[Dict[str, str]] = None,
async_flag: Optional[bool] = None,
) -> Dict[str, Any]:
r"""
Run multiple native assertions by URN.

:param urns: List of DataHub assertion unique identifiers.
:param save_results: If True, the results are stored for later viewing in the UI.
:param parameters: Key/value pairs for injecting runtime parameters into SQL fragments.
:param async_flag: If True, returns immediately with null results; poll run events later.
"""
variable_values: Dict[str, Any] = {"urns": urns}

if save_results is not None:
variable_values["saveResults"] = save_results

if parameters is not None:
variable_values["parameters"] = Assertion._build_string_map_entries(
parameters
)

if async_flag is not None:
variable_values["async"] = async_flag

request = GraphQLRequest(
Assertion.RUN_ASSERTIONS_MUTATION, variable_values=variable_values
)

result = self.client.execute(request)

return result["runAssertions"]

def run_assertions_for_asset(
self,
urn: str,
tag_urns: Optional[List[str]] = None,
parameters: Optional[Dict[str, str]] = None,
async_flag: Optional[bool] = None,
) -> Dict[str, Any]:
r"""
Run all native assertions attached to an asset, optionally filtered by tags.

:param urn: The DataHub dataset unique identifier.
:param tag_urns: Optional list of tag URNs to filter which assertions to run.
:param parameters: Key/value pairs for injecting runtime parameters into SQL fragments.
:param async_flag: If True, returns immediately with null results; poll run events later.
"""
variable_values: Dict[str, Any] = {"urn": urn}

if tag_urns is not None:
variable_values["tagUrns"] = tag_urns

if parameters is not None:
variable_values["parameters"] = Assertion._build_string_map_entries(
parameters
)

if async_flag is not None:
variable_values["async"] = async_flag

request = GraphQLRequest(
Assertion.RUN_ASSERTIONS_FOR_ASSET_MUTATION,
variable_values=variable_values,
)

result = self.client.execute(request)

return result["runAssertionsForAsset"]
Loading