Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - Sampling and suggestion #67

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ build_dev_images:
docker build -t swiple-api ./backend/
docker build -t swiple-ui ./frontend/

swiple_api_dev_install:
cd ./backend && poetry install --with postgres,redshift,mysql,trino,athena,snowflake,aws-secrets,gcp,azure-secrets,dev && cd ..

swiple_ui_install:
npm install --preifx ./frontend/

Expand All @@ -21,6 +24,13 @@ swiple_ui_dev:
swiple_api_dev:
python3 ./backend/main.py

swiple_api_test:
cd ./backend && PRODUCTION='False' \
SECRET_KEY=DphzRvbm3ICHH2t1_Xj5NTUVEpqjz5KOHxuF77udndQ= \
[email protected] \
ADMIN_PASSWORD=admin \
poetry run pytest --cov app/ --cov-report=xml && cd ..

demo:
docker compose run demo

Expand Down
17 changes: 14 additions & 3 deletions backend/app/api/api_v1/endpoints/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from app.core.runner import create_dataset_suggestions, run_dataset_validation
from opensearchpy import OpenSearch, RequestError
import requests
from app.utils import json_schema_to_single_doc
from app.logger import get_logger

logger = get_logger(__file__)

router = APIRouter(
dependencies=[Depends(current_active_user)]
Expand All @@ -28,7 +32,7 @@

@router.get("/json-schema")
def get_json_schema():
schema = Dataset.schema()
schema = json_schema_to_single_doc(Dataset.schema())
return JSONResponse(status_code=status.HTTP_200_OK, content=schema)


Expand Down Expand Up @@ -210,7 +214,13 @@ def create_suggestions(
dataset = get_by_key_or_404(key, repository)

results = create_dataset_suggestions(key, client)
expectations = [expectation_repository._get_object_from_dict(e) for e in results]
expectations = []
for expectation in results:
try:
expectations.append(expectation_repository._get_object_from_dict(expectation))
except ValueError as e:
logger.warning(expectation)
logger.warning(e)

expectation_repository.delete_by_filter(dataset_id=dataset.key, suggested=True, enabled=False)
expectation_repository.bulk_create(expectations)
Expand All @@ -223,7 +233,8 @@ def should_update_sample(dataset: Dataset, dataset_update: DatasetUpdate):
# if the new and the old datasets use a query and the query has not changed, don't update sample
return not (
(dataset.dataset_name == dataset_update.dataset_name and (
dataset.runtime_parameters == dataset_update.runtime_parameters is None)) or
dataset.runtime_parameters == dataset_update.runtime_parameters is None) and (
dataset.sampling == dataset_update.sampling)) or
(dataset.runtime_parameters and
dataset_update.runtime_parameters and
dataset.runtime_parameters.query == dataset_update.runtime_parameters.query)
Expand Down
138 changes: 67 additions & 71 deletions backend/app/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,27 @@
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import DataContextConfig, AnonymizedUsageStatisticsConfig
from great_expectations.data_context.types.base import InMemoryStoreBackendDefaults
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler
from great_expectations.rule_based_profiler.data_assistant_result.onboarding_data_assistant_result import OnboardingDataAssistantResult
from opensearchpy import OpenSearch
from pandas import isnull


from app import constants as c
from app import utils
from app.core.actions import action_dispatcher
from app.core.expectations import supported_unsupported_expectations
from app.db.client import client as os_client
from app.models.dataset import BaseDataset
from app.models.datasource import Engine
from app.models.validation import Validation
from app.repositories.dataset import DatasetRepository
from app.repositories.datasource import DatasourceRepository
from app.repositories.expectation import ExpectationRepository
from app.settings import settings
from app.logger import get_logger

logger = get_logger(__file__)


class Runner:
def __init__(self, datasource, batch, meta, dataset_id=None, datasource_id=None, expectations=None,
def __init__(self, datasource, batch: BaseDataset, meta, dataset_id=None, datasource_id=None, expectations=None,
identifiers=None, excluded_expectations=[]):
self.identifiers = identifiers
self.datasource = datasource
Expand All @@ -44,21 +45,13 @@ def profile(self):

data_context_config = self.get_data_context_config()
context = BaseDataContext(project_config=data_context_config)
suite: ExpectationSuite = context.create_expectation_suite("default", overwrite_existing=True)

batch_request = self.get_batch_request(is_profile=True)

validator = context.get_validator(
batch_request = self.get_batch_request()
data_assistant_result: OnboardingDataAssistantResult = context.assistants.onboarding.run(
batch_request=batch_request,
expectation_suite=suite,
cardinality_limit_mode="few",
)

profiler = UserConfigurableProfiler(
validator,
excluded_expectations=self.excluded_expectations,
value_set_threshold="few",
)
expectations = profiler.build_suite().to_json_dict()['expectations']
expectations = data_assistant_result.to_json_dict()['expectation_configurations']

for expectation in expectations:
expectation["kwargs"].update({"result_format": "SUMMARY", "include_config": True, "catch_exceptions": True})
Expand All @@ -76,26 +69,15 @@ def profile(self):

return expectations

def sample(self):
def sample(self) -> dict:
data_context_config = self.get_data_context_config()
context = BaseDataContext(project_config=data_context_config)

batch_request = self.get_batch_request()

suite: ExpectationSuite = context.create_expectation_suite("default", overwrite_existing=True)

try:
validator = context.get_validator(
batch_request=batch_request, expectation_suite=suite,
)
head = validator.head()
except KeyError as ex:
if self.batch.runtime_parameters:
return {"exception": f"Syntax error in query."}
else:
print(str(ex))
return {"exception": f"{self.batch.dataset_name} is not recognized."}

validator = context.get_validator(
batch_request=batch_request, expectation_suite=suite,
)
head = validator.head(n_rows=10)
rows = head.to_dict(orient='records')
columns = head.columns

Expand All @@ -106,7 +88,10 @@ def sample(self):

if isnull(record[column]):
record[column] = None
return {'columns': list(columns), 'rows': rows}
return {
"columns": list(columns),
"rows": rows,
}

def validate(self) -> Validation:
data_context_config = self.get_data_context_config()
Expand Down Expand Up @@ -166,7 +151,7 @@ def validate(self) -> Validation:
return Validation(**validation)

def get_data_context_config(self):
connection_string = self._get_connection_string()
connection_string = self.get_connection_string()

context = DataContextConfig(
datasources={
Expand All @@ -177,26 +162,7 @@ def get_data_context_config(self):
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": connection_string,
},
"data_connectors": {
"default_runtime_data_connector": {
"class_name": "RuntimeDataConnector",

# Is there a use-case where this is needed?
# If we require users to add all details for runs in the app then
# using SDK, it pulls values into SDK? This would require batch_identifiers
# for datasources like spark/airflow TODO do this when spark/airflow is added
#
# Alternative is to let users push to ES runs without
# values in app. (Don't like the sound of that...)
"batch_identifiers": [
self.batch.dataset_name
],
},
"default_inferred_data_connector_name": {
"class_name": "InferredAssetSqlDataConnector",
"include_schema_name": True,
},
}
"data_connectors": self._get_data_connectors()
}
},
store_backend_defaults=InMemoryStoreBackendDefaults(),
Expand All @@ -205,35 +171,35 @@ def get_data_context_config(self):
},
anonymous_usage_statistics=AnonymizedUsageStatisticsConfig(enabled=False)
)

