Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## In progress
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.
- Improve write operations to be closer to `target-postgres`.
- Removed workaround for `_`-prefixed column names.
The package now requires CrateDB 6.2 or higher.

## 2023-12-08 v0.0.1
- Make it work. It can run the canonical Meltano GitHub -> DB example.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and loaders, and based on the [Meltano PostgreSQL target].
In order to learn more about Singer, Meltano, and friends, navigate to the
[Singer Intro](./docs/singer-intro.md).

Operating the package successfully needs CrateDB 6.2 or higher.
Copy link
Contributor Author

@amotl amotl Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @surister. Thanks. Let me loop your comment into a thread.

Should we perhaps maintain this and check for CrateDB version? I assume that this will break people trying to use it with CrateDB < 6.2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will not work with lower versions of CrateDB. As I see the CrateDB 6.2 release might be far away (expecting 6.1 first?), this patch is probably not ready for releasing, and should be made a draft again?

Copy link
Contributor Author

@amotl amotl Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about pre-flight checks. I haven't seen many (if any) such measures in other vendor's adapters. In a perfect world, this adapter will vanish completely anyway if we could submit all specialities into the upstream target-postgres.

However, I see your point, and, as long as we maintain a separate adapter, and, most importantly, as long as it will not have a negative performance impact in high-traffic or high-volume environments, I will not object adding conveniency features like pre-flight version checks.

Copy link
Contributor Author

@amotl amotl Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This patch includes a check for the CrateDB version, as suggested.

Copy link
Contributor Author

@amotl amotl Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: The package is in alpha/workbench mode, currently bearing just 46 downloads per month. In this spirit, we can easily produce (merge & release) this improvement, to make it a fully functional and significantly enhanced package for all users of CrateDB nightly and future CrateDB 6.2 users, even if this will only be released in Jan/Feb 2026.

All people who are currently using the package can easily stick with the currently released version. There is not much need to introduce or address backward compatibility issues.


## Install

Expand Down
90 changes: 0 additions & 90 deletions target_cratedb/sinks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
"""CrateDB target sink class, which handles writing streams."""

import datetime
import os
import time
from typing import List, Optional, Union

import sqlalchemy as sa
Expand All @@ -20,9 +18,6 @@ class CrateDBSink(PostgresSink):

connector_class = CrateDBConnector

soft_delete_column_name = "__sdc_deleted_at"
version_column_name = "__sdc_table_version"

def __init__(self, *args, **kwargs):
"""Initialize SQL Sink. See super class for more details."""
super().__init__(*args, **kwargs)
Expand All @@ -32,91 +27,6 @@ def __init__(self, *args, **kwargs):
# operations on the target table.
self.strategy_direct = MELTANO_CRATEDB_STRATEGY_DIRECT

# Record processing

def _add_sdc_metadata_to_record(
self,
record: dict,
message: dict,
context: dict,
) -> None:
"""Populate metadata _sdc columns from incoming record message.

Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html

Args:
record: Individual record in the stream.
message: The record message.
context: Stream partition or context dictionary.
"""
record["__sdc_extracted_at"] = message.get("time_extracted")
record["__sdc_received_at"] = datetime.datetime.now(
tz=datetime.timezone.utc,
).isoformat()
record["__sdc_batched_at"] = (
context.get("batch_start_time", None) or datetime.datetime.now(tz=datetime.timezone.utc)
).isoformat()
record["__sdc_deleted_at"] = record.get("__sdc_deleted_at")
record["__sdc_sequence"] = int(round(time.time() * 1000))
record["__sdc_table_version"] = message.get("version")
record["__sdc_sync_started_at"] = self.sync_started_at

def _add_sdc_metadata_to_schema(self) -> None:
"""Add _sdc metadata columns.

Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
"""
properties_dict = self.schema["properties"]
for col in (
"__sdc_extracted_at",
"__sdc_received_at",
"__sdc_batched_at",
"__sdc_deleted_at",
):
properties_dict[col] = {
"type": ["null", "string"],
"format": "date-time",
}
for col in ("__sdc_sequence", "__sdc_table_version", "__sdc_sync_started_at"):
properties_dict[col] = {"type": ["null", "integer"]}

def _remove_sdc_metadata_from_schema(self) -> None:
"""Remove _sdc metadata columns.

Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html
"""
properties_dict = self.schema["properties"]
for col in (
"__sdc_extracted_at",
"__sdc_received_at",
"__sdc_batched_at",
"__sdc_deleted_at",
"__sdc_sequence",
"__sdc_table_version",
"__sdc_sync_started_at",
):
properties_dict.pop(col, None)

def _remove_sdc_metadata_from_record(self, record: dict) -> None:
"""Remove metadata _sdc columns from incoming record message.

Record metadata specs documented at:
https://sdk.meltano.com/en/latest/implementation/record_metadata.html

Args:
record: Individual record in the stream.
"""
record.pop("__sdc_extracted_at", None)
record.pop("__sdc_received_at", None)
record.pop("__sdc_batched_at", None)
record.pop("__sdc_deleted_at", None)
record.pop("__sdc_sequence", None)
record.pop("__sdc_table_version", None)
record.pop("__sdc_sync_started_at", None)

def process_batch(self, context: dict) -> None:
"""Process a batch with the given batch context.

Expand Down
2 changes: 1 addition & 1 deletion target_cratedb/tests/test_standard_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from importlib_resources import files as resource_files # type: ignore[no-redef]


METADATA_COLUMN_PREFIX = "__sdc"
METADATA_COLUMN_PREFIX = "_sdc"


@pytest.fixture(scope="session")
Expand Down