Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Runtime: Updated Fivetran SDK to commit `18e037c`
- Implemented `AlterTableRecreateStatements` for recreating a table
with a new schema, used for processing primary key column changes.
- Added support for `AlterTable::drop_columns` operation.

## v0.0.3 - 2025-05-23
- Dependencies: Updated to `sqlalchemy-cratedb==0.42.0.dev2`
Expand Down
22 changes: 22 additions & 0 deletions src/cratedb_fivetran_destination/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def AlterTable(self, request, context):
columns_new: t.List[common_pb2.Column] = []
columns_changed: t.List[common_pb2.Column] = []
columns_common: t.List[common_pb2.Column] = []
columns_deleted: t.List[common_pb2.Column] = []

for column in new_table.columns:
column_old = columns_old.get(column.name)
Expand All @@ -138,6 +139,27 @@ def AlterTable(self, request, context):
columns_changed.append(column)

table_info = self._table_info_from_request(request)

if request.drop_columns:
new_column_names = [column.name for column in new_table.columns]
for column in old_table.columns:
if column.name not in new_column_names:
columns_deleted.append(column)
if columns_deleted:
amendments = []
for column in columns_deleted:
amendments.append(f'DROP COLUMN "{column.name}"')
with self.engine.connect() as connection:
connection.execute(
sa.text(f"ALTER TABLE {table_info.fullname} {', '.join(amendments)}")
)
log_message(
LOG_INFO, f"AlterTable: Successfully altered table: {table_info.fullname}"
)
else:
log_message(LOG_INFO, "AlterTable (drop columns): Nothing changed")
return destination_sdk_pb2.AlterTableResponse(success=True)

if pk_has_changed:
log_message(
LOG_WARNING,
Expand Down
11 changes: 11 additions & 0 deletions tests/data/fivetran_migrations_ddl/configuration.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"writerType": "Database",
"url": "crate://",
"database": "doc",
"table": "testdrive",
"host": "localhost",
"port": "4200",
"user": "crate",
"password": "",
"enableEncryption": "false"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"create_table" : {
"transaction": {
"columns": {
"id": "INT",
"amount": "DOUBLE",
"desc": "STRING"
},
"primary_key": ["id"]
}
},
"ops" : [
{
"upsert": {
"transaction": [
{"id":1, "amount": 100.45, "desc": null},
{"id":2, "amount": 150.33, "desc": "two"},
{"id":3, "amount": 150.33, "desc": "two"},
{"id":4, "amount": 150.33, "desc": "two"},
{"id":10, "amount": 200, "desc": "three"},
{"id":20, "amount": 50, "desc": "money"}
]
}
}
],
"schema_migration" : [
{
"add_column": [
{
"table": "transaction",
"column": "operation_time",
"data_type": "UTC_DATETIME"
}
],
"change_column_data_type": [
{
"table": "transaction",
"column": "amount",
"data_type": "STRING"
Comment on lines +35 to +39
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The target data type should actually be DOUBLE, but that currently fails.

Suggested change
"change_column_data_type": [
{
"table": "transaction",
"column": "amount",
"data_type": "STRING"
"change_column_data_type": [
{
"table": "transaction",
"column": "amount",
"data_type": "DOUBLE"

}
],
"drop_column": [
{
"table": "transaction",
"column": "desc"
}
]
}
],
"describe_table" : [
"transaction"
]
}
39 changes: 39 additions & 0 deletions tests/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,45 @@ def test_api_alter_table_nothing_changed(engine, capsys):
assert format_log_message("AlterTable: Nothing changed", newline=True) in out


def test_api_alter_table_drop_column_nothing_changed(engine, capsys):
"""
Invoke gRPC API method `AlterTable` with `drop_columns=True`, but nothing changed.
"""
from cratedb_fivetran_destination.main import CrateDBDestinationImpl

destination = CrateDBDestinationImpl()

with engine.connect() as conn:
conn.execute(sa.text("CREATE TABLE testdrive.foo (id INT)"))

# Invoke gRPC API method under test.
table: common_pb2.Table = common_pb2.Table(
name="foo",
columns=[
common_pb2.Column(
name="id",
type=common_pb2.DataType.INT,
primary_key=False,
)
],
)
config = {"url": "crate://"}
response = destination.AlterTable(
request=destination_sdk_pb2.AlterTableRequest(
table=table, schema_name="testdrive", configuration=config, drop_columns=True
),
context=destination_sdk_pb2.AlterTableResponse(),
)

# Validate outcome.
assert response.success is True
assert response.warning.message == ""

# Check log output.
out, err = capsys.readouterr()
assert format_log_message("AlterTable (drop columns): Nothing changed", newline=True) in out
Comment on lines +253 to +259
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe "nothing changed" should converge into a warning message instead? Let's track this on behalf of an "API polishing" ticket, please. /cc @coderabbitai

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Create a GitHub issue to track the API polishing suggestion

gh issue create \
  --title "API polishing: Convert 'nothing changed' messages to warnings" \
  --body "## Context

During review of PR #106, a suggestion was made to convert 'nothing changed' log messages into warning messages for better API consistency.

Currently, when \`AlterTable\` operations result in no changes (e.g., when \`drop_columns=True\` but no columns are actually dropped), the system logs informational messages like:
- \`AlterTable: Nothing changed\`
- \`AlterTable (drop columns): Nothing changed\`

## Suggestion

These 'nothing changed' scenarios should converge into warning messages instead of informational log messages, to better signal to users that the operation had no effect.

## References

- PR: https://github.com/crate/cratedb-fivetran-destination/pull/106
- Comment: https://github.com/crate/cratedb-fivetran-destination/pull/106#discussion_r2571402896
- Requested by: @amotl" \
  --assignee amotl

Length of output: 64


@amotl, I've created issue #108 to track this API polishing suggestion: #108

The issue captures the idea of converting "nothing changed" messages to warnings for better API consistency and has been assigned to you.



def test_api_alter_table_change_primary_key_type(engine, capsys):
"""
Invoke gRPC API method `AlterTable`, changing the type of the primary key.
Expand Down
12 changes: 12 additions & 0 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,18 @@ def test_integration_fivetran(capfd, services):
assert "Describe Table: composite_table" in err


@pytest.mark.parametrize("services", ["./tests/data/fivetran_migrations_ddl"], indirect=True)
def test_integration_fivetran_migrations_ddl(capfd, services):
"""
Verify the Fivetran destination tester runs to completion with Fivetran test data.
"""

# Read out stdout and stderr.
out, err = capfd.readouterr()

assert "Describe Table: transaction" in err


@pytest.mark.parametrize("services", ["./tests/data/cratedb_canonical"], indirect=True)
def test_integration_cratedb(capfd, services, engine):
"""
Expand Down