Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/crawlee/storages/_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,17 @@ async def drop(self) -> None:

@override
async def purge(self) -> None:
await self._client.purge()
try:
await self._client.purge()
except NotImplementedError:
logger.warning(
f'Storage client "{type(self._client).__name__}" does not support purging the request queue. '
'Falling back to dropping and recreating it; the request queue ID may change.'
)
await self.drop()
new_rq = await RequestQueue.open(name=self._name)
Comment thread
vdusek marked this conversation as resolved.
Outdated
self._client = new_rq._client # noqa: SLF001
self._id = new_rq._id # noqa: SLF001

@override
async def add_request(
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/storages/test_request_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,50 @@ async def test_purge(
await rq.drop()


async def test_purge_falls_back_to_drop_when_not_implemented(
storage_client: StorageClient,
caplog: pytest.LogCaptureFixture,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that `purge` falls back to drop+recreate when the underlying client raises `NotImplementedError`.

Some storage clients (e.g. the Apify platform client) do not support purging. In that case `purge` should
drop and recreate the queue instead of propagating the error, so that callers like `BasicCrawler` keep
working on repeated runs.
"""
rq = await RequestQueue.open(
name='purge-fallback-test',
storage_client=storage_client,
)

await rq.add_requests(['https://example.com/1', 'https://example.com/2'])
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 2

async def _raise_not_implemented(self: object) -> None:
raise NotImplementedError('Purge is not supported.')

monkeypatch.setattr(type(rq._client), 'purge', _raise_not_implemented)

with caplog.at_level('WARNING'):
await rq.purge()

assert any('does not support purging' in rec.message for rec in caplog.records)

# The queue should be empty and usable.
metadata = await rq.get_metadata()
assert metadata.pending_request_count == 0
assert metadata.total_request_count == 0
assert metadata.handled_request_count == 0

await rq.add_request('https://example.com/after-purge')
request = await rq.fetch_next_request()
assert request is not None
assert request.url == 'https://example.com/after-purge'

await rq.drop()


async def test_open_with_alias(
storage_client: StorageClient,
) -> None:
Expand Down
Loading