Skip to content

Commit be9dd7c

Browse files
authored
KAFKA-6397: Consumer should not block setting positions of unavailable partitions (#2593)
1 parent 39aa421 commit be9dd7c

File tree

9 files changed

+380
-339
lines changed

9 files changed

+380
-339
lines changed

kafka/cluster.py

+4
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ def request_update(self):
202202
self._future = Future()
203203
return self._future
204204

205+
@property
206+
def need_update(self):
207+
return self._need_update
208+
205209
def topics(self, exclude_internal_topics=True):
206210
"""Get set of known topics.
207211

kafka/consumer/fetcher.py

+149-151
Large diffs are not rendered by default.

kafka/consumer/group.py

+50-53
Original file line numberDiff line numberDiff line change
@@ -572,9 +572,8 @@ def committed(self, partition, metadata=False, timeout_ms=None):
572572
This offset will be used as the position for the consumer
573573
in the event of a failure.
574574
575-
This call may block to do a remote call if the partition in question
576-
isn't assigned to this consumer or if the consumer hasn't yet
577-
initialized its cache of committed offsets.
575+
This call will block to do a remote call to get the latest committed
576+
offsets from the server.
578577
579578
Arguments:
580579
partition (TopicPartition): The partition to check.
@@ -586,28 +585,16 @@ def committed(self, partition, metadata=False, timeout_ms=None):
586585
587586
Raises:
588587
KafkaTimeoutError if timeout_ms provided
588+
BrokerResponseErrors if OffsetFetchRequest raises an error.
589589
"""
590590
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
591591
assert self.config['group_id'] is not None, 'Requires group_id'
592592
if not isinstance(partition, TopicPartition):
593593
raise TypeError('partition must be a TopicPartition namedtuple')
594-
if self._subscription.is_assigned(partition):
595-
committed = self._subscription.assignment[partition].committed
596-
if committed is None:
597-
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms)
598-
committed = self._subscription.assignment[partition].committed
599-
else:
600-
commit_map = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms)
601-
if partition in commit_map:
602-
committed = commit_map[partition]
603-
else:
604-
committed = None
605-
606-
if committed is not None:
607-
if metadata:
608-
return committed
609-
else:
610-
return committed.offset
594+
committed = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms)
595+
if partition not in committed:
596+
return None
597+
return committed[partition] if metadata else committed[partition].offset
611598

