Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Changelog

## Unreleased
- Removed `SystemColumnWorkaround` for `_`-prefixed column names.
The package now requires CrateDB 6.2 or higher.
Comment on lines 3 to +5
Copy link
Member Author

@amotl amotl Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see on the release notes the CrateDB 6.2 release might be far away (expecting 6.1 first?), this patch is probably not ready for releasing, and should be made a draft again for the time being?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6.1 will be released soon, 6.2 jan/feb 2026 though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Because this package has 4k downloads/month already, we will probably hold this update back until CrateDB 6.2 has been released. In this spirit, I am toggling it into draft mode again.


## 2025/07/07 v0.0.2
- ingestr: Fixed importing from Kafka per `SystemColumnWorkaround`
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
The [dlt-cratedb] package is temporary for shipping the code until
[DLT-2733] is ready for upstreaming into main [dlt].

Operating the package successfully needs CrateDB 6.2 or higher.

## Documentation

Please refer to the [handbook].
Expand All @@ -33,8 +35,7 @@ Please refer to the [handbook].

- The `cratedb` adapter is heavily based on the `postgres` adapter.
- The `CrateDbSqlClient` deviates from the original `Psycopg2SqlClient` by
accounting for [CRATEDB-15161] per `SystemColumnWorkaround`.
- A few more other patches.
adding a few CrateDB-specific adjustments.

## Backlog

