Skip to content

Commit 3e28b42

Browse files
committed
Merge pull request #420 from toddpalino/master
Initial support for consumer coordinator
2 parents ee6b9cb + da03827 commit 3e28b42

File tree

4 files changed

+226
-7
lines changed

4 files changed

+226
-7
lines changed

kafka/client.py

+128
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,26 @@ def _get_leader_for_partition(self, topic, partition):
9898
# Otherwise return the BrokerMetadata
9999
return self.brokers[meta.leader]
100100

101+
def _get_coordinator_for_group(self, group):
102+
"""
103+
Returns the coordinator broker for a consumer group.
104+
105+
ConsumerCoordinatorNotAvailableCode will be raised if the coordinator
106+
does not currently exist for the group.
107+
108+
OffsetsLoadInProgressCode is raised if the coordinator is available
109+
but is still loading offsets from the internal topic
110+
"""
111+
112+
resp = self.send_consumer_metadata_request(group)
113+
114+
# If there's a problem with finding the coordinator, raise the
115+
# provided error
116+
kafka.common.check_error(resp)
117+
118+
# Otherwise return the BrokerMetadata
119+
return BrokerMetadata(resp.nodeId, resp.host, resp.port)
120+
101121
def _next_id(self):
102122
"""Generate a new correlation id"""
103123
# modulo to keep w/i int32
@@ -254,6 +274,96 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
254274
# Return responses in the same order as provided
255275
return [responses[tp] for tp in original_ordering]
256276

277+
def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
278+
"""
279+
Send a list of requests to the consumer coordinator for the group
280+
specified using the supplied encode/decode functions. As the payloads
281+
that use consumer-aware requests do not contain the group (e.g.
282+
OffsetFetchRequest), all payloads must be for a single group.
283+
284+
Arguments:
285+
286+
group: the name of the consumer group (str) the payloads are for
287+
payloads: list of object-like entities with topic (str) and
288+
partition (int) attributes; payloads with duplicate
289+
topic+partition are not supported.
290+
291+
encode_fn: a method to encode the list of payloads to a request body,
292+
must accept client_id, correlation_id, and payloads as
293+
keyword arguments
294+
295+
decode_fn: a method to decode a response body into response objects.
296+
The response objects must be object-like and have topic
297+
and partition attributes
298+
299+
Returns:
300+
301+
List of response objects in the same order as the supplied payloads
302+
"""
303+
# encoders / decoders do not maintain ordering currently
304+
# so we need to keep this so we can rebuild order before returning
305+
original_ordering = [(p.topic, p.partition) for p in payloads]
306+
307+
broker = self._get_coordinator_for_group(group)
308+
309+
# Send the list of request payloads and collect the responses and
310+
# errors
311+
responses = {}
312+
requestId = self._next_id()
313+
log.debug('Request %s to %s: %s', requestId, broker, payloads)
314+
request = encoder_fn(client_id=self.client_id,
315+
correlation_id=requestId, payloads=payloads)
316+
317+
# Send the request, recv the response
318+
try:
319+
conn = self._get_conn(broker.host.decode('utf-8'), broker.port)
320+
conn.send(requestId, request)
321+
322+
except ConnectionError as e:
323+
log.warning('ConnectionError attempting to send request %s '
324+
'to server %s: %s', requestId, broker, e)
325+
326+
for payload in payloads:
327+
topic_partition = (payload.topic, payload.partition)
328+
responses[topic_partition] = FailedPayloadsError(payload)
329+
330+
# No exception, try to get response
331+
else:
332+
333+
# decoder_fn=None signal that the server is expected to not
334+
# send a response. This probably only applies to
335+
# ProduceRequest w/ acks = 0
336+
if decoder_fn is None:
337+
log.debug('Request %s does not expect a response '
338+
'(skipping conn.recv)', requestId)
339+
for payload in payloads:
340+
topic_partition = (payload.topic, payload.partition)
341+
responses[topic_partition] = None
342+
return []
343+
344+
try:
345+
response = conn.recv(requestId)
346+
except ConnectionError as e:
347+
log.warning('ConnectionError attempting to receive a '
348+
'response to request %s from server %s: %s',
349+
requestId, broker, e)
350+
351+
for payload in payloads:
352+
topic_partition = (payload.topic, payload.partition)
353+
responses[topic_partition] = FailedPayloadsError(payload)
354+
355+
else:
356+
_resps = []
357+
for payload_response in decoder_fn(response):
358+
topic_partition = (payload_response.topic,
359+
payload_response.partition)
360+
responses[topic_partition] = payload_response
361+
_resps.append(payload_response)
362+
log.debug('Response %s: %s', requestId, _resps)
363+
364+
# Return responses in the same order as provided
365+
return [responses[tp] for tp in original_ordering]
366+
257367
def __repr__(self):
258368
return '<KafkaClient client_id=%s>' % (self.client_id)
259369

