Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 56 additions & 5 deletions docs/supported-sources/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,80 @@ The URI format for Apache Kafka is as follows:
kafka://?bootstrap_servers=localhost:9092&group_id=test_group&security_protocol=SASL_SSL&sasl_mechanisms=PLAIN&sasl_username=example_username&sasl_password=example_secret&batch_size=1000&batch_timeout=3
```

URI parameters:
### URI parameters

Connectivity options:
- `bootstrap_servers`: Required, the Kafka server or servers to connect to, typically in the form of a host and port, e.g. `localhost:9092`
- `group_id`: Required, the consumer group ID used for identifying the client when consuming messages.
- `security_protocol`: The protocol used to communicate with brokers, e.g. `SASL_SSL` for secure communication.
- `sasl_mechanisms`: The SASL mechanism to be used for authentication, e.g. `PLAIN`.
- `sasl_username`: The username for SASL authentication.
- `sasl_password`: The password for SASL authentication.

Transfer options:
- `batch_size`: The number of messages to fetch in a single batch, defaults to 3000.
- `batch_timeout`: The maximum time to wait for messages, defaults to 3 seconds.

Decoding options:
- `key_type`: The data type of the Kafka event `key` field. Possible values: `json`.
- `value_type`: The data type of the Kafka event `value_type` field. Possible values: `json`.
- `format`: The output format/layout. Possible values: `standard_v1`, `standard_v2`, `flexible`.
- `include`: Which fields to include in the output, comma-separated.
- `select`: Which field to select (pick) into the output.
Copy link
Contributor Author

@amotl amotl Aug 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Todo: Add a remark to the select explanation, a compressed version of:

Always use select=value to select the value [sic!] of the Kafka event. This has been chosen deliberately to adhere to the Kafka jargon, despite ingestr's internal event layout relays this field as data.


The URI is used to connect to the Kafka brokers for ingesting messages.
When using the `include` or `select` option, the decoder will automatically
select the `flexible` output format.
Comment on lines +35 to +36
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karakanb: Does this statement help on the question you had about the flexible output format?


### Group ID
The group ID is used to identify the consumer group that reads messages from a topic. Kafka uses the group ID to manage consumer offsets and assign partitions to consumers, which means that the group ID is the key to reading messages from the correct partition and position in the topic.

Once you have your Kafka server, credentials, and group ID set up, here's a sample command to ingest messages from a Kafka topic into a DuckDB database:
## Examples

### Kafka to DuckDB

Once you have your Kafka server, credentials, and group ID set up,
here are a few sample commands to ingest messages from a Kafka topic into a destination database:

Transfer data using the traditional `standard_v1` output format into DuckDB.
The result of this command will be a table in the `kafka.duckdb` database with JSON columns.
```sh
ingestr ingest \
--source-uri 'kafka://?bootstrap_servers=localhost:9092&group_id=test_group' \
--source-uri 'kafka://?bootstrap_servers=localhost:9092&group_id=test' \
--source-table 'my-topic' \
--dest-uri duckdb:///kafka.duckdb \
--dest-uri 'duckdb:///kafka.duckdb' \
--dest-table 'dest.my_topic'
```

The result of this command will be a table in the `kafka.duckdb` database with JSON columns.
### Kafka to PostgreSQL

Transfer data converging the Kafka event `value` into a PostgreSQL destination
table, after decoding from JSON, using the `flexible` output format.
```sh
echo '{"sensor_id":1,"ts":"2025-06-01 10:00","reading":42.42}' | kcat -P -b localhost -t demo
```
```sh
ingestr ingest \
--source-uri 'kafka://?bootstrap_servers=localhost:9092&group_id=test&value_type=json&select=value' \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would the same work without the select but the format set to flexible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using select implicitly sets the format to flexible.

