Skip to content

Commit d29f1b9

Browse files
committed
[kafka] feat: Adding end_time functionality to kafka_consumer
THis commit adds an end_time functionality to the kafka consumer function which makes it more batch-processing friendly, as it allows the user to achieve indempotency
1 parent cd2f2c1 commit d29f1b9

File tree

3 files changed

+75
-13
lines changed

3 files changed

+75
-13
lines changed

sources/kafka/__init__.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def kafka_consumer(
3535
batch_size: Optional[int] = 3000,
3636
batch_timeout: Optional[int] = 3,
3737
start_from: Optional[TAnyDateTime] = None,
38+
end_time: Optional[TAnyDateTime] = None,
3839
) -> Iterable[TDataItem]:
3940
"""Extract recent messages from the given Kafka topics.
4041
@@ -56,6 +57,8 @@ def kafka_consumer(
5657
consume, in seconds.
5758
start_from (Optional[TAnyDateTime]): A timestamp, at which to start
5859
reading. Older messages are ignored.
60+
end_time (Optional[TAnyDateTime]): A timestamp, at which to stop
61+
reading. Newer messages are ignored.
5962
6063
Yields:
6164
Iterable[TDataItem]: Kafka messages.
@@ -78,7 +81,23 @@ def kafka_consumer(
7881
if start_from is not None:
7982
start_from = ensure_pendulum_datetime(start_from)
8083

81-
tracker = OffsetTracker(consumer, topics, dlt.current.resource_state(), start_from)
84+
if end_time is not None:
85+
end_time = ensure_pendulum_datetime(end_time)
86+
87+
if start_from is None:
88+
raise ValueError("`start_from` must be provided if `end_time` is provided")
89+
90+
if start_from > end_time:
91+
raise ValueError("`start_from` must be before `end_time`")
92+
93+
tracker = OffsetTracker(
94+
consumer, topics, dlt.current.resource_state(), start_from, end_time
95+
)
96+
97+
else:
98+
tracker = OffsetTracker(
99+
consumer, topics, dlt.current.resource_state(), start_from
100+
)
82101

83102
# read messages up to the maximum offsets,
84103
# not waiting for new messages
@@ -97,7 +116,19 @@ def kafka_consumer(
97116
else:
98117
raise err
99118
else:
100-
batch.append(msg_processor(msg))
101-
tracker.renew(msg)
119+
topic = msg.topic()
120+
partition = str(msg.partition())
121+
current_offset = msg.offset()
122+
max_offset = tracker[topic][partition]["max"]
123+
124+
# Only process the message if it's within the partition's max offset
125+
if current_offset < max_offset:
126+
batch.append(msg_processor(msg))
127+
tracker.renew(msg)
128+
else:
129+
logger.info(
130+
f"Skipping message on {topic} partition {partition} at offset {current_offset} "
131+
f"- beyond max offset {max_offset}"
132+
)
102133

103134
yield batch

sources/kafka/helpers.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import Any, Dict, List
22

33
from confluent_kafka import Consumer, Message, TopicPartition # type: ignore
4-
from confluent_kafka.admin import AdminClient, TopicMetadata # type: ignore
4+
from confluent_kafka.admin import TopicMetadata # type: ignore
55

66
from dlt import config, secrets
77
from dlt.common import pendulum
@@ -54,23 +54,26 @@ def default_msg_processor(msg: Message) -> Dict[str, Any]:
5454
class OffsetTracker(dict): # type: ignore
5555
"""Object to control offsets of the given topics.
5656
57-
Tracks all the partitions of the given topics with two params:
58-
current offset and maximum offset (partition length).
57+
Tracks all the partitions of the given topics with three params:
58+
current offset, maximum offset (partition length), and an end time.
5959
6060
Args:
6161
consumer (confluent_kafka.Consumer): Kafka consumer.
6262
topic_names (List): Names of topics to track.
6363
pl_state (DictStrAny): Pipeline current state.
6464
start_from (Optional[pendulum.DateTime]): A timestamp, after which messages
6565
are read. Older messages are ignored.
66+
end_time (Optional[pendulum.DateTime]): A timestamp, before which messages
67+
are read. Newer messages are ignored.
6668
"""
6769

6870
def __init__(
6971
self,
7072
consumer: Consumer,
7173
topic_names: List[str],
7274
pl_state: DictStrAny,
73-
start_from: pendulum.DateTime = None,
75+
start_from: Optional[pendulum.DateTime] = None,
76+
end_time: Optional[pendulum.DateTime] = None,
7477
):
7578
super().__init__()
7679

@@ -82,7 +85,7 @@ def __init__(
8285
"offsets", {t_name: {} for t_name in topic_names}
8386
)
8487

85-
self._init_partition_offsets(start_from)
88+
self._init_partition_offsets(start_from, end_time)
8689

8790
def _read_topics(self, topic_names: List[str]) -> Dict[str, TopicMetadata]:
8891
"""Read the given topics metadata from Kafka.
@@ -104,7 +107,11 @@ def _read_topics(self, topic_names: List[str]) -> Dict[str, TopicMetadata]:
104107

