Skip to content

Commit 06a4172

Browse files
committed
Dont long poll in consumer if waiting to send offset resets
1 parent 8349bc7 commit 06a4172

File tree

1 file changed

+23
-19
lines changed

1 file changed

+23
-19
lines changed

kafka/consumer/group.py

+23-19
Original file line numberDiff line numberDiff line change
@@ -734,7 +734,13 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
734734
if records:
735735
return records
736736

737-
self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000))
737+
# We do not want to be stuck blocking in poll if we are missing some positions
738+
# since the offset lookup may be backing off after a failure
739+
poll_timeout_ms = inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000)
740+
if not has_all_fetch_positions:
741+
poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms'])
742+
743+
self._client.poll(timeout_ms=poll_timeout_ms)
738744
# after the long poll, we should check whether the group needs to rebalance
739745
# prior to returning data so that the group can stabilize faster
740746
if self._coordinator.need_rejoin():
@@ -1142,29 +1148,27 @@ def _update_fetch_positions(self, timeout_ms=None):
11421148
if self._subscription.has_all_fetch_positions():
11431149
return True
11441150

1145-
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
1146-
try:
1147-
if (self.config['api_version'] >= (0, 8, 1) and
1148-
self.config['group_id'] is not None):
1151+
if (self.config['api_version'] >= (0, 8, 1) and
1152+
self.config['group_id'] is not None):
1153+
try:
11491154
# If there are any partitions which do not have a valid position and are not
11501155
# awaiting reset, then we need to fetch committed offsets. We will only do a
11511156
# coordinator lookup if there are partitions which have missing positions, so
11521157
# a consumer with manually assigned partitions can avoid a coordinator dependence
11531158
# by always ensuring that assigned partitions have an initial position.
1154-
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=inner_timeout_ms())
1155-
1156-
# If there are partitions still needing a position and a reset policy is defined,
1157-
# request reset using the default policy. If no reset strategy is defined and there
1158-
# are partitions with a missing position, then we will raise an exception.
1159-
self._subscription.reset_missing_positions()
1160-
1161-
# Finally send an asynchronous request to lookup and update the positions of any
1162-
# partitions which are awaiting reset.
1163-
self._fetcher.reset_offsets_if_needed()
1164-
return True
1165-
1166-
except KafkaTimeoutError:
1167-
return False
1159+
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms)
1160+
except KafkaTimeoutError:
1161+
pass
1162+
1163+
# If there are partitions still needing a position and a reset policy is defined,
1164+
# request reset using the default policy. If no reset strategy is defined and there
1165+
# are partitions with a missing position, then we will raise an exception.
1166+
self._subscription.reset_missing_positions()
1167+
1168+
# Finally send an asynchronous request to lookup and update the positions of any
1169+
# partitions which are awaiting reset.
1170+
self._fetcher.reset_offsets_if_needed()
1171+
return False
11681172

11691173
def _message_generator_v2(self):
11701174
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())

0 commit comments

Comments
 (0)