diff --git a/dlt/destinations/__init__.py b/dlt/destinations/__init__.py index a856f574d8..f4a69b6c7f 100644 --- a/dlt/destinations/__init__.py +++ b/dlt/destinations/__init__.py @@ -1,6 +1,7 @@ from dlt.destinations.impl.postgres.factory import postgres from dlt.destinations.impl.snowflake.factory import snowflake from dlt.destinations.impl.filesystem.factory import filesystem +from dlt.destinations.impl.cratedb.factory import cratedb from dlt.destinations.impl.duckdb.factory import duckdb from dlt.destinations.impl.dummy.factory import dummy from dlt.destinations.impl.mssql.factory import mssql @@ -23,6 +24,7 @@ "postgres", "snowflake", "filesystem", + "cratedb", "duckdb", "dummy", "mssql", diff --git a/dlt/destinations/impl/cratedb/__init__.py b/dlt/destinations/impl/cratedb/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/destinations/impl/cratedb/backlog.md b/dlt/destinations/impl/cratedb/backlog.md new file mode 100644 index 0000000000..10bc0f6d42 --- /dev/null +++ b/dlt/destinations/impl/cratedb/backlog.md @@ -0,0 +1,74 @@ +# CrateDB destination adapter backlog + +## Iteration +1 + +- Check if the Redshift literal escaper is the right choice for all + data types it is handling. + +- Currently only works with CrateDB's default dataset / schema `doc`, + that's why out-of-the-box `dlt init chess cratedb` will fail like + `ERROR: Schema 'chess_players_games_data' unknown`. Why!? + Thoughts: Most probably, dlt introspects the available schemas, + doesn't find it in CrateDB, and fails. It doesn't know that CrateDB + would create the schema transparently, without needing an explicit + `CREATE SCHEMA ...` operation. + +- ERROR: mismatched input 'ABORT' expecting {.... + +- Documentation: How JSONB/ARRAYs are mapped to CrateDB + +- The `merge` followup job is currently defunct with CrateDB. + It has been replaced by a `replace" [sic!] job. + +- Troubleshooting: Workloads specifying certain write dispositions currently + need to be invoked twice. On the first invocation, this happens: + + dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package 1750628689.026369 with exception: + + + Schema frankfurter content was changed - by a loader or by destination code - from the moment it was retrieved by load package. Such schema cannot reliably be updated nor saved. Current version hash: ZUJEaD/gSBvY9vbgx3urdi93mJppp7CMlRaLuJkxJCs= != stored version hash Cocqcq+4Qzo8zLOc9R3uYjhjLZr6D/QwgS03TtFG2wI=. If you are using destination client directly, without storing schema in load package, you should first save it into schema storage. You can also use schema._bump_version() in test code to remove modified flag. + +- Is it possible to use an `adapter` to better support CrateDB's special + data types? For example, full round-tripping using `sys.summits` isn't + possible just yet, due to the `coordinates` column, for example when + selecting DuckDB. + +- Is it possible to provide other direct loaders from the `sqlalchemy` + adapter? + +- Addressing per `cratedb://` will raise errors, e.g. in `ingestr`: + `Following fields are missing: ['password', 'username', 'host'] in configuration with spec CrateDbCredentials` + +- Add staging support + > CrateDB supports Amazon S3, Google Cloud Storage, and Azure Blob + > Storage as file staging destinations. + +## Verify + +- > Data is loaded into CrateDB using the most efficient method + > depending on the data source. + +- > CrateDB does not support the `binary` datatype. Binary will be loaded to a `text` column. + +## Iteration +2 + +- The `UNIQUE` constraint is dearly missing. Is it possible to emulate? + Otherwise, when loading data multiple times, duplicates will happen. + ``` + cr> select * from users; + +----+-------+-------------------+----------------+ + | id | name | __dlt_load_id | __dlt_id | + +----+-------+-------------------+----------------+ + | 2 | Bob | 1749406972.57687 | NFPmX5Pw4gZ7fA | + | 1 | Alice | 1749406985.809837 | 6/Xoe8jhlBUbAQ | + | 2 | Bob | 1749406985.809837 | VnP4S8AsQ/ujOg | + | 1 | Alice | 1749406972.57687 | 4HCWs9jqfwTyTQ | + +----+-------+-------------------+----------------+ + SELECT 4 rows in set (0.003 sec) + ``` + +- Provide Ibis dataset access, see + tests/load/test_read_interfaces.py + +- Software tests about geospatial concerns have been removed. + See `postgres` adapter for bringing them back. diff --git a/dlt/destinations/impl/cratedb/configuration.py b/dlt/destinations/impl/cratedb/configuration.py new file mode 100644 index 0000000000..2030c99749 --- /dev/null +++ b/dlt/destinations/impl/cratedb/configuration.py @@ -0,0 +1,27 @@ +import dataclasses +from typing import Final +from dlt.common.configuration import configspec +from dlt.destinations.impl.postgres.configuration import ( + PostgresCredentials, + PostgresClientConfiguration, +) + + +@configspec(init=False) +class CrateDbCredentials(PostgresCredentials): + drivername: Final[str] = dataclasses.field( # type: ignore[misc] + default="postgres", init=False, repr=False, compare=False + ) + + +# CrateDB does not support databases, just schemas. +# In dlt, schemas are conveyed by `dataset_name`? +del CrateDbCredentials.__dataclass_fields__["database"] + + +@configspec +class CrateDbClientConfiguration(PostgresClientConfiguration): + destination_type: Final[str] = dataclasses.field( # type: ignore[misc] + default="cratedb", init=False, repr=False, compare=False + ) + credentials: CrateDbCredentials = None diff --git a/dlt/destinations/impl/cratedb/cratedb.py b/dlt/destinations/impl/cratedb/cratedb.py new file mode 100644 index 0000000000..c14f8feadd --- /dev/null +++ b/dlt/destinations/impl/cratedb/cratedb.py @@ -0,0 +1,138 @@ +import logging +from typing import Dict, Any, Sequence, List + +from dlt.common.destination import DestinationCapabilitiesContext +from dlt.common.destination.client import ( + PreparedTableSchema, + LoadJob, + FollowupJobRequest, +) +from dlt.common.schema import TColumnHint, Schema +from dlt.destinations.impl.cratedb.configuration import CrateDbClientConfiguration +from dlt.destinations.impl.cratedb.sql_client import CrateDbSqlClient +from dlt.destinations.impl.postgres.postgres import PostgresClient +from dlt.destinations.insert_job_client import InsertValuesJobClient +from dlt.destinations.sql_client import SqlClientBase +from dlt.destinations.sql_jobs import SqlStagingReplaceFollowupJob + +# FIXME: The `UNIQUE` constraint is dearly missing. +# When loading data multiple times, duplicates will happen. +HINT_TO_CRATEDB_ATTR: Dict[TColumnHint, str] = {"unique": ""} + + +logger = logging.getLogger(__name__) + + +class CrateDbStagingReplaceJob(SqlStagingReplaceFollowupJob): + @classmethod + def generate_sql( + cls, + table_chain: Sequence[PreparedTableSchema], + sql_client: SqlClientBase[Any], + ) -> List[str]: + """ + CrateDB uses `ALTER CLUSTER SWAP TABLE`. + + -- https://github.com/crate/crate/issues/14833 + """ + sql: List[str] = [] + for table in table_chain: + with sql_client.with_staging_dataset(): + staging_table_name = sql_client.make_qualified_table_name(table["name"]) + table_name = sql_client.make_qualified_table_name(table["name"]) + sql.extend( + ( + # Drop destination table. + f"DROP TABLE IF EXISTS {table_name};", + # Recreate destination table, because `ALTER CLUSTER SWAP TABLE` needs it. + ( + f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM" + f" {staging_table_name} WHERE 1 = 0;" + ), + # Move the staging table to the destination schema. + f"ALTER CLUSTER SWAP TABLE {staging_table_name} TO {table_name};", + # CrateDB needs to flush writes. + f"REFRESH TABLE {table_name};", + # Recreate staging table not needed with CrateDB, because + # `ALTER CLUSTER SWAP TABLE` does not remove the source table. + ( + f"CREATE TABLE IF NOT EXISTS {staging_table_name} AS SELECT * FROM" + f" {table_name} WHERE 1 = 0;" + ), + f"REFRESH TABLE {staging_table_name};", + ) + ) + return sql + + +class CrateDbClient(PostgresClient): + def __init__( + self, + schema: Schema, + config: CrateDbClientConfiguration, + capabilities: DestinationCapabilitiesContext, + ) -> None: + sql_client = CrateDbSqlClient( + config.normalize_dataset_name(schema), + config.normalize_staging_dataset_name(schema), + config.credentials, + capabilities, + ) + InsertValuesJobClient.__init__(self, schema, config, sql_client) + self.config: CrateDbClientConfiguration = config + self.sql_client: CrateDbSqlClient = sql_client + self.active_hints = HINT_TO_CRATEDB_ATTR if self.config.create_indexes else {} + self.type_mapper = self.capabilities.get_type_mapper() + + def create_load_job( + self, + table: PreparedTableSchema, + file_path: str, + load_id: str, + restore: bool = False, + ) -> LoadJob: + """ + CrateDB only supports the "insert values" paradigm. + """ + job = InsertValuesJobClient.create_load_job(self, table, file_path, load_id, restore) + if job is not None: + return job + return None + + def _create_merge_followup_jobs( + self, table_chain: Sequence[PreparedTableSchema] + ) -> List[FollowupJobRequest]: + """ + CrateDB currently does not support "merge" followup jobs. + -- https://github.com/crate-workbench/dlt/issues/4 + + Workaround: Redirect the "merge" job to use a "replace" job instead. + """ + return [CrateDbStagingReplaceJob.from_table_chain(table_chain, self.sql_client)] + + def complete_load(self, load_id: str) -> None: + """ + Intercept to invoke a `REFRESH TABLE ...` statement. + """ + result = super().complete_load(load_id=load_id) + table_name = self.sql_client.make_qualified_table_name(self.schema.loads_table_name) + self.sql_client.execute_sql(f"REFRESH TABLE {table_name}") + return result + + def _commit_schema_update(self, schema: Schema, schema_str: str) -> None: + """ + Intercept to invoke a `REFRESH TABLE ...` statement. + """ + result = super()._commit_schema_update(schema=schema, schema_str=schema_str) + table_name = self.sql_client.make_qualified_table_name(self.schema.version_table_name) + self.sql_client.execute_sql(f"REFRESH TABLE {table_name}") + return result + + def _delete_schema_in_storage(self, schema: Schema) -> None: + """ + Intercept to invoke a `REFRESH TABLE ...` statement. + """ + result = super()._delete_schema_in_storage(schema=schema) + table_name = self.sql_client.make_qualified_table_name(self.schema.version_table_name) + self.sql_client.execute_sql(f"REFRESH TABLE {table_name}") + return result diff --git a/dlt/destinations/impl/cratedb/factory.py b/dlt/destinations/impl/cratedb/factory.py new file mode 100644 index 0000000000..eb449d6085 --- /dev/null +++ b/dlt/destinations/impl/cratedb/factory.py @@ -0,0 +1,97 @@ +from copy import deepcopy +from typing import Type, TYPE_CHECKING, List, Any, Dict + +from dlt.common.destination import Destination, DestinationCapabilitiesContext +from dlt.common.destination.typing import PreparedTableSchema +from dlt.common.schema.typing import TColumnSchema +from dlt.destinations.impl.cratedb.configuration import CrateDbClientConfiguration +from dlt.destinations.impl.cratedb.utils import escape_cratedb_literal +from dlt.destinations.impl.postgres.factory import PostgresTypeMapper, postgres + +if TYPE_CHECKING: + from dlt.destinations.impl.cratedb.cratedb import CrateDbClient + + +class CrateDbTypeMapper(PostgresTypeMapper): + """ + Adjust type mappings for CrateDB. + + - CrateDB uses `object(dynamic)` instead of `json` or `jsonb`. + - CrateDB does not support `timestamp(6) without time zone`. + - CrateDB does not support `time` for storing. + - CrateDB does not support `binary` or `bytea`. + """ + + def __new__(cls, *args: List[Any], **kwargs: Dict[str, Any]) -> "CrateDbTypeMapper": + cls.sct_to_unbound_dbt = deepcopy(PostgresTypeMapper.sct_to_unbound_dbt) + cls.sct_to_unbound_dbt["json"] = "object(dynamic)" + cls.sct_to_unbound_dbt["binary"] = "text" + + cls.sct_to_dbt = deepcopy(PostgresTypeMapper.sct_to_dbt) + cls.sct_to_dbt["timestamp"] = "timestamp with time zone" + del cls.sct_to_dbt["time"] + + cls.dbt_to_sct = deepcopy(PostgresTypeMapper.dbt_to_sct) + cls.dbt_to_sct["jsonb"] = "object(dynamic)" # type: ignore[assignment] + cls.dbt_to_sct["bytea"] = "text" + + return super().__new__(cls) + + def to_db_datetime_type( + self, + column: TColumnSchema, + table: PreparedTableSchema = None, + ) -> str: + """ + CrateDB does not support `timestamp(6) without time zone`. + To not render the SQL clause like this, nullify the `precision` attribute. + """ + column["precision"] = None + return super().to_db_datetime_type(column, table) + + +class cratedb(postgres, Destination[CrateDbClientConfiguration, "CrateDbClient"]): + spec = CrateDbClientConfiguration # type: ignore[assignment] + + def _raw_capabilities(self) -> DestinationCapabilitiesContext: + """ + Tune down capabilities for CrateDB. + """ + caps = super()._raw_capabilities() + + # CrateDB does not support transactions. + caps.supports_transactions = False + caps.supports_ddl_transactions = False + + # CrateDB does not support `TRUNCATE TABLE`, use `DELETE FROM` instead. + caps.supports_truncate_command = False + + # CrateDB's type mapping needs adjustments compared to PostgreSQL. + caps.type_mapper = CrateDbTypeMapper + + # TODO: Provide a dedicated dialect for SQLGlot. + caps.sqlglot_dialect = "postgres" + + # CrateDB needs a slightly adjusted escaping of literals. + # TODO: Escaping might need further adjustments, to be explored using integration tests. + caps.escape_literal = escape_cratedb_literal + + # CrateDB does not support direct data loading using advanced formats. + # TODO: Explore adding more formats for staged imports. + caps.preferred_loader_file_format = "insert_values" + caps.supported_loader_file_formats = ["insert_values"] + caps.loader_file_format_selector = None + + return caps + + @property + def client_class(self) -> Type["CrateDbClient"]: + """ + Provide a different client for CrateDB. + """ + from dlt.destinations.impl.cratedb.cratedb import CrateDbClient + + return CrateDbClient + + +cratedb.register() diff --git a/dlt/destinations/impl/cratedb/readme.md b/dlt/destinations/impl/cratedb/readme.md new file mode 100644 index 0000000000..a74db46241 --- /dev/null +++ b/dlt/destinations/impl/cratedb/readme.md @@ -0,0 +1,14 @@ +# CrateDB destination adapter + +## What's inside + +- The `cratedb` adapter is heavily based on the `postgres` adapter. +- The `CrateDbSqlClient` deviates from the original `Psycopg2SqlClient` by + accounting for [CRATEDB-15161] per `SystemColumnWorkaround`. + +## Backlog + +A few items need to be resolved, see [backlog](./backlog.md). + + +[CRATEDB-15161]: https://github.com/crate/crate/issues/15161 diff --git a/dlt/destinations/impl/cratedb/sql_client.py b/dlt/destinations/impl/cratedb/sql_client.py new file mode 100644 index 0000000000..2a0a6ce0c4 --- /dev/null +++ b/dlt/destinations/impl/cratedb/sql_client.py @@ -0,0 +1,155 @@ +import platform + +from dlt.destinations.exceptions import DatabaseUndefinedRelation, DatabaseTransientException +from dlt.destinations.impl.cratedb.utils import SystemColumnWorkaround +from dlt.destinations.impl.postgres.sql_client import Psycopg2SqlClient +from dlt.common import logger +from dlt.destinations.typing import DBTransaction + +if platform.python_implementation() == "PyPy": + import psycopg2cffi as psycopg2 + from psycopg2cffi.sql import SQL, Composed, Composable +else: + import psycopg2 + +from contextlib import contextmanager +from typing import Any, AnyStr, Iterator, Optional, Sequence, List, cast + +from dlt.common.destination.dataset import DBApiCursor + +from dlt.destinations.sql_client import ( + DBApiCursorImpl, + raise_database_error, +) + + +class CrateDbApiCursorImpl(DBApiCursorImpl): + """ + Compensate for patches by `SystemColumnWorkaround`. + """ + + def _get_columns(self) -> List[str]: + if self.native_cursor.description: + return [SystemColumnWorkaround.unquirk(c[0]) for c in self.native_cursor.description] + return [] + + +class CrateDbSqlClient(Psycopg2SqlClient): + """ + CrateDB SQL client, mostly compatible with PostgreSQL, with a few deviations. + + - Use `doc` as a default search path. + - Disable transactions. + - Apply I/O patches provided by `SystemColumnWorkaround`. + """ + + def open_connection(self) -> "psycopg2.connection": + """ + Use `doc` instead of `public` as search path with CrateDB. + """ + self._conn = psycopg2.connect( + dsn=self.credentials.to_native_representation(), + options=f"-c search_path={self.fully_qualified_dataset_name()},doc", + ) + self._reset_connection() + return self._conn + + @contextmanager + @raise_database_error + def begin_transaction(self) -> Iterator[DBTransaction]: + """ + CrateDB does not support transactions. Make emitting `BEGIN TRANSACTION` a no-op. + """ + logger.warning( + "CrateDB does not support transactions. Each SQL statement is auto-committed" + " separately." + ) + yield self + + @raise_database_error + def commit_transaction(self) -> None: + """ + CrateDB does not support transactions. Make emitting `COMMIT` a no-op. + """ + pass + + @raise_database_error + def rollback_transaction(self) -> None: + """ + CrateDB does not support transactions. Raise an exception on `ROLLBACK`. + + TODO: Any better idea? + """ + raise NotImplementedError("CrateDB statements can not be rolled back") + + # @raise_database_error + def execute_sql( + self, sql: AnyStr, *args: Any, **kwargs: Any + ) -> Optional[Sequence[Sequence[Any]]]: + """ + Need to patch the result returned from the database. + """ + result = super().execute_sql(sql, *args, **kwargs) + return SystemColumnWorkaround.patch_result(result) + + @contextmanager + @raise_database_error + def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]: + """ + Need to patch the SQL statement and use the custom `CrateDbApiCursorImpl`. + """ + query = cast(AnyStr, SystemColumnWorkaround.patch_sql(query)) + curr: DBApiCursor = None + db_args = args if args else kwargs if kwargs else None + with self._conn.cursor() as curr: + try: + curr.execute(query, db_args) + yield CrateDbApiCursorImpl(curr) # type: ignore[abstract] + except psycopg2.Error as outer: + try: + self._reset_connection() + except psycopg2.Error: + self.close_connection() + self.open_connection() + raise outer + + @staticmethod + def _is_error_schema_unknown(exception: Exception) -> bool: + """ + CrateDB raises `Schema 'testdrive' unknown` errors when accessing schemas not including any tables yet. + + psycopg2.errors.InternalError_: Schema 'testdrive_staging' unknown + dlt.destinations.exceptions.DatabaseTransientException: Schema 'testdrive_staging' unknown + + a) Try to ignore that. + b) TODO: Refactor to synthesize an empty result, see `job_client_impl.get_stored_state`. + c) Resolve in CrateDB. + """ + msg = str(exception) + return "Schema" in msg and "unknown" in msg + + @classmethod + def _make_database_exception(cls, ex: Exception) -> Exception: + """ + Loop in additional error evaluation for CrateDB. + """ + if cls._is_error_schema_unknown(exception=ex): + raise DatabaseUndefinedRelation(ex) + return Psycopg2SqlClient._make_database_exception(ex) + + def create_dataset(self) -> None: + """ + CrateDB does not know `CREATE|DROP SCHEMA ...` statements. + A no-op is not enough, because downstream operations expect the schema to exist. + """ + self.execute_sql( + f"CREATE TABLE IF NOT EXISTS {self.fully_qualified_dataset_name()}._placeholder (id" + " INT)" + ) + + def drop_dataset(self) -> None: + """ + CrateDB does not know `CREATE|DROP SCHEMA ...` statements. + A no-op is not enough, because downstream operations expect the schema to exist. + """ + self.execute_sql(f"DROP TABLE IF EXISTS {self.fully_qualified_dataset_name()}._placeholder") diff --git a/dlt/destinations/impl/cratedb/utils.py b/dlt/destinations/impl/cratedb/utils.py new file mode 100644 index 0000000000..9e14069a37 --- /dev/null +++ b/dlt/destinations/impl/cratedb/utils.py @@ -0,0 +1,122 @@ +import json +import platform +from datetime import datetime, date, time +from typing import Union, Sequence, Any, Optional + +from dlt.common.data_writers.escape import _escape_extended, _make_sql_escape_re + +if platform.python_implementation() == "PyPy": + import psycopg2cffi as psycopg2 +import psycopg2.sql + + +# CrateDB does not accept the original `{"'": "''", "\\": "\\\\", "\n": "\\n", "\r": "\\r"}`? +SQL_ESCAPE_DICT = {"'": "''"} +SQL_ESCAPE_RE = _make_sql_escape_re(SQL_ESCAPE_DICT) + + +def _escape_extended_cratedb(v: str) -> str: + """ + The best-practice escaper for CrateDB, discovered by trial-and-error. + """ + return _escape_extended(v, prefix="'", escape_dict=SQL_ESCAPE_DICT, escape_re=SQL_ESCAPE_RE) + + +def escape_cratedb_literal(v: Any) -> Any: + """ + Based on `escape_postgres_literal`, with a mix of `escape_redshift_literal`. + + CrateDB needs a slightly adjusted escaping of literals. + Examples: "L'Aupillon" and "Pizzas d'Anarosa" from `sys.summits`. + + It possibly also doesn't support the `E'` prefix as employed by the PostgreSQL escaper? + Fortunately, the Redshift escaper came to the rescue, providing a reasonable baseline. + + CrateDB also needs support when serializing container types ARRAY vs. OBJECT. + """ + if isinstance(v, str): + return _escape_extended_cratedb(v) + if isinstance(v, (datetime, date, time)): + return f"'{v.isoformat()}'" + # CrateDB, when serializing from an incoming `json` or `jsonb` type, the type mapper + # can't know about what's actually inside, so arrays need a special treatment. + if isinstance(v, list): + v = {"array": v} + if isinstance(v, dict): + return _escape_extended_cratedb(json.dumps(v)) + "::OBJECT(DYNAMIC)" + if isinstance(v, bytes): + return f"'\\x{v.hex()}'" + if v is None: + return "NULL" + + return str(v) + + +class SystemColumnWorkaround: + """ + CrateDB reserves `_`-prefixed column names for internal purposes. + + When approaching CrateDB with such private columns which are common in + ETL frameworks, a corresponding error will be raised. + + InvalidColumnNameException["_dlt_load_id" conflicts with system column pattern] + + This class provides utility methods to work around the problem, brutally. + + See also: https://github.com/crate/crate/issues/15161 + + FIXME: Please get rid of this code by resolving the issue in CrateDB. + """ + + quirked_labels = [ + "__dlt_id", + "__dlt_load_id", + "__dlt_parent_id", + "__dlt_list_idx", + "__dlt_root_id", + ] + + @staticmethod + def quirk(thing: str) -> str: + thing = ( + thing.replace("_dlt_load_id", "__dlt_load_id") + .replace("_dlt_id", "__dlt_id") + .replace("_dlt_parent_id", "__dlt_parent_id") + .replace("_dlt_list_idx", "__dlt_list_idx") + .replace("_dlt_root_id", "__dlt_root_id") + ) + return thing + + @staticmethod + def unquirk(thing: str) -> str: + thing = ( + thing.replace("__dlt_load_id", "_dlt_load_id") + .replace("__dlt_id", "_dlt_id") + .replace("__dlt_parent_id", "_dlt_parent_id") + .replace("__dlt_list_idx", "_dlt_list_idx") + .replace("__dlt_root_id", "_dlt_root_id") + ) + return thing + + @classmethod + def patch_sql(cls, sql: Union[str, psycopg2.sql.Composed]) -> Union[str, psycopg2.sql.Composed]: + if isinstance(sql, str): + sql = cls.quirk(sql) + elif isinstance(sql, psycopg2.sql.Composed): + sql.seq[0]._wrapped = cls.quirk(sql.seq[0]._wrapped) + else: + raise NotImplementedError(f"Unsupported type '{type(sql)}' for augmenting SQL: {sql}") + return sql + + @classmethod + def patch_result(cls, result: Sequence[Sequence[Any]]) -> Optional[Sequence[Sequence[Any]]]: + if result is None: + return None + row_new = [] + for row in result: + if len(row) >= 2 and row[1] in cls.quirked_labels: + r = list(row) + r[1] = cls.unquirk(r[1]) + row = tuple(r) + row_new.append(row) + return row_new diff --git a/dlt/destinations/utils.py b/dlt/destinations/utils.py index 6e93e62cf7..047507722e 100644 --- a/dlt/destinations/utils.py +++ b/dlt/destinations/utils.py @@ -57,9 +57,9 @@ def get_resource_for_adapter(data: Any) -> DltResource: def info_schema_null_to_bool(v: str) -> bool: """Converts INFORMATION SCHEMA truth values to Python bool""" - if v in ("NO", "0"): + if v in ("NO", "0", False): return False - elif v in ("YES", "1"): + elif v in ("YES", "1", True): return True raise ValueError(v) diff --git a/docs/website/docs/dlt-ecosystem/destinations/cratedb.md b/docs/website/docs/dlt-ecosystem/destinations/cratedb.md new file mode 100644 index 0000000000..65de556cc0 --- /dev/null +++ b/docs/website/docs/dlt-ecosystem/destinations/cratedb.md @@ -0,0 +1,127 @@ +--- +title: CrateDB +description: CrateDB `dlt` destination +keywords: [ cratedb, destination, data warehouse ] +--- + +# CrateDB + +## Install dlt with CrateDB + +**To install the DLT library with CrateDB dependencies:** + +```sh +pip install "dlt[cratedb]" +``` + +## Setup guide + +### 1. Initialize the dlt project + +Let's start by initializing a new `dlt` project as follows: + +```sh +dlt init chess cratedb +``` + +Because CrateDB currently only supports writing to its default `doc` schema with dlt, +please replace `dataset_name="chess_players_games_data"` with `dataset_name="doc"`. + +The `dlt init` command will initialize your pipeline with `chess` as the source and +`cratedb` as the destination. + +The above command generates several files and directories, including `.dlt/secrets.toml`. + +### 2. Configure credentials + +Next, set up the CrateDB credentials in the `.dlt/secrets.toml` file as shown below. +CrateDB is compatible with PostgreSQL and uses the `psycopg2` driver, like the +`postgres` destination. + +```toml +[destination.cratedb.credentials] +host = "localhost" # CrateDB server host. +port = 5432 # CrateDB PostgreSQL TCP protocol port, default is 5432. +username = "crate" # CrateDB username, default is usually "crate". +password = "" # CrateDB password, if any. +``` + +Alternatively, You can pass a database connection string as shown below. +```toml +destination.cratedb.credentials="postgres://crate:@localhost:5432/" +``` +Keep it at the top of your TOML file, before any section starts. +Because CrateDB uses `psycopg2`, using `postgres://` is the right choice. + +Use Docker or Podman to run an instance of CrateDB for evaluation purposes. +```shell +docker run --rm -it --name=cratedb --publish=4200:4200 --publish=5432:5432 crate:latest -Cdiscovery.type=single-node +``` + +## Data loading + +Data is loaded into CrateDB using the most efficient method depending on the data source: + +- For local files, the `psycopg2` library is used to directly load files into + CrateDB tables using the `INSERT` command. +- For files in remote storage like S3 or Azure Blob Storage, + CrateDB data loading functions are used to read the files and insert the data into tables. + +## Datasets + +CrateDB currently only supports working with its default schema `doc`. +So, please use `dataset_name="doc"`. + +## Supported file formats + +- [INSERT](../file-formats/insert-format.md) is the preferred format for both direct loading and staging. + +The `cratedb` destination has a few specific deviations from the default SQL destinations: + +- CrateDB does not support the `time` datatype. Time will be loaded to a `text` column. +- CrateDB does not support the `binary` datatype. Binary will be loaded to a `text` column. +- CrateDB can produce rounding errors under certain conditions when using the `float/double` datatype. + Make sure to use the `decimal` datatype if you can’t afford to have rounding errors. + +## Supported column hints + +CrateDB supports the following [column hints](../../general-usage/schema#tables-and-columns): + +- `primary_key` - marks the column as part of the primary key. Multiple columns can have this hint to create a composite primary key. + +## Staging support + +CrateDB supports Amazon S3, Google Cloud Storage, and Azure Blob Storage as file staging destinations. + +`dlt` will upload CSV or JSONL files to the staging location and use CrateDB data loading functions +to load the data directly from the staged files. + +Please refer to the filesystem documentation to learn how to configure credentials for the staging destinations: + +- [Amazon S3](./filesystem.md#aws-s3) +- [Azure Blob Storage](./filesystem.md#azure-blob-storage) + +To run a pipeline with staging enabled: + +```py +pipeline = dlt.pipeline( + pipeline_name='chess_pipeline', + destination='cratedb', + staging='filesystem', # add this to activate staging + dataset_name='chess_data' +) +``` + +### dbt support + +Integration with [dbt](../transformations/dbt/dbt.md) is generally supported via [dbt-cratedb2] +but not tested by us. + +### Syncing of `dlt` state + +This destination fully supports [dlt state sync](../../general-usage/state#syncing-state-with-destination). + + +[dbt-cratedb2]: https://pypi.org/project/dbt-cratedb2/ + + diff --git a/docs/website/sidebars.js b/docs/website/sidebars.js index 85dc5029e7..ef9142f62e 100644 --- a/docs/website/sidebars.js +++ b/docs/website/sidebars.js @@ -243,6 +243,7 @@ const sidebars = { 'dlt-ecosystem/destinations/mssql', 'dlt-ecosystem/destinations/synapse', 'dlt-ecosystem/destinations/clickhouse', + 'dlt-ecosystem/destinations/cratedb', 'dlt-ecosystem/destinations/filesystem', 'dlt-ecosystem/destinations/delta-iceberg', 'dlt-ecosystem/destinations/iceberg', diff --git a/pyproject.toml b/pyproject.toml index 92786b9b00..9f91a72a6a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,6 +113,7 @@ pyiceberg = { version = ">=0.9.0", optional = true } databricks-sdk = {version = ">=0.38.0", optional = true} pywin32 = {version = ">=306", optional = true, platform = "win32"} rich-argparse = "^1.6.0" +sqlalchemy-cratedb = { version = ">=0.42.0.dev2", optional = true } [tool.poetry.extras] gcp = ["grpcio", "google-cloud-bigquery", "db-dtypes", "gcsfs"] @@ -144,6 +145,7 @@ sql_database = ["sqlalchemy"] sqlalchemy = ["sqlalchemy", "alembic"] pyiceberg = ["pyiceberg", "pyarrow", "sqlalchemy"] postgis = ["psycopg2-binary", "psycopg2cffi"] +cratedb = ["psycopg2-binary", "sqlalchemy-cratedb"] [tool.poetry.scripts] dlt = "dlt.cli._dlt:_main" diff --git a/tests/load/cratedb/__init__.py b/tests/load/cratedb/__init__.py new file mode 100644 index 0000000000..a26d405a92 --- /dev/null +++ b/tests/load/cratedb/__init__.py @@ -0,0 +1,3 @@ +from tests.utils import skip_if_not_active + +skip_if_not_active("cratedb") diff --git a/tests/load/cratedb/compose.yml b/tests/load/cratedb/compose.yml new file mode 100644 index 0000000000..27d25e65bc --- /dev/null +++ b/tests/load/cratedb/compose.yml @@ -0,0 +1,8 @@ +services: + cratedb: + image: crate:latest + container_name: dlt_cratedb + restart: unless-stopped + ports: + - 4200:4200 + - 5432:5432 diff --git a/tests/load/cratedb/conftest.py b/tests/load/cratedb/conftest.py new file mode 100644 index 0000000000..7de32913d0 --- /dev/null +++ b/tests/load/cratedb/conftest.py @@ -0,0 +1,12 @@ +import pytest + +from dlt.destinations.impl.cratedb.configuration import CrateDbCredentials + + +@pytest.fixture +def credentials() -> CrateDbCredentials: + creds = CrateDbCredentials() + creds.username = "crate" + creds.password = "" + creds.host = "localhost" + return creds diff --git a/tests/load/cratedb/test_cratedb_client.py b/tests/load/cratedb/test_cratedb_client.py new file mode 100644 index 0000000000..0a9abe86d5 --- /dev/null +++ b/tests/load/cratedb/test_cratedb_client.py @@ -0,0 +1,81 @@ +import os +from typing import Iterator +import pytest + +from dlt.common.configuration.resolve import ( + resolve_configuration, + ConfigFieldMissingException, +) +from dlt.common.storages import FileStorage + +from dlt.destinations.impl.cratedb.configuration import CrateDbCredentials +from dlt.destinations.impl.cratedb.cratedb import CrateDbClient + +from tests.utils import TEST_STORAGE_ROOT, delete_test_storage +from tests.load.utils import yield_client_with_storage +from tests.common.configuration.utils import environment + + +@pytest.fixture +def file_storage() -> FileStorage: + return FileStorage(TEST_STORAGE_ROOT, file_type="b", makedirs=True) + + +@pytest.fixture(autouse=True) +def auto_delete_storage() -> None: + delete_test_storage() + + +@pytest.fixture(scope="function") +def client() -> Iterator[CrateDbClient]: + yield from yield_client_with_storage("cratedb") # type: ignore[misc] + + +def test_cratedb_credentials_defaults() -> None: + pg_cred = CrateDbCredentials() + assert pg_cred.port == 5432 + assert pg_cred.connect_timeout == 15 + assert pg_cred.client_encoding is None + assert CrateDbCredentials.__config_gen_annotations__ == ["port", "connect_timeout"] + # port should be optional + resolve_configuration(pg_cred, explicit_value="cratedb://loader:loader@localhost/DLT_DATA") + assert pg_cred.port == 5432 + # preserve case + assert pg_cred.database is None + + +def test_cratedb_credentials_native_value(environment) -> None: + with pytest.raises(ConfigFieldMissingException): + resolve_configuration( + CrateDbCredentials(), explicit_value="cratedb://loader@localhost/dlt_data" + ) + # set password via env + os.environ["CREDENTIALS__PASSWORD"] = "pass" + c = resolve_configuration( + CrateDbCredentials(), explicit_value="cratedb://loader@localhost/dlt_data" + ) + assert c.is_resolved() + assert c.password == "pass" + # but if password is specified - it is final + c = resolve_configuration( + CrateDbCredentials(), + explicit_value="cratedb://loader:loader@localhost/dlt_data", + ) + assert c.is_resolved() + assert c.password == "loader" + + c = CrateDbCredentials("cratedb://loader:loader@localhost/dlt_data") + assert c.password == "loader" + assert c.database is None + + +def test_cratedb_query_params() -> None: + dsn_full = ( + "cratedb://loader:pass@localhost:5432/dlt_data?client_encoding=utf-8&connect_timeout=600" + ) + dsn_nodb = "cratedb://loader:pass@localhost:5432?client_encoding=utf-8&connect_timeout=600" + csc = CrateDbCredentials() + csc.parse_native_representation(dsn_full) + assert csc.connect_timeout == 600 + assert csc.client_encoding == "utf-8" + assert csc.to_native_representation() == dsn_nodb diff --git a/tests/load/cratedb/test_cratedb_table_builder.py b/tests/load/cratedb/test_cratedb_table_builder.py new file mode 100644 index 0000000000..d63f6ee7da --- /dev/null +++ b/tests/load/cratedb/test_cratedb_table_builder.py @@ -0,0 +1,193 @@ +from copy import deepcopy +from typing import cast + +import pytest +import sqlfluff + +from dlt.common.exceptions import TerminalValueError +from dlt.common.schema import Schema, utils +from dlt.common.utils import uniq_id +from dlt.destinations import cratedb +from dlt.destinations.impl.cratedb.configuration import ( + CrateDbClientConfiguration, + CrateDbCredentials, +) +from dlt.destinations.impl.cratedb.cratedb import ( + CrateDbClient, +) +from dlt.destinations.impl.postgres.postgres import PostgresClient +from tests.cases import ( + TABLE_UPDATE, + TABLE_UPDATE_ALL_INT_PRECISIONS, +) + + +@pytest.fixture +def client(empty_schema: Schema, credentials: CrateDbCredentials) -> CrateDbClient: + return create_client(empty_schema, credentials=credentials) + + +@pytest.fixture +def cs_client(empty_schema: Schema, credentials: CrateDbCredentials) -> CrateDbClient: + # change normalizer to case sensitive + empty_schema._normalizers_config["names"] = "tests.common.cases.normalizers.title_case" + empty_schema.update_normalizers() + return create_client(empty_schema, credentials=credentials) + + +def create_client(empty_schema: Schema, credentials: CrateDbCredentials) -> CrateDbClient: + # return client without opening connection + config = CrateDbClientConfiguration(credentials=credentials)._bind_dataset_name( + dataset_name="test_" + uniq_id() + ) + return cast(CrateDbClient, cratedb().client(empty_schema, config)) + + +def test_create_table(client: CrateDbClient) -> None: + # make sure we are in case insensitive mode + assert client.capabilities.generates_case_sensitive_identifiers() is False + # check if dataset name is properly folded + assert client.sql_client.dataset_name == client.config.dataset_name # identical to config + assert ( + client.sql_client.staging_dataset_name + == client.config.staging_dataset_name_layout % client.config.dataset_name + ) + # non existing table + sql = client._get_table_update_sql("event_test_table", TABLE_UPDATE, False)[0] + # FIXME: SQLFluff does not support CrateDB yet, failing on its special data types. + # sqlfluff.parse(sql, dialect="postgres") + qualified_name = client.sql_client.make_qualified_table_name("event_test_table") + assert f"CREATE TABLE {qualified_name}" in sql + assert '"col1" bigint NOT NULL' in sql + assert '"col2" double precision NOT NULL' in sql + assert '"col3" boolean NOT NULL' in sql + assert '"col4" timestamp with time zone NOT NULL' in sql + assert '"col5" varchar' in sql + assert '"col6" numeric(38,9) NOT NULL' in sql + assert '"col7" text' in sql + assert '"col8" numeric(156,78)' in sql + assert '"col9" object(dynamic) NOT NULL' in sql + assert '"col10" date NOT NULL' in sql + assert '"col11" time without time zone NOT NULL' in sql + assert '"col1_precision" smallint NOT NULL' in sql + assert '"col4_precision" timestamp with time zone NOT NULL' in sql + assert '"col5_precision" varchar(25)' in sql + assert '"col6_precision" numeric(6,2) NOT NULL' in sql + assert '"col7_precision" text' in sql + assert '"col11_precision" time without time zone NOT NULL' in sql + + +def test_create_table_all_precisions(client: CrateDbClient) -> None: + # 128 bit integer will fail + table_update = list(TABLE_UPDATE_ALL_INT_PRECISIONS) + with pytest.raises(TerminalValueError) as tv_ex: + sql = client._get_table_update_sql("event_test_table", table_update, False)[0] + assert "128" in str(tv_ex.value) + + # remove col5 HUGEINT which is last + table_update.pop() + sql = client._get_table_update_sql("event_test_table", table_update, False)[0] + sqlfluff.parse(sql, dialect="postgres") + assert '"col1_int" smallint ' in sql + assert '"col2_int" smallint ' in sql + assert '"col3_int" integer ' in sql + assert '"col4_int" bigint ' in sql + + +def test_alter_table(client: CrateDbClient) -> None: + # existing table has no columns + sql = client._get_table_update_sql("event_test_table", TABLE_UPDATE, True)[0] + # FIXME: SQLFluff does not support CrateDB yet, failing on its special data types. + # sqlfluff.parse(sql, dialect="postgres") + canonical_name = client.sql_client.make_qualified_table_name("event_test_table") + # must have several ALTER TABLE statements + assert sql.count(f"ALTER TABLE {canonical_name}\nADD COLUMN") == 1 + assert "event_test_table" in sql + assert '"col1" bigint NOT NULL' in sql + assert '"col2" double precision NOT NULL' in sql + assert '"col3" boolean NOT NULL' in sql + assert '"col4" timestamp with time zone NOT NULL' in sql + assert '"col5" varchar' in sql + assert '"col6" numeric(38,9) NOT NULL' in sql + assert '"col7" text' in sql + assert '"col8" numeric(156,78)' in sql + assert '"col9" object(dynamic) NOT NULL' in sql + assert '"col10" date NOT NULL' in sql + assert '"col11" time without time zone NOT NULL' in sql + assert '"col1_precision" smallint NOT NULL' in sql + assert '"col4_precision" timestamp with time zone NOT NULL' in sql + assert '"col5_precision" varchar(25)' in sql + assert '"col6_precision" numeric(6,2) NOT NULL' in sql + assert '"col7_precision" text' in sql + assert '"col11_precision" time without time zone NOT NULL' in sql + + +def test_create_table_with_hints( + client: CrateDbClient, empty_schema: Schema, credentials: CrateDbCredentials +) -> None: + mod_update = deepcopy(TABLE_UPDATE) + # timestamp + mod_update[0]["primary_key"] = True + mod_update[0]["sort"] = True + mod_update[1]["unique"] = True + mod_update[4]["parent_key"] = True + sql = client._get_table_update_sql("event_test_table", mod_update, False)[0] + # FIXME: SQLFluff does not support CrateDB yet, failing on its special data types. + # sqlfluff.parse(sql, dialect="postgres") + assert '"col1" bigint NOT NULL' in sql + assert '"col2" double precision NOT NULL' in sql + assert '"col5" varchar ' in sql + # no hints + assert '"col3" boolean NOT NULL' in sql + assert '"col4" timestamp with time zone NOT NULL' in sql + + # same thing without indexes + client = cast( + CrateDbClient, + cratedb().client( + empty_schema, + CrateDbClientConfiguration( + create_indexes=False, + credentials=credentials, + )._bind_dataset_name(dataset_name="test_" + uniq_id()), + ), + ) + sql = client._get_table_update_sql("event_test_table", mod_update, False)[0] + # FIXME: SQLFluff does not support CrateDB yet, failing on its special data types. + # sqlfluff.parse(sql, dialect="postgres") + assert '"col2" double precision NOT NULL' in sql + + +def test_create_table_case_sensitive(cs_client: CrateDbClient) -> None: + # did we switch to case sensitive + assert cs_client.capabilities.generates_case_sensitive_identifiers() is True + # check dataset names + assert cs_client.sql_client.dataset_name.startswith("Test") + with cs_client.with_staging_dataset(): + assert cs_client.sql_client.dataset_name.endswith("staginG") + assert cs_client.sql_client.staging_dataset_name.endswith("staginG") + # check tables + cs_client.schema.update_table( + utils.new_table("event_test_table", columns=deepcopy(TABLE_UPDATE)) + ) + sql = cs_client._get_table_update_sql( + "Event_test_tablE", + list(cs_client.schema.get_table_columns("Event_test_tablE").values()), + False, + )[0] + # FIXME: SQLFluff does not support CrateDB yet, failing on its special data types. + # sqlfluff.parse(sql, dialect="postgres") + # everything capitalized + assert cs_client.sql_client.fully_qualified_dataset_name(escape=False)[0] == "T" # Test + # every line starts with "Col" + for line in sql.split("\n")[1:]: + assert line.startswith('"Col') + + +def test_create_dlt_table(client: CrateDbClient) -> None: + # non existing table + sql = client._get_table_update_sql("_dlt_version", TABLE_UPDATE, False)[0] + # FIXME: SQLFluff does not support CrateDB yet, failing on its special data types. + # sqlfluff.parse(sql, dialect="postgres") + qualified_name = client.sql_client.make_qualified_table_name("_dlt_version") + assert f"CREATE TABLE IF NOT EXISTS {qualified_name}" in sql diff --git a/tests/utils.py b/tests/utils.py index 34d6a3a438..7eb5c2e65b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -48,6 +48,7 @@ # destination constants IMPLEMENTED_DESTINATIONS = { "athena", + "cratedb", "duckdb", "bigquery", "redshift",