105108
return tracked_topics
106109

107-
def _init_partition_offsets(self, start_from: pendulum.DateTime) -> None:
110+
def _init_partition_offsets(
111+
self,
112+
start_from: Optional[pendulum.DateTime] = None,
113+
end_time: Optional[pendulum.DateTime] = None,
114+
) -> None:
108115
"""Designate current and maximum offsets for every partition.
109116
110117
Current offsets are read from the state, if present. Set equal
@@ -113,6 +120,8 @@ def _init_partition_offsets(self, start_from: pendulum.DateTime) -> None:
113120
Args:
114121
start_from (pendulum.DateTime): A timestamp, at which to start
115122
reading. Older messages are ignored.
123+
end_time (pendulum.DateTime): A timestamp, before which messages
124+
are read. Newer messages are ignored.
116125
"""
117126
all_parts = []
118127
for t_name, topic in self._topics.items():
@@ -128,27 +137,49 @@ def _init_partition_offsets(self, start_from: pendulum.DateTime) -> None:
128137
for part in topic.partitions
129138
]
130139

131-
# get offsets for the timestamp, if given
132-
if start_from is not None:
140+
# get offsets for the timestamp ranges, if given
141+
if start_from is not None and end_time is not None:
142+
start_ts_offsets = self._consumer.offsets_for_times(parts)
143+
end_ts_offsets = self._consumer.offsets_for_times(
144+
[
145+
TopicPartition(t_name, part, end_time.int_timestamp * 1000)
146+
for part in topic.partitions
147+
]
148+
)
149+
elif start_from is not None:
133150
ts_offsets = self._consumer.offsets_for_times(parts)
134151

135152
# designate current and maximum offsets for every partition
136153
for i, part in enumerate(parts):
137154
max_offset = self._consumer.get_watermark_offsets(part)[1]
138155

139-
if start_from is not None:
156+
if start_from is not None and end_time is not None:
157+
if start_ts_offsets[i].offset != -1:
158+
cur_offset = start_ts_offsets[i].offset
159+
else:
160+
cur_offset = max_offset - 1
161+
if end_ts_offsets[i].offset != -1:
162+
end_offset = end_ts_offsets[i].offset
163+
else:
164+
end_offset = max_offset
165+
166+
elif start_from is not None:
140167
if ts_offsets[i].offset != -1:
141168
cur_offset = ts_offsets[i].offset
142169
else:
143170
cur_offset = max_offset - 1
171+
172+
end_offset = max_offset
173+
144174
else:
145175
cur_offset = (
146176
self._cur_offsets[t_name].get(str(part.partition), -1) + 1
147177
)
178+
end_offset = max_offset
148179

149180
self[t_name][str(part.partition)] = {
150181
"cur": cur_offset,
151-
"max": max_offset,
182+
"max": end_offset,
152183
}
153184

154185
parts[i].offset = cur_offset

sources/kafka/sources/kafka/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)