Skip to content

Commit 1594e38

Browse files
authored
Support Fetch Request/Response v6 in consumer (#2500)
1 parent 837a600 commit 1594e38

File tree

6 files changed

+83
-66
lines changed

6 files changed

+83
-66
lines changed

kafka/consumer/fetcher.py

Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -411,10 +411,10 @@ def _message_generator(self):
411411

412412
tp = self._next_partition_records.topic_partition
413413

414-
# We can ignore any prior signal to drop pending message sets
414+
# We can ignore any prior signal to drop pending record batches
415415
# because we are starting from a fresh one where fetch_offset == position
416416
# i.e., the user seek()'d to this position
417-
self._subscriptions.assignment[tp].drop_pending_message_set = False
417+
self._subscriptions.assignment[tp].drop_pending_record_batch = False
418418

419419
for msg in self._next_partition_records.take():
420420

@@ -430,12 +430,12 @@ def _message_generator(self):
430430
break
431431

432432
# If there is a seek during message iteration,
433-
# we should stop unpacking this message set and
433+
# we should stop unpacking this record batch and
434434
# wait for a new fetch response that aligns with the
435435
# new seek position
436-
elif self._subscriptions.assignment[tp].drop_pending_message_set:
437-
log.debug("Skipping remainder of message set for partition %s", tp)
438-
self._subscriptions.assignment[tp].drop_pending_message_set = False
436+
elif self._subscriptions.assignment[tp].drop_pending_record_batch:
437+
log.debug("Skipping remainder of record batch for partition %s", tp)
438+
self._subscriptions.assignment[tp].drop_pending_record_batch = False
439439
self._next_partition_records = None
440440
break
441441

@@ -451,16 +451,16 @@ def _message_generator(self):
451451

452452
self._next_partition_records = None
453453

454-
def _unpack_message_set(self, tp, records):
454+
def _unpack_records(self, tp, records):
455455
try:
456456
batch = records.next_batch()
457457
while batch is not None:
458458

459459
# Try DefaultsRecordBatch / message log format v2
460460
# base_offset, last_offset_delta, and control batches
461461
try:
462-
self._subscriptions.assignment[tp].last_offset_from_message_batch = batch.base_offset + \
463-
batch.last_offset_delta
462+
self._subscriptions.assignment[tp].last_offset_from_record_batch = batch.base_offset + \
463+
batch.last_offset_delta
464464
# Control batches have a single record indicating whether a transaction
465465
# was aborted or committed.
466466
# When isolation_level is READ_COMMITTED (currently unsupported)
@@ -673,17 +673,18 @@ def _create_fetch_requests(self):
673673
"""
674674
# create the fetch info as a dict of lists of partition info tuples
675675
# which can be passed to FetchRequest() via .items()
676+
version = self._client.api_version(FetchRequest, max_version=6)
676677
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
677678

678679
for partition in self._fetchable_partitions():
679680
node_id = self._client.cluster.leader_for_partition(partition)
680681

681682
# advance position for any deleted compacted messages if required
682-
if self._subscriptions.assignment[partition].last_offset_from_message_batch:
683-
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_message_batch + 1
683+
if self._subscriptions.assignment[partition].last_offset_from_record_batch:
684+
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_record_batch + 1
684685
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position:
685686
log.debug(
686-
"Advance position for partition %s from %s to %s (last message batch location plus one)"
687+
"Advance position for partition %s from %s to %s (last record batch location plus one)"
687688
" to correct for deleted compacted messages and/or transactional control records",
688689
partition, self._subscriptions.assignment[partition].position, next_offset_from_batch_header)
689690
self._subscriptions.assignment[partition].position = next_offset_from_batch_header
@@ -697,52 +698,60 @@ def _create_fetch_requests(self):
697698
self._client.cluster.request_update()
698699

699700
elif self._client.in_flight_request_count(node_id) == 0:
700-
partition_info = (
701-
partition.partition,
702-
position,
703-
self.config['max_partition_fetch_bytes']
704-
)
701+
if version < 5:
702+
partition_info = (
703+
partition.partition,
704+
position,
705+
self.config['max_partition_fetch_bytes']
706+
)
707+
else:
708+
partition_info = (
709+
partition.partition,
710+
position,
711+
-1, # log_start_offset is used internally by brokers / replicas only
712+
self.config['max_partition_fetch_bytes'],
713+
)
705714
fetchable[node_id][partition.topic].append(partition_info)
706715
log.debug("Adding fetch request for partition %s at offset %d",
707716
partition, position)
708717
else:
709718
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
710719
partition, node_id)
711720

712-
version = self._client.api_version(FetchRequest, max_version=4)
713721
requests = {}
714722
for node_id, partition_data in six.iteritems(fetchable):
715-
if version < 3:
723+
# As of version == 3 partitions will be returned in order as
724+
# they are requested, so to avoid starvation with
725+
# `fetch_max_bytes` option we need this shuffle
726+
# NOTE: we do have partition_data in random order due to usage
727+
# of unordered structures like dicts, but that does not
728+
# guarantee equal distribution, and starting in Python3.6
729+
# dicts retain insert order.
730+
partition_data = list(partition_data.items())
731+
random.shuffle(partition_data)
732+
733+
if version <= 2:
734+
requests[node_id] = FetchRequest[version](
735+
-1, # replica_id
736+
self.config['fetch_max_wait_ms'],
737+
self.config['fetch_min_bytes'],
738+
partition_data)
739+
elif version == 3:
716740
requests[node_id] = FetchRequest[version](
717741
-1, # replica_id
718742
self.config['fetch_max_wait_ms'],
719743
self.config['fetch_min_bytes'],
720-
partition_data.items())
744+
self.config['fetch_max_bytes'],
745+
partition_data)
721746
else:
722-
# As of version == 3 partitions will be returned in order as
723-
# they are requested, so to avoid starvation with
724-
# `fetch_max_bytes` option we need this shuffle
725-
# NOTE: we do have partition_data in random order due to usage
726-
# of unordered structures like dicts, but that does not
727-
# guarantee equal distribution, and starting in Python3.6
728-
# dicts retain insert order.
729-
partition_data = list(partition_data.items())
730-
random.shuffle(partition_data)
731-
if version == 3:
732-
requests[node_id] = FetchRequest[version](
733-
-1, # replica_id
734-
self.config['fetch_max_wait_ms'],
735-
self.config['fetch_min_bytes'],
736-
self.config['fetch_max_bytes'],
737-
partition_data)
738-
else:
739-
requests[node_id] = FetchRequest[version](
740-
-1, # replica_id
741-
self.config['fetch_max_wait_ms'],
742-
self.config['fetch_min_bytes'],
743-
self.config['fetch_max_bytes'],
744-
self._isolation_level,
745-
partition_data)
747+
# through v6
748+
requests[node_id] = FetchRequest[version](
749+
-1, # replica_id
750+
self.config['fetch_max_wait_ms'],
751+
self.config['fetch_min_bytes'],
752+
self.config['fetch_max_bytes'],
753+
self._isolation_level,
754+
partition_data)
746755
return requests
747756

748757
def _handle_fetch_response(self, request, send_time, response):
@@ -821,7 +830,7 @@ def _parse_fetched_data(self, completed_fetch):
821830
log.debug("Adding fetched record for partition %s with"
822831
" offset %d to buffered record list", tp,
823832
position)
824-
unpacked = list(self._unpack_message_set(tp, records))
833+
unpacked = list(self._unpack_records(tp, records))
825834
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
826835
if unpacked:
827836
last_offset = unpacked[-1].offset
@@ -845,7 +854,9 @@ def _parse_fetched_data(self, completed_fetch):
845854
self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count)
846855

847856
elif error_type in (Errors.NotLeaderForPartitionError,
848-
Errors.UnknownTopicOrPartitionError):
857+
Errors.UnknownTopicOrPartitionError,
858+
Errors.KafkaStorageError):
859+
log.debug("Error fetching partition %s: %s", tp, error_type.__name__)
849860
self._client.cluster.request_update()
850861
elif error_type is Errors.OffsetOutOfRangeError:
851862
position = self._subscriptions.assignment[tp].position
@@ -862,8 +873,10 @@ def _parse_fetched_data(self, completed_fetch):
862873
elif error_type is Errors.TopicAuthorizationFailedError:
863874
log.warning("Not authorized to read from topic %s.", tp.topic)
864875
raise Errors.TopicAuthorizationFailedError(set(tp.topic))
865-
elif error_type is Errors.UnknownError:
866-
log.warning("Unknown error fetching data for topic-partition %s", tp)
876+
elif error_type.is_retriable:
877+
log.debug("Retriable error fetching partition %s: %s", tp, error_type())
878+
if error_type.invalid_metadata:
879+
self._client.cluster.request_update()
867880
else:
868881
raise error_type('Unexpected error while fetching data')
869882

kafka/consumer/subscription_state.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,10 @@ def __init__(self):
381381
self.reset_strategy = None # the reset strategy if awaitingReset is set
382382
self._position = None # offset exposed to the user
383383
self.highwater = None
384-
self.drop_pending_message_set = False
385-
# The last message offset hint available from a message batch with
384+
self.drop_pending_record_batch = False
385+
# The last message offset hint available from a record batch with
386386
# magic=2 which includes deleted compacted messages
387-
self.last_offset_from_message_batch = None
387+
self.last_offset_from_record_batch = None
388388

389389
def _set_position(self, offset):
390390
assert self.has_valid_position, 'Valid position required'
@@ -399,16 +399,16 @@ def await_reset(self, strategy):
399399
self.awaiting_reset = True
400400
self.reset_strategy = strategy
401401
self._position = None
402-
self.last_offset_from_message_batch = None
402+
self.last_offset_from_record_batch = None
403403
self.has_valid_position = False
404404

405405
def seek(self, offset):
406406
self._position = offset
407407
self.awaiting_reset = False
408408
self.reset_strategy = None
409409
self.has_valid_position = True
410-
self.drop_pending_message_set = True
411-
self.last_offset_from_message_batch = None
410+
self.drop_pending_record_batch = True
411+
self.last_offset_from_record_batch = None
412412

413413
def pause(self):
414414
self.paused = True

kafka/producer/record_accumulator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class RecordAccumulator(object):
156156
will also impact the compression ratio (more batching means better
157157
compression). Default: None.
158158
linger_ms (int): An artificial delay time to add before declaring a
159-
messageset (that isn't full) ready for sending. This allows
159+
record batch (that isn't full) ready for sending. This allows
160160
time for more records to arrive. Setting a non-zero linger_ms
161161
will trade off some latency for potentially better throughput
162162
due to more batching (and hence fewer, larger requests).

kafka/protocol/fetch.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class FetchResponse_v0(Response):
1414
('partition', Int32),
1515
('error_code', Int16),
1616
('highwater_offset', Int64),
17-
('message_set', Bytes)))))
17+
('records', Bytes)))))
1818
)
1919

2020

@@ -29,7 +29,7 @@ class FetchResponse_v1(Response):
2929
('partition', Int32),
3030
('error_code', Int16),
3131
('highwater_offset', Int64),
32-
('message_set', Bytes)))))
32+
('records', Bytes)))))
3333
)
3434

3535

@@ -46,6 +46,7 @@ class FetchResponse_v3(Response):
4646

4747

4848
class FetchResponse_v4(Response):
49+
# Adds message format v2
4950
API_KEY = 1
5051
API_VERSION = 4
5152
SCHEMA = Schema(
@@ -60,7 +61,7 @@ class FetchResponse_v4(Response):
6061
('aborted_transactions', Array(
6162
('producer_id', Int64),
6263
('first_offset', Int64))),
63-
('message_set', Bytes)))))
64+
('records', Bytes)))))
6465
)
6566

6667

@@ -80,7 +81,7 @@ class FetchResponse_v5(Response):
8081
('aborted_transactions', Array(
8182
('producer_id', Int64),
8283
('first_offset', Int64))),
83-
('message_set', Bytes)))))
84+
('records', Bytes)))))
8485
)
8586

8687

@@ -115,7 +116,7 @@ class FetchResponse_v7(Response):
115116
('aborted_transactions', Array(
116117
('producer_id', Int64),
117118
('first_offset', Int64))),
118-
('message_set', Bytes)))))
119+
('records', Bytes)))))
119120
)
120121

121122

@@ -156,7 +157,7 @@ class FetchResponse_v11(Response):
156157
('producer_id', Int64),
157158
('first_offset', Int64))),
158159
('preferred_read_replica', Int32),
159-
('message_set', Bytes)))))
160+
('records', Bytes)))))
160161
)
161162

162163

@@ -211,6 +212,7 @@ class FetchRequest_v3(Request):
211212

212213
class FetchRequest_v4(Request):
213214
# Adds isolation_level field
215+
# Adds message format v2
214216
API_KEY = 1
215217
API_VERSION = 4
216218
RESPONSE_TYPE = FetchResponse_v4
@@ -264,7 +266,7 @@ class FetchRequest_v6(Request):
264266

265267
class FetchRequest_v7(Request):
266268
"""
267-
Add incremental fetch requests
269+
Add incremental fetch requests (see KIP-227)
268270
"""
269271
API_KEY = 1
270272
API_VERSION = 7

kafka/protocol/produce.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class ProduceResponse_v2(Response):
4747

4848

4949
class ProduceResponse_v3(Response):
50+
# Adds support for message format v2
5051
API_KEY = 0
5152
API_VERSION = 3
5253
SCHEMA = ProduceResponse_v2.SCHEMA
@@ -141,7 +142,7 @@ class ProduceRequest_v0(ProduceRequest):
141142
('topic', String('utf-8')),
142143
('partitions', Array(
143144
('partition', Int32),
144-
('messages', Bytes)))))
145+
('records', Bytes)))))
145146
)
146147

147148

@@ -158,6 +159,7 @@ class ProduceRequest_v2(ProduceRequest):
158159

159160

160161
class ProduceRequest_v3(ProduceRequest):
162+
# Adds support for message format v2
161163
API_VERSION = 3
162164
RESPONSE_TYPE = ProduceResponse_v3
163165
SCHEMA = Schema(
@@ -168,7 +170,7 @@ class ProduceRequest_v3(ProduceRequest):
168170
('topic', String('utf-8')),
169171
('partitions', Array(
170172
('partition', Int32),
171-
('messages', Bytes)))))
173+
('records', Bytes)))))
172174
)
173175

174176

test/test_fetcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ def test__handle_fetch_error(fetcher, caplog, exception, log_level):
399399
assert caplog.records[0].levelname == logging.getLevelName(log_level)
400400

401401

402-
def test__unpack_message_set(fetcher):
402+
def test__unpack_records(fetcher):
403403
fetcher.config['check_crcs'] = False
404404
tp = TopicPartition('foo', 0)
405405
messages = [
@@ -408,7 +408,7 @@ def test__unpack_message_set(fetcher):
408408
(None, b"c", None),
409409
]
410410
memory_records = MemoryRecords(_build_record_batch(messages))
411-
records = list(fetcher._unpack_message_set(tp, memory_records))
411+
records = list(fetcher._unpack_records(tp, memory_records))
412412
assert len(records) == 3
413413
assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
414414
assert records[0].value == b'a'

0 commit comments

Comments
 (0)