Skip to content

Commit ee10e2a

Browse files
github-actions[bot]miguelgrinbergpquentinayayron
authored
[Backport 8.16] Allow retries for statuses other than 429 in streaming bulk (#2702)
Co-authored-by: Miguel Grinberg <[email protected]> Co-authored-by: Quentin Pradet <[email protected]> Co-authored-by: Aaron Hoffer <[email protected]>
1 parent 0544a4b commit ee10e2a

File tree

4 files changed

+116
-20
lines changed

4 files changed

+116
-20
lines changed

elasticsearch/_async/helpers.py

+19-10
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ async def async_streaming_bulk(
173173
max_backoff: float = 600,
174174
yield_ok: bool = True,
175175
ignore_status: Union[int, Collection[int]] = (),
176+
retry_on_status: Union[int, Collection[int]] = (429,),
176177
*args: Any,
177178
**kwargs: Any,
178179
) -> AsyncIterable[Tuple[bool, Dict[str, Any]]]:
@@ -184,10 +185,11 @@ async def async_streaming_bulk(
184185
entire input is consumed and sent.
185186
186187
If you specify ``max_retries`` it will also retry any documents that were
187-
rejected with a ``429`` status code. To do this it will wait (**by calling
188-
asyncio.sleep**) for ``initial_backoff`` seconds and then,
189-
every subsequent rejection for the same chunk, for double the time every
190-
time up to ``max_backoff`` seconds.
188+
rejected with a ``429`` status code. Use ``retry_on_status`` to
189+
configure which status codes will be retried. To do this it will wait
190+
(**by calling asyncio.sleep which will block**) for ``initial_backoff`` seconds
191+
and then, every subsequent rejection for the same chunk, for double the time
192+
every time up to ``max_backoff`` seconds.
191193
192194
:arg client: instance of :class:`~elasticsearch.AsyncElasticsearch` to use
193195
:arg actions: iterable or async iterable containing the actions to be executed
@@ -200,8 +202,11 @@ async def async_streaming_bulk(
200202
:arg expand_action_callback: callback executed on each action passed in,
201203
should return a tuple containing the action line and the data line
202204
(`None` if data line should be omitted).
205+
:arg retry_on_status: HTTP status code that will trigger a retry.
206+
(if `None` is specified only status 429 will retry).
203207
:arg max_retries: maximum number of times a document will be retried when
204-
``429`` is received, set to 0 (default) for no retries on ``429``
208+
retry_on_status (defaulting to ``429``) is received,
209+
set to 0 (default) for no retries
205210
:arg initial_backoff: number of seconds we should wait before the first
206211
retry. Any subsequent retries will be powers of ``initial_backoff *
207212
2**retry_number``
@@ -213,6 +218,9 @@ async def async_streaming_bulk(
213218
client = client.options()
214219
client._client_meta = (("h", "bp"),)
215220

221+
if isinstance(retry_on_status, int):
222+
retry_on_status = (retry_on_status,)
223+
216224
async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
217225
async for item in aiter(actions):
218226
yield expand_action_callback(item)
@@ -264,11 +272,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
264272
):
265273
if not ok:
266274
action, info = info.popitem()
267-
# retry if retries enabled, we get 429, and we are not
268-
# in the last attempt
275+
# retry if retries enabled, we are not in the last attempt,
276+
# and status in retry_on_status (defaulting to 429)
269277
if (
270278
max_retries
271-
and info["status"] == 429
279+
and info["status"] in retry_on_status
272280
and (attempt + 1) <= max_retries
273281
):
274282
# _process_bulk_chunk expects strings so we need to
@@ -281,8 +289,9 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
281289
yield ok, info
282290

283291
except ApiError as e:
284-
# suppress 429 errors since we will retry them
285-
if attempt == max_retries or e.status_code != 429:
292+
# suppress any status in retry_on_status (429 by default)
293+
# since we will retry them
294+
if attempt == max_retries or e.status_code not in retry_on_status:
286295
raise
287296
else:
288297
if not to_retry:

elasticsearch/helpers/actions.py

+19-10
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ def streaming_bulk(
374374
max_backoff: float = 600,
375375
yield_ok: bool = True,
376376
ignore_status: Union[int, Collection[int]] = (),
377+
retry_on_status: Union[int, Collection[int]] = (429,),
377378
span_name: str = "helpers.streaming_bulk",
378379
*args: Any,
379380
**kwargs: Any,
@@ -386,10 +387,11 @@ def streaming_bulk(
386387
entire input is consumed and sent.
387388
388389
If you specify ``max_retries`` it will also retry any documents that were
389-
rejected with a ``429`` status code. To do this it will wait (**by calling
390-
time.sleep which will block**) for ``initial_backoff`` seconds and then,
391-
every subsequent rejection for the same chunk, for double the time every
392-
time up to ``max_backoff`` seconds.
390+
rejected with a ``429`` status code. Use ``retry_on_status`` to
391+
configure which status codes will be retried. To do this it will wait
392+
(**by calling time.sleep which will block**) for ``initial_backoff`` seconds
393+
and then, every subsequent rejection for the same chunk, for double the time
394+
every time up to ``max_backoff`` seconds.
393395
394396
:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
395397
:arg actions: iterable containing the actions to be executed
@@ -402,8 +404,11 @@ def streaming_bulk(
402404
:arg expand_action_callback: callback executed on each action passed in,
403405
should return a tuple containing the action line and the data line
404406
(`None` if data line should be omitted).
407+
:arg retry_on_status: HTTP status code that will trigger a retry.
408+
(if `None` is specified only status 429 will retry).
405409
:arg max_retries: maximum number of times a document will be retried when
406-
``429`` is received, set to 0 (default) for no retries on ``429``
410+
retry_on_status (defaulting to ``429``) is received,
411+
set to 0 (default) for no retries
407412
:arg initial_backoff: number of seconds we should wait before the first
408413
retry. Any subsequent retries will be powers of ``initial_backoff *
409414
2**retry_number``
@@ -415,6 +420,9 @@ def streaming_bulk(
415420
client = client.options()
416421
client._client_meta = (("h", "bp"),)
417422

423+
if isinstance(retry_on_status, int):
424+
retry_on_status = (retry_on_status,)
425+
418426
serializer = client.transport.serializers.get_serializer("application/json")
419427

420428
bulk_data: List[
@@ -458,11 +466,11 @@ def streaming_bulk(
458466
):
459467
if not ok:
460468
action, info = info.popitem()
461-
# retry if retries enabled, we get 429, and we are not
462-
# in the last attempt
469+
# retry if retries enabled, we are not in the last attempt,
470+
# and status in retry_on_status (defaulting to 429)
463471
if (
464472
max_retries
465-
and info["status"] == 429
473+
and info["status"] in retry_on_status
466474
and (attempt + 1) <= max_retries
467475
):
468476
# _process_bulk_chunk expects bytes so we need to
@@ -475,8 +483,9 @@ def streaming_bulk(
475483
yield ok, info
476484

477485
except ApiError as e:
478-
# suppress 429 errors since we will retry them
479-
if attempt == max_retries or e.status_code != 429:
486+
# suppress any status in retry_on_status (429 by default)
487+
# since we will retry them
488+
if attempt == max_retries or e.status_code not in retry_on_status:
480489
raise
481490
else:
482491
if not to_retry:

test_elasticsearch/test_async/test_server/test_helpers.py

+39
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,45 @@ async def streaming_bulk():
293293
await streaming_bulk()
294294
assert 4 == failing_client._called
295295

296+
async def test_connection_timeout_is_retried_with_retry_status_callback(
297+
self, async_client
298+
):
299+
failing_client = FailingBulkClient(
300+
async_client,
301+
fail_with=ApiError(
302+
message="Connection timed out!",
303+
body={},
304+
meta=ApiResponseMeta(
305+
status=522, headers={}, http_version="1.1", duration=0, node=None
306+
),
307+
),
308+
)
309+
docs = [
310+
{"_index": "i", "_id": 47, "f": "v"},
311+
{"_index": "i", "_id": 45, "f": "v"},
312+
{"_index": "i", "_id": 42, "f": "v"},
313+
]
314+
315+
results = [
316+
x
317+
async for x in helpers.async_streaming_bulk(
318+
failing_client,
319+
docs,
320+
raise_on_exception=False,
321+
raise_on_error=False,
322+
chunk_size=1,
323+
retry_on_status=522,
324+
max_retries=1,
325+
initial_backoff=0,
326+
)
327+
]
328+
assert 3 == len(results)
329+
assert [True, True, True] == [r[0] for r in results]
330+
await async_client.indices.refresh(index="i")
331+
res = await async_client.search(index="i")
332+
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
333+
assert 4 == failing_client._called
334+
296335

297336
class TestBulk:
298337
async def test_bulk_works_with_single_item(self, async_client):

test_elasticsearch/test_server/test_helpers.py

+39
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,45 @@ def streaming_bulk():
288288
assert 4 == failing_client._called
289289

290290

291+
def test_connection_timeout_is_retried_with_retry_status_callback(sync_client):
292+
failing_client = FailingBulkClient(
293+
sync_client,
294+
fail_with=ApiError(
295+
message="Connection timed out!",
296+
body={},
297+
meta=ApiResponseMeta(
298+
status=522, headers={}, http_version="1.1", duration=0, node=None
299+
),
300+
),
301+
)
302+
docs = [
303+
{"_index": "i", "_id": 47, "f": "v"},
304+
{"_index": "i", "_id": 45, "f": "v"},
305+
{"_index": "i", "_id": 42, "f": "v"},
306+
]
307+
308+
results = list(
309+
helpers.streaming_bulk(
310+
failing_client,
311+
docs,
312+
index="i",
313+
raise_on_exception=False,
314+
raise_on_error=False,
315+
chunk_size=1,
316+
retry_on_status=522,
317+
max_retries=1,
318+
initial_backoff=0,
319+
)
320+
)
321+
assert 3 == len(results)
322+
print(results)
323+
assert [True, True, True] == [r[0] for r in results]
324+
sync_client.indices.refresh(index="i")
325+
res = sync_client.search(index="i")
326+
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
327+
assert 4 == failing_client._called
328+
329+
291330
def test_bulk_works_with_single_item(sync_client):
292331
docs = [{"answer": 42, "_id": 1}]
293332
success, failed = helpers.bulk(sync_client, docs, index="test-index", refresh=True)

0 commit comments

Comments
 (0)