Skip to content

KAFKA-6397: Consumer should not block setting positions of unavailable partitions #2593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 23, 2025
4 changes: 4 additions & 0 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,10 @@ def request_update(self):
self._future = Future()
return self._future

@property
def need_update(self):
return self._need_update

def topics(self, exclude_internal_topics=True):
"""Get set of known topics.

Expand Down
300 changes: 149 additions & 151 deletions kafka/consumer/fetcher.py

Large diffs are not rendered by default.

103 changes: 50 additions & 53 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,8 @@ def committed(self, partition, metadata=False, timeout_ms=None):
This offset will be used as the position for the consumer
in the event of a failure.

This call may block to do a remote call if the partition in question
isn't assigned to this consumer or if the consumer hasn't yet
initialized its cache of committed offsets.
This call will block to do a remote call to get the latest committed
offsets from the server.

Arguments:
partition (TopicPartition): The partition to check.
Expand All @@ -586,28 +585,16 @@ def committed(self, partition, metadata=False, timeout_ms=None):

Raises:
KafkaTimeoutError if timeout_ms provided
BrokerResponseErrors if OffsetFetchRequest raises an error.
"""
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
assert self.config['group_id'] is not None, 'Requires group_id'
if not isinstance(partition, TopicPartition):
raise TypeError('partition must be a TopicPartition namedtuple')
if self._subscription.is_assigned(partition):
committed = self._subscription.assignment[partition].committed
if committed is None:
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms)
committed = self._subscription.assignment[partition].committed
else:
commit_map = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms)
if partition in commit_map:
committed = commit_map[partition]
else:
committed = None

if committed is not None:
if metadata:
return committed
else:
return committed.offset
committed = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms)
if partition not in committed:
return None
return committed[partition] if metadata else committed[partition].offset

def _fetch_all_topic_metadata(self):
"""A blocking call that fetches topic metadata for all topics in the
Expand Down Expand Up @@ -717,10 +704,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
if not self._coordinator.poll(timeout_ms=inner_timeout_ms()):
return {}

# Fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions(), timeout_ms=inner_timeout_ms())
has_all_fetch_positions = self._update_fetch_positions(timeout_ms=inner_timeout_ms())

# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
Expand All @@ -737,7 +721,13 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
if records:
return records

self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000))
# We do not want to be stuck blocking in poll if we are missing some positions
# since the offset lookup may be backing off after a failure
poll_timeout_ms = inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000)
if not has_all_fetch_positions:
poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms'])

self._client.poll(timeout_ms=poll_timeout_ms)
# after the long poll, we should check whether the group needs to rebalance
# prior to returning data so that the group can stabilize faster
if self._coordinator.need_rejoin():
Expand All @@ -758,12 +748,18 @@ def position(self, partition, timeout_ms=None):
if not isinstance(partition, TopicPartition):
raise TypeError('partition must be a TopicPartition namedtuple')
assert self._subscription.is_assigned(partition), 'Partition is not assigned'

inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout retrieving partition position')
position = self._subscription.assignment[partition].position
if position is None:
# batch update fetch positions for any partitions without a valid position
self._update_fetch_positions(self._subscription.assigned_partitions(), timeout_ms=timeout_ms)
position = self._subscription.assignment[partition].position
return position.offset if position else None
try:
while position is None:
# batch update fetch positions for any partitions without a valid position
self._update_fetch_positions(timeout_ms=inner_timeout_ms())
position = self._subscription.assignment[partition].position
except KafkaTimeoutError:
return None
else:
return position.offset

def highwater(self, partition):
"""Last known highwater offset for a partition.
Expand Down Expand Up @@ -1056,7 +1052,7 @@ def offsets_for_times(self, timestamps):
raise ValueError(
"The target time for partition {} is {}. The target time "
"cannot be negative.".format(tp, ts))
return self._fetcher.get_offsets_by_times(
return self._fetcher.offsets_by_times(
timestamps, self.config['request_timeout_ms'])

def beginning_offsets(self, partitions):
Expand Down Expand Up @@ -1122,7 +1118,7 @@ def _use_consumer_group(self):
return False
return True

def _update_fetch_positions(self, partitions, timeout_ms=None):
def _update_fetch_positions(self, timeout_ms=None):
"""Set the fetch position to the committed position (if there is one)
or reset it using the offset reset policy the user has configured.

Expand All @@ -1136,29 +1132,30 @@ def _update_fetch_positions(self, partitions, timeout_ms=None):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
try:
# Lookup any positions for partitions which are awaiting reset (which may be the
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
# this check first to avoid an unnecessary lookup of committed offsets (which
# typically occurs when the user is manually assigning partitions and managing
# their own offsets).
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms())

if not self._subscription.has_all_fetch_positions(partitions):
# if we still don't have offsets for the given partitions, then we should either
# seek to the last committed position or reset using the auto reset policy
if (self.config['api_version'] >= (0, 8, 1) and
self.config['group_id'] is not None):
# first refresh commits for all assigned partitions
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=inner_timeout_ms())

# Then, do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions, timeout_ms=inner_timeout_ms())
if self._subscription.has_all_fetch_positions():
return True

except KafkaTimeoutError:
return False
if (self.config['api_version'] >= (0, 8, 1) and
self.config['group_id'] is not None):
try:
# If there are any partitions which do not have a valid position and are not
# awaiting reset, then we need to fetch committed offsets. We will only do a
# coordinator lookup if there are partitions which have missing positions, so
# a consumer with manually assigned partitions can avoid a coordinator dependence
# by always ensuring that assigned partitions have an initial position.
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms)
except KafkaTimeoutError:
pass

# If there are partitions still needing a position and a reset policy is defined,
# request reset using the default policy. If no reset strategy is defined and there
# are partitions with a missing position, then we will raise an exception.
self._subscription.reset_missing_positions()

# Finally send an asynchronous request to lookup and update the positions of any
# partitions which are awaiting reset.
self._fetcher.reset_offsets_if_needed()
return False

def _message_generator_v2(self):
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())
Expand Down
Loading
Loading