@@ -446,6 +556,13 @@ def send_metadata_request(self, payloads=[], fail_on_error=True,
446556

447557
return self._send_broker_unaware_request(payloads, encoder, decoder)
448558

559+
def send_consumer_metadata_request(self, payloads=[], fail_on_error=True,
560+
callback=None):
561+
encoder = KafkaProtocol.encode_consumer_metadata_request
562+
decoder = KafkaProtocol.decode_consumer_metadata_response
563+
564+
return self._send_broker_unaware_request(payloads, encoder, decoder)
565+
449566
def send_produce_request(self, payloads=[], acks=1, timeout=1000,
450567
fail_on_error=True, callback=None):
451568
"""
@@ -550,3 +667,14 @@ def send_offset_fetch_request(self, group, payloads=[],
550667

551668
return [resp if not callback else callback(resp) for resp in resps
552669
if not fail_on_error or not self._raise_on_response_error(resp)]
670+
671+
def send_offset_fetch_request_kafka(self, group, payloads=[],
672+
fail_on_error=True, callback=None):
673+
674+
encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
675+
group=group, from_kafka=True)
676+
decoder = KafkaProtocol.decode_offset_fetch_response
677+
resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)
678+
679+
return [resp if not callback else callback(resp) for resp in resps
680+
if not fail_on_error or not self._raise_on_response_error(resp)]

kafka/common.py

+22
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@
1313
MetadataResponse = namedtuple("MetadataResponse",
1414
["brokers", "topics"])
1515

16+
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
17+
ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
18+
["groups"])
19+
20+
ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
21+
["error", "nodeId", "host", "port"])
22+
1623
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
1724
ProduceRequest = namedtuple("ProduceRequest",
1825
["topic", "partition", "messages"])
@@ -160,6 +167,21 @@ class StaleLeaderEpochCodeError(BrokerResponseError):
160167
message = 'STALE_LEADER_EPOCH_CODE'
161168

162169

170+
class OffsetsLoadInProgressCode(BrokerResponseError):
171+
errno = 14
172+
message = 'OFFSETS_LOAD_IN_PROGRESS_CODE'
173+
174+
175+
class ConsumerCoordinatorNotAvailableCode(BrokerResponseError):
176+
errno = 15
177+
message = 'CONSUMER_COORDINATOR_NOT_AVAILABLE_CODE'
178+
179+
180+
class NotCoordinatorForConsumerCode(BrokerResponseError):
181+
errno = 16
182+
message = 'NOT_COORDINATOR_FOR_CONSUMER_CODE'
183+
184+
163185
class KafkaUnavailableError(KafkaError):
164186
pass
165187

kafka/protocol.py

+47-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
MetadataResponse, ProduceResponse, FetchResponse,
1515
OffsetResponse, OffsetCommitResponse, OffsetFetchResponse,
1616
ProtocolError, BufferUnderflowError, ChecksumError,
17-
ConsumerFetchSizeTooSmall, UnsupportedCodecError
17+
ConsumerFetchSizeTooSmall, UnsupportedCodecError,
18+
ConsumerMetadataResponse
1819
)
1920
from kafka.util import (
2021
crc32, read_short_string, read_int_string, relative_unpack,
@@ -43,19 +44,21 @@ class KafkaProtocol(object):
4344
METADATA_KEY = 3
4445
OFFSET_COMMIT_KEY = 8
4546
OFFSET_FETCH_KEY = 9
47+
CONSUMER_METADATA_KEY = 10
4648

4749
###################
4850
# Private API #
4951
###################
5052

5153
@classmethod
52-
def _encode_message_header(cls, client_id, correlation_id, request_key):
54+
def _encode_message_header(cls, client_id, correlation_id, request_key,
55+
version=0):
5356
"""
5457
Encode the common request envelope
5558
"""
5659
return struct.pack('>hhih%ds' % len(client_id),
5760
request_key, # ApiKey
58-
0, # ApiVersion
61+
version, # ApiVersion
5962
correlation_id, # CorrelationId
6063
len(client_id), # ClientId size
6164
client_id) # ClientId
@@ -429,6 +432,38 @@ def decode_metadata_response(cls, data):
429432

430433
return MetadataResponse(brokers, topic_metadata)
431434

435+
@classmethod
436+
def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
437+
"""
438+
Encode a ConsumerMetadataRequest
439+
440+
Arguments:
441+
client_id: string
442+
correlation_id: int
443+
payloads: string (consumer group)
444+
"""
445+
message = []
446+
message.append(cls._encode_message_header(client_id, correlation_id,
447+
KafkaProtocol.CONSUMER_METADATA_KEY))
448+
message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
449+
450+
msg = b''.join(message)
451+
return write_int_string(msg)
452+
453+
@classmethod
454+
def decode_consumer_metadata_response(cls, data):
455+
"""
456+
Decode bytes to a ConsumerMetadataResponse
457+
458+
Arguments:
459+
data: bytes to decode
460+
"""
461+
((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
462+
(host, cur) = read_short_string(data, cur)
463+
((port,), cur) = relative_unpack('>i', data, cur)
464+
465+
return ConsumerMetadataResponse(error, nodeId, host, port)
466+
432467
@classmethod
433468
def encode_offset_commit_request(cls, client_id, correlation_id,
434469
group, payloads):
@@ -481,21 +516,27 @@ def decode_offset_commit_response(cls, data):
481516

482517
@classmethod
483518
def encode_offset_fetch_request(cls, client_id, correlation_id,
484-
group, payloads):
519+
group, payloads, from_kafka=False):
485520
"""
486-
Encode some OffsetFetchRequest structs
521+
Encode some OffsetFetchRequest structs. The request is encoded using
522+
version 0 if from_kafka is false, indicating a request for Zookeeper
523+
offsets. It is encoded using version 1 otherwise, indicating a request
524+
for Kafka offsets.
487525
488526
Arguments:
489527
client_id: string
490528
correlation_id: int
491529
group: string, the consumer group you are fetching offsets for
492530
payloads: list of OffsetFetchRequest
531+
from_kafka: bool, default False, set True for Kafka-committed offsets
493532
"""
494533
grouped_payloads = group_by_topic_and_partition(payloads)
495534

496535
message = []
536+
reqver = 1 if from_kafka else 0
497537
message.append(cls._encode_message_header(client_id, correlation_id,
498-
KafkaProtocol.OFFSET_FETCH_KEY))
538+
KafkaProtocol.OFFSET_FETCH_KEY,
539+
version=reqver))
499540

500541
message.append(write_short_string(group))
501542
message.append(struct.pack('>i', len(grouped_payloads)))

test/test_protocol.py

+29-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
ProduceResponse, FetchResponse, OffsetAndMessage,
1414
BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition,
1515
KafkaUnavailableError, UnsupportedCodecError, ConsumerFetchSizeTooSmall,
16-
ProtocolError
16+
ProtocolError, ConsumerMetadataResponse
1717
)
1818
from kafka.protocol import (
1919
ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
@@ -560,6 +560,34 @@ def test_decode_metadata_response(self):
560560
decoded = KafkaProtocol.decode_metadata_response(encoded)
561561
self.assertEqual(decoded, (node_brokers, topic_partitions))
562562

563+
def test_encode_consumer_metadata_request(self):
564+
expected = b"".join([
565+
struct.pack(">i", 17), # Total length of the request
566+
struct.pack('>h', 10), # API key consumer metadata
567+
struct.pack('>h', 0), # API version
568+
struct.pack('>i', 4), # Correlation ID
569+
struct.pack('>h3s', 3, b"cid"),# The client ID
570+
struct.pack('>h2s', 2, b"g1"), # Group "g1"
571+
])
572+
573+
encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1")
574+
575+
self.assertEqual(encoded, expected)
576+
577+
def test_decode_consumer_metadata_response(self):
578+
encoded = b"".join([
579+
struct.pack(">i", 42), # Correlation ID
580+
struct.pack(">h", 0), # No Error
581+
struct.pack(">i", 1), # Broker ID
582+
struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host
583+
struct.pack(">i", 1000), # Broker Port
584+
])
585+
586+
results = KafkaProtocol.decode_consumer_metadata_response(encoded)
587+
self.assertEqual(results,
588+
ConsumerMetadataResponse(error = 0, nodeId = 1, host = b'brokers1.kafka.rdio.com', port = 1000)
589+
)
590+
563591
def test_encode_offset_request(self):
564592
expected = b"".join([
565593
struct.pack(">i", 21), # Total length of the request

0 commit comments

Comments
 (0)