--source-table 'demo' \
--dest-uri 'postgres://postgres:postgres@localhost:5432/?sslmode=disable' \
--dest-table 'public.kafka_demo'
```
The result of this command will be the `public.kafka_demo` table using
the Kafka event `value`'s top-level JSON keys as table columns.
```sh
psql "postgresql://postgres:postgres@localhost:5432/" \
-c '\d+ public.kafka_demo' \
-c 'select * from public.kafka_demo;'
```
```text
Table "public.kafka_demo"

Column | Type |
--------------+--------------------------+
sensor_id | bigint |
ts | timestamp with time zone |
reading | double precision |
_dlt_load_id | character varying |
_dlt_id | character varying |
```
149 changes: 141 additions & 8 deletions ingestr/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import requests
import sqlalchemy
from confluent_kafka import Producer # type: ignore
from confluent_kafka.admin import AdminClient # type: ignore
from dlt.sources.filesystem import glob_files
from fsspec.implementations.memory import MemoryFileSystem # type: ignore
from sqlalchemy.pool import NullPool
Expand Down Expand Up @@ -1284,19 +1285,39 @@ def as_datetime2(date_str: str) -> datetime:
return datetime.strptime(date_str, "%Y-%m-%d")


@pytest.fixture(scope="session")
def kafka_service():
"""
Provide a Kafka service container for the whole test session.
"""
container = KafkaContainer("confluentinc/cp-kafka:7.6.0")
container.start()
yield container
container.stop()


@pytest.fixture(scope="function")
def kafka(kafka_service):
"""
Provide a Kafka service container using a clean canvas.
Before invoking the test case, delete all relevant topics completely.
"""
admin = AdminClient({"bootstrap.servers": kafka_service.get_bootstrap_server()})
admin.delete_topics(["test_topic"])
admin.poll(1)
return kafka_service


@pytest.mark.parametrize(
"dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys())
)
def test_kafka_to_db(dest):
def test_kafka_to_db_incremental(kafka, dest):
"""
Validate standard Kafka event decoding, focusing on both metadata and data payload.
"""
with ThreadPoolExecutor() as executor:
dest_future = executor.submit(dest.start)
source_future = executor.submit(
KafkaContainer("confluentinc/cp-kafka:7.6.0").start, timeout=120
)
dest_uri = dest_future.result()
kafka = source_future.result()

# kafka = KafkaContainer("confluentinc/cp-kafka:7.6.0").start(timeout=60)

# Create Kafka producer
producer = Producer({"bootstrap.servers": kafka.get_bootstrap_server()})
Expand Down Expand Up @@ -1357,7 +1378,119 @@ def get_output_table():
assert res[2] == ("message3",)
assert res[3] == ("message4",)

kafka.stop()

@pytest.mark.parametrize(
"dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys())
)
def test_kafka_to_db_decode_json(kafka, dest):
"""
Validate slightly more advanced Kafka event decoding, focusing on the payload value this time.

This exercise uses the `value_type=json` and `select=value` URL parameters.
"""
with ThreadPoolExecutor() as executor:
dest_future = executor.submit(dest.start)
dest_uri = dest_future.result()

# Create Kafka producer
producer = Producer({"bootstrap.servers": kafka.get_bootstrap_server()})

# Create topic and send messages
topic = "test_topic"
messages = [
{"id": 1, "temperature": 42.42, "humidity": 82},
{"id": 2, "temperature": 451.00, "humidity": 15},
]

for message in messages:
producer.produce(topic, json.dumps(message))
producer.flush()

def run():
res = invoke_ingest_command(
f"kafka://?bootstrap_servers={kafka.get_bootstrap_server()}&group_id=test_group&value_type=json&select=value",
"test_topic",
dest_uri,
"testschema.output",
)
assert res.exit_code == 0

def get_output_table():
dest_engine = sqlalchemy.create_engine(dest_uri)
with dest_engine.connect() as conn:
res = (
conn.execute(
"SELECT id, temperature, humidity FROM testschema.output WHERE temperature >= 38.00 ORDER BY id ASC"
)
.mappings()
.fetchall()
)
dest_engine.dispose()
return res

run()

res = get_output_table()
assert len(res) == 2
assert res[0] == messages[0]
assert res[1] == messages[1]


@pytest.mark.parametrize(
"dest", list(DESTINATIONS.values()), ids=list(DESTINATIONS.keys())
)
def test_kafka_to_db_include_metadata(kafka, dest):
"""
Validate slightly more advanced Kafka event decoding, focusing on metadata this time.

This exercise uses the `include=` URL parameter.
"""
with ThreadPoolExecutor() as executor:
dest_future = executor.submit(dest.start)
dest_uri = dest_future.result()

# Create Kafka producer
producer = Producer({"bootstrap.servers": kafka.get_bootstrap_server()})

# Create topic and send messages
topic = "test_topic"
messages = [
{"id": 1, "temperature": 42.42, "humidity": 82},
{"id": 2, "temperature": 451.00, "humidity": 15},
]

for message in messages:
producer.produce(topic=topic, value=json.dumps(message), key="test")
producer.flush()

def run():
res = invoke_ingest_command(
f"kafka://?bootstrap_servers={kafka.get_bootstrap_server()}&group_id=test_group&include=partition,topic,key,offset,ts",
"test_topic",
dest_uri,
"testschema.output",
)
assert res.exit_code == 0

def get_output_table():
dest_engine = sqlalchemy.create_engine(dest_uri)
with dest_engine.connect() as conn:
res = (
conn.execute(
'SELECT "partition", "topic", "key", "offset" FROM testschema.output ORDER BY "ts__value" ASC'
)
.mappings()
.fetchall()
)
dest_engine.dispose()
return res

run()

res = get_output_table()
assert len(res) == 2
assert res[0] == {"partition": 0, "topic": "test_topic", "key": "test", "offset": 0}
assert res[1] == {"partition": 0, "topic": "test_topic", "key": "test", "offset": 1}


@pytest.mark.parametrize(
Expand Down
16 changes: 8 additions & 8 deletions ingestr/src/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""A source to extract Kafka messages.

When extraction starts, partitions length is checked -
When extraction starts, partition length is checked -
data is read only up to it, overriding the default Kafka's
behavior of waiting for new messages in endless loop.
"""
Expand All @@ -16,8 +16,8 @@

from .helpers import (
KafkaCredentials,
KafkaEventProcessor,
OffsetTracker,
default_msg_processor,
)


Expand All @@ -29,9 +29,7 @@
def kafka_consumer(
topics: Union[str, List[str]],
credentials: Union[KafkaCredentials, Consumer] = dlt.secrets.value,
msg_processor: Optional[
Callable[[Message], Dict[str, Any]]
] = default_msg_processor,
msg_processor: Optional[Callable[[Message], Dict[str, Any]]] = None,
batch_size: Optional[int] = 3000,
batch_timeout: Optional[int] = 3,
start_from: Optional[TAnyDateTime] = None,
Expand All @@ -49,17 +47,19 @@ def kafka_consumer(
Auth credentials or an initiated Kafka consumer. By default,
is taken from secrets.
msg_processor(Optional[Callable]): A function-converter,
which'll process every Kafka message after it's read and
before it's transfered to the destination.
which will process every Kafka message after it is read and
before it is transferred to the destination.
batch_size (Optional[int]): Messages batch size to read at once.
batch_timeout (Optional[int]): Maximum time to wait for a batch
consume, in seconds.
to be consumed in seconds.
start_from (Optional[TAnyDateTime]): A timestamp, at which to start
reading. Older messages are ignored.

Yields:
Iterable[TDataItem]: Kafka messages.
"""
msg_processor = msg_processor or KafkaEventProcessor().process

if not isinstance(topics, list):
topics = [topics]

Expand Down
Loading