From 5a6266f8aea1219a0819e233f44036d917cbb2b6 Mon Sep 17 00:00:00 2001 From: Brock Griffey <52086127+brock-acryl@users.noreply.github.com> Date: Thu, 30 Jan 2025 13:57:37 -0500 Subject: [PATCH 1/6] Update teradata.py Teradata sqlalchemy connector does not support max_overflow. replaced with poolclass=QueuePool. --- .../datahub/ingestion/source/sql/teradata.py | 30 ++++++++----------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index e6319f668ecb8..692da5db731bf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from datetime import datetime from functools import lru_cache +from itertools import groupby from typing import ( Any, Dict, @@ -22,6 +23,7 @@ from sqlalchemy.engine import Engine from sqlalchemy.engine.base import Connection from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.pool import QueuePool from sqlalchemy.sql.expression import text from teradatasqlalchemy.dialect import TeradataDialect from teradatasqlalchemy.options import configure @@ -58,7 +60,6 @@ from datahub.metadata.schema_classes import SchemaMetadataClass from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage -from datahub.utilities.groupby import groupby_unsorted logger: logging.Logger = logging.getLogger(__name__) @@ -286,7 +287,7 @@ def grouper(fk_row): # TODO: Check if there's a better way fk_dicts = list() - for constraint_info, constraint_cols in groupby_unsorted(res, grouper): + for constraint_info, constraint_cols in groupby(res, grouper): fk_dict = { "name": str(constraint_info["name"]), "constrained_columns": list(), @@ -599,12 +600,7 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): setattr( # noqa: B010 TeradataDialect, "get_columns", - lambda self, - connection, - table_name, - schema=None, - use_qvci=self.config.use_qvci, - **kw: optimized_get_columns( + lambda self, connection, table_name, schema=None, use_qvci=self.config.use_qvci, **kw: optimized_get_columns( self, connection, table_name, @@ -618,11 +614,7 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): setattr( # noqa: B010 TeradataDialect, "get_pk_constraint", - lambda self, - connection, - table_name, - schema=None, - **kw: optimized_get_pk_constraint( + lambda self, connection, table_name, schema=None, **kw: optimized_get_pk_constraint( self, connection, table_name, schema, **kw ), ) @@ -630,11 +622,7 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): setattr( # noqa: B010 TeradataDialect, "get_foreign_keys", - lambda self, - connection, - table_name, - schema=None, - **kw: optimized_get_foreign_keys( + lambda self, connection, table_name, schema=None, **kw: optimized_get_foreign_keys( self, connection, table_name, schema, **kw ), ) @@ -705,6 +693,12 @@ def get_inspectors(self): # This method can be overridden in the case that you want to dynamically # run on multiple databases. url = self.config.get_sql_alchemy_url() + + # replace pooling with pooling for teradata + if "max_overflow" in self.config.options: + self.config.options.pop("max_overflow") + self.config.options["poolclass"] = QueuePool + logger.debug(f"sql_alchemy_url={url}") engine = create_engine(url, **self.config.options) with engine.connect() as conn: From ea6e07af2f6e2862b2dd700a0bba3d48c8778048 Mon Sep 17 00:00:00 2001 From: Brock Griffey <52086127+brock-acryl@users.noreply.github.com> Date: Mon, 3 Feb 2025 09:49:14 -0500 Subject: [PATCH 2/6] Update teradata.py updated pooling comments, removed unwanted changes --- .../datahub/ingestion/source/sql/teradata.py | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index 692da5db731bf..71160c3bf7cc5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -3,7 +3,6 @@ from dataclasses import dataclass from datetime import datetime from functools import lru_cache -from itertools import groupby from typing import ( Any, Dict, @@ -60,6 +59,7 @@ from datahub.metadata.schema_classes import SchemaMetadataClass from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage +from datahub.utilities.groupby import groupby_unsorted logger: logging.Logger = logging.getLogger(__name__) @@ -287,7 +287,7 @@ def grouper(fk_row): # TODO: Check if there's a better way fk_dicts = list() - for constraint_info, constraint_cols in groupby(res, grouper): + for constraint_info, constraint_cols in groupby_unsorted(res, grouper): fk_dict = { "name": str(constraint_info["name"]), "constrained_columns": list(), @@ -600,7 +600,12 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): setattr( # noqa: B010 TeradataDialect, "get_columns", - lambda self, connection, table_name, schema=None, use_qvci=self.config.use_qvci, **kw: optimized_get_columns( + lambda self, + connection, + table_name, + schema=None, + use_qvci=self.config.use_qvci, + **kw: optimized_get_columns( self, connection, table_name, @@ -614,7 +619,11 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): setattr( # noqa: B010 TeradataDialect, "get_pk_constraint", - lambda self, connection, table_name, schema=None, **kw: optimized_get_pk_constraint( + lambda self, + connection, + table_name, + schema=None, + **kw: optimized_get_pk_constraint( self, connection, table_name, schema, **kw ), ) @@ -622,7 +631,11 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): setattr( # noqa: B010 TeradataDialect, "get_foreign_keys", - lambda self, connection, table_name, schema=None, **kw: optimized_get_foreign_keys( + lambda self, + connection, + table_name, + schema=None, + **kw: optimized_get_foreign_keys( self, connection, table_name, schema, **kw ), ) @@ -694,7 +707,7 @@ def get_inspectors(self): # run on multiple databases. url = self.config.get_sql_alchemy_url() - # replace pooling with pooling for teradata + # Teradata does not support max_overflow, instead we use QueuePool when profiling data if "max_overflow" in self.config.options: self.config.options.pop("max_overflow") self.config.options["poolclass"] = QueuePool From d6ea6e6da5012102f1a06c349834a6a569365907 Mon Sep 17 00:00:00 2001 From: Brock Griffey <52086127+brock-acryl@users.noreply.github.com> Date: Tue, 4 Feb 2025 14:01:04 -0500 Subject: [PATCH 3/6] created _add_default_options --- .../src/datahub/ingestion/source/sql/sql_common.py | 12 ++++++++---- .../src/datahub/ingestion/source/sql/teradata.py | 11 ++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index a0bd9ce0760bd..1c07deed411b3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -352,6 +352,13 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str) ) self.report.sql_aggregator = self.aggregator.report + def _add_default_options(self, sql_config: SQLCommonConfig) -> None: + """Add default SQLAlchemy options. Can be overridden by subclasses to add additional defaults.""" + if sql_config.is_profiling_enabled(): + sql_config.options.setdefault( + "max_overflow", sql_config.profiling.max_workers + ) + @classmethod def test_connection(cls, config_dict: dict) -> TestConnectionReport: test_report = TestConnectionReport() @@ -521,10 +528,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit # Extra default SQLAlchemy option for better connection pooling and threading. # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow - if sql_config.is_profiling_enabled(): - sql_config.options.setdefault( - "max_overflow", sql_config.profiling.max_workers - ) + self._add_default_options(sql_config) for inspector in self.get_inspectors(): profiler = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index 71160c3bf7cc5..a21442096e913 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -679,6 +679,12 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): if self.config.stateful_ingestion: self.config.stateful_ingestion.remove_stale_metadata = False + def _add_default_options(self, sql_config: SQLCommonConfig) -> None: + """Add Teradata-specific default options""" + # Teradata does not support max_overflow, instead we use QueuePool when profiling + if sql_config.is_profiling_enabled(): + sql_config.options.setdefault("poolclass", QueuePool) + @classmethod def create(cls, config_dict, ctx): config = TeradataConfig.parse_obj(config_dict) @@ -707,11 +713,6 @@ def get_inspectors(self): # run on multiple databases. url = self.config.get_sql_alchemy_url() - # Teradata does not support max_overflow, instead we use QueuePool when profiling data - if "max_overflow" in self.config.options: - self.config.options.pop("max_overflow") - self.config.options["poolclass"] = QueuePool - logger.debug(f"sql_alchemy_url={url}") engine = create_engine(url, **self.config.options) with engine.connect() as conn: From 4760bac6e1eb7e9282226a60d3268dc5c2b509b1 Mon Sep 17 00:00:00 2001 From: Brock Griffey <52086127+brock-acryl@users.noreply.github.com> Date: Wed, 5 Feb 2025 14:53:48 -0500 Subject: [PATCH 4/6] comment changes moved comments and added new comments --- .../src/datahub/ingestion/source/sql/sql_common.py | 4 ++-- .../src/datahub/ingestion/source/sql/teradata.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 1c07deed411b3..5b1b9b1c2952c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -354,6 +354,8 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str) def _add_default_options(self, sql_config: SQLCommonConfig) -> None: """Add default SQLAlchemy options. Can be overridden by subclasses to add additional defaults.""" + # Extra default SQLAlchemy option for better connection pooling and threading. + # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow if sql_config.is_profiling_enabled(): sql_config.options.setdefault( "max_overflow", sql_config.profiling.max_workers @@ -526,8 +528,6 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit # Known issue with sqlalchemy https://stackoverflow.com/questions/60804288/pycharm-duplicated-log-for-sqlalchemy-echo-true sqlalchemy_log._add_default_handler = lambda x: None # type: ignore - # Extra default SQLAlchemy option for better connection pooling and threading. - # https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow self._add_default_options(sql_config) for inspector in self.get_inspectors(): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index a21442096e913..d76f0136539eb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -681,8 +681,10 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): def _add_default_options(self, sql_config: SQLCommonConfig) -> None: """Add Teradata-specific default options""" - # Teradata does not support max_overflow, instead we use QueuePool when profiling + super()._add_default_options(sql_config) if sql_config.is_profiling_enabled(): + # By default, Teradata uses SingletonThreadPool, which is not supported by sqlalchemy + # QueuePool used for parallel connections when profiling is enabled sql_config.options.setdefault("poolclass", QueuePool) @classmethod From 40499e01a03113d489f5d9a15a46e86634225661 Mon Sep 17 00:00:00 2001 From: Brock Griffey <52086127+brock-acryl@users.noreply.github.com> Date: Thu, 6 Feb 2025 22:23:56 -0500 Subject: [PATCH 5/6] Update teradata.py Updated comments. Adjusted to only change poolclass. --- .../src/datahub/ingestion/source/sql/teradata.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index d76f0136539eb..96bf7c567e5a8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -683,8 +683,10 @@ def _add_default_options(self, sql_config: SQLCommonConfig) -> None: """Add Teradata-specific default options""" super()._add_default_options(sql_config) if sql_config.is_profiling_enabled(): - # By default, Teradata uses SingletonThreadPool, which is not supported by sqlalchemy - # QueuePool used for parallel connections when profiling is enabled + # Sqlalchemy uses QueuePool by default however Teradata uses SingletonThreadPool. + # SingletonThreadPool does not support parellel connections. + # https://docs.sqlalchemy.org/en/20/core/pooling.html#connection-pool-configuration + # https://github.com/Teradata/sqlalchemy-teradata/issues/96 sql_config.options.setdefault("poolclass", QueuePool) @classmethod From db2deed98297a32a66d6905e1f1dd2ac7308f71b Mon Sep 17 00:00:00 2001 From: Brock Griffey <52086127+brock-acryl@users.noreply.github.com> Date: Thu, 6 Feb 2025 22:53:08 -0500 Subject: [PATCH 6/6] Update teradata.py --- metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index 96bf7c567e5a8..c52eceb726955 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -684,7 +684,7 @@ def _add_default_options(self, sql_config: SQLCommonConfig) -> None: super()._add_default_options(sql_config) if sql_config.is_profiling_enabled(): # Sqlalchemy uses QueuePool by default however Teradata uses SingletonThreadPool. - # SingletonThreadPool does not support parellel connections. + # SingletonThreadPool does not support parellel connections. For using profiling, we need to use QueuePool. # https://docs.sqlalchemy.org/en/20/core/pooling.html#connection-pool-configuration # https://github.com/Teradata/sqlalchemy-teradata/issues/96 sql_config.options.setdefault("poolclass", QueuePool)