diff --git a/kafka/cluster.py b/kafka/cluster.py index c92d1d05b..ae822a401 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -202,6 +202,10 @@ def request_update(self): self._future = Future() return self._future + @property + def need_update(self): + return self._need_update + def topics(self, exclude_internal_topics=True): """Get set of known topics. diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 14dc8a30d..ceca1d9b6 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -67,6 +67,7 @@ class Fetcher(six.Iterator): 'check_crcs': True, 'metrics': None, 'metric_group_prefix': 'consumer', + 'request_timeout_ms': 30000, 'retry_backoff_ms': 100, 'enable_incremental_fetch_sessions': True, 'isolation_level': 'read_uncommitted', @@ -135,6 +136,7 @@ def __init__(self, client, subscriptions, **configs): self._isolation_level = ISOLATION_LEVEL_CONFIG[self.config['isolation_level']] self._session_handlers = {} self._nodes_with_pending_fetch_requests = set() + self._cached_list_offsets_exception = None self._next_in_line_exception_metadata = None def send_fetches(self): @@ -156,24 +158,6 @@ def send_fetches(self): self._clean_done_fetch_futures() return futures - def reset_offsets_if_needed(self, partitions, timeout_ms=None): - """Lookup and set offsets for any partitions which are awaiting an - explicit reset. - - Arguments: - partitions (set of TopicPartitions): the partitions to reset - - Raises: - KafkaTimeoutError if timeout_ms provided - """ - needs_offset_reset = set() - for tp in partitions: - if self._subscriptions.is_assigned(tp) and self._subscriptions.is_offset_reset_needed(tp): - needs_offset_reset.add(tp) - - if needs_offset_reset: - self._reset_offsets(needs_offset_reset, timeout_ms=timeout_ms) - def _clean_done_fetch_futures(self): while True: if not self._fetch_futures: @@ -187,62 +171,7 @@ def in_flight_fetches(self): self._clean_done_fetch_futures() return bool(self._fetch_futures) - def update_fetch_positions(self, partitions, timeout_ms=None): - """Update the fetch positions for the provided partitions. - - Arguments: - partitions (list of TopicPartitions): partitions to update - - Raises: - NoOffsetForPartitionError: if no offset is stored for a given - partition and no reset policy is available - KafkaTimeoutError if timeout_ms provided. - """ - needs_offset_reset = set() - # reset the fetch position to the committed position - for tp in partitions: - if not self._subscriptions.is_assigned(tp) or self._subscriptions.has_valid_position(tp): - continue - - if self._subscriptions.is_offset_reset_needed(tp): - needs_offset_reset.add(tp) - elif self._subscriptions.assignment[tp].committed is None: - # there's no committed position, so we need to reset with the - # default strategy - self._subscriptions.need_offset_reset(tp) - needs_offset_reset.add(tp) - else: - committed = self._subscriptions.assignment[tp].committed.offset - log.debug("Resetting offset for partition %s to the committed" - " offset %s", tp, committed) - self._subscriptions.seek(tp, committed) - - if needs_offset_reset: - self._reset_offsets(needs_offset_reset, timeout_ms=timeout_ms) - - def get_offsets_by_times(self, timestamps, timeout_ms): - offsets = self._retrieve_offsets(timestamps, timeout_ms) - for tp in timestamps: - if tp not in offsets: - offsets[tp] = None - return offsets - - def beginning_offsets(self, partitions, timeout_ms): - return self.beginning_or_end_offset( - partitions, OffsetResetStrategy.EARLIEST, timeout_ms) - - def end_offsets(self, partitions, timeout_ms): - return self.beginning_or_end_offset( - partitions, OffsetResetStrategy.LATEST, timeout_ms) - - def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): - timestamps = dict([(tp, timestamp) for tp in partitions]) - offsets = self._retrieve_offsets(timestamps, timeout_ms) - for tp in timestamps: - offsets[tp] = offsets[tp].offset - return offsets - - def _reset_offsets(self, partitions, timeout_ms=None): + def reset_offsets_if_needed(self): """Reset offsets for the given partitions using the offset reset strategy. Arguments: @@ -252,28 +181,24 @@ def _reset_offsets(self, partitions, timeout_ms=None): NoOffsetForPartitionError: if no offset reset strategy is defined KafkaTimeoutError if timeout_ms provided """ + # Raise exception from previous offset fetch if there is one + exc, self._cached_list_offsets_exception = self._cached_list_offsets_exception, None + if exc: + raise exc + + partitions = self._subscriptions.partitions_needing_reset() + if not partitions: + return + offset_resets = dict() for tp in partitions: ts = self._subscriptions.assignment[tp].reset_strategy - if not ts: - raise NoOffsetForPartitionError(tp) - offset_resets[tp] = ts - - offsets = self._retrieve_offsets(offset_resets, timeout_ms=timeout_ms) + if ts: + offset_resets[tp] = ts - for partition in partitions: - if partition not in offsets: - raise NoOffsetForPartitionError(partition) + self._reset_offsets_async(offset_resets) - # we might lose the assignment while fetching the offset, - # so check it is still active - if self._subscriptions.is_assigned(partition): - offset = offsets[partition].offset - log.debug("Resetting offset for partition %s to offset %s.", - partition, offset) - self._subscriptions.seek(partition, offset) - - def _retrieve_offsets(self, timestamps, timeout_ms=None): + def offsets_by_times(self, timestamps, timeout_ms=None): """Fetch offset for each partition passed in ``timestamps`` map. Blocks until offsets are obtained, a non-retriable exception is raised @@ -283,6 +208,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=None): timestamps: {TopicPartition: int} dict with timestamps to fetch offsets by. -1 for the latest available, -2 for the earliest available. Otherwise timestamp is treated as epoch milliseconds. + timeout_ms (int, optional): The maximum time in milliseconds to block. Returns: {TopicPartition: OffsetAndTimestamp}: Mapping of partition to @@ -293,11 +219,19 @@ def _retrieve_offsets(self, timestamps, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms provided """ + offsets = self._fetch_offsets_by_times(timestamps, timeout_ms) + for tp in timestamps: + if tp not in offsets: + offsets[tp] = None + return offsets + + def _fetch_offsets_by_times(self, timestamps, timeout_ms=None): if not timestamps: return {} inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout fetching offsets') timestamps = copy.copy(timestamps) + fetched_offsets = dict() while True: if not timestamps: return {} @@ -310,31 +244,42 @@ def _retrieve_offsets(self, timestamps, timeout_ms=None): break if future.succeeded(): - return future.value - if not future.retriable(): + fetched_offsets.update(future.value[0]) + if not future.value[1]: + return fetched_offsets + + timestamps = {tp: timestamps[tp] for tp in future.value[1]} + + elif not future.retriable(): raise future.exception # pylint: disable-msg=raising-bad-type - if future.exception.invalid_metadata: + if future.exception.invalid_metadata or self._client.cluster.need_update: refresh_future = self._client.cluster.request_update() self._client.poll(future=refresh_future, timeout_ms=inner_timeout_ms()) if not future.is_done: break - - # Issue #1780 - # Recheck partition existence after after a successful metadata refresh - if refresh_future.succeeded() and isinstance(future.exception, Errors.StaleMetadata): - log.debug("Stale metadata was raised, and we now have an updated metadata. Rechecking partition existence") - unknown_partition = future.exception.args[0] # TopicPartition from StaleMetadata - if self._client.cluster.leader_for_partition(unknown_partition) is None: - log.debug("Removed partition %s from offsets retrieval" % (unknown_partition, )) - timestamps.pop(unknown_partition) else: time.sleep(inner_timeout_ms(self.config['retry_backoff_ms']) / 1000) raise Errors.KafkaTimeoutError( "Failed to get offsets by timestamps in %s ms" % (timeout_ms,)) + def beginning_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.EARLIEST, timeout_ms) + + def end_offsets(self, partitions, timeout_ms): + return self.beginning_or_end_offset( + partitions, OffsetResetStrategy.LATEST, timeout_ms) + + def beginning_or_end_offset(self, partitions, timestamp, timeout_ms): + timestamps = dict([(tp, timestamp) for tp in partitions]) + offsets = self._fetch_offsets_by_times(timestamps, timeout_ms) + for tp in timestamps: + offsets[tp] = offsets[tp].offset + return offsets + def fetched_records(self, max_records=None, update_offsets=True): """Returns previously fetched records and updates consumed offsets. @@ -449,6 +394,53 @@ def _append(self, drained, part, max_records, update_offsets): part.drain() return 0 + def _reset_offset_if_needed(self, partition, timestamp, offset): + # we might lose the assignment while fetching the offset, or the user might seek to a different offset, + # so verify it is still assigned and still in need of the requested reset + if not self._subscriptions.is_assigned(partition): + log.debug("Skipping reset of partition %s since it is no longer assigned", partition) + elif not self._subscriptions.is_offset_reset_needed(partition): + log.debug("Skipping reset of partition %s since reset is no longer needed", partition) + elif timestamp and not timestamp == self._subscriptions.assignment[partition].reset_strategy: + log.debug("Skipping reset of partition %s since an alternative reset has been requested", partition) + else: + log.info("Resetting offset for partition %s to offset %s.", partition, offset) + self._subscriptions.seek(partition, offset) + + def _reset_offsets_async(self, timestamps): + timestamps_by_node = self._group_list_offset_requests(timestamps) + + for node_id, timestamps_and_epochs in six.iteritems(timestamps_by_node): + if not self._client.ready(node_id): + continue + partitions = set(timestamps_and_epochs.keys()) + expire_at = time.time() + self.config['request_timeout_ms'] / 1000 + self._subscriptions.set_reset_pending(partitions, expire_at) + + def on_success(result): + fetched_offsets, partitions_to_retry = result + if partitions_to_retry: + self._subscriptions.reset_failed(partitions_to_retry, time.time() + self.config['retry_backoff_ms'] / 1000) + self._client.cluster.request_update() + + for partition, offset in six.iteritems(fetched_offsets): + ts, _epoch = timestamps_and_epochs[partition] + self._reset_offset_if_needed(partition, ts, offset.offset) + + def on_failure(error): + self._subscriptions.reset_failed(partitions, time.time() + self.config['retry_backoff_ms'] / 1000) + self._client.cluster.request_update() + + if not getattr(error, 'retriable', False): + if not self._cached_list_offsets_exception: + self._cached_list_offsets_exception = error + else: + log.error("Discarding error in ListOffsetResponse because another error is pending: %s", error) + + future = self._send_list_offsets_request(node_id, timestamps_and_epochs) + future.add_callback(on_success) + future.add_errback(on_failure) + def _send_list_offsets_requests(self, timestamps): """Fetch offsets for each partition in timestamps dict. This may send request to multiple nodes, based on who is Leader for partition. @@ -460,39 +452,22 @@ def _send_list_offsets_requests(self, timestamps): Returns: Future: resolves to a mapping of retrieved offsets """ - timestamps_by_node = collections.defaultdict(dict) - for partition, timestamp in six.iteritems(timestamps): - node_id = self._client.cluster.leader_for_partition(partition) - if node_id is None: - if partition.topic not in self._client.cluster.topics(): - log.warning("Could not lookup offsets for partition %s since no metadata is available for topic. " - "Wait for metadata refresh and try again", partition) - else: - log.warning("Could not lookup offsets for partition %s since no metadata is available for it. " - "Wait for metadata refresh and try again", partition) - self._client.add_topic(partition.topic) - return Future().failure(Errors.StaleMetadata(partition)) - elif node_id == -1: - log.debug("Leader for partition %s unavailable for fetching " - "offset, wait for metadata refresh", partition) - return Future().failure( - Errors.LeaderNotAvailableError(partition)) - else: - leader_epoch = -1 - timestamps_by_node[node_id][partition] = (timestamp, leader_epoch) + timestamps_by_node = self._group_list_offset_requests(timestamps) + if not timestamps_by_node: + return Future().failure(Errors.StaleMetadata()) - # Aggregate results until we have all + # Aggregate results until we have all responses list_offsets_future = Future() - responses = [] - node_count = len(timestamps_by_node) + fetched_offsets = dict() + partitions_to_retry = set() + remaining_responses = [len(timestamps_by_node)] # list for mutable / 2.7 hack - def on_success(value): - responses.append(value) - if len(responses) == node_count: - offsets = {} - for r in responses: - offsets.update(r) - list_offsets_future.success(offsets) + def on_success(remaining_responses, value): + remaining_responses[0] -= 1 # noqa: F823 + fetched_offsets.update(value[0]) + partitions_to_retry.update(value[1]) + if not remaining_responses[0] and not list_offsets_future.is_done: + list_offsets_future.success((fetched_offsets, partitions_to_retry)) def on_fail(err): if not list_offsets_future.is_done: @@ -500,12 +475,31 @@ def on_fail(err): for node_id, timestamps in six.iteritems(timestamps_by_node): _f = self._send_list_offsets_request(node_id, timestamps) - _f.add_callback(on_success) + _f.add_callback(on_success, remaining_responses) _f.add_errback(on_fail) return list_offsets_future + def _group_list_offset_requests(self, timestamps): + timestamps_by_node = collections.defaultdict(dict) + for partition, timestamp in six.iteritems(timestamps): + node_id = self._client.cluster.leader_for_partition(partition) + if node_id is None: + self._client.add_topic(partition.topic) + log.debug("Partition %s is unknown for fetching offset", partition) + self._client.cluster.request_update() + elif node_id == -1: + log.debug("Leader for partition %s unavailable for fetching " + "offset, wait for metadata refresh", partition) + self._client.cluster.request_update() + else: + leader_epoch = -1 + timestamps_by_node[node_id][partition] = (timestamp, leader_epoch) + return dict(timestamps_by_node) + def _send_list_offsets_request(self, node_id, timestamps_and_epochs): version = self._client.api_version(ListOffsetsRequest, max_version=4) + if self.config['isolation_level'] == 'read_committed' and version < 2: + raise Errors.UnsupportedVersionError('read_committed isolation level requires ListOffsetsRequest >= v2') by_topic = collections.defaultdict(list) for tp, (timestamp, leader_epoch) in six.iteritems(timestamps_and_epochs): if version >= 4: @@ -526,12 +520,12 @@ def _send_list_offsets_request(self, node_id, timestamps_and_epochs): self._isolation_level, list(six.iteritems(by_topic))) - # Client returns a future that only fails on network issues # so create a separate future and attach a callback to update it # based on response error codes future = Future() + log.debug("Sending ListOffsetRequest %s to broker %s", request, node_id) _f = self._client.send(node_id, request) _f.add_callback(self._handle_list_offsets_response, future) _f.add_errback(lambda e: future.failure(e)) @@ -547,7 +541,9 @@ def _handle_list_offsets_response(self, future, response): Raises: AssertionError: if response does not match partition """ - timestamp_offset_map = {} + fetched_offsets = dict() + partitions_to_retry = set() + unauthorized_topics = set() for topic, part_data in response.topics: for partition_info in part_data: partition, error_code = partition_info[:2] @@ -572,10 +568,11 @@ def _handle_list_offsets_response(self, future, response): "Fetched offset %s, timestamp %s, leader_epoch %s", partition, offset, timestamp, leader_epoch) if offset != UNKNOWN_OFFSET: - timestamp_offset_map[partition] = OffsetAndTimestamp(offset, timestamp, leader_epoch) + fetched_offsets[partition] = OffsetAndTimestamp(offset, timestamp, leader_epoch) elif error_type is Errors.UnsupportedForMessageFormatError: - # The message format on the broker side is before 0.10.0, - # we simply put None in the response. + # The message format on the broker side is before 0.10.0, which means it does not + # support timestamps. We treat this case the same as if we weren't able to find an + # offset corresponding to the requested timestamp and leave it out of the result. log.debug("Cannot search by timestamp for partition %s because the" " message format version is before 0.10.0", partition) elif error_type in (Errors.NotLeaderForPartitionError, @@ -583,22 +580,23 @@ def _handle_list_offsets_response(self, future, response): Errors.KafkaStorageError): log.debug("Attempt to fetch offsets for partition %s failed due" " to %s, retrying.", error_type.__name__, partition) - future.failure(error_type(partition)) - return + partitions_to_retry.add(partition) elif error_type is Errors.UnknownTopicOrPartitionError: log.warning("Received unknown topic or partition error in ListOffsets " "request for partition %s. The topic/partition " + "may not exist or the user may not have Describe access " "to it.", partition) - future.failure(error_type(partition)) - return + partitions_to_retry.add(partition) + elif error_type is Errors.TopicAuthorizationFailedError: + unauthorized_topics.add(topic) else: log.warning("Attempt to fetch offsets for partition %s failed due to:" " %s", partition, error_type.__name__) - future.failure(error_type(partition)) - return - if not future.is_done: - future.success(timestamp_offset_map) + partitions_to_retry.add(partition) + if unauthorized_topics: + future.failure(Errors.TopicAuthorizationFailedError(unauthorized_topics)) + else: + future.success((fetched_offsets, partitions_to_retry)) def _fetchable_partitions(self): fetchable = self._subscriptions.fetchable_partitions() @@ -636,17 +634,17 @@ def _create_fetch_requests(self): elif not self._client.connected(node_id) and self._client.connection_delay(node_id) > 0: # If we try to send during the reconnect backoff window, then the request is just # going to be failed anyway before being sent, so skip the send for now - log.log(0, "Skipping fetch for partition %s because node %s is awaiting reconnect backoff", + log.debug("Skipping fetch for partition %s because node %s is awaiting reconnect backoff", partition, node_id) elif self._client.throttle_delay(node_id) > 0: # If we try to send while throttled, then the request is just # going to be failed anyway before being sent, so skip the send for now - log.log(0, "Skipping fetch for partition %s because node %s is throttled", + log.debug("Skipping fetch for partition %s because node %s is throttled", partition, node_id) elif node_id in self._nodes_with_pending_fetch_requests: - log.log(0, "Skipping fetch for partition %s because there is a pending fetch request to node %s", + log.debug("Skipping fetch for partition %s because there is a pending fetch request to node %s", partition, node_id) continue @@ -851,7 +849,7 @@ def _parse_fetched_data(self, completed_fetch): " current offset %d", tp, fetch_offset, position.offset) elif self._subscriptions.has_default_offset_reset_policy(): log.info("Fetch offset %s is out of range for topic-partition %s", fetch_offset, tp) - self._subscriptions.need_offset_reset(tp) + self._subscriptions.request_offset_reset(tp) else: raise Errors.OffsetOutOfRangeError({tp: fetch_offset}) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 078f49c39..a86ececf4 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -572,9 +572,8 @@ def committed(self, partition, metadata=False, timeout_ms=None): This offset will be used as the position for the consumer in the event of a failure. - This call may block to do a remote call if the partition in question - isn't assigned to this consumer or if the consumer hasn't yet - initialized its cache of committed offsets. + This call will block to do a remote call to get the latest committed + offsets from the server. Arguments: partition (TopicPartition): The partition to check. @@ -586,28 +585,16 @@ def committed(self, partition, metadata=False, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms provided + BrokerResponseErrors if OffsetFetchRequest raises an error. """ assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1' assert self.config['group_id'] is not None, 'Requires group_id' if not isinstance(partition, TopicPartition): raise TypeError('partition must be a TopicPartition namedtuple') - if self._subscription.is_assigned(partition): - committed = self._subscription.assignment[partition].committed - if committed is None: - self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms) - committed = self._subscription.assignment[partition].committed - else: - commit_map = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms) - if partition in commit_map: - committed = commit_map[partition] - else: - committed = None - - if committed is not None: - if metadata: - return committed - else: - return committed.offset + committed = self._coordinator.fetch_committed_offsets([partition], timeout_ms=timeout_ms) + if partition not in committed: + return None + return committed[partition] if metadata else committed[partition].offset def _fetch_all_topic_metadata(self): """A blocking call that fetches topic metadata for all topics in the @@ -717,10 +704,7 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): if not self._coordinator.poll(timeout_ms=inner_timeout_ms()): return {} - # Fetch positions if we have partitions we're subscribed to that we - # don't know the offset for - if not self._subscription.has_all_fetch_positions(): - self._update_fetch_positions(self._subscription.missing_fetch_positions(), timeout_ms=inner_timeout_ms()) + has_all_fetch_positions = self._update_fetch_positions(timeout_ms=inner_timeout_ms()) # If data is available already, e.g. from a previous network client # poll() call to commit, then just return it immediately @@ -737,7 +721,13 @@ def _poll_once(self, timeout_ms, max_records, update_offsets=True): if records: return records - self._client.poll(timeout_ms=inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000)) + # We do not want to be stuck blocking in poll if we are missing some positions + # since the offset lookup may be backing off after a failure + poll_timeout_ms = inner_timeout_ms(self._coordinator.time_to_next_poll() * 1000) + if not has_all_fetch_positions: + poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms']) + + self._client.poll(timeout_ms=poll_timeout_ms) # after the long poll, we should check whether the group needs to rebalance # prior to returning data so that the group can stabilize faster if self._coordinator.need_rejoin(): @@ -758,12 +748,18 @@ def position(self, partition, timeout_ms=None): if not isinstance(partition, TopicPartition): raise TypeError('partition must be a TopicPartition namedtuple') assert self._subscription.is_assigned(partition), 'Partition is not assigned' + + inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout retrieving partition position') position = self._subscription.assignment[partition].position - if position is None: - # batch update fetch positions for any partitions without a valid position - self._update_fetch_positions(self._subscription.assigned_partitions(), timeout_ms=timeout_ms) - position = self._subscription.assignment[partition].position - return position.offset if position else None + try: + while position is None: + # batch update fetch positions for any partitions without a valid position + self._update_fetch_positions(timeout_ms=inner_timeout_ms()) + position = self._subscription.assignment[partition].position + except KafkaTimeoutError: + return None + else: + return position.offset def highwater(self, partition): """Last known highwater offset for a partition. @@ -1056,7 +1052,7 @@ def offsets_for_times(self, timestamps): raise ValueError( "The target time for partition {} is {}. The target time " "cannot be negative.".format(tp, ts)) - return self._fetcher.get_offsets_by_times( + return self._fetcher.offsets_by_times( timestamps, self.config['request_timeout_ms']) def beginning_offsets(self, partitions): @@ -1122,7 +1118,7 @@ def _use_consumer_group(self): return False return True - def _update_fetch_positions(self, partitions, timeout_ms=None): + def _update_fetch_positions(self, timeout_ms=None): """Set the fetch position to the committed position (if there is one) or reset it using the offset reset policy the user has configured. @@ -1136,29 +1132,30 @@ def _update_fetch_positions(self, partitions, timeout_ms=None): NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined. """ - inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout updating fetch positions') - try: - # Lookup any positions for partitions which are awaiting reset (which may be the - # case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do - # this check first to avoid an unnecessary lookup of committed offsets (which - # typically occurs when the user is manually assigning partitions and managing - # their own offsets). - self._fetcher.reset_offsets_if_needed(partitions, timeout_ms=inner_timeout_ms()) - - if not self._subscription.has_all_fetch_positions(partitions): - # if we still don't have offsets for the given partitions, then we should either - # seek to the last committed position or reset using the auto reset policy - if (self.config['api_version'] >= (0, 8, 1) and - self.config['group_id'] is not None): - # first refresh commits for all assigned partitions - self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=inner_timeout_ms()) - - # Then, do any offset lookups in case some positions are not known - self._fetcher.update_fetch_positions(partitions, timeout_ms=inner_timeout_ms()) + if self._subscription.has_all_fetch_positions(): return True - except KafkaTimeoutError: - return False + if (self.config['api_version'] >= (0, 8, 1) and + self.config['group_id'] is not None): + try: + # If there are any partitions which do not have a valid position and are not + # awaiting reset, then we need to fetch committed offsets. We will only do a + # coordinator lookup if there are partitions which have missing positions, so + # a consumer with manually assigned partitions can avoid a coordinator dependence + # by always ensuring that assigned partitions have an initial position. + self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms) + except KafkaTimeoutError: + pass + + # If there are partitions still needing a position and a reset policy is defined, + # request reset using the default policy. If no reset strategy is defined and there + # are partitions with a missing position, then we will raise an exception. + self._subscription.reset_missing_positions() + + # Finally send an asynchronous request to lookup and update the positions of any + # partitions which are awaiting reset. + self._fetcher.reset_offsets_if_needed() + return False def _message_generator_v2(self): timeout_ms = 1000 * max(0, self._consumer_timeout - time.time()) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 0f479a55b..cc3675b1d 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -15,10 +15,11 @@ import logging import random import re +import time from kafka.vendor import six -from kafka.errors import IllegalStateError +import kafka.errors as Errors from kafka.protocol.list_offsets import OffsetResetStrategy from kafka.structs import OffsetAndMetadata from kafka.util import ensure_valid_topic_name @@ -52,10 +53,6 @@ class SubscriptionState(object): Note that pause state as well as fetch/consumed positions are not preserved when partition assignment is changed whether directly by the user or through a group rebalance. - - This class also maintains a cache of the latest commit position for each of - the assigned partitions. This is updated through committed() and can be used - to set the initial fetch position (e.g. Fetcher._reset_offset() ). """ _SUBSCRIPTION_EXCEPTION_MESSAGE = ( "You must choose only one way to configure your consumer:" @@ -85,10 +82,8 @@ def __init__(self, offset_reset_strategy='earliest'): self._group_subscription = set() self._user_assignment = set() self.assignment = OrderedDict() - self.listener = None - - # initialize to true for the consumers to fetch offset upon starting up - self.needs_fetch_committed_offsets = True + self.rebalance_listener = None + self.listeners = [] def _set_subscription_type(self, subscription_type): if not isinstance(subscription_type, SubscriptionType): @@ -96,7 +91,7 @@ def _set_subscription_type(self, subscription_type): if self.subscription_type == SubscriptionType.NONE: self.subscription_type = subscription_type elif self.subscription_type != subscription_type: - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) def subscribe(self, topics=(), pattern=None, listener=None): """Subscribe to a list of topics, or a topic regex pattern. @@ -135,7 +130,7 @@ def subscribe(self, topics=(), pattern=None, listener=None): """ assert topics or pattern, 'Must provide topics or pattern' if (topics and pattern): - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) elif pattern: self._set_subscription_type(SubscriptionType.AUTO_PATTERN) @@ -150,7 +145,7 @@ def subscribe(self, topics=(), pattern=None, listener=None): if listener and not isinstance(listener, ConsumerRebalanceListener): raise TypeError('listener must be a ConsumerRebalanceListener') - self.listener = listener + self.rebalance_listener = listener def change_subscription(self, topics): """Change the topic subscription. @@ -166,7 +161,7 @@ def change_subscription(self, topics): - a topic name does not consist of ASCII-characters/'-'/'_'/'.' """ if not self.partitions_auto_assigned(): - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) if isinstance(topics, six.string_types): topics = [topics] @@ -193,13 +188,13 @@ def group_subscribe(self, topics): topics (list of str): topics to add to the group subscription """ if not self.partitions_auto_assigned(): - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) self._group_subscription.update(topics) def reset_group_subscription(self): """Reset the group's subscription to only contain topics subscribed by this consumer.""" if not self.partitions_auto_assigned(): - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) assert self.subscription is not None, 'Subscription required' self._group_subscription.intersection_update(self.subscription) @@ -226,7 +221,6 @@ def assign_from_user(self, partitions): self._user_assignment = set(partitions) self._set_assignment({partition: self.assignment.get(partition, TopicPartitionState()) for partition in partitions}) - self.needs_fetch_committed_offsets = True def assign_from_subscribed(self, assignments): """Update the assignment to the specified partitions @@ -241,16 +235,14 @@ def assign_from_subscribed(self, assignments): consumer instance. """ if not self.partitions_auto_assigned(): - raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) + raise Errors.IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) for tp in assignments: if tp.topic not in self.subscription: raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,)) - # after rebalancing, we always reinitialize the assignment value # randomized ordering should improve balance for short-lived consumers self._set_assignment({partition: TopicPartitionState() for partition in assignments}, randomize=True) - self.needs_fetch_committed_offsets = True log.info("Updated partition assignment: %s", assignments) def _set_assignment(self, partition_states, randomize=False): @@ -300,8 +292,10 @@ def seek(self, partition, offset): Arguments: partition (TopicPartition): partition for seek operation - offset (int): message offset in partition + offset (int or OffsetAndMetadata): message offset in partition """ + if not isinstance(offset, (int, OffsetAndMetadata)): + raise TypeError("offset must be type in or OffsetAndMetadata") self.assignment[partition].seek(offset) def assigned_partitions(self): @@ -333,7 +327,7 @@ def all_consumed_offsets(self): all_consumed[partition] = state.position return all_consumed - def need_offset_reset(self, partition, offset_reset_strategy=None): + def request_offset_reset(self, partition, offset_reset_strategy=None): """Mark partition for offset reset using specified or default strategy. Arguments: @@ -342,7 +336,11 @@ def need_offset_reset(self, partition, offset_reset_strategy=None): """ if offset_reset_strategy is None: offset_reset_strategy = self._default_offset_reset_strategy - self.assignment[partition].await_reset(offset_reset_strategy) + self.assignment[partition].reset(offset_reset_strategy) + + def set_reset_pending(self, partitions, next_allowed_reset_time): + for partition in partitions: + self.assignment[partition].set_reset_pending(next_allowed_reset_time) def has_default_offset_reset_policy(self): """Return True if default offset reset policy is Earliest or Latest""" @@ -351,24 +349,41 @@ def has_default_offset_reset_policy(self): def is_offset_reset_needed(self, partition): return self.assignment[partition].awaiting_reset - def has_all_fetch_positions(self, partitions=None): - if partitions is None: - partitions = self.assigned_partitions() - for tp in partitions: - if not self.has_valid_position(tp): + def has_all_fetch_positions(self): + for state in six.itervalues(self.assignment): + if not state.has_valid_position: return False return True def missing_fetch_positions(self): missing = set() for partition, state in six.iteritems(self.assignment): - if not state.has_valid_position: + if state.is_missing_position(): missing.add(partition) return missing def has_valid_position(self, partition): return partition in self.assignment and self.assignment[partition].has_valid_position + def reset_missing_positions(self): + partitions_with_no_offsets = set() + for tp, state in six.iteritems(self.assignment): + if state.is_missing_position(): + if self._default_offset_reset_strategy == OffsetResetStrategy.NONE: + partitions_with_no_offsets.add(tp) + else: + state.reset(self._default_offset_reset_strategy) + + if partitions_with_no_offsets: + raise Errors.NoOffsetForPartitionError(partitions_with_no_offsets) + + def partitions_needing_reset(self): + partitions = set() + for tp, state in six.iteritems(self.assignment): + if state.awaiting_reset and state.is_reset_allowed(): + partitions.add(tp) + return partitions + def is_assigned(self, partition): return partition in self.assignment @@ -384,6 +399,10 @@ def pause(self, partition): def resume(self, partition): self.assignment[partition].resume() + def reset_failed(self, partitions, next_retry_time): + for partition in partitions: + self.assignment[partition].reset_failed(next_retry_time) + def move_partition_to_end(self, partition): if partition in self.assignment: try: @@ -398,14 +417,12 @@ def position(self, partition): class TopicPartitionState(object): def __init__(self): - self.committed = None # last committed OffsetAndMetadata - self.has_valid_position = False # whether we have valid position self.paused = False # whether this partition has been paused by the user - self.awaiting_reset = False # whether we are awaiting reset self.reset_strategy = None # the reset strategy if awaiting_reset is set self._position = None # OffsetAndMetadata exposed to the user self.highwater = None self.drop_pending_record_batch = False + self.next_allowed_retry_time = None def _set_position(self, offset): assert self.has_valid_position, 'Valid position required' @@ -417,18 +434,37 @@ def _get_position(self): position = property(_get_position, _set_position, None, "last position") - def await_reset(self, strategy): - self.awaiting_reset = True + def reset(self, strategy): + assert strategy is not None self.reset_strategy = strategy self._position = None - self.has_valid_position = False + self.next_allowed_retry_time = None + + def is_reset_allowed(self): + return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.time() + + @property + def awaiting_reset(self): + return self.reset_strategy is not None + + def set_reset_pending(self, next_allowed_retry_time): + self.next_allowed_retry_time = next_allowed_retry_time + + def reset_failed(self, next_allowed_retry_time): + self.next_allowed_retry_time = next_allowed_retry_time + + @property + def has_valid_position(self): + return self._position is not None + + def is_missing_position(self): + return not self.has_valid_position and not self.awaiting_reset def seek(self, offset): - self._position = OffsetAndMetadata(offset, '', -1) - self.awaiting_reset = False + self._position = offset if isinstance(offset, OffsetAndMetadata) else OffsetAndMetadata(offset, '', -1) self.reset_strategy = None - self.has_valid_position = True self.drop_pending_record_batch = True + self.next_allowed_retry_time = None def pause(self): self.paused = True diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 2944c7ec7..d4943da31 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -229,10 +229,6 @@ def _on_join_complete(self, generation, member_id, protocol, assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes) - # set the flag to refresh last committed offsets - self._subscription.needs_fetch_committed_offsets = True - - # update partition assignment try: self._subscription.assign_from_subscribed(assignment.partitions()) except ValueError as e: @@ -253,13 +249,13 @@ def _on_join_complete(self, generation, member_id, protocol, assigned, self.group_id) # execute the user's callback after rebalance - if self._subscription.listener: + if self._subscription.rebalance_listener: try: - self._subscription.listener.on_partitions_assigned(assigned) + self._subscription.rebalance_listener.on_partitions_assigned(assigned) except Exception: - log.exception("User provided listener %s for group %s" + log.exception("User provided rebalance listener %s for group %s" " failed on partition assignment: %s", - self._subscription.listener, self.group_id, + self._subscription.rebalance_listener, self.group_id, assigned) def poll(self, timeout_ms=None): @@ -360,14 +356,14 @@ def _on_join_prepare(self, generation, member_id, timeout_ms=None): # execute the user's callback before rebalance log.info("Revoking previously assigned partitions %s for group %s", self._subscription.assigned_partitions(), self.group_id) - if self._subscription.listener: + if self._subscription.rebalance_listener: try: revoked = set(self._subscription.assigned_partitions()) - self._subscription.listener.on_partitions_revoked(revoked) + self._subscription.rebalance_listener.on_partitions_revoked(revoked) except Exception: - log.exception("User provided subscription listener %s" + log.exception("User provided subscription rebalance listener %s" " for group %s failed on_partitions_revoked", - self._subscription.listener, self.group_id) + self._subscription.rebalance_listener, self.group_id) self._is_leader = False self._subscription.reset_group_subscription() @@ -398,13 +394,11 @@ def need_rejoin(self): def refresh_committed_offsets_if_needed(self, timeout_ms=None): """Fetch committed offsets for assigned partitions.""" - if self._subscription.needs_fetch_committed_offsets: - offsets = self.fetch_committed_offsets(self._subscription.assigned_partitions(), timeout_ms=timeout_ms) - for partition, offset in six.iteritems(offsets): - # verify assignment is still active - if self._subscription.is_assigned(partition): - self._subscription.assignment[partition].committed = offset - self._subscription.needs_fetch_committed_offsets = False + missing_fetch_positions = set(self._subscription.missing_fetch_positions()) + offsets = self.fetch_committed_offsets(missing_fetch_positions, timeout_ms=timeout_ms) + for partition, offset in six.iteritems(offsets): + log.debug("Setting offset for partition %s to the committed offset %s", partition, offset.offset); + self._subscription.seek(partition, offset.offset) def fetch_committed_offsets(self, partitions, timeout_ms=None): """Fetch the current committed offsets for specified partitions @@ -505,7 +499,6 @@ def _do_commit_offsets_async(self, offsets, callback=None): offsets.values())) if callback is None: callback = self.config['default_offset_commit_callback'] - self._subscription.needs_fetch_committed_offsets = True future = self._send_offset_commit_request(offsets) future.add_both(lambda res: self.completed_offset_commits.appendleft((callback, offsets, res))) return future @@ -703,8 +696,6 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): if error_type is Errors.NoError: log.debug("Group %s committed offset %s for partition %s", self.group_id, offset, tp) - if self._subscription.is_assigned(tp): - self._subscription.assignment[tp].committed = offset elif error_type is Errors.GroupAuthorizationFailedError: log.error("Not authorized to commit offsets for group %s", self.group_id) diff --git a/kafka/errors.py b/kafka/errors.py index 900dcd5e2..dfdc75015 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -77,6 +77,10 @@ class NoBrokersAvailable(KafkaError): invalid_metadata = True +class NoOffsetForPartitionError(KafkaError): + pass + + class NodeNotReadyError(KafkaError): retriable = True diff --git a/test/integration/test_consumer_integration.py b/test/integration/test_consumer_integration.py index b181845a4..71cf2642d 100644 --- a/test/integration/test_consumer_integration.py +++ b/test/integration/test_consumer_integration.py @@ -9,7 +9,7 @@ from kafka.vendor.six.moves import range import kafka.codec -from kafka.errors import UnsupportedCodecError, UnsupportedVersionError +from kafka.errors import KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError from kafka.structs import TopicPartition, OffsetAndTimestamp from test.testutil import Timer, assert_message_count, env_kafka_version, random_string @@ -300,4 +300,5 @@ def test_kafka_consumer_offsets_for_times_errors(kafka_consumer_factory, topic): with pytest.raises(ValueError): consumer.offsets_for_times({tp: -1}) - assert consumer.offsets_for_times({bad_tp: 0}) == {bad_tp: None} + with pytest.raises(KafkaTimeoutError): + consumer.offsets_for_times({bad_tp: 0}) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 8c114c90f..bfd3a2187 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -231,17 +231,23 @@ def test_need_rejoin(coordinator): def test_refresh_committed_offsets_if_needed(mocker, coordinator): + tp0 = TopicPartition('foobar', 0) + tp1 = TopicPartition('foobar', 1) mocker.patch.object(ConsumerCoordinator, 'fetch_committed_offsets', return_value = { - TopicPartition('foobar', 0): OffsetAndMetadata(123, '', -1), - TopicPartition('foobar', 1): OffsetAndMetadata(234, '', -1)}) - coordinator._subscription.assign_from_user([TopicPartition('foobar', 0)]) - assert coordinator._subscription.needs_fetch_committed_offsets is True + tp0: OffsetAndMetadata(123, '', -1), + tp1: OffsetAndMetadata(234, '', -1)}) + coordinator._subscription.assign_from_user([tp0, tp1]) + coordinator._subscription.request_offset_reset(tp0) + coordinator._subscription.request_offset_reset(tp1) + assert coordinator._subscription.is_offset_reset_needed(tp0) + assert coordinator._subscription.is_offset_reset_needed(tp1) coordinator.refresh_committed_offsets_if_needed() assignment = coordinator._subscription.assignment - assert assignment[TopicPartition('foobar', 0)].committed == OffsetAndMetadata(123, '', -1) - assert TopicPartition('foobar', 1) not in assignment - assert coordinator._subscription.needs_fetch_committed_offsets is False + assert assignment[tp0].position == OffsetAndMetadata(123, '', -1) + assert assignment[tp1].position == OffsetAndMetadata(234, '', -1) + assert not coordinator._subscription.is_offset_reset_needed(tp0) + assert not coordinator._subscription.is_offset_reset_needed(tp1) def test_fetch_committed_offsets(mocker, coordinator): diff --git a/test/test_fetcher.py b/test/test_fetcher.py index cc4789e6d..740fa1bab 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -18,9 +18,10 @@ from kafka.protocol.fetch import FetchRequest, FetchResponse from kafka.protocol.list_offsets import ListOffsetsResponse, OffsetResetStrategy from kafka.errors import ( - StaleMetadata, LeaderNotAvailableError, NotLeaderForPartitionError, + StaleMetadata, NotLeaderForPartitionError, UnknownTopicOrPartitionError, OffsetOutOfRangeError ) +from kafka.future import Future from kafka.record.memory_records import MemoryRecordsBuilder, MemoryRecords from kafka.structs import OffsetAndMetadata, OffsetAndTimestamp, TopicPartition @@ -107,47 +108,41 @@ def test_create_fetch_requests(fetcher, mocker, api_version, fetch_version): assert set([r.API_VERSION for (r, _offsets) in requests_and_offsets]) == set([fetch_version]) -def test_update_fetch_positions(fetcher, topic, mocker): - mocker.patch.object(fetcher, '_reset_offsets') +def test_reset_offsets_if_needed(fetcher, topic, mocker): + mocker.patch.object(fetcher, '_reset_offsets_async') partition = TopicPartition(topic, 0) - # unassigned partition - fetcher.update_fetch_positions([TopicPartition('fizzbuzz', 0)]) - assert fetcher._reset_offsets.call_count == 0 - # fetchable partition (has offset, not paused) - fetcher.update_fetch_positions([partition]) - assert fetcher._reset_offsets.call_count == 0 - - # partition needs reset, no committed offset - fetcher._subscriptions.need_offset_reset(partition) - fetcher._subscriptions.assignment[partition].awaiting_reset = False - fetcher.update_fetch_positions([partition]) - fetcher._reset_offsets.assert_called_with(set([partition]), timeout_ms=None) + fetcher.reset_offsets_if_needed() + assert fetcher._reset_offsets_async.call_count == 0 + + # partition needs reset, no valid position + fetcher._subscriptions.request_offset_reset(partition) + fetcher.reset_offsets_if_needed() + fetcher._reset_offsets_async.assert_called_with({partition: OffsetResetStrategy.EARLIEST}) assert fetcher._subscriptions.assignment[partition].awaiting_reset is True - fetcher.update_fetch_positions([partition]) - fetcher._reset_offsets.assert_called_with(set([partition]), timeout_ms=None) + fetcher.reset_offsets_if_needed() + fetcher._reset_offsets_async.assert_called_with({partition: OffsetResetStrategy.EARLIEST}) - # partition needs reset, has committed offset - fetcher._reset_offsets.reset_mock() - fetcher._subscriptions.need_offset_reset(partition) - fetcher._subscriptions.assignment[partition].awaiting_reset = False - fetcher._subscriptions.assignment[partition].committed = OffsetAndMetadata(123, '', -1) - mocker.patch.object(fetcher._subscriptions, 'seek') - fetcher.update_fetch_positions([partition]) - assert fetcher._reset_offsets.call_count == 0 - fetcher._subscriptions.seek.assert_called_with(partition, 123) + # partition needs reset, has valid position + fetcher._reset_offsets_async.reset_mock() + fetcher._subscriptions.request_offset_reset(partition) + fetcher._subscriptions.seek(partition, 123) + fetcher.reset_offsets_if_needed() + assert fetcher._reset_offsets_async.call_count == 0 -def test__reset_offsets(fetcher, mocker): +def test__reset_offsets_async(fetcher, mocker): tp = TopicPartition("topic", 0) fetcher._subscriptions.subscribe(topics=["topic"]) fetcher._subscriptions.assign_from_subscribed([tp]) - fetcher._subscriptions.need_offset_reset(tp) - mocked = mocker.patch.object(fetcher, '_retrieve_offsets') - - mocked.return_value = {tp: OffsetAndTimestamp(1001, None, -1)} - fetcher._reset_offsets([tp]) + fetcher._subscriptions.request_offset_reset(tp) + fetched_offsets = {tp: OffsetAndTimestamp(1001, None, -1)} + mocker.patch.object(fetcher._client, 'ready', return_value=True) + mocker.patch.object(fetcher, '_send_list_offsets_request', + return_value=Future().success((fetched_offsets, set()))) + mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) + fetcher._reset_offsets_async({tp: OffsetResetStrategy.EARLIEST}) assert not fetcher._subscriptions.assignment[tp].awaiting_reset assert fetcher._subscriptions.assignment[tp].position.offset == 1001 @@ -180,7 +175,7 @@ def send_side_effect(*args, **kw): # Leader == -1 fut = fetcher._send_list_offsets_requests({tp: 0}) assert fut.failed() - assert isinstance(fut.exception, LeaderNotAvailableError) + assert isinstance(fut.exception, StaleMetadata) assert not mocked_send.called # Leader == 0, send failed @@ -197,9 +192,9 @@ def send_side_effect(*args, **kw): assert not fut.is_done assert mocked_send.called # Check that we bound the futures correctly to chain success - send_futures.pop().success({tp: (10, 10000)}) + send_futures.pop().success(({tp: (10, 10000)}, set())) assert fut.succeeded() - assert fut.value == {tp: (10, 10000)} + assert fut.value == ({tp: (10, 10000)}, set()) def test__send_list_offsets_requests_multiple_nodes(fetcher, mocker): @@ -233,7 +228,7 @@ def send_side_effect(node_id, timestamps): req_by_node[node] = timestamps if node == 0: # Say tp3 does not have any messages so it's missing - f.success({tp1: (11, 1001)}) + f.success(({tp1: (11, 1001)}, set())) else: second_future = f assert req_by_node == { @@ -243,15 +238,15 @@ def send_side_effect(node_id, timestamps): # We only resolved 1 future so far, so result future is not yet ready assert not fut.is_done - second_future.success({tp2: (12, 1002), tp4: (14, 1004)}) + second_future.success(({tp2: (12, 1002), tp4: (14, 1004)}, set())) assert fut.succeeded() - assert fut.value == {tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)} + assert fut.value == ({tp1: (11, 1001), tp2: (12, 1002), tp4: (14, 1004)}, set()) # -- First succeeded second not del send_futures[:] fut = fetcher._send_list_offsets_requests(tss) assert len(send_futures) == 2 - send_futures[0][2].success({tp1: (11, 1001)}) + send_futures[0][2].success(({tp1: (11, 1001)}, set())) send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1)) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) @@ -261,7 +256,7 @@ def send_side_effect(node_id, timestamps): fut = fetcher._send_list_offsets_requests(tss) assert len(send_futures) == 2 send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1)) - send_futures[1][2].success({tp1: (11, 1001)}) + send_futures[1][2].success(({tp1: (11, 1001)}, set())) assert fut.failed() assert isinstance(fut.exception, UnknownTopicOrPartitionError) @@ -275,7 +270,7 @@ def test__handle_list_offsets_response_v1(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 1): OffsetAndTimestamp(9999, 1000, -1)} + assert fut.value == ({TopicPartition("topic", 1): OffsetAndTimestamp(9999, 1000, -1)}, set()) # Broker returns NotLeaderForPartitionError fut = Future() @@ -283,8 +278,8 @@ def test__handle_list_offsets_response_v1(fetcher, mocker): ("topic", [(0, 6, -1, -1)]), ]) fetcher._handle_list_offsets_response(fut, res) - assert fut.failed() - assert isinstance(fut.exception, NotLeaderForPartitionError) + assert fut.succeeded() + assert fut.value == ({}, set([TopicPartition("topic", 0)])) # Broker returns UnknownTopicOrPartitionError fut = Future() @@ -292,21 +287,21 @@ def test__handle_list_offsets_response_v1(fetcher, mocker): ("topic", [(0, 3, -1, -1)]), ]) fetcher._handle_list_offsets_response(fut, res) - assert fut.failed() - assert isinstance(fut.exception, UnknownTopicOrPartitionError) + assert fut.succeeded() + assert fut.value == ({}, set([TopicPartition("topic", 0)])) # Broker returns many errors and 1 result - # Will fail on 1st error and return fut = Future() res = ListOffsetsResponse[1]([ - ("topic", [(0, 43, -1, -1)]), - ("topic", [(1, 6, -1, -1)]), - ("topic", [(2, 3, -1, -1)]), + ("topic", [(0, 43, -1, -1)]), # not retriable + ("topic", [(1, 6, -1, -1)]), # retriable + ("topic", [(2, 3, -1, -1)]), # retriable ("topic", [(3, 0, 1000, 9999)]) ]) fetcher._handle_list_offsets_response(fut, res) - assert fut.failed() - assert isinstance(fut.exception, NotLeaderForPartitionError) + assert fut.succeeded() + assert fut.value == ({TopicPartition("topic", 3): OffsetAndTimestamp(9999, 1000, -1)}, + set([TopicPartition("topic", 1), TopicPartition("topic", 2)])) def test__handle_list_offsets_response_v2_v3(fetcher, mocker): @@ -318,7 +313,7 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)} + assert fut.value == ({TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) # v3 response is the same format fut = Future() @@ -328,7 +323,7 @@ def test__handle_list_offsets_response_v2_v3(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)} + assert fut.value == ({TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, -1)}, set()) def test__handle_list_offsets_response_v4_v5(fetcher, mocker): @@ -340,7 +335,7 @@ def test__handle_list_offsets_response_v4_v5(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)} + assert fut.value == ({TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) # v5 response is the same format fut = Future() @@ -350,7 +345,7 @@ def test__handle_list_offsets_response_v4_v5(fetcher, mocker): ]) fetcher._handle_list_offsets_response(fut, res) assert fut.succeeded() - assert fut.value == {TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)} + assert fut.value == ({TopicPartition("topic", 0): OffsetAndTimestamp(9999, 1000, 1234)}, set()) def test_fetched_records(fetcher, topic, mocker): @@ -628,15 +623,19 @@ def test_partition_records_compacted_offset(mocker): assert msgs[0].offset == fetch_offset + 1 -def test_update_fetch_positions_paused(subscription_state, client, mocker): +def test_reset_offsets_paused(subscription_state, client, mocker): fetcher = Fetcher(client, subscription_state) tp = TopicPartition('foo', 0) subscription_state.assign_from_user([tp]) subscription_state.pause(tp) # paused partition does not have a valid position - subscription_state.need_offset_reset(tp, OffsetResetStrategy.LATEST) + subscription_state.request_offset_reset(tp, OffsetResetStrategy.LATEST) - mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(10, 1, -1)}) - fetcher.update_fetch_positions([tp]) + fetched_offsets = {tp: OffsetAndTimestamp(10, 1, -1)} + mocker.patch.object(fetcher._client, 'ready', return_value=True) + mocker.patch.object(fetcher, '_send_list_offsets_request', + return_value=Future().success((fetched_offsets, set()))) + mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) + fetcher.reset_offsets_if_needed() assert not subscription_state.is_offset_reset_needed(tp) assert not subscription_state.is_fetchable(tp) # because tp is paused @@ -644,14 +643,19 @@ def test_update_fetch_positions_paused(subscription_state, client, mocker): assert subscription_state.position(tp) == OffsetAndMetadata(10, '', -1) -def test_update_fetch_positions_paused_without_valid(subscription_state, client, mocker): +def test_reset_offsets_paused_without_valid(subscription_state, client, mocker): fetcher = Fetcher(client, subscription_state) tp = TopicPartition('foo', 0) subscription_state.assign_from_user([tp]) subscription_state.pause(tp) # paused partition does not have a valid position + subscription_state.reset_missing_positions() - mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(0, 1, -1)}) - fetcher.update_fetch_positions([tp]) + fetched_offsets = {tp: OffsetAndTimestamp(0, 1, -1)} + mocker.patch.object(fetcher._client, 'ready', return_value=True) + mocker.patch.object(fetcher, '_send_list_offsets_request', + return_value=Future().success((fetched_offsets, set()))) + mocker.patch.object(fetcher._client.cluster, "leader_for_partition", return_value=0) + fetcher.reset_offsets_if_needed() assert not subscription_state.is_offset_reset_needed(tp) assert not subscription_state.is_fetchable(tp) # because tp is paused @@ -659,16 +663,16 @@ def test_update_fetch_positions_paused_without_valid(subscription_state, client, assert subscription_state.position(tp) == OffsetAndMetadata(0, '', -1) -def test_update_fetch_positions_paused_with_valid(subscription_state, client, mocker): +def test_reset_offsets_paused_with_valid(subscription_state, client, mocker): fetcher = Fetcher(client, subscription_state) tp = TopicPartition('foo', 0) subscription_state.assign_from_user([tp]) - subscription_state.assignment[tp].committed = OffsetAndMetadata(0, '', -1) - subscription_state.seek(tp, 10) + subscription_state.seek(tp, 0) + subscription_state.assignment[tp].position = OffsetAndMetadata(10, '', -1) subscription_state.pause(tp) # paused partition already has a valid position - mocker.patch.object(fetcher, '_retrieve_offsets', return_value={tp: OffsetAndTimestamp(0, 1, -1)}) - fetcher.update_fetch_positions([tp]) + mocker.patch.object(fetcher, '_fetch_offsets_by_times', return_value={tp: OffsetAndTimestamp(0, 1, -1)}) + fetcher.reset_offsets_if_needed() assert not subscription_state.is_offset_reset_needed(tp) assert not subscription_state.is_fetchable(tp) # because tp is paused