Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/teradata): teradata profiling fix for pooling #12507

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import dataclass
from datetime import datetime
from functools import lru_cache
from itertools import groupby

Check warning on line 6 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L6

Added line #L6 was not covered by tests
from typing import (
Any,
Dict,
Expand All @@ -22,6 +23,7 @@
from sqlalchemy.engine import Engine
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.pool import QueuePool

Check warning on line 26 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L26

Added line #L26 was not covered by tests
from sqlalchemy.sql.expression import text
from teradatasqlalchemy.dialect import TeradataDialect
from teradatasqlalchemy.options import configure
Expand Down Expand Up @@ -58,7 +60,6 @@
from datahub.metadata.schema_classes import SchemaMetadataClass
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
from datahub.utilities.groupby import groupby_unsorted

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -286,7 +287,7 @@

# TODO: Check if there's a better way
fk_dicts = list()
for constraint_info, constraint_cols in groupby_unsorted(res, grouper):
for constraint_info, constraint_cols in groupby(res, grouper):

Check warning on line 290 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L290

Added line #L290 was not covered by tests
fk_dict = {
"name": str(constraint_info["name"]),
"constrained_columns": list(),
Expand Down Expand Up @@ -599,12 +600,7 @@
setattr( # noqa: B010
TeradataDialect,
"get_columns",
lambda self,
connection,
table_name,
schema=None,
use_qvci=self.config.use_qvci,
**kw: optimized_get_columns(
lambda self, connection, table_name, schema=None, use_qvci=self.config.use_qvci, **kw: optimized_get_columns(
self,
connection,
table_name,
Expand All @@ -618,23 +614,15 @@
setattr( # noqa: B010
TeradataDialect,
"get_pk_constraint",
lambda self,
connection,
table_name,
schema=None,
**kw: optimized_get_pk_constraint(
lambda self, connection, table_name, schema=None, **kw: optimized_get_pk_constraint(
self, connection, table_name, schema, **kw
),
)

setattr( # noqa: B010
TeradataDialect,
"get_foreign_keys",
lambda self,
connection,
table_name,
schema=None,
**kw: optimized_get_foreign_keys(
lambda self, connection, table_name, schema=None, **kw: optimized_get_foreign_keys(
self, connection, table_name, schema, **kw
),
)
Expand Down Expand Up @@ -705,6 +693,12 @@
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
url = self.config.get_sql_alchemy_url()

# replace pooling with pooling for teradata
if "max_overflow" in self.config.options:
self.config.options.pop("max_overflow")
self.config.options["poolclass"] = QueuePool

Check warning on line 700 in metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py#L698-L700

Added lines #L698 - L700 were not covered by tests

logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
with engine.connect() as conn:
Expand Down
Loading