Skip to content

Adding Kafka end_time support for batch processing and SASL optionality. #607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions sources/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def kafka_consumer(
batch_size: Optional[int] = 3000,
batch_timeout: Optional[int] = 3,
start_from: Optional[TAnyDateTime] = None,
end_time: Optional[TAnyDateTime] = None,
) -> Iterable[TDataItem]:
"""Extract recent messages from the given Kafka topics.

Expand All @@ -56,6 +57,8 @@ def kafka_consumer(
consume, in seconds.
start_from (Optional[TAnyDateTime]): A timestamp, at which to start
reading. Older messages are ignored.
end_time (Optional[TAnyDateTime]): A timestamp, at which to stop
reading. Newer messages are ignored.

Yields:
Iterable[TDataItem]: Kafka messages.
Expand All @@ -78,7 +81,23 @@ def kafka_consumer(
if start_from is not None:
start_from = ensure_pendulum_datetime(start_from)

tracker = OffsetTracker(consumer, topics, dlt.current.resource_state(), start_from)
if end_time is not None:
end_time = ensure_pendulum_datetime(end_time)

if start_from is None:
raise ValueError("`start_from` must be provided if `end_time` is provided")

if start_from > end_time:
raise ValueError("`start_from` must be before `end_time`")

tracker = OffsetTracker(
consumer, topics, dlt.current.resource_state(), start_from, end_time
)

else:
tracker = OffsetTracker(
consumer, topics, dlt.current.resource_state(), start_from
)

# read messages up to the maximum offsets,
# not waiting for new messages
Expand All @@ -97,7 +116,19 @@ def kafka_consumer(
else:
raise err
else:
batch.append(msg_processor(msg))
tracker.renew(msg)
topic = msg.topic()
partition = str(msg.partition())
current_offset = msg.offset()
max_offset = tracker[topic][partition]["max"]

# Only process the message if it's within the partition's max offset
if current_offset < max_offset:
batch.append(msg_processor(msg))
tracker.renew(msg)
else:
logger.info(
f"Skipping message on {topic} partition {partition} at offset {current_offset} "
f"- beyond max offset {max_offset}"
)

yield batch
76 changes: 58 additions & 18 deletions sources/kafka/helpers.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from confluent_kafka import Consumer, Message, TopicPartition # type: ignore
from confluent_kafka.admin import AdminClient, TopicMetadata # type: ignore
from confluent_kafka.admin import TopicMetadata # type: ignore

from dlt import config, secrets
from dlt.common import pendulum
from dlt.common.configuration import configspec
from dlt.common.configuration.specs import CredentialsConfiguration
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.typing import DictStrAny, TSecretValue, TAnyDateTime
from dlt.common.typing import DictStrAny, TSecretValue
from dlt.common.utils import digest128


Expand Down Expand Up @@ -54,23 +54,26 @@ def default_msg_processor(msg: Message) -> Dict[str, Any]:
class OffsetTracker(dict): # type: ignore
"""Object to control offsets of the given topics.

Tracks all the partitions of the given topics with two params:
current offset and maximum offset (partition length).
Tracks all the partitions of the given topics with three params:
current offset, maximum offset (partition length), and an end time.

Args:
consumer (confluent_kafka.Consumer): Kafka consumer.
topic_names (List): Names of topics to track.
pl_state (DictStrAny): Pipeline current state.
start_from (Optional[pendulum.DateTime]): A timestamp, after which messages
are read. Older messages are ignored.
end_time (Optional[pendulum.DateTime]): A timestamp, before which messages
are read. Newer messages are ignored.
"""

def __init__(
self,
consumer: Consumer,
topic_names: List[str],
pl_state: DictStrAny,
start_from: pendulum.DateTime = None,
start_from: Optional[pendulum.DateTime] = None,
end_time: Optional[pendulum.DateTime] = None,
):
super().__init__()

Expand All @@ -82,7 +85,7 @@ def __init__(
"offsets", {t_name: {} for t_name in topic_names}
)

self._init_partition_offsets(start_from)
self._init_partition_offsets(start_from, end_time)

def _read_topics(self, topic_names: List[str]) -> Dict[str, TopicMetadata]:
"""Read the given topics metadata from Kafka.
Expand All @@ -104,7 +107,11 @@ def _read_topics(self, topic_names: List[str]) -> Dict[str, TopicMetadata]:

