Skip to content

Commit 5b28fec

Browse files
committed
KAFKA-6397: Consumer should not block setting positions of unavailable partitions
1 parent 39aa421 commit 5b28fec

File tree

7 files changed

+335
-291
lines changed

7 files changed

+335
-291
lines changed

kafka/cluster.py

+4
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ def request_update(self):
202202
self._future = Future()
203203
return self._future
204204

205+
@property
206+
def need_update(self):
207+
return self._need_update
208+
205209
def topics(self, exclude_internal_topics=True):
206210
"""Get set of known topics.
207211

kafka/consumer/fetcher.py

+140-138
Large diffs are not rendered by default.

kafka/consumer/group.py

+33-27
Original file line numberDiff line numberDiff line change
@@ -717,10 +717,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True):
717717
if not self._coordinator.poll(timeout_ms=inner_timeout_ms()):
718718
return {}
719719

720-
# Fetch positions if we have partitions we're subscribed to that we
721-
# don't know the offset for
722-
if not self._subscription.has_all_fetch_positions():
723-
self._update_fetch_positions(self._subscription.missing_fetch_positions(), timeout_ms=inner_timeout_ms())
720+
has_all_fetch_positions = self._update_fetch_positions(timeout_ms=inner_timeout_ms())
724721

725722
# If data is available already, e.g. from a previous network client
726723
# poll() call to commit, then just return it immediately
@@ -758,12 +755,18 @@ def position(self, partition, timeout_ms=None):
758755
if not isinstance(partition, TopicPartition):
759756
raise TypeError('partition must be a TopicPartition namedtuple')
760757
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
758+
759+
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout retrieving partition position')
761760
position = self._subscription.assignment[partition].position
762-
if position is None:
763-
# batch update fetch positions for any partitions without a valid position
764-
self._update_fetch_positions(self._subscription.assigned_partitions(), timeout_ms=timeout_ms)
765-
position = self._subscription.assignment[partition].position
766-
return position.offset if position else None
761+
try:
762+
while position is None:
763+
# batch update fetch positions for any partitions without a valid position
764+
self._update_fetch_positions(timeout_ms=inner_timeout_ms())
765+
position = self._subscription.assignment[partition].position
766+
except KafkaTimeoutError:
767+
return None
768+
else:
769+
return position.offset
767770

768771
def highwater(self, partition):
769772
"""Last known highwater offset for a partition.
@@ -1122,7 +1125,7 @@ def _use_consumer_group(self):
11221125
return False
11231126
return True
11241127

1125-
def _update_fetch_positions(self, partitions, timeout_ms=None):
1128+
def _update_fetch_positions(self, timeout_ms=None):
11261129
"""Set the fetch position to the committed position (if there is one)
11271130
or reset it using the offset reset policy the user has configured.
11281131
@@ -1136,25 +1139,28 @@ def _update_fetch_positions(self, partitions, timeout_ms=None):
11361139
NoOffsetForPartitionError: If no offset is stored for a given
11371140
partition and no offset reset policy is defined.
11381141
"""
1142+
if self._subscription.has_all_fetch_positions():
1143+
return True
1144+
11391145
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions')
11401146
try:
1141-
# Lookup any positions for partitions which are awaiting reset (which may be the
1142-
# case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
1143-
# this check first to avoid an unnecessary lookup of committed offsets (which
1144-
# typically occurs when the user is manually assigning partitions and managing
1145-
# their own offsets).
1146-
self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms())
1147-
1148-
if not self._subscription.has_all_fetch_positions(partitions):
1149-
# if we still don't have offsets for the given partitions, then we should either
1150-
# seek to the last committed position or reset using the auto reset policy
1151-
if (self.config['api_version'] >= (0, 8, 1) and
1152-
self.config['group_id'] is not None):
1153-
# first refresh commits for all assigned partitions
1154-
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=inner_timeout_ms())
1155-
1156-
# Then, do any offset lookups in case some positions are not known
1157-
self._fetcher.update_fetch_positions(partitions, timeout_ms=inner_timeout_ms())
1147+
if (self.config['api_version'] >= (0, 8, 1) and
1148+
self.config['group_id'] is not None):
1149+
# If there are any partitions which do not have a valid position and are not
1150+
# awaiting reset, then we need to fetch committed offsets. We will only do a
1151+
# coordinator lookup if there are partitions which have missing positions, so
1152+
# a consumer with manually assigned partitions can avoid a coordinator dependence
1153+
# by always ensuring that assigned partitions have an initial position.
1154+
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=inner_timeout_ms())
1155+
1156+
# If there are partitions still needing a position and a reset policy is defined,
1157+
# request reset using the default policy. If no reset strategy is defined and there
1158+
# are partitions with a missing position, then we will raise an exception.
1159+
self._subscription.reset_missing_positions()
1160+
1161+
# Finally send an asynchronous request to lookup and update the positions of any
1162+
# partitions which are awaiting reset.
1163+
self._fetcher.reset_offsets_if_needed(timeout_ms=inner_timeout_ms())
11581164
return True
11591165