return context

def get_batch_request(self, is_profile=True):
def get_batch_request(self):
batch_spec_passthrough = {"create_temp_table": False}
if self.batch.runtime_parameters:
batch_spec_passthrough = None
if is_profile:
# Bug when profiling. Requires physical/temp table to get column types.
# set back to False when https://github.com/great-expectations/great_expectations/issues/4832
# is fixed
batch_spec_passthrough = {"create_temp_table": True}

runtime_parameters = self.batch.runtime_parameters.dict(by_alias=True)
return RuntimeBatchRequest(
datasource_name=self.datasource.datasource_name,
data_connector_name="default_runtime_data_connector",
data_asset_name=self.batch.dataset_name,
runtime_parameters=runtime_parameters,
batch_identifiers={self.batch.dataset_name: self.batch.dataset_name},
batch_spec_passthrough=batch_spec_passthrough,
batch_spec_passthrough={"create_temp_table": True},
)
else:
if self.batch.sampling:
batch_spec_passthrough.update(self.batch.sampling.dict(exclude_none=True))
return BatchRequest(
datasource_name=self.datasource.datasource_name,
data_connector_name="default_inferred_data_connector_name",
data_connector_name="default_configured_asset_data_connector_name",
data_asset_name=self.batch.dataset_name,
batch_spec_passthrough={"create_temp_table": False},
batch_spec_passthrough={
"create_temp_table": False,
**batch_spec_passthrough,
},
)

