Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dlt/destinations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dlt.destinations.impl.postgres.factory import postgres
from dlt.destinations.impl.snowflake.factory import snowflake
from dlt.destinations.impl.filesystem.factory import filesystem
from dlt.destinations.impl.cratedb.factory import cratedb
from dlt.destinations.impl.duckdb.factory import duckdb
from dlt.destinations.impl.dummy.factory import dummy
from dlt.destinations.impl.mssql.factory import mssql
Expand All @@ -23,6 +24,7 @@
"postgres",
"snowflake",
"filesystem",
"cratedb",
"duckdb",
"dummy",
"mssql",
Expand Down
Empty file.
74 changes: 74 additions & 0 deletions dlt/destinations/impl/cratedb/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# CrateDB destination adapter backlog

## Iteration +1

- Check if the Redshift literal escaper is the right choice for all
data types it is handling.

- Currently only works with CrateDB's default dataset / schema `doc`,
that's why out-of-the-box `dlt init chess cratedb` will fail like
`ERROR: Schema 'chess_players_games_data' unknown`. Why!?
Thoughts: Most probably, dlt introspects the available schemas,
doesn't find it in CrateDB, and fails. It doesn't know that CrateDB
would create the schema transparently, without needing an explicit
`CREATE SCHEMA ...` operation.