11601166
except KafkaTimeoutError:

kafka/consumer/subscription_state.py

+73-37
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
import logging
1616
import random
1717
import re
18+
import time
1819

1920
from kafka.vendor import six
2021

21-
from kafka.errors import IllegalStateError
22+
import kafka.errors as Errors
2223
from kafka.protocol.list_offsets import OffsetResetStrategy
2324
from kafka.structs import OffsetAndMetadata
2425
from kafka.util import ensure_valid_topic_name
@@ -52,10 +53,6 @@ class SubscriptionState(object):
5253
Note that pause state as well as fetch/consumed positions are not preserved
5354
when partition assignment is changed whether directly by the user or
5455
through a group rebalance.
55-
56-
This class also maintains a cache of the latest commit position for each of
57-
the assigned partitions. This is updated through committed() and can be used
58-
to set the initial fetch position (e.g. Fetcher._reset_offset() ).
5956
"""
6057
_SUBSCRIPTION_EXCEPTION_MESSAGE = (
6158
"You must choose only one way to configure your consumer:"
@@ -85,18 +82,16 @@ def __init__(self, offset_reset_strategy='earliest'):
8582
self._group_subscription = set()
8683
self._user_assignment = set()
8784
self.assignment = OrderedDict()
88-
self.listener = None
89-
90-
# initialize to true for the consumers to fetch offset upon starting up
91-
self.needs_fetch_committed_offsets = True
85+
self.rebalance_listener = None
86+
self.listeners = []
9287

9388
def _set_subscription_type(self, subscription_type):
9489
if not isinstance(subscription_type, SubscriptionType):
9590
raise ValueError('SubscriptionType enum required')
9691
if self.subscription_type == SubscriptionType.NONE:
9792
self.subscription_type = subscription_type
9893
elif self.subscription_type != subscription_type:
99-
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
94+
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
10095

10196
def subscribe(self, topics=(), pattern=None, listener=None):
10297
"""Subscribe to a list of topics, or a topic regex pattern.
@@ -135,7 +130,7 @@ def subscribe(self, topics=(), pattern=None, listener=None):
135130
"""
136131
assert topics or pattern, 'Must provide topics or pattern'
137132
if (topics and pattern):
138-
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
133+
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
139134

140135
elif pattern:
141136
self._set_subscription_type(SubscriptionType.AUTO_PATTERN)
@@ -150,7 +145,7 @@ def subscribe(self, topics=(), pattern=None, listener=None):
150145

151146
if listener and not isinstance(listener, ConsumerRebalanceListener):
152147
raise TypeError('listener must be a ConsumerRebalanceListener')
153-
self.listener = listener
148+
self.rebalance_listener = listener
154149

155150
def change_subscription(self, topics):
156151
"""Change the topic subscription.
@@ -166,7 +161,7 @@ def change_subscription(self, topics):
166161
- a topic name does not consist of ASCII-characters/'-'/'_'/'.'
167162
"""
168163
if not self.partitions_auto_assigned():
169-
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
164+
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
170165

171166
if isinstance(topics, six.string_types):
172167
topics = [topics]
@@ -193,13 +188,13 @@ def group_subscribe(self, topics):
193188
topics (list of str): topics to add to the group subscription
194189
"""
195190
if not self.partitions_auto_assigned():
196-
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
191+
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
197192
self._group_subscription.update(topics)
198193

