Skip to content

Commit

Permalink
fix(ingestion/teradata): teradata profiling fix for pooling (#12507)
Browse files Browse the repository at this point in the history
  • Loading branch information
brock-acryl authored Feb 7, 2025
1 parent 390a672 commit 45c8123
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
16 changes: 10 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,15 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str)
)
self.report.sql_aggregator = self.aggregator.report

def _add_default_options(self, sql_config: SQLCommonConfig) -> None:
"""Add default SQLAlchemy options. Can be overridden by subclasses to add additional defaults."""
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if sql_config.is_profiling_enabled():
sql_config.options.setdefault(
"max_overflow", sql_config.profiling.max_workers
)

@classmethod
def test_connection(cls, config_dict: dict) -> TestConnectionReport:
test_report = TestConnectionReport()
Expand Down Expand Up @@ -519,12 +528,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
# Known issue with sqlalchemy https://stackoverflow.com/questions/60804288/pycharm-duplicated-log-for-sqlalchemy-echo-true
sqlalchemy_log._add_default_handler = lambda x: None # type: ignore

# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if sql_config.is_profiling_enabled():
sql_config.options.setdefault(
"max_overflow", sql_config.profiling.max_workers
)
self._add_default_options(sql_config)

for inspector in self.get_inspectors():
profiler = None
Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from sqlalchemy.engine import Engine
from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.pool import QueuePool
from sqlalchemy.sql.expression import text
from teradatasqlalchemy.dialect import TeradataDialect
from teradatasqlalchemy.options import configure
Expand Down Expand Up @@ -678,6 +679,16 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
if self.config.stateful_ingestion:
self.config.stateful_ingestion.remove_stale_metadata = False

def _add_default_options(self, sql_config: SQLCommonConfig) -> None:
"""Add Teradata-specific default options"""
super()._add_default_options(sql_config)
if sql_config.is_profiling_enabled():
# Sqlalchemy uses QueuePool by default however Teradata uses SingletonThreadPool.
# SingletonThreadPool does not support parellel connections. For using profiling, we need to use QueuePool.
# https://docs.sqlalchemy.org/en/20/core/pooling.html#connection-pool-configuration
# https://github.com/Teradata/sqlalchemy-teradata/issues/96
sql_config.options.setdefault("poolclass", QueuePool)

@classmethod
def create(cls, config_dict, ctx):
config = TeradataConfig.parse_obj(config_dict)
Expand Down Expand Up @@ -705,6 +716,7 @@ def get_inspectors(self):
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
url = self.config.get_sql_alchemy_url()

logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
with engine.connect() as conn:
Expand Down

0 comments on commit 45c8123

Please sign in to comment.