return tracked_topics

def _init_partition_offsets(self, start_from: pendulum.DateTime) -> None:
def _init_partition_offsets(
self,
start_from: Optional[pendulum.DateTime] = None,
end_time: Optional[pendulum.DateTime] = None,
) -> None:
"""Designate current and maximum offsets for every partition.

Current offsets are read from the state, if present. Set equal
Expand All @@ -113,6 +120,8 @@ def _init_partition_offsets(self, start_from: pendulum.DateTime) -> None:
Args:
start_from (pendulum.DateTime): A timestamp, at which to start
reading. Older messages are ignored.
end_time (pendulum.DateTime): A timestamp, before which messages
are read. Newer messages are ignored.
"""
all_parts = []
for t_name, topic in self._topics.items():
Expand All @@ -128,27 +137,49 @@ def _init_partition_offsets(self, start_from: pendulum.DateTime) -> None:
for part in topic.partitions
]

# get offsets for the timestamp, if given
if start_from is not None:
# get offsets for the timestamp ranges, if given
if start_from is not None and end_time is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if

(start_from is not None) and (end_time is None)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this one I am not that sure, I dont think it really makes it more readable (it is a simple AND). I made a quick check on the folder as well, and I think it all without the parenthesis (in helpers.py for instance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry, I wasn't clear. I meant what would happen in the case if start_from is specified and end_time is None?
It seems that this case is not covered

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! I added back the original fall back scenario when start_time is the only param provided. Basically we read from that start to the max offset. Let me know your thoughts!

start_ts_offsets = self._consumer.offsets_for_times(parts)
end_ts_offsets = self._consumer.offsets_for_times(
[
TopicPartition(t_name, part, end_time.int_timestamp * 1000)
for part in topic.partitions
]
)
elif start_from is not None:
ts_offsets = self._consumer.offsets_for_times(parts)

# designate current and maximum offsets for every partition
for i, part in enumerate(parts):
max_offset = self._consumer.get_watermark_offsets(part)[1]

if start_from is not None:
if start_from is not None and end_time is not None:
if start_ts_offsets[i].offset != -1:
cur_offset = start_ts_offsets[i].offset
else:
cur_offset = max_offset - 1
if end_ts_offsets[i].offset != -1:
end_offset = end_ts_offsets[i].offset
else:
end_offset = max_offset

elif start_from is not None:
if ts_offsets[i].offset != -1:
cur_offset = ts_offsets[i].offset
else:
cur_offset = max_offset - 1

end_offset = max_offset

else:
cur_offset = (
self._cur_offsets[t_name].get(str(part.partition), -1) + 1
)
end_offset = max_offset

self[t_name][str(part.partition)] = {
"cur": cur_offset,
"max": max_offset,
"max": end_offset,
}

parts[i].offset = cur_offset
Expand Down Expand Up @@ -200,9 +231,11 @@ class KafkaCredentials(CredentialsConfiguration):
bootstrap_servers: str = config.value
group_id: str = config.value
security_protocol: str = config.value
sasl_mechanisms: str = config.value
sasl_username: str = config.value
sasl_password: TSecretValue = secrets.value

# Optional SASL credentials
sasl_mechanisms: Optional[str] = config.value
sasl_username: Optional[str] = config.value
sasl_password: Optional[TSecretValue] = secrets.value

def init_consumer(self) -> Consumer:
"""Init a Kafka consumer from this credentials.
Expand All @@ -214,9 +247,16 @@ def init_consumer(self) -> Consumer:
"bootstrap.servers": self.bootstrap_servers,
"group.id": self.group_id,
"security.protocol": self.security_protocol,
"sasl.mechanisms": self.sasl_mechanisms,
"sasl.username": self.sasl_username,
"sasl.password": self.sasl_password,
"auto.offset.reset": "earliest",
}

if self.sasl_mechanisms and self.sasl_username and self.sasl_password:
config.update(
{
"sasl.mechanisms": self.sasl_mechanisms,
"sasl.username": self.sasl_username,
"sasl.password": self.sasl_password,
}
)

return Consumer(config)
Empty file.