From c9c481ed1a6138cd7c99a3750a0a6cd5e4398484 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 10 Nov 2025 18:21:13 +0100 Subject: [PATCH] Remove workaround for `_`-prefixed column names --- CHANGES.md | 2 + README.md | 5 +- docs/backlog.md | 2 +- src/dlt_cratedb/impl/cratedb/sql_client.py | 53 +------------- src/dlt_cratedb/impl/cratedb/utils.py | 84 +--------------------- 5 files changed, 9 insertions(+), 137 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0606612..ccff524 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,8 @@ # Changelog ## Unreleased +- Removed `SystemColumnWorkaround` for `_`-prefixed column names. + The package now requires CrateDB 6.2 or higher. ## 2025/07/07 v0.0.2 - ingestr: Fixed importing from Kafka per `SystemColumnWorkaround` diff --git a/README.md b/README.md index 8a342c3..15af97c 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,8 @@ The [dlt-cratedb] package is temporary for shipping the code until [DLT-2733] is ready for upstreaming into main [dlt]. +Operating the package successfully needs CrateDB 6.2 or higher. + ## Documentation Please refer to the [handbook]. @@ -33,8 +35,7 @@ Please refer to the [handbook]. - The `cratedb` adapter is heavily based on the `postgres` adapter. - The `CrateDbSqlClient` deviates from the original `Psycopg2SqlClient` by - accounting for [CRATEDB-15161] per `SystemColumnWorkaround`. -- A few more other patches. + adding a few CrateDB-specific adjustments. ## Backlog diff --git a/docs/backlog.md b/docs/backlog.md index 10bc0f6..43af042 100644 --- a/docs/backlog.md +++ b/docs/backlog.md @@ -57,7 +57,7 @@ ``` cr> select * from users; +----+-------+-------------------+----------------+ - | id | name | __dlt_load_id | __dlt_id | + | id | name | _dlt_load_id | _dlt_id | +----+-------+-------------------+----------------+ | 2 | Bob | 1749406972.57687 | NFPmX5Pw4gZ7fA | | 1 | Alice | 1749406985.809837 | 6/Xoe8jhlBUbAQ | diff --git a/src/dlt_cratedb/impl/cratedb/sql_client.py b/src/dlt_cratedb/impl/cratedb/sql_client.py index 7a88019..39a09b6 100644 --- a/src/dlt_cratedb/impl/cratedb/sql_client.py +++ b/src/dlt_cratedb/impl/cratedb/sql_client.py @@ -7,32 +7,15 @@ from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient from dlt.destinations.typing import DBTransaction -from dlt_cratedb.impl.cratedb.utils import SystemColumnWorkaround - if platform.python_implementation() == "PyPy": import psycopg2cffi as psycopg2 # type: ignore[import-not-found] else: import psycopg2 from contextlib import contextmanager -from typing import Any, AnyStr, Iterator, List, Optional, Sequence, cast - -from dlt.common.destination.dataset import DBApiCursor -from dlt.destinations.sql_client import ( - DBApiCursorImpl, - raise_database_error, -) - - -class CrateDbApiCursorImpl(DBApiCursorImpl): - """ - Compensate for patches by `SystemColumnWorkaround`. - """ +from typing import Iterator - def _get_columns(self) -> List[str]: - if self.native_cursor.description: - return [SystemColumnWorkaround.unquirk(c[0]) for c in self.native_cursor.description] - return [] +from dlt.destinations.sql_client import raise_database_error class CrateDbSqlClient(Psycopg2SqlClient): @@ -41,7 +24,6 @@ class CrateDbSqlClient(Psycopg2SqlClient): - Use `doc` as a default search path. - Disable transactions. - - Apply I/O patches provided by `SystemColumnWorkaround`. """ def open_connection(self) -> "psycopg2.connection": @@ -83,37 +65,6 @@ def rollback_transaction(self) -> None: """ raise NotImplementedError("CrateDB statements can not be rolled back") - # @raise_database_error - def execute_sql( - self, sql: AnyStr, *args: Any, **kwargs: Any - ) -> Optional[Sequence[Sequence[Any]]]: - """ - Need to patch the result returned from the database. - """ - result = super().execute_sql(sql, *args, **kwargs) - return SystemColumnWorkaround.patch_result(result) - - @contextmanager - @raise_database_error - def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]: - """ - Need to patch the SQL statement and use the custom `CrateDbApiCursorImpl`. - """ - query = cast(AnyStr, SystemColumnWorkaround.patch_sql(query)) - curr: DBApiCursor - db_args = args if args else kwargs if kwargs else None - with self._conn.cursor() as curr: - try: - curr.execute(query, db_args) - yield CrateDbApiCursorImpl(curr) # type: ignore[abstract] - except psycopg2.Error as outer: - try: - self._reset_connection() - except psycopg2.Error: - self.close_connection() - self.open_connection() - raise outer - @staticmethod def _is_error_schema_unknown(exception: Exception) -> bool: """ diff --git a/src/dlt_cratedb/impl/cratedb/utils.py b/src/dlt_cratedb/impl/cratedb/utils.py index f3cda6a..02437b4 100644 --- a/src/dlt_cratedb/impl/cratedb/utils.py +++ b/src/dlt_cratedb/impl/cratedb/utils.py @@ -1,14 +1,9 @@ import json -import platform from datetime import date, datetime, time -from typing import Any, Optional, Sequence, Union +from typing import Any from dlt.common.data_writers.escape import _escape_extended, _make_sql_escape_re -if platform.python_implementation() == "PyPy": - import psycopg2cffi as psycopg2 # type: ignore[import-not-found] -import psycopg2.sql - # CrateDB does not accept the original `{"'": "''", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}`? SQL_ESCAPE_DICT = {"'": "''"} SQL_ESCAPE_RE = _make_sql_escape_re(SQL_ESCAPE_DICT) @@ -49,80 +44,3 @@ def escape_cratedb_literal(v: Any) -> Any: return "NULL" return str(v) - - -class SystemColumnWorkaround: - """ - CrateDB reserves `_`-prefixed column names for internal purposes. - - When approaching CrateDB with such private columns which are common in - ETL frameworks, a corresponding error will be raised. - - InvalidColumnNameException["_dlt_load_id" conflicts with system column pattern] - - This class provides utility methods to work around the problem, brutally. - - See also: https://github.com/crate/crate/issues/15161 - - FIXME: Please get rid of this code by resolving the issue in CrateDB. - """ - - quirked_labels = [ - # dlt core - "__dlt_id", - "__dlt_load_id", - "__dlt_parent_id", - "__dlt_list_idx", - "__dlt_root_id", - # ingestr kafka - "_kafka__msg_id", - # ingestr mongodb - # "__id", - ] - - @staticmethod - def quirk(thing: str) -> str: - thing = ( - thing.replace("_dlt_load_id", "__dlt_load_id") - .replace("_dlt_id", "__dlt_id") - .replace("_dlt_parent_id", "__dlt_parent_id") - .replace("_dlt_list_idx", "__dlt_list_idx") - .replace("_dlt_root_id", "__dlt_root_id") - .replace("_kafka_msg_id", "_kafka__msg_id") - ) - return thing - - @staticmethod - def unquirk(thing: str) -> str: - thing = ( - thing.replace("__dlt_load_id", "_dlt_load_id") - .replace("__dlt_id", "_dlt_id") - .replace("__dlt_parent_id", "_dlt_parent_id") - .replace("__dlt_list_idx", "_dlt_list_idx") - .replace("__dlt_root_id", "_dlt_root_id") - .replace("_kafka__msg_id", "_kafka_msg_id") - ) - return thing - - @classmethod - def patch_sql(cls, sql: Union[str, psycopg2.sql.Composed]) -> Union[str, psycopg2.sql.Composed]: - if isinstance(sql, str): - sql = cls.quirk(sql) - elif isinstance(sql, psycopg2.sql.Composed): - sql.seq[0]._wrapped = cls.quirk(sql.seq[0]._wrapped) - else: - raise NotImplementedError(f"Unsupported type '{type(sql)}' for augmenting SQL: {sql}") - return sql - - @classmethod - def patch_result(cls, result: Sequence[Sequence[Any]]) -> Optional[Sequence[Sequence[Any]]]: - if result is None: - return None - row_new = [] - for row in result: - if len(row) >= 2 and row[1] in cls.quirked_labels: - r = list(row) - r[1] = cls.unquirk(r[1]) - row = tuple(r) - row_new.append(row) - return row_new