Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased
- Dependencies: Updated requirements across the board
- Removed workaround for `_`-prefixed column names.
The package now requires CrateDB 6.2 or higher.

## v0.0.3 - 2025-05-23
- Dependencies: Updated to `sqlalchemy-cratedb==0.42.0.dev2`
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ This project and repository provides:

- The source code of the `cratedb-fivetran-destination` package, which implements
the [CrateDB destination adapter for Fivetran]. It works with both [CrateDB] and
[CrateDB Cloud].
[CrateDB Cloud]. Operating the package successfully needs CrateDB 6.2 or higher.

- The public [issue tracker] for this project. Please use it
to report problems, and stay informed about their resolutions.
Expand Down
166 changes: 0 additions & 166 deletions src/cratedb_fivetran_destination/dictx.py

This file was deleted.

6 changes: 3 additions & 3 deletions src/cratedb_fivetran_destination/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from attrs import define
from toolz import dissoc

from cratedb_fivetran_destination.model import FieldMap, SqlBag, TableInfo, TypeMap
from cratedb_fivetran_destination.model import SqlBag, TableInfo, TypeMap
from fivetran_sdk import common_pb2

logger = logging.getLogger()
Expand Down Expand Up @@ -112,7 +112,7 @@ def to_sql(self) -> SqlBag:
# Translate "columns changed" instructions into migration operation
# based on altering and copying using `UPDATE ... SET ...`.
for column in self.columns_changed:
column_name = FieldMap.to_cratedb(column.name)
column_name = column.name
column_name_temporary = column_name + "_alter_tmp"
type_ = TypeMap.to_cratedb(column.type, column.params)
sqlbag.add(
Expand All @@ -137,7 +137,7 @@ def to_sql(self) -> SqlBag:

@staticmethod
def column_definition(column):
field = FieldMap.to_cratedb(column.name)
field = column.name
type_ = TypeMap.to_cratedb(column.type, column.params)
return f"{field} {type_}"

Expand Down
10 changes: 4 additions & 6 deletions src/cratedb_fivetran_destination/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

from cratedb_fivetran_destination.engine import AlterTableInplaceStatements, Processor
from cratedb_fivetran_destination.model import (
FieldMap,
FivetranKnowledge,
FivetranTable,
TableInfo,
Expand Down Expand Up @@ -83,7 +82,7 @@ def CreateTable(self, request, context):
fivetran_column: common_pb2.Column
for fivetran_column in request.table.columns:
db_column: sa.Column = sa.Column()
db_column.name = FieldMap.to_cratedb(fivetran_column.name)
db_column.name = fivetran_column.name
db_column.type = TypeMap.to_cratedb(fivetran_column.type)
db_column.primary_key = fivetran_column.primary_key
if db_column.primary_key:
Expand All @@ -92,8 +91,8 @@ def CreateTable(self, request, context):
# db_column.params(fivetran_column.params) # noqa: ERA001
table.append_column(db_column)

# Need to add the `__fivetran_deleted` column manually?
col: sa.Column = sa.Column(name="__fivetran_deleted")
# Need to add the `_fivetran_deleted` column manually? Why?
col: sa.Column = sa.Column(name="_fivetran_deleted")
col.type = sa.Boolean()
table.append_column(col)

Expand Down Expand Up @@ -238,7 +237,7 @@ def DescribeTable(self, request, context):
sa_column: sa.Column
for sa_column in sa_table.columns:
ft_column = common_pb2.Column(
name=FieldMap.to_fivetran(sa_column.name),
name=sa_column.name,
type=TypeMap.to_fivetran(sa_column.type),
primary_key=sa_column.primary_key,
)
Expand All @@ -261,7 +260,6 @@ def _files_to_records(request, files: t.List[str]):
logger.info(f"Decrypting file: {filename}")
for record in read_csv.decrypt_file(filename, value):
# Rename keys according to field map.
record = FieldMap.rename_keys(record)
FivetranKnowledge.replace_values(record)
yield record

Expand Down
41 changes: 0 additions & 41 deletions src/cratedb_fivetran_destination/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,10 @@
from attrs import define
from sqlalchemy_cratedb import ObjectType

from cratedb_fivetran_destination.dictx import OrderedDictX
from fivetran_sdk import common_pb2
from fivetran_sdk.common_pb2 import DataType


class FieldMap:
"""
Manage special knowledge about CrateDB field names.
"""

# Map special column names, because CrateDB does not allow `_` prefixes.
field_map = {
"_fivetran_id": "__fivetran_id",
"_fivetran_synced": "__fivetran_synced",
"_fivetran_deleted": "__fivetran_deleted",
}

@classmethod
def rename_keys(cls, record):
"""
Rename keys according to the field map.
"""
record = OrderedDictX(record)
for key, value in cls.field_map.items():
if key in record:
record.rename_key(key, value)
return record

@classmethod
def to_cratedb(cls, fivetran_field):
"""
Convert a Fivetran field name into a CrateDB field name.
"""
return cls.field_map.get(fivetran_field, fivetran_field)

@classmethod
def to_fivetran(cls, cratedb_field):
"""
Convert a CrateDB field name into a Fivetran field name.
"""
# TODO: Compute reverse map only once.
reverse_map = dict(zip(cls.field_map.values(), cls.field_map.keys()))
return reverse_map.get(cratedb_field, cratedb_field)


class TypeMap:
"""
Map Fivetran types to CrateDB types and back.
Expand Down
12 changes: 6 additions & 6 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ def starter():
json={"count": 42, "foo": "bar"},
xml="XML",
naive_time=86400000,
__fivetran_synced=mock.ANY,
__fivetran_id="zyx-987-abc",
__fivetran_deleted=None,
_fivetran_synced=mock.ANY,
_fivetran_id="zyx-987-abc",
_fivetran_deleted=None,
)


Expand Down Expand Up @@ -158,9 +158,9 @@ def test_integration_cratedb(capfd, services, engine):
sa.Column("json", ObjectType),
sa.Column("xml", sa.String),
sa.Column("naive_time", UserDefinedType),
sa.Column("__fivetran_synced", UserDefinedType),
sa.Column("__fivetran_id", sa.String),
sa.Column("__fivetran_deleted", sa.Boolean),
sa.Column("_fivetran_synced", UserDefinedType),
sa.Column("_fivetran_id", sa.String),
sa.Column("_fivetran_deleted", sa.Boolean),
schema="tester_reference",
quote_schema=True,
)
Expand Down
Loading