From 1ce0df3c1c583931577819c7517e1fe67aff34e8 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 27 Nov 2025 23:33:22 +0100 Subject: [PATCH 1/2] Add support for `AlterTable::drop_columns` This satisfies SDK tester's `schema_migrations_input_ddl.json`. Co-authored-by: Sebastian Utz --- CHANGES.md | 1 + src/cratedb_fivetran_destination/main.py | 21 ++++++++ .../configuration.json | 11 ++++ .../schema_migrations_input_ddl.json | 53 +++++++++++++++++++ tests/test_adapter.py | 39 ++++++++++++++ tests/test_integration.py | 12 +++++ 6 files changed, 137 insertions(+) create mode 100644 tests/data/fivetran_migrations_ddl/configuration.json create mode 100644 tests/data/fivetran_migrations_ddl/schema_migrations_input_ddl.json diff --git a/CHANGES.md b/CHANGES.md index 260a94e..3e8046f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,7 @@ - Runtime: Updated Fivetran SDK to commit `18e037c` - Implemented `AlterTableRecreateStatements` for recreating a table with a new schema, used for processing primary key column changes. +- Added support for `AlterTable::drop_columns` operation. ## v0.0.3 - 2025-05-23 - Dependencies: Updated to `sqlalchemy-cratedb==0.42.0.dev2` diff --git a/src/cratedb_fivetran_destination/main.py b/src/cratedb_fivetran_destination/main.py index 9055438..3c52de7 100644 --- a/src/cratedb_fivetran_destination/main.py +++ b/src/cratedb_fivetran_destination/main.py @@ -121,6 +121,7 @@ def AlterTable(self, request, context): columns_new: t.List[common_pb2.Column] = [] columns_changed: t.List[common_pb2.Column] = [] columns_common: t.List[common_pb2.Column] = [] + columns_deleted: t.List[common_pb2.Column] = [] for column in new_table.columns: column_old = columns_old.get(column.name) @@ -138,6 +139,26 @@ def AlterTable(self, request, context): columns_changed.append(column) table_info = self._table_info_from_request(request) + + if request.drop_columns: + for column in old_table.columns: + if column not in new_table.columns: + columns_deleted.append(column) + if columns_deleted: + amendments = [] + for column in columns_deleted: + amendments.append(f'DROP COLUMN "{column.name}"') + with self.engine.connect() as connection: + connection.execute( + sa.text(f"ALTER TABLE {table_info.fullname} {', '.join(amendments)}") + ) + log_message( + LOG_INFO, f"AlterTable: Successfully altered table: {table_info.fullname}" + ) + else: + log_message(LOG_INFO, "AlterTable (drop columns): Nothing changed") + return destination_sdk_pb2.AlterTableResponse(success=True) + if pk_has_changed: log_message( LOG_WARNING, diff --git a/tests/data/fivetran_migrations_ddl/configuration.json b/tests/data/fivetran_migrations_ddl/configuration.json new file mode 100644 index 0000000..17762aa --- /dev/null +++ b/tests/data/fivetran_migrations_ddl/configuration.json @@ -0,0 +1,11 @@ +{ + "writerType": "Database", + "url": "crate://", + "database": "doc", + "table": "testdrive", + "host": "localhost", + "port": "4200", + "user": "crate", + "password": "", + "enableEncryption": "false" +} diff --git a/tests/data/fivetran_migrations_ddl/schema_migrations_input_ddl.json b/tests/data/fivetran_migrations_ddl/schema_migrations_input_ddl.json new file mode 100644 index 0000000..480fbaf --- /dev/null +++ b/tests/data/fivetran_migrations_ddl/schema_migrations_input_ddl.json @@ -0,0 +1,53 @@ +{ + "create_table" : { + "transaction": { + "columns": { + "id": "INT", + "amount": "DOUBLE", + "desc": "STRING" + }, + "primary_key": ["id"] + } + }, + "ops" : [ + { + "upsert": { + "transaction": [ + {"id":1, "amount": 100.45, "desc": null}, + {"id":2, "amount": 150.33, "desc": "two"}, + {"id":3, "amount": 150.33, "desc": "two"}, + {"id":4, "amount": 150.33, "desc": "two"}, + {"id":10, "amount": 200, "desc": "three"}, + {"id":20, "amount": 50, "desc": "money"} + ] + } + } + ], + "schema_migration" : [ + { + "add_column": [ + { + "table": "transaction", + "column": "operation_time", + "data_type": "UTC_DATETIME" + } + ], + "change_column_data_type": [ + { + "table": "transaction", + "column": "amount", + "data_type": "STRING" + } + ], + "drop_column": [ + { + "table": "transaction", + "column": "desc" + } + ] + } + ], + "describe_table" : [ + "transaction" + ] +} diff --git a/tests/test_adapter.py b/tests/test_adapter.py index 0685ef5..25eedd3 100644 --- a/tests/test_adapter.py +++ b/tests/test_adapter.py @@ -220,6 +220,45 @@ def test_api_alter_table_nothing_changed(engine, capsys): assert format_log_message("AlterTable: Nothing changed", newline=True) in out +def test_api_alter_table_drop_column_nothing_changed(engine, capsys): + """ + Invoke gRPC API method `AlterTable` with `drop_columns=True`, but nothing changed. + """ + from cratedb_fivetran_destination.main import CrateDBDestinationImpl + + destination = CrateDBDestinationImpl() + + with engine.connect() as conn: + conn.execute(sa.text("CREATE TABLE testdrive.foo (id INT)")) + + # Invoke gRPC API method under test. + table: common_pb2.Table = common_pb2.Table( + name="foo", + columns=[ + common_pb2.Column( + name="id", + type=common_pb2.DataType.INT, + primary_key=False, + ) + ], + ) + config = {"url": "crate://"} + response = destination.AlterTable( + request=destination_sdk_pb2.AlterTableRequest( + table=table, schema_name="testdrive", configuration=config, drop_columns=True + ), + context=destination_sdk_pb2.AlterTableResponse(), + ) + + # Validate outcome. + assert response.success is True + assert response.warning.message == "" + + # Check log output. + out, err = capsys.readouterr() + assert format_log_message("AlterTable (drop columns): Nothing changed", newline=True) in out + + def test_api_alter_table_change_primary_key_type(engine, capsys): """ Invoke gRPC API method `AlterTable`, changing the type of the primary key. diff --git a/tests/test_integration.py b/tests/test_integration.py index 62cf693..5f502df 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -140,6 +140,18 @@ def test_integration_fivetran(capfd, services): assert "Describe Table: composite_table" in err +@pytest.mark.parametrize("services", ["./tests/data/fivetran_migrations_ddl"], indirect=True) +def test_integration_fivetran_migrations_ddl(capfd, services): + """ + Verify the Fivetran destination tester runs to completion with Fivetran test data. + """ + + # Read out stdout and stderr. + out, err = capfd.readouterr() + + assert "Describe Table: transaction" in err + + @pytest.mark.parametrize("services", ["./tests/data/cratedb_canonical"], indirect=True) def test_integration_cratedb(capfd, services, engine): """ From c1a3300a1d0726cdab508509d50fd765e463c9ca Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 28 Nov 2025 12:47:50 +0100 Subject: [PATCH 2/2] AlterTable::drop_columns: Implement suggestions by CodeRabbit --- src/cratedb_fivetran_destination/main.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cratedb_fivetran_destination/main.py b/src/cratedb_fivetran_destination/main.py index 3c52de7..6ad6991 100644 --- a/src/cratedb_fivetran_destination/main.py +++ b/src/cratedb_fivetran_destination/main.py @@ -141,8 +141,9 @@ def AlterTable(self, request, context): table_info = self._table_info_from_request(request) if request.drop_columns: + new_column_names = [column.name for column in new_table.columns] for column in old_table.columns: - if column not in new_table.columns: + if column.name not in new_column_names: columns_deleted.append(column) if columns_deleted: amendments = []