Expand Down
2 changes: 1 addition & 1 deletion docs/backlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
```
cr> select * from users;
+----+-------+-------------------+----------------+
| id | name | __dlt_load_id | __dlt_id |
| id | name | _dlt_load_id | _dlt_id |
+----+-------+-------------------+----------------+
| 2 | Bob | 1749406972.57687 | NFPmX5Pw4gZ7fA |
| 1 | Alice | 1749406985.809837 | 6/Xoe8jhlBUbAQ |
Expand Down
53 changes: 2 additions & 51 deletions src/dlt_cratedb/impl/cratedb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,15 @@
from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient
from dlt.destinations.typing import DBTransaction

from dlt_cratedb.impl.cratedb.utils import SystemColumnWorkaround

if platform.python_implementation() == "PyPy":
import psycopg2cffi as psycopg2 # type: ignore[import-not-found]
else:
import psycopg2

from contextlib import contextmanager
from typing import Any, AnyStr, Iterator, List, Optional, Sequence, cast

from dlt.common.destination.dataset import DBApiCursor
from dlt.destinations.sql_client import (
DBApiCursorImpl,
raise_database_error,
)


class CrateDbApiCursorImpl(DBApiCursorImpl):
"""
Compensate for patches by `SystemColumnWorkaround`.
"""
from typing import Iterator

def _get_columns(self) -> List[str]:
if self.native_cursor.description:
return [SystemColumnWorkaround.unquirk(c[0]) for c in self.native_cursor.description]
return []
from dlt.destinations.sql_client import raise_database_error


class CrateDbSqlClient(Psycopg2SqlClient):
Expand All @@ -41,7 +24,6 @@ class CrateDbSqlClient(Psycopg2SqlClient):

- Use `doc` as a default search path.
- Disable transactions.
- Apply I/O patches provided by `SystemColumnWorkaround`.
"""

def open_connection(self) -> "psycopg2.connection":
Expand Down Expand Up @@ -83,37 +65,6 @@ def rollback_transaction(self) -> None:
"""
raise NotImplementedError("CrateDB statements can not be rolled back")

# @raise_database_error
def execute_sql(
self, sql: AnyStr, *args: Any, **kwargs: Any
) -> Optional[Sequence[Sequence[Any]]]:
"""
Need to patch the result returned from the database.
"""
result = super().execute_sql(sql, *args, **kwargs)
return SystemColumnWorkaround.patch_result(result)

@contextmanager
@raise_database_error
def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]:
"""
Need to patch the SQL statement and use the custom `CrateDbApiCursorImpl`.
"""
query = cast(AnyStr, SystemColumnWorkaround.patch_sql(query))
curr: DBApiCursor
db_args = args if args else kwargs if kwargs else None
with self._conn.cursor() as curr:
try:
curr.execute(query, db_args)
yield CrateDbApiCursorImpl(curr) # type: ignore[abstract]
except psycopg2.Error as outer:
try:
self._reset_connection()
except psycopg2.Error:
self.close_connection()
self.open_connection()
raise outer

@staticmethod
def _is_error_schema_unknown(exception: Exception) -> bool:
"""
Expand Down
84 changes: 1 addition & 83 deletions src/dlt_cratedb/impl/cratedb/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import json
import platform
from datetime import date, datetime, time
from typing import Any, Optional, Sequence, Union
from typing import Any

from dlt.common.data_writers.escape import _escape_extended, _make_sql_escape_re

if platform.python_implementation() == "PyPy":
import psycopg2cffi as psycopg2 # type: ignore[import-not-found]
import psycopg2.sql

# CrateDB does not accept the original `{"'": "''", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}`?
SQL_ESCAPE_DICT = {"'": "''"}
SQL_ESCAPE_RE = _make_sql_escape_re(SQL_ESCAPE_DICT)
Expand Down Expand Up @@ -49,80 +44,3 @@ def escape_cratedb_literal(v: Any) -> Any:
return "NULL"

return str(v)


class SystemColumnWorkaround:
"""
CrateDB reserves `_`-prefixed column names for internal purposes.

When approaching CrateDB with such private columns which are common in
ETL frameworks, a corresponding error will be raised.

InvalidColumnNameException["_dlt_load_id" conflicts with system column pattern]

This class provides utility methods to work around the problem, brutally.

See also: https://github.com/crate/crate/issues/15161

FIXME: Please get rid of this code by resolving the issue in CrateDB.
"""

quirked_labels = [
# dlt core
"__dlt_id",
"__dlt_load_id",
"__dlt_parent_id",
"__dlt_list_idx",
"__dlt_root_id",
# ingestr kafka
"_kafka__msg_id",
# ingestr mongodb
# "__id",
]

@staticmethod
def quirk(thing: str) -> str:
thing = (
thing.replace("_dlt_load_id", "__dlt_load_id")
.replace("_dlt_id", "__dlt_id")
.replace("_dlt_parent_id", "__dlt_parent_id")
.replace("_dlt_list_idx", "__dlt_list_idx")
.replace("_dlt_root_id", "__dlt_root_id")
.replace("_kafka_msg_id", "_kafka__msg_id")
)
return thing

@staticmethod
def unquirk(thing: str) -> str:
thing = (
thing.replace("__dlt_load_id", "_dlt_load_id")
.replace("__dlt_id", "_dlt_id")
.replace("__dlt_parent_id", "_dlt_parent_id")
.replace("__dlt_list_idx", "_dlt_list_idx")
.replace("__dlt_root_id", "_dlt_root_id")
.replace("_kafka__msg_id", "_kafka_msg_id")
)
return thing

@classmethod
def patch_sql(cls, sql: Union[str, psycopg2.sql.Composed]) -> Union[str, psycopg2.sql.Composed]:
if isinstance(sql, str):
sql = cls.quirk(sql)
elif isinstance(sql, psycopg2.sql.Composed):
sql.seq[0]._wrapped = cls.quirk(sql.seq[0]._wrapped)
else:
raise NotImplementedError(f"Unsupported type '{type(sql)}' for augmenting SQL: {sql}")
return sql

@classmethod
def patch_result(cls, result: Sequence[Sequence[Any]]) -> Optional[Sequence[Sequence[Any]]]:
if result is None:
return None
row_new = []
for row in result:
if len(row) >= 2 and row[1] in cls.quirked_labels:
r = list(row)
r[1] = cls.unquirk(r[1])
row = tuple(r)
row_new.append(row)
return row_new