199194
def reset_group_subscription(self):
200195
"""Reset the group's subscription to only contain topics subscribed by this consumer."""
201196
if not self.partitions_auto_assigned():
202-
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
197+
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
203198
assert self.subscription is not None, 'Subscription required'
204199
self._group_subscription.intersection_update(self.subscription)
205200

@@ -226,7 +221,6 @@ def assign_from_user(self, partitions):
226221
self._user_assignment = set(partitions)
227222
self._set_assignment({partition: self.assignment.get(partition, TopicPartitionState())
228223
for partition in partitions})
229-
self.needs_fetch_committed_offsets = True
230224

231225
def assign_from_subscribed(self, assignments):
232226
"""Update the assignment to the specified partitions
@@ -241,16 +235,14 @@ def assign_from_subscribed(self, assignments):
241235
consumer instance.
242236
"""
243237
if not self.partitions_auto_assigned():
244-
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
238+
raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
245239

246240
for tp in assignments:
247241
if tp.topic not in self.subscription:
248242
raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,))
249243

250-
# after rebalancing, we always reinitialize the assignment value
251244
# randomized ordering should improve balance for short-lived consumers
252245
self._set_assignment({partition: TopicPartitionState() for partition in assignments}, randomize=True)
253-
self.needs_fetch_committed_offsets = True
254246
log.info("Updated partition assignment: %s", assignments)
255247