def _get_connection_string(self):
def get_connection_string(self):
# Snowflake SQLAlchemy connector requires the schema in the connection string in order to create TEMP tables.
if self.datasource.engine == Engine.SNOWFLAKE and self.batch.runtime_parameters:
schema = self.batch.runtime_parameters.schema_name
Expand All @@ -250,6 +216,39 @@ def _get_connection_string(self):

return connection_string

def _get_data_connectors(self):
if self.batch.runtime_parameters:
return {
"default_runtime_data_connector": {
"class_name": "RuntimeDataConnector",
# Is there a use-case where this is needed?
# If we require users to add all details for runs in the app then
# using SDK, it pulls values into SDK? This would require batch_identifiers
# for datasources like spark/airflow TODO do this when spark/airflow is added
#
# Alternative is to let users push to ES runs without
# values in app. (Don't like the sound of that...)
"batch_identifiers": [
self.batch.dataset_name
],
},
}
else:
# Physical table, schema.table
schema_name, table_name = self.batch.dataset_name.split(".")
return {
"default_configured_asset_data_connector_name": {
"class_name": "ConfiguredAssetSqlDataConnector",
"assets": {
table_name: {
"include_schema_name": True,
"schema_name": schema_name,
"table_name": table_name,
}
}
},
}

@staticmethod
def _get_status(success: bool) -> Literal["success", "failure"]:
action_status: Literal["failure"] = "failure"
Expand Down Expand Up @@ -311,17 +310,14 @@ def create_dataset_suggestions(dataset_id: str, client: OpenSearch = os_client):
"dataset_name": dataset.dataset_name,
}

excluded_expectations = supported_unsupported_expectations()["unsupported_expectations"]
excluded_expectations.append(c.EXPECT_COLUMN_VALUES_TO_BE_BETWEEN)

results = Runner(
datasource=datasource,
batch=dataset,
meta=meta,
identifiers=identifiers,
datasource_id=dataset.datasource_id,
dataset_id=dataset.key,
excluded_expectations=excluded_expectations,
excluded_expectations=[],
).profile()

return results
58 changes: 33 additions & 25 deletions backend/app/core/sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

from app.models.dataset import BaseDataset, Sample
from app.models.datasource import Datasource
from app.utils import add_limit_clause
from app.utils import add_limit_clause, error_msg_from_exception
from app.core.runner import Runner


class GetSampleException(Exception):
Expand All @@ -16,16 +17,37 @@ def __init__(self, error: Any, *args: object) -> None:


def get_dataset_sample(dataset: BaseDataset, datasource: Datasource) -> Sample:
if dataset.runtime_parameters:
return get_sample_query_results(
query=dataset.runtime_parameters.query,
url=datasource.connection_string()
)

return get_sample_query_results(
query=f"select * from {dataset.dataset_name}",
url=datasource.connection_string(),
)
try:
# can be removed if/when second comment is addressed here
# https://github.com/great-expectations/great_expectations/issues/6475#issuecomment-1335682074
if dataset.runtime_parameters:
return get_sample_query_results(
query=dataset.runtime_parameters.query,
url=datasource.connection_string()
)

sample = Runner(
datasource=datasource,
batch=dataset,
meta=None,
).sample()

if len(sample["columns"]) == 0:
raise GetSampleException("No columns included in statement.")

return Sample(**sample)
except ProgrammingError as e:
raise GetSampleException(e.orig.pgerror) from e
except OperationalError as e:
raise GetSampleException(e.orig) from e
except DatabaseError as e:
raise GetSampleException(error_msg_from_exception(e)) from e
# can be removed when https://github.com/great-expectations/great_expectations/issues/6463#issuecomment-1334476484
# is addressed.
except KeyError as e:
raise GetSampleException(error_msg_from_exception(e)) from e
except Exception as e:
raise GetSampleException(error_msg_from_exception(e)) from e


def get_sample_query_results(query: str, url: str) -> Sample:
Expand Down Expand Up @@ -68,17 +90,3 @@ def get_sample_query_results(query: str, url: str) -> Sample:
def get_columns_and_rows(execution):
return list(execution.keys()), execution.all()


def error_msg_from_exception(ex: Exception) -> str:
"""Translate exception into error message
Database have different ways to handle exception. This function attempts
to make sense of the exception object and construct a human readable
sentence.
"""
msg = ""
if hasattr(ex, "message"):
if isinstance(ex.message, dict):
msg = ex.message.get("message")
elif ex.message:
msg = ex.message
return msg or str(ex)
Loading