- ERROR: mismatched input 'ABORT' expecting {....

- Documentation: How JSONB/ARRAYs are mapped to CrateDB

- The `merge` followup job is currently defunct with CrateDB.
It has been replaced by a `replace" [sic!] job.

- Troubleshooting: Workloads specifying certain write dispositions currently
need to be invoked twice. On the first invocation, this happens:

dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package 1750628689.026369 with exception:

<class 'dlt.common.destination.exceptions.DestinationSchemaTampered'>
Schema frankfurter content was changed - by a loader or by destination code - from the moment it was retrieved by load package. Such schema cannot reliably be updated nor saved. Current version hash: ZUJEaD/gSBvY9vbgx3urdi93mJppp7CMlRaLuJkxJCs= != stored version hash Cocqcq+4Qzo8zLOc9R3uYjhjLZr6D/QwgS03TtFG2wI=. If you are using destination client directly, without storing schema in load package, you should first save it into schema storage. You can also use schema._bump_version() in test code to remove modified flag.

- Is it possible to use an `adapter` to better support CrateDB's special
data types? For example, full round-tripping using `sys.summits` isn't
possible just yet, due to the `coordinates` column, for example when
selecting DuckDB.

- Is it possible to provide other direct loaders from the `sqlalchemy`
adapter?

- Addressing per `cratedb://` will raise errors, e.g. in `ingestr`:
`Following fields are missing: ['password', 'username', 'host'] in configuration with spec CrateDbCredentials`

- Add staging support
> CrateDB supports Amazon S3, Google Cloud Storage, and Azure Blob
> Storage as file staging destinations.

## Verify

- > Data is loaded into CrateDB using the most efficient method
> depending on the data source.

- > CrateDB does not support the `binary` datatype. Binary will be loaded to a `text` column.

## Iteration +2

- The `UNIQUE` constraint is dearly missing. Is it possible to emulate?
Otherwise, when loading data multiple times, duplicates will happen.
```
cr> select * from users;
+----+-------+-------------------+----------------+
| id | name | __dlt_load_id | __dlt_id |
+----+-------+-------------------+----------------+
| 2 | Bob | 1749406972.57687 | NFPmX5Pw4gZ7fA |
| 1 | Alice | 1749406985.809837 | 6/Xoe8jhlBUbAQ |
| 2 | Bob | 1749406985.809837 | VnP4S8AsQ/ujOg |
| 1 | Alice | 1749406972.57687 | 4HCWs9jqfwTyTQ |
+----+-------+-------------------+----------------+
SELECT 4 rows in set (0.003 sec)
```

- Provide Ibis dataset access, see
tests/load/test_read_interfaces.py

- Software tests about geospatial concerns have been removed.
See `postgres` adapter for bringing them back.
27 changes: 27 additions & 0 deletions dlt/destinations/impl/cratedb/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import dataclasses
from typing import Final
from dlt.common.configuration import configspec
from dlt.destinations.impl.postgres.configuration import (
PostgresCredentials,
PostgresClientConfiguration,
)


@configspec(init=False)
class CrateDbCredentials(PostgresCredentials):
drivername: Final[str] = dataclasses.field( # type: ignore[misc]
default="postgres", init=False, repr=False, compare=False
)


# CrateDB does not support databases, just schemas.
# In dlt, schemas are conveyed by `dataset_name`?
del CrateDbCredentials.__dataclass_fields__["database"]


@configspec
class CrateDbClientConfiguration(PostgresClientConfiguration):
destination_type: Final[str] = dataclasses.field( # type: ignore[misc]
default="cratedb", init=False, repr=False, compare=False
)
credentials: CrateDbCredentials = None
138 changes: 138 additions & 0 deletions dlt/destinations/impl/cratedb/cratedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import logging
from typing import Dict, Any, Sequence, List

from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.destination.client import (
PreparedTableSchema,
LoadJob,
FollowupJobRequest,
)
from dlt.common.schema import TColumnHint, Schema
from dlt.destinations.impl.cratedb.configuration import CrateDbClientConfiguration
from dlt.destinations.impl.cratedb.sql_client import CrateDbSqlClient
from dlt.destinations.impl.postgres.postgres import PostgresClient
from dlt.destinations.insert_job_client import InsertValuesJobClient
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.sql_jobs import SqlStagingReplaceFollowupJob

# FIXME: The `UNIQUE` constraint is dearly missing.
# When loading data multiple times, duplicates will happen.
HINT_TO_CRATEDB_ATTR: Dict[TColumnHint, str] = {"unique": ""}


logger = logging.getLogger(__name__)


class CrateDbStagingReplaceJob(SqlStagingReplaceFollowupJob):
@classmethod
def generate_sql(
cls,
table_chain: Sequence[PreparedTableSchema],
sql_client: SqlClientBase[Any],
) -> List[str]:
"""
CrateDB uses `ALTER CLUSTER SWAP TABLE`.

-- https://github.com/crate/crate/issues/14833
"""
sql: List[str] = []
for table in table_chain:
with sql_client.with_staging_dataset():
staging_table_name = sql_client.make_qualified_table_name(table["name"])
table_name = sql_client.make_qualified_table_name(table["name"])
sql.extend(
(
# Drop destination table.
f"DROP TABLE IF EXISTS {table_name};",
# Recreate destination table, because `ALTER CLUSTER SWAP TABLE` needs it.
(
f"CREATE TABLE IF NOT EXISTS {table_name} AS SELECT * FROM"
f" {staging_table_name} WHERE 1 = 0;"
),
# Move the staging table to the destination schema.
f"ALTER CLUSTER SWAP TABLE {staging_table_name} TO {table_name};",
# CrateDB needs to flush writes.
f"REFRESH TABLE {table_name};",
# Recreate staging table not needed with CrateDB, because
# `ALTER CLUSTER SWAP TABLE` does not remove the source table.
(
f"CREATE TABLE IF NOT EXISTS {staging_table_name} AS SELECT * FROM"
f" {table_name} WHERE 1 = 0;"
),
f"REFRESH TABLE {staging_table_name};",
)
)
return sql


class CrateDbClient(PostgresClient):
def __init__(
self,
schema: Schema,
config: CrateDbClientConfiguration,
capabilities: DestinationCapabilitiesContext,
) -> None:
sql_client = CrateDbSqlClient(
config.normalize_dataset_name(schema),
config.normalize_staging_dataset_name(schema),
config.credentials,
capabilities,
)
InsertValuesJobClient.__init__(self, schema, config, sql_client)
self.config: CrateDbClientConfiguration = config
self.sql_client: CrateDbSqlClient = sql_client
self.active_hints = HINT_TO_CRATEDB_ATTR if self.config.create_indexes else {}
self.type_mapper = self.capabilities.get_type_mapper()

def create_load_job(
self,
table: PreparedTableSchema,
file_path: str,
load_id: str,
restore: bool = False,
) -> LoadJob:
"""
CrateDB only supports the "insert values" paradigm.
"""
job = InsertValuesJobClient.create_load_job(self, table, file_path, load_id, restore)
if job is not None:
return job
return None

def _create_merge_followup_jobs(
self, table_chain: Sequence[PreparedTableSchema]
) -> List[FollowupJobRequest]:
"""
CrateDB currently does not support "merge" followup jobs.
-- https://github.com/crate-workbench/dlt/issues/4

Workaround: Redirect the "merge" job to use a "replace" job instead.
"""
return [CrateDbStagingReplaceJob.from_table_chain(table_chain, self.sql_client)]

def complete_load(self, load_id: str) -> None:
"""
Intercept to invoke a `REFRESH TABLE ...` statement.
"""
result = super().complete_load(load_id=load_id)
table_name = self.sql_client.make_qualified_table_name(self.schema.loads_table_name)
self.sql_client.execute_sql(f"REFRESH TABLE {table_name}")
return result

def _commit_schema_update(self, schema: Schema, schema_str: str) -> None:
"""
Intercept to invoke a `REFRESH TABLE ...` statement.
"""
result = super()._commit_schema_update(schema=schema, schema_str=schema_str)
table_name = self.sql_client.make_qualified_table_name(self.schema.version_table_name)
self.sql_client.execute_sql(f"REFRESH TABLE {table_name}")
return result

def _delete_schema_in_storage(self, schema: Schema) -> None:
"""
Intercept to invoke a `REFRESH TABLE ...` statement.
"""
result = super()._delete_schema_in_storage(schema=schema)
table_name = self.sql_client.make_qualified_table_name(self.schema.version_table_name)
self.sql_client.execute_sql(f"REFRESH TABLE {table_name}")
return result
97 changes: 97 additions & 0 deletions dlt/destinations/impl/cratedb/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from copy import deepcopy
from typing import Type, TYPE_CHECKING, List, Any, Dict

from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.destination.typing import PreparedTableSchema
from dlt.common.schema.typing import TColumnSchema
from dlt.destinations.impl.cratedb.configuration import CrateDbClientConfiguration
from dlt.destinations.impl.cratedb.utils import escape_cratedb_literal
from dlt.destinations.impl.postgres.factory import PostgresTypeMapper, postgres

if TYPE_CHECKING:
from dlt.destinations.impl.cratedb.cratedb import CrateDbClient


class CrateDbTypeMapper(PostgresTypeMapper):
"""
Adjust type mappings for CrateDB.

- CrateDB uses `object(dynamic)` instead of `json` or `jsonb`.
- CrateDB does not support `timestamp(6) without time zone`.
- CrateDB does not support `time` for storing.
- CrateDB does not support `binary` or `bytea`.
"""

def __new__(cls, *args: List[Any], **kwargs: Dict[str, Any]) -> "CrateDbTypeMapper":
cls.sct_to_unbound_dbt = deepcopy(PostgresTypeMapper.sct_to_unbound_dbt)
cls.sct_to_unbound_dbt["json"] = "object(dynamic)"
cls.sct_to_unbound_dbt["binary"] = "text"

cls.sct_to_dbt = deepcopy(PostgresTypeMapper.sct_to_dbt)
cls.sct_to_dbt["timestamp"] = "timestamp with time zone"
del cls.sct_to_dbt["time"]

cls.dbt_to_sct = deepcopy(PostgresTypeMapper.dbt_to_sct)
cls.dbt_to_sct["jsonb"] = "object(dynamic)" # type: ignore[assignment]
cls.dbt_to_sct["bytea"] = "text"

return super().__new__(cls)

def to_db_datetime_type(
self,
column: TColumnSchema,
table: PreparedTableSchema = None,
) -> str:
"""
CrateDB does not support `timestamp(6) without time zone`.
To not render the SQL clause like this, nullify the `precision` attribute.
"""
column["precision"] = None
return super().to_db_datetime_type(column, table)


class cratedb(postgres, Destination[CrateDbClientConfiguration, "CrateDbClient"]):
spec = CrateDbClientConfiguration # type: ignore[assignment]

def _raw_capabilities(self) -> DestinationCapabilitiesContext:
"""
Tune down capabilities for CrateDB.
"""
caps = super()._raw_capabilities()

# CrateDB does not support transactions.
caps.supports_transactions = False
caps.supports_ddl_transactions = False

# CrateDB does not support `TRUNCATE TABLE`, use `DELETE FROM` instead.
caps.supports_truncate_command = False

# CrateDB's type mapping needs adjustments compared to PostgreSQL.
caps.type_mapper = CrateDbTypeMapper

# TODO: Provide a dedicated dialect for SQLGlot.
caps.sqlglot_dialect = "postgres"

# CrateDB needs a slightly adjusted escaping of literals.
# TODO: Escaping might need further adjustments, to be explored using integration tests.
caps.escape_literal = escape_cratedb_literal

# CrateDB does not support direct data loading using advanced formats.
# TODO: Explore adding more formats for staged imports.
caps.preferred_loader_file_format = "insert_values"
caps.supported_loader_file_formats = ["insert_values"]
caps.loader_file_format_selector = None

return caps

@property
def client_class(self) -> Type["CrateDbClient"]:
"""
Provide a different client for CrateDB.
"""
from dlt.destinations.impl.cratedb.cratedb import CrateDbClient

return CrateDbClient


cratedb.register()
14 changes: 14 additions & 0 deletions dlt/destinations/impl/cratedb/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# CrateDB destination adapter

## What's inside

- The `cratedb` adapter is heavily based on the `postgres` adapter.
- The `CrateDbSqlClient` deviates from the original `Psycopg2SqlClient` by
accounting for [CRATEDB-15161] per `SystemColumnWorkaround`.

## Backlog

A few items need to be resolved, see [backlog](./backlog.md).


[CRATEDB-15161]: https://github.com/crate/crate/issues/15161
Loading