256248
def _set_assignment(self, partition_states, randomize=False):
@@ -300,8 +292,10 @@ def seek(self, partition, offset):
300292
301293
Arguments:
302294
partition (TopicPartition): partition for seek operation
303-
offset (int): message offset in partition
295+
offset (int or OffsetAndMetadata): message offset in partition
304296
"""
297+
if not isinstance(offset, (int, OffsetAndMetadata)):
298+
raise TypeError("offset must be type in or OffsetAndMetadata")
305299
self.assignment[partition].seek(offset)
306300

307301
def assigned_partitions(self):
@@ -333,7 +327,7 @@ def all_consumed_offsets(self):
333327
all_consumed[partition] = state.position
334328
return all_consumed
335329

336-
def need_offset_reset(self, partition, offset_reset_strategy=None):
330+
def request_offset_reset(self, partition, offset_reset_strategy=None):
337331
"""Mark partition for offset reset using specified or default strategy.
338332
339333
Arguments:
@@ -342,7 +336,11 @@ def need_offset_reset(self, partition, offset_reset_strategy=None):
342336
"""
343337
if offset_reset_strategy is None:
344338
offset_reset_strategy = self._default_offset_reset_strategy
345-
self.assignment[partition].await_reset(offset_reset_strategy)
339+
self.assignment[partition].reset(offset_reset_strategy)
340+
341+
def set_reset_pending(self, partitions, next_allowed_reset_time):
342+
for partition in partitions:
343+
self.assignment[partition].set_reset_pending(next_allowed_reset_time)
346344

347345
def has_default_offset_reset_policy(self):
348346
"""Return True if default offset reset policy is Earliest or Latest"""
@@ -351,24 +349,41 @@ def has_default_offset_reset_policy(self):
351349
def is_offset_reset_needed(self, partition):
352350
return self.assignment[partition].awaiting_reset
353351

354-
def has_all_fetch_positions(self, partitions=None):
355-
if partitions is None:
356-
partitions = self.assigned_partitions()
357-
for tp in partitions:
358-
if not self.has_valid_position(tp):
352+
def has_all_fetch_positions(self):
353+
for state in six.itervalues(self.assignment):
354+
if not state.has_valid_position:
359355
return False
360356
return True
361357

362358
def missing_fetch_positions(self):
363359
missing = set()
364360
for partition, state in six.iteritems(self.assignment):
365-
if not state.has_valid_position:
361+
if state.is_missing_position():
366362
missing.add(partition)
367363
return missing
368364

369365
def has_valid_position(self, partition):
370366
return partition in self.assignment and self.assignment[partition].has_valid_position
371367

368+
def reset_missing_positions(self):
369+
partitions_with_no_offsets = set()
370+
for tp, state in six.iteritems(self.assignment):
371+
if state.is_missing_position():
372+
if self._default_offset_reset_strategy == OffsetResetStrategy.NONE:
373+
partitions_with_no_offsets.add(tp)
374+
else:
375+
state.reset(self._default_offset_reset_strategy)
376+
377+
if partitions_with_no_offsets:
378+
raise Errors.NoOffsetForPartitionError(partitions_with_no_offsets)
379+
380+
def partitions_needing_reset(self):
381+
partitions = set()
382+
for tp, state in six.iteritems(self.assignment):
383+
if state.awaiting_reset and state.is_reset_allowed():
384+
partitions.add(tp)
385+
return partitions
386+
372387
def is_assigned(self, partition):
373388
return partition in self.assignment
374389

@@ -384,6 +399,10 @@ def pause(self, partition):
384399
def resume(self, partition):
385400
self.assignment[partition].resume()
386401

402+
def reset_failed(self, partitions, next_retry_time):
403+
for partition in partitions:
404+
self.assignment[partition].reset_failed(next_retry_time)
405+
387406
def move_partition_to_end(self, partition):
388407
if partition in self.assignment:
389408
try:
@@ -398,14 +417,12 @@ def position(self, partition):
398417

399418
class TopicPartitionState(object):
400419
def __init__(self):
401-
self.committed = None # last committed OffsetAndMetadata
402-
self.has_valid_position = False # whether we have valid position
403420
self.paused = False # whether this partition has been paused by the user
404-
self.awaiting_reset = False # whether we are awaiting reset
405421
self.reset_strategy = None # the reset strategy if awaiting_reset is set
406422
self._position = None # OffsetAndMetadata exposed to the user
407423
self.highwater = None
408424
self.drop_pending_record_batch = False
425+
self.next_allowed_retry_time = None
409426

410427
def _set_position(self, offset):
411428
assert self.has_valid_position, 'Valid position required'
@@ -417,18 +434,37 @@ def _get_position(self):
417434

418435
position = property(_get_position, _set_position, None, "last position")
419436

420-
def await_reset(self, strategy):
421-
self.awaiting_reset = True
437+
def reset(self, strategy):
438+
assert strategy is not None
422439
self.reset_strategy = strategy
423440
self._position = None
424-
self.has_valid_position = False
441+
self.next_allowed_retry_time = None
442+
443+
def is_reset_allowed(self):
444+
return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.time()
445+
446+
@property
447+
def awaiting_reset(self):
448+
return self.reset_strategy is not None
449+
450+
def set_reset_pending(self, next_allowed_retry_time):
451+
self.next_allowed_retry_time = next_allowed_retry_time
452+
453+
def reset_failed(self, next_allowed_retry_time):
454+
self.next_allowed_retry_time = next_allowed_retry_time
455+
456+
@property
457+
def has_valid_position(self):
458+
return self._position is not None
459+
460+
def is_missing_position(self):
461+
return not self.has_valid_position and not self.awaiting_reset
425462

426463
def seek(self, offset):
427-
self._position = OffsetAndMetadata(offset, '', -1)
428-
self.awaiting_reset = False
464+
self._position = offset if isinstance(offset, OffsetAndMetadata) else OffsetAndMetadata(offset, '', -1)
429465
self.reset_strategy = None
430-
self.has_valid_position = True
431466
self.drop_pending_record_batch = True
467+
self.next_allowed_retry_time = None
432468

433469
def pause(self):
434470
self.paused = True

0 commit comments

Comments
 (0)