From 1abea55bcfd85ea186ae6f379ab1ad880b96f2d8 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Thu, 7 May 2026 15:16:08 +0200 Subject: [PATCH 1/2] fix: Fall back to drop+recreate when `RequestQueue.purge` is unsupported Storage clients that cannot purge (e.g. the Apify platform) raise `NotImplementedError`, which broke `BasicCrawler`'s automatic purge on repeated runs. Catch it, log a warning, and drop+reopen the queue so callers keep working. --- src/crawlee/storages/_request_queue.py | 12 ++++++- tests/unit/storages/test_request_queue.py | 44 +++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index 639ef49ce6..c063cc311b 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -150,7 +150,17 @@ async def drop(self) -> None: @override async def purge(self) -> None: - await self._client.purge() + try: + await self._client.purge() + except NotImplementedError: + logger.warning( + f'Storage client "{type(self._client).__name__}" does not support purging the request queue. ' + 'Falling back to dropping and recreating it; the request queue ID may change.' + ) + await self.drop() + new_rq = await RequestQueue.open(name=self._name) + self._client = new_rq._client # noqa: SLF001 + self._id = new_rq._id # noqa: SLF001 @override async def add_request( diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index 09ce769d9e..b5418b2559 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -715,6 +715,50 @@ async def test_purge( await rq.drop() +async def test_purge_falls_back_to_drop_when_not_implemented( + storage_client: StorageClient, + caplog: pytest.LogCaptureFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that `purge` falls back to drop+recreate when the underlying client raises `NotImplementedError`. + + Some storage clients (e.g. the Apify platform client) do not support purging. In that case `purge` should + drop and recreate the queue instead of propagating the error, so that callers like `BasicCrawler` keep + working on repeated runs. + """ + rq = await RequestQueue.open( + name='purge-fallback-test', + storage_client=storage_client, + ) + + await rq.add_requests(['https://example.com/1', 'https://example.com/2']) + metadata = await rq.get_metadata() + assert metadata.pending_request_count == 2 + + async def _raise_not_implemented(self: object) -> None: + raise NotImplementedError('Purge is not supported.') + + monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented) + + with caplog.at_level('WARNING'): + await rq.purge() + + assert any('does not support purging' in rec.message for rec in caplog.records) + + # The queue should be empty and usable. + metadata = await rq.get_metadata() + assert metadata.pending_request_count == 0 + assert metadata.total_request_count == 0 + assert metadata.handled_request_count == 0 + + await rq.add_request('https://example.com/after-purge') + request = await rq.fetch_next_request() + assert request is not None + assert request.url == 'https://example.com/after-purge' + + await rq.drop() + + async def test_open_with_alias( storage_client: StorageClient, ) -> None: From 9278cac8dd80a84d145f3fe2f4a4704f5f8a6058 Mon Sep 17 00:00:00 2001 From: Vlada Dusek Date: Thu, 7 May 2026 16:26:16 +0200 Subject: [PATCH 2/2] if missing purge, do not drop named storages --- src/crawlee/storages/_request_queue.py | 17 ++++-- tests/unit/storages/test_request_queue.py | 66 +++++++++++++++++++---- 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/src/crawlee/storages/_request_queue.py b/src/crawlee/storages/_request_queue.py index c063cc311b..286dfde3a4 100644 --- a/src/crawlee/storages/_request_queue.py +++ b/src/crawlee/storages/_request_queue.py @@ -153,12 +153,23 @@ async def purge(self) -> None: try: await self._client.purge() except NotImplementedError: + client_name = type(self._client).__name__ + if self._name is not None: + logger.warning( + f'Storage client "{client_name}" does not support purging the request queue. ' + f'Skipping purge for named queue "{self._name}" to avoid destroying persistent data; ' + 'the queue contents are left intact.' + ) + return logger.warning( - f'Storage client "{type(self._client).__name__}" does not support purging the request queue. ' - 'Falling back to dropping and recreating it; the request queue ID may change.' + f'Storage client "{client_name}" does not support purging the request queue. ' + 'Falling back to dropping and recreating the unnamed queue; the request queue ID may change.' ) await self.drop() - new_rq = await RequestQueue.open(name=self._name) + # Override `purge_on_start` so the storage client does not try to purge the freshly recreated + # (and necessarily empty) queue and re-raise the same `NotImplementedError`. + recreate_config = service_locator.get_configuration().model_copy(update={'purge_on_start': False}) + new_rq = await RequestQueue.open(configuration=recreate_config) self._client = new_rq._client # noqa: SLF001 self._id = new_rq._id # noqa: SLF001 diff --git a/tests/unit/storages/test_request_queue.py b/tests/unit/storages/test_request_queue.py index b5418b2559..d77d524150 100644 --- a/tests/unit/storages/test_request_queue.py +++ b/tests/unit/storages/test_request_queue.py @@ -715,21 +715,19 @@ async def test_purge( await rq.drop() -async def test_purge_falls_back_to_drop_when_not_implemented( +async def test_purge_falls_back_to_drop_for_unnamed_queue_when_not_implemented( storage_client: StorageClient, caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch, ) -> None: - """Test that `purge` falls back to drop+recreate when the underlying client raises `NotImplementedError`. + """Test that `purge` falls back to drop+recreate for unnamed queues when the client raises `NotImplementedError`. - Some storage clients (e.g. the Apify platform client) do not support purging. In that case `purge` should - drop and recreate the queue instead of propagating the error, so that callers like `BasicCrawler` keep - working on repeated runs. + Some storage clients (e.g. the Apify platform client) do not support purging. For the default unnamed queue + used by `BasicCrawler`, `purge` should drop and recreate the queue so that callers keep working on repeated + runs. Named queues are handled separately to avoid destroying persistent data. """ - rq = await RequestQueue.open( - name='purge-fallback-test', - storage_client=storage_client, - ) + rq = await RequestQueue.open(storage_client=storage_client) + assert rq.name is None await rq.add_requests(['https://example.com/1', 'https://example.com/2']) metadata = await rq.get_metadata() @@ -743,13 +741,16 @@ async def _raise_not_implemented(self: object) -> None: with caplog.at_level('WARNING'): await rq.purge() - assert any('does not support purging' in rec.message for rec in caplog.records) + assert any( + 'does not support purging' in rec.message and 'dropping and recreating' in rec.message for rec in caplog.records + ) - # The queue should be empty and usable. + # The queue should be empty, usable, and backed by a fresh client (id may differ for backends that mint new ids). metadata = await rq.get_metadata() assert metadata.pending_request_count == 0 assert metadata.total_request_count == 0 assert metadata.handled_request_count == 0 + assert rq.id is not None await rq.add_request('https://example.com/after-purge') request = await rq.fetch_next_request() @@ -759,6 +760,49 @@ async def _raise_not_implemented(self: object) -> None: await rq.drop() +async def test_purge_skips_named_queue_when_not_implemented( + storage_client: StorageClient, + caplog: pytest.LogCaptureFixture, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Test that `purge` is a logged no-op for named queues when the client raises `NotImplementedError`. + + Named queues are considered persistent (e.g. shared across runs on the Apify platform), so falling back + to drop+recreate would silently destroy user data. Instead `purge` logs a warning and leaves the queue + intact. + """ + rq = await RequestQueue.open( + name='purge-fallback-named-test', + storage_client=storage_client, + ) + original_id = rq.id + + await rq.add_requests(['https://example.com/1', 'https://example.com/2']) + metadata = await rq.get_metadata() + assert metadata.pending_request_count == 2 + + async def _raise_not_implemented(self: object) -> None: + raise NotImplementedError('Purge is not supported.') + + monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented) + + with caplog.at_level('WARNING'): + await rq.purge() + + assert any( + 'does not support purging' in rec.message and 'Skipping purge for named queue' in rec.message + for rec in caplog.records + ) + + # Queue identity and contents must be preserved. + assert rq.id == original_id + metadata = await rq.get_metadata() + assert metadata.pending_request_count == 2 + assert metadata.total_request_count == 2 + + await rq.drop() + + async def test_open_with_alias( storage_client: StorageClient, ) -> None: