Skip to content

Commit d272455

Browse files
committed
rename methods -> list_offsets
1 parent e1df1b2 commit d272455

File tree

2 files changed

+31
-31
lines changed

2 files changed

+31
-31
lines changed

kafka/consumer/fetcher.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
272272
if not timestamps:
273273
return {}
274274

275-
future = self._send_offset_requests(timestamps)
275+
future = self._send_list_offsets_requests(timestamps)
276276
self._client.poll(future=future, timeout_ms=remaining_ms)
277277

278278
if future.succeeded():
@@ -519,7 +519,7 @@ def _deserialize(self, f, topic, bytes_):
519519
return f.deserialize(topic, bytes_)
520520
return f(bytes_)
521521

522-
def _send_offset_requests(self, timestamps):
522+
def _send_list_offsets_requests(self, timestamps):
523523
"""Fetch offsets for each partition in timestamps dict. This may send
524524
request to multiple nodes, based on who is Leader for partition.
525525
@@ -564,12 +564,12 @@ def on_fail(err):
564564
list_offsets_future.failure(err)
565565

566566
for node_id, timestamps in six.iteritems(timestamps_by_node):
567-
_f = self._send_offset_request(node_id, timestamps)
567+
_f = self._send_list_offsets_request(node_id, timestamps)
568568
_f.add_callback(on_success)
569569
_f.add_errback(on_fail)
570570
return list_offsets_future
571571

572-
def _send_offset_request(self, node_id, timestamps):
572+
def _send_list_offsets_request(self, node_id, timestamps):
573573
version = self._client.api_version(ListOffsetsRequest, max_version=3)
574574
by_topic = collections.defaultdict(list)
575575
for tp, timestamp in six.iteritems(timestamps):
@@ -596,12 +596,12 @@ def _send_offset_request(self, node_id, timestamps):
596596
future = Future()
597597

598598
_f = self._client.send(node_id, request)
599-
_f.add_callback(self._handle_offset_response, future)
599+
_f.add_callback(self._handle_list_offsets_response, future)
600600
_f.add_errback(lambda e: future.failure(e))
601601
return future
602602

603-
def _handle_offset_response(self, future, response):
604-
"""Callback for the response of the list offset call above.
603+
def _handle_list_offsets_response(self, future, response):
604+
"""Callback for the response of the ListOffsets api call
605605
606606
Arguments:
607607
future (Future): the future to update based on response

test/test_fetcher.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ def test__reset_offset(fetcher, mocker):
149149
assert fetcher._subscriptions.assignment[tp].position == 1001
150150

151151

152-
def test__send_offset_requests(fetcher, mocker):
153-
tp = TopicPartition("topic_send_offset", 1)
154-
mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
152+
def test__send_list_offsets_requests(fetcher, mocker):
153+
tp = TopicPartition("topic_send_list_offsets", 1)
154+
mocked_send = mocker.patch.object(fetcher, "_send_list_offsets_request")
155155
send_futures = []
156156

157157
def send_side_effect(*args, **kw):
@@ -168,19 +168,19 @@ def send_side_effect(*args, **kw):
168168
[None, -1], itertools.cycle([0]))
169169

170170
# Leader == None
171-
fut = fetcher._send_offset_requests({tp: 0})
171+
fut = fetcher._send_list_offsets_requests({tp: 0})
172172
assert fut.failed()
173173
assert isinstance(fut.exception, StaleMetadata)
174174
assert not mocked_send.called
175175

176176
# Leader == -1
177-
fut = fetcher._send_offset_requests({tp: 0})
177+
fut = fetcher._send_list_offsets_requests({tp: 0})
178178
assert fut.failed()
179179
assert isinstance(fut.exception, LeaderNotAvailableError)
180180
assert not mocked_send.called
181181

182182
# Leader == 0, send failed
183-
fut = fetcher._send_offset_requests({tp: 0})
183+
fut = fetcher._send_list_offsets_requests({tp: 0})
184184
assert not fut.is_done
185185
assert mocked_send.called
186186
# Check that we bound the futures correctly to chain failure
@@ -189,7 +189,7 @@ def send_side_effect(*args, **kw):
189189
assert isinstance(fut.exception, NotLeaderForPartitionError)
190190

191191
# Leader == 0, send success
192-
fut = fetcher._send_offset_requests({tp: 0})
192+
fut = fetcher._send_list_offsets_requests({tp: 0})
193193
assert not fut.is_done
194194
assert mocked_send.called
195195
# Check that we bound the futures correctly to chain success
@@ -198,12 +198,12 @@ def send_side_effect(*args, **kw):
198198
assert fut.value == {tp: (10, 10000)}
199199

200200

201-
def test__send_offset_requests_multiple_nodes(fetcher, mocker):
202-
tp1 = TopicPartition("topic_send_offset", 1)
203-
tp2 = TopicPartition("topic_send_offset", 2)
204-
tp3 = TopicPartition("topic_send_offset", 3)
205-
tp4 = TopicPartition("topic_send_offset", 4)
206-
mocked_send = mocker.patch.object(fetcher, "_send_offset_request")
201+
def test__send_list_offsets_requests_multiple_nodes(fetcher, mocker):
202+
tp1 = TopicPartition("topic_send_list_offsets", 1)
203+
tp2 = TopicPartition("topic_send_list_offsets", 2)
204+
tp3 = TopicPartition("topic_send_list_offsets", 3)
205+
tp4 = TopicPartition("topic_send_list_offsets", 4)
206+
mocked_send = mocker.patch.object(fetcher, "_send_list_offsets_request")
207207
send_futures = []
208208

209209
def send_side_effect(node_id, timestamps):
@@ -218,7 +218,7 @@ def send_side_effect(node_id, timestamps):
218218

219219
# -- All node succeeded case
220220
tss = OrderedDict([(tp1, 0), (tp2, 0), (tp3, 0), (tp4, 0)])
221-
fut = fetcher._send_offset_requests(tss)
221+
fut = fetcher._send_list_offsets_requests(tss)
222222
assert not fut.is_done
223223
assert mocked_send.call_count == 2
224224

@@ -244,7 +244,7 @@ def send_side_effect(node_id, timestamps):
244244

245245
# -- First succeeded second not
246246
del send_futures[:]
247-
fut = fetcher._send_offset_requests(tss)
247+
fut = fetcher._send_list_offsets_requests(tss)
248248
assert len(send_futures) == 2
249249
send_futures[0][2].success({tp1: (11, 1001)})
250250
send_futures[1][2].failure(UnknownTopicOrPartitionError(tp1))
@@ -253,22 +253,22 @@ def send_side_effect(node_id, timestamps):
253253

254254
# -- First fails second succeeded
255255
del send_futures[:]
256-
fut = fetcher._send_offset_requests(tss)
256+
fut = fetcher._send_list_offsets_requests(tss)
257257
assert len(send_futures) == 2
258258
send_futures[0][2].failure(UnknownTopicOrPartitionError(tp1))
259259
send_futures[1][2].success({tp1: (11, 1001)})
260260
assert fut.failed()
261261
assert isinstance(fut.exception, UnknownTopicOrPartitionError)
262262

263263

264-
def test__handle_offset_response_v1(fetcher, mocker):
264+
def test__handle_list_offsets_response_v1(fetcher, mocker):
265265
# Broker returns UnsupportedForMessageFormatError, will omit partition
266266
fut = Future()
267267
res = ListOffsetsResponse[1]([
268268
("topic", [(0, 43, -1, -1)]),
269269
("topic", [(1, 0, 1000, 9999)])
270270
])
271-
fetcher._handle_offset_response(fut, res)
271+
fetcher._handle_list_offsets_response(fut, res)
272272
assert fut.succeeded()
273273
assert fut.value == {TopicPartition("topic", 1): (9999, 1000)}
274274

@@ -277,7 +277,7 @@ def test__handle_offset_response_v1(fetcher, mocker):
277277
res = ListOffsetsResponse[1]([
278278
("topic", [(0, 6, -1, -1)]),
279279
])
280-
fetcher._handle_offset_response(fut, res)
280+
fetcher._handle_list_offsets_response(fut, res)
281281
assert fut.failed()
282282
assert isinstance(fut.exception, NotLeaderForPartitionError)
283283

@@ -286,7 +286,7 @@ def test__handle_offset_response_v1(fetcher, mocker):
286286
res = ListOffsetsResponse[1]([
287287
("topic", [(0, 3, -1, -1)]),
288288
])
289-
fetcher._handle_offset_response(fut, res)
289+
fetcher._handle_list_offsets_response(fut, res)
290290
assert fut.failed()
291291
assert isinstance(fut.exception, UnknownTopicOrPartitionError)
292292

@@ -299,19 +299,19 @@ def test__handle_offset_response_v1(fetcher, mocker):
299299
("topic", [(2, 3, -1, -1)]),
300300
("topic", [(3, 0, 1000, 9999)])
301301
])
302-
fetcher._handle_offset_response(fut, res)
302+
fetcher._handle_list_offsets_response(fut, res)
303303
assert fut.failed()
304304
assert isinstance(fut.exception, NotLeaderForPartitionError)
305305

306306

307-
def test__handle_offset_response_v2_v3(fetcher, mocker):
307+
def test__handle_list_offsets_response_v2_v3(fetcher, mocker):
308308
# including a throttle_time shouldnt cause issues
309309
fut = Future()
310310
res = ListOffsetsResponse[2](
311311
123, # throttle_time_ms
312312
[("topic", [(0, 0, 1000, 9999)])
313313
])
314-
fetcher._handle_offset_response(fut, res)
314+
fetcher._handle_list_offsets_response(fut, res)
315315
assert fut.succeeded()
316316
assert fut.value == {TopicPartition("topic", 0): (9999, 1000)}
317317

@@ -321,7 +321,7 @@ def test__handle_offset_response_v2_v3(fetcher, mocker):
321321
123, # throttle_time_ms
322322
[("topic", [(0, 0, 1000, 9999)])
323323
])
324-
fetcher._handle_offset_response(fut, res)
324+
fetcher._handle_list_offsets_response(fut, res)
325325
assert fut.succeeded()
326326
assert fut.value == {TopicPartition("topic", 0): (9999, 1000)}
327327

0 commit comments

Comments
 (0)