diff --git a/CHANGES.md b/CHANGES.md index 72057dc..c2f590c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,8 @@ ## Unreleased - Dependencies: Updated requirements across the board +- Removed workaround for `_`-prefixed column names. + The package now requires CrateDB 6.2 or higher. ## v0.0.3 - 2025-05-23 - Dependencies: Updated to `sqlalchemy-cratedb==0.42.0.dev2` diff --git a/README.md b/README.md index 832f45c..61970ea 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,7 @@ This project and repository provides: - The source code of the `cratedb-fivetran-destination` package, which implements the [CrateDB destination adapter for Fivetran]. It works with both [CrateDB] and - [CrateDB Cloud]. + [CrateDB Cloud]. Operating the package successfully needs CrateDB 6.2 or higher. - The public [issue tracker] for this project. Please use it to report problems, and stay informed about their resolutions. diff --git a/src/cratedb_fivetran_destination/dictx.py b/src/cratedb_fivetran_destination/dictx.py deleted file mode 100644 index 66bda77..0000000 --- a/src/cratedb_fivetran_destination/dictx.py +++ /dev/null @@ -1,166 +0,0 @@ -""" -OrderedDictX by Zuzu Corneliu. - -For the keeping of order case (the other one is trivial, remove old and add new -one): I was not satisfied with the ordered-dictionary needing reconstruction -(at least partially), obviously for efficiency reasons, so I've put together a -class (OrderedDictX) that extends OrderedDict and allows you to do key changes -efficiently, i.e. in O(1) complexity. The implementation can also be adjusted -for the now-ordered built-in dict class. - -It uses 2 extra dictionaries to remap the changed keys ("external" - i.e. as -they appear externally to the user) to the ones in the underlying OrderedDict -("internal") - the dictionaries will only hold keys that were changed so as -long as no key changing is done they will be empty. - -As expected, the splicing method is extremely slow (didn't expect it to be that -much slower either though) and uses a lot of memory, and the O(N) solution of -@Ashwini Chaudhary (bug-fixed though, del also needed) is also slower, 17X -times in this example. - -Of course, this solution being O(1), compared to the O(N) OrderedDictRaymond -the time difference becomes much more apparent as the dictionary size -increases, e.g. for 5 times more elements (100000), the O(N) is 100X slower. - -https://stackoverflow.com/questions/16475384/rename-a-dictionary-key/75115645#75115645 -""" - -from collections import OrderedDict - - -class OrderedDictX(OrderedDict): - def __init__(self, *args, **kwargs): - # Mappings from new->old (ext2int), old->new (int2ext). - # Only the keys that are changed (internal key - # doesn't match what the user sees) are contained. - self._keys_ext2int = OrderedDict() - self._keys_int2ext = OrderedDict() - self.update(*args, **kwargs) - - def rename_key(self, k_old, k_new): - # Validate that the old key is part of the dict - if not self.__contains__(k_old): - raise KeyError(f"Cannot rename key {k_old} to {k_new}: {k_old} not existing in dict") - - # Return if no changing is actually to be done - if len(OrderedDict.fromkeys([k_old, k_new])) == 1: - return - - # Validate that the new key would not conflict with another one - if self.__contains__(k_new): - raise KeyError(f"Cannot rename key {k_old} to {k_new}: {k_new} already in dict") - - # Change the key using internal dicts mechanism - if k_old in self._keys_ext2int: - # Revert change temporarily - k_old_int = self._keys_ext2int[k_old] - del self._keys_ext2int[k_old] - k_old = k_old_int - # Check if new key matches the internal key - if len(OrderedDict.fromkeys([k_old, k_new])) == 1: - del self._keys_int2ext[k_old] - return - - # Finalize key change - self._keys_ext2int[k_new] = k_old - self._keys_int2ext[k_old] = k_new - - def __contains__(self, k) -> bool: - if k in self._keys_ext2int: - return True - if not super().__contains__(k): - return False - return k not in self._keys_int2ext - - def __getitem__(self, k): - if not self.__contains__(k): - # Intentionally raise KeyError in ext2int - return self._keys_ext2int[k] - return super().__getitem__(self._keys_ext2int.get(k, k)) - - def __setitem__(self, k, v): - if k in self._keys_ext2int: - return super().__setitem__(self._keys_ext2int[k], v) - # If the key exists in the internal state but was renamed to a k_ext, - # employ this trick: make it such that it appears as if k_ext has also been renamed to k - if k in self._keys_int2ext: - k_ext = self._keys_int2ext[k] - self._keys_ext2int[k] = k_ext - k = k_ext - return super().__setitem__(k, v) - - def __delitem__(self, k): - if not self.__contains__(k): - # Intentionally raise KeyError in ext2int - del self._keys_ext2int[k] - if k in self._keys_ext2int: - k_int = self._keys_ext2int[k] - del self._keys_ext2int[k] - del self._keys_int2ext[k_int] - k = k_int - return super().__delitem__(k) - - def __iter__(self): - yield from self.keys() - - def __reversed__(self): - for k in reversed(super().keys()): - yield self._keys_int2ext.get(k, k) - - def __eq__(self, other: object) -> bool: - if not isinstance(other, dict): - return False - if len(self) != len(other): - return False - for (k, v), (k_other, v_other) in zip(self.items(), other.items()): - if k != k_other or v != v_other: - return False - return True - - def update(self, *args, **kwargs): - for k, v in OrderedDict(*args, **kwargs).items(): - self.__setitem__(k, v) - - def popitem(self, last=True) -> tuple: - if not last: - k = next(iter(self.keys())) - else: - k = next(iter(reversed(self.keys()))) - v = self.__getitem__(k) - self.__delitem__(k) - return k, v - - class OrderedDictXKeysView: - def __init__(self, odx: "OrderedDictX", orig_keys): - self._odx = odx - self._orig_keys = orig_keys - - def __iter__(self): - for k in self._orig_keys: - yield self._odx._keys_int2ext.get(k, k) - - def __reversed__(self): - for k in reversed(self._orig_keys): - yield self._odx._keys_int2ext.get(k, k) - - class OrderedDictXItemsView: - def __init__(self, odx: "OrderedDictX", orig_items): - self._odx = odx - self._orig_items = orig_items - - def __iter__(self): - for k, v in self._orig_items: - yield self._odx._keys_int2ext.get(k, k), v - - def __reversed__(self): - for k, v in reversed(self._orig_items): - yield self._odx._keys_int2ext.get(k, k), v - - def keys(self): - return self.OrderedDictXKeysView(self, super().keys()) - - def items(self): - return self.OrderedDictXItemsView(self, super().items()) - - def copy(self): - return OrderedDictX(self.items()) diff --git a/src/cratedb_fivetran_destination/engine.py b/src/cratedb_fivetran_destination/engine.py index 0ba689a..7de4932 100644 --- a/src/cratedb_fivetran_destination/engine.py +++ b/src/cratedb_fivetran_destination/engine.py @@ -6,7 +6,7 @@ from attrs import define from toolz import dissoc -from cratedb_fivetran_destination.model import FieldMap, SqlBag, TableInfo, TypeMap +from cratedb_fivetran_destination.model import SqlBag, TableInfo, TypeMap from fivetran_sdk import common_pb2 logger = logging.getLogger() @@ -112,7 +112,7 @@ def to_sql(self) -> SqlBag: # Translate "columns changed" instructions into migration operation # based on altering and copying using `UPDATE ... SET ...`. for column in self.columns_changed: - column_name = FieldMap.to_cratedb(column.name) + column_name = column.name column_name_temporary = column_name + "_alter_tmp" type_ = TypeMap.to_cratedb(column.type, column.params) sqlbag.add( @@ -137,7 +137,7 @@ def to_sql(self) -> SqlBag: @staticmethod def column_definition(column): - field = FieldMap.to_cratedb(column.name) + field = column.name type_ = TypeMap.to_cratedb(column.type, column.params) return f"{field} {type_}" diff --git a/src/cratedb_fivetran_destination/main.py b/src/cratedb_fivetran_destination/main.py index 705d644..d37f073 100644 --- a/src/cratedb_fivetran_destination/main.py +++ b/src/cratedb_fivetran_destination/main.py @@ -9,7 +9,6 @@ from cratedb_fivetran_destination.engine import AlterTableInplaceStatements, Processor from cratedb_fivetran_destination.model import ( - FieldMap, FivetranKnowledge, FivetranTable, TableInfo, @@ -83,7 +82,7 @@ def CreateTable(self, request, context): fivetran_column: common_pb2.Column for fivetran_column in request.table.columns: db_column: sa.Column = sa.Column() - db_column.name = FieldMap.to_cratedb(fivetran_column.name) + db_column.name = fivetran_column.name db_column.type = TypeMap.to_cratedb(fivetran_column.type) db_column.primary_key = fivetran_column.primary_key if db_column.primary_key: @@ -92,8 +91,8 @@ def CreateTable(self, request, context): # db_column.params(fivetran_column.params) # noqa: ERA001 table.append_column(db_column) - # Need to add the `__fivetran_deleted` column manually? - col: sa.Column = sa.Column(name="__fivetran_deleted") + # Need to add the `_fivetran_deleted` column manually? Why? + col: sa.Column = sa.Column(name="_fivetran_deleted") col.type = sa.Boolean() table.append_column(col) @@ -238,7 +237,7 @@ def DescribeTable(self, request, context): sa_column: sa.Column for sa_column in sa_table.columns: ft_column = common_pb2.Column( - name=FieldMap.to_fivetran(sa_column.name), + name=sa_column.name, type=TypeMap.to_fivetran(sa_column.type), primary_key=sa_column.primary_key, ) @@ -261,7 +260,6 @@ def _files_to_records(request, files: t.List[str]): logger.info(f"Decrypting file: {filename}") for record in read_csv.decrypt_file(filename, value): # Rename keys according to field map. - record = FieldMap.rename_keys(record) FivetranKnowledge.replace_values(record) yield record diff --git a/src/cratedb_fivetran_destination/model.py b/src/cratedb_fivetran_destination/model.py index d1b3f71..6bc15a7 100644 --- a/src/cratedb_fivetran_destination/model.py +++ b/src/cratedb_fivetran_destination/model.py @@ -6,51 +6,10 @@ from attrs import define from sqlalchemy_cratedb import ObjectType -from cratedb_fivetran_destination.dictx import OrderedDictX from fivetran_sdk import common_pb2 from fivetran_sdk.common_pb2 import DataType -class FieldMap: - """ - Manage special knowledge about CrateDB field names. - """ - - # Map special column names, because CrateDB does not allow `_` prefixes. - field_map = { - "_fivetran_id": "__fivetran_id", - "_fivetran_synced": "__fivetran_synced", - "_fivetran_deleted": "__fivetran_deleted", - } - - @classmethod - def rename_keys(cls, record): - """ - Rename keys according to the field map. - """ - record = OrderedDictX(record) - for key, value in cls.field_map.items(): - if key in record: - record.rename_key(key, value) - return record - - @classmethod - def to_cratedb(cls, fivetran_field): - """ - Convert a Fivetran field name into a CrateDB field name. - """ - return cls.field_map.get(fivetran_field, fivetran_field) - - @classmethod - def to_fivetran(cls, cratedb_field): - """ - Convert a CrateDB field name into a Fivetran field name. - """ - # TODO: Compute reverse map only once. - reverse_map = dict(zip(cls.field_map.values(), cls.field_map.keys())) - return reverse_map.get(cratedb_field, cratedb_field) - - class TypeMap: """ Map Fivetran types to CrateDB types and back. diff --git a/tests/test_integration.py b/tests/test_integration.py index a9542da..cc7cc09 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -95,9 +95,9 @@ def starter(): json={"count": 42, "foo": "bar"}, xml="XML", naive_time=86400000, - __fivetran_synced=mock.ANY, - __fivetran_id="zyx-987-abc", - __fivetran_deleted=None, + _fivetran_synced=mock.ANY, + _fivetran_id="zyx-987-abc", + _fivetran_deleted=None, ) @@ -158,9 +158,9 @@ def test_integration_cratedb(capfd, services, engine): sa.Column("json", ObjectType), sa.Column("xml", sa.String), sa.Column("naive_time", UserDefinedType), - sa.Column("__fivetran_synced", UserDefinedType), - sa.Column("__fivetran_id", sa.String), - sa.Column("__fivetran_deleted", sa.Boolean), + sa.Column("_fivetran_synced", UserDefinedType), + sa.Column("_fivetran_id", sa.String), + sa.Column("_fivetran_deleted", sa.Boolean), schema="tester_reference", quote_schema=True, )