612599
def _fetch_all_topic_metadata(self):
613600
"""A blocking call that fetches topic metadata for all topics in the
@@ -717,10 +704,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
717704
if not self._coordinator.poll(timeout_ms=inner_timeout_ms()):
718705
return {}
719706

720-
# Fetch positions if we have partitions we're subscribed to that we
721-
# don't know the offset for
722-
if not self._subscription.has_all_fetch_positions():
723-
self._update_fetch_positions(self._subscription.missing_fetch_positions(), timeout_ms=inner_timeout_ms())
707+
has_all_fetch_positions = self._update_fetch_positions(timeout_ms=inner_timeout_ms())
724708

725709
# If data is available already, e.g. from a previous network client
726710
# poll() call to commit, then just return it immediately
@@ -737,7 +721,13 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
737721
if records:
738722
return records
739723

740-
self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000))
724+
# We do not want to be stuck blocking in poll if we are missing some positions
725+
# since the offset lookup may be backing off after a failure
726+
poll_timeout_ms = inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000)
727+
if not has_all_fetch_positions:
728+
poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms'])
729+
730+
self._client.poll(timeout_ms=poll_timeout_ms)
741731
# after the long poll, we should check whether the group needs to rebalance
742732
# prior to returning data so that the group can stabilize faster
743733
if self._coordinator.need_rejoin():
@@ -758,12 +748,18 @@ def position(self, partition, timeout_ms=None):
758748
if not isinstance(partition, TopicPartition):
759749
raise TypeError('partition must be a TopicPartition namedtuple')
760750
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
751+
752+
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout retrieving partition position')
761753
position = self._subscription.assignment[partition].position
762-
if position is None:
763-
# batch update fetch positions for any partitions without a valid position
764-
self._update_fetch_positions(self._subscription.assigned_partitions(), timeout_ms=timeout_ms)
765-
position = self._subscription.assignment[partition].position
766-
return position.offset if position else None
754+
try:
755+
while position is None:
756+
# batch update fetch positions for any partitions without a valid position
757+
self._update_fetch_positions(timeout_ms=inner_timeout_ms())
758+
position = self._subscription.assignment[partition].position
759+
except KafkaTimeoutError:
760+
return None
761+
else:
762+
return position.offset
767763

768764
def highwater(self, partition):
769765
"""Last known highwater offset for a partition.
@@ -1056,7 +1052,7 @@ def offsets_for_times(self, timestamps):
10561052
raise ValueError(
10571053
"The target time for partition {} is {}. The target time "
10581054
"cannot be negative.".format(tp, ts))
1059-
return self._fetcher.get_offsets_by_times(
1055+
return self._fetcher.offsets_by_times(
10601056
timestamps, self.config['request_timeout_ms'])
10611057

10621058
def beginning_offsets(self, partitions):
@@ -1122,7 +1118,7 @@ def _use_consumer_group(self):
11221118
return False
11231119
return True
11241120

1125-
def _update_fetch_positions(self, partitions, timeout_ms=None):
1121+
def _update_fetch_positions(self, timeout_ms=None):
11261122
"""Set the fetch position to the committed position (if there is one)
11271123
or reset it using the offset reset policy the user has configured.
11281124
@@ -1136,29 +1132,30 @@ def _update_fetch_positions(self, partitions, timeout_ms=None):
11361132
NoOffsetForPartitionError: If no offset is stored for a given
11371133
partition and no offset reset policy is defined.
11381134
"""
1139-
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
1140-
try:
1141-
# Lookup any positions for partitions which are awaiting reset (which may be the
1142-
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
1143-
# this check first to avoid an unnecessary lookup of committed offsets (which
1144-
# typically occurs when the user is manually assigning partitions and managing
1145-
# their own offsets).
1146-
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms())
1147-
1148-
if not self._subscription.has_all_fetch_positions(partitions):
1149-
# if we still don't have offsets for the given partitions, then we should either
1150-
# seek to the last committed position or reset using the auto reset policy
1151-
if (self.config['api_version'] >= (0, 8, 1) and
1152-
self.config['group_id'] is not None):
1153-
# first refresh commits for all assigned partitions
1154-
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=inner_timeout_ms())
1155-
1156-
# Then, do any offset lookups in case some positions are not known
1157-
self._fetcher.update_fetch_positions(partitions, timeout_ms=inner_timeout_ms())
1135+
if self._subscription.has_all_fetch_positions():
11581136
return True
11591137

1160-
except KafkaTimeoutError:
1161-
return False
1138+
if (self.config['api_version'] >= (0, 8, 1) and
1139+
self.config['group_id'] is not None):
1140+
try:
1141+
# If there are any partitions which do not have a valid position and are not
1142+
# awaiting reset, then we need to fetch committed offsets. We will only do a
1143+
# coordinator lookup if there are partitions which have missing positions, so
1144+
# a consumer with manually assigned partitions can avoid a coordinator dependence
1145+
# by always ensuring that assigned partitions have an initial position.
1146+
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms)
1147+
except KafkaTimeoutError:
1148+
pass
1149+
1150+
# If there are partitions still needing a position and a reset policy is defined,
1151+
# request reset using the default policy. If no reset strategy is defined and there
1152+
# are partitions with a missing position, then we will raise an exception.
1153+
self._subscription.reset_missing_positions()
1154+
1155+
# Finally send an asynchronous request to lookup and update the positions of any
1156+
# partitions which are awaiting reset.
1157+
self._fetcher.reset_offsets_if_needed()
1158+
return False
11621159

11631160
def _message_generator_v2(self):
11641161
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())

0 commit comments

Comments
 (0)