diff --git a/docs/03_concepts/code/03_dataset_exports.py b/docs/03_concepts/code/03_dataset_exports.py index 78f0f5b9..4f0c01c4 100644 --- a/docs/03_concepts/code/03_dataset_exports.py +++ b/docs/03_concepts/code/03_dataset_exports.py @@ -11,14 +11,14 @@ async def main() -> None: await dataset.export_to( content_type='csv', key='data.csv', - to_key_value_store_name='my-cool-key-value-store', + to_kvs_name='my-cool-key-value-store', ) # Export the data as JSON await dataset.export_to( content_type='json', key='data.json', - to_key_value_store_name='my-cool-key-value-store', + to_kvs_name='my-cool-key-value-store', ) # Print the exported records diff --git a/docs/03_concepts/code/conditional_actor_charge.py b/docs/03_concepts/code/conditional_actor_charge.py index 926c591d..08e2d073 100644 --- a/docs/03_concepts/code/conditional_actor_charge.py +++ b/docs/03_concepts/code/conditional_actor_charge.py @@ -6,8 +6,7 @@ async def main() -> None: # Check the dataset because there might already be items # if the run migrated or was restarted default_dataset = await Actor.open_dataset() - dataset_info = await default_dataset.get_info() - charged_items = dataset_info.item_count if dataset_info else 0 + charged_items = default_dataset.metadata.item_count # highlight-start if Actor.get_charging_manager().get_pricing_info().is_pay_per_event: diff --git a/pyproject.toml b/pyproject.toml index 44ffe667..46157847 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,8 @@ keywords = [ dependencies = [ "apify-client>=1.9.2", "apify-shared>=1.3.0", - "crawlee~=0.6.0", + "cachetools>=5.5.0", + "crawlee@git+https://github.com/apify/crawlee-python.git@new-storage-clients", "cryptography>=42.0.0", "httpx>=0.27.0", # TODO: ensure compatibility with the latest version of lazy-object-proxy @@ -72,12 +73,16 @@ dev = [ "pytest~=8.4.0", "respx~=0.22.0", "ruff~=0.11.0", - "setuptools", # setuptools are used by pytest but not explicitly required + "setuptools", # setuptools are used by pytest but not explicitly required + "types-cachetools>=6.0.0.20250525", ] [tool.hatch.build.targets.wheel] packages = ["src/apify"] +[tool.hatch.metadata] +allow-direct-references = true + [tool.ruff] line-length = 120 include = ["src/**/*.py", "tests/**/*.py", "docs/**/*.py", "website/**/*.py"] diff --git a/src/apify/_actor.py b/src/apify/_actor.py index b8fcdc05..7b5cdabd 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -88,7 +88,7 @@ def __init__( # Create an instance of the cloud storage client, the local storage client is obtained # from the service locator. - self._cloud_storage_client = ApifyStorageClient.from_config(config=self._configuration) + self._cloud_storage_client = ApifyStorageClient() # Set the event manager based on whether the Actor is running on the platform or locally. self._event_manager = ( diff --git a/src/apify/apify_storage_client/__init__.py b/src/apify/apify_storage_client/__init__.py index 8b6d517c..4af7c8ee 100644 --- a/src/apify/apify_storage_client/__init__.py +++ b/src/apify/apify_storage_client/__init__.py @@ -1,3 +1,11 @@ -from apify.apify_storage_client._apify_storage_client import ApifyStorageClient +from ._dataset_client import ApifyDatasetClient +from ._key_value_store_client import ApifyKeyValueStoreClient +from ._request_queue_client import ApifyRequestQueueClient +from ._storage_client import ApifyStorageClient -__all__ = ['ApifyStorageClient'] +__all__ = [ + 'ApifyDatasetClient', + 'ApifyKeyValueStoreClient', + 'ApifyRequestQueueClient', + 'ApifyStorageClient', +] diff --git a/src/apify/apify_storage_client/_apify_storage_client.py b/src/apify/apify_storage_client/_apify_storage_client.py deleted file mode 100644 index 51e3fc24..00000000 --- a/src/apify/apify_storage_client/_apify_storage_client.py +++ /dev/null @@ -1,72 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from typing_extensions import override - -from apify_client import ApifyClientAsync -from crawlee._utils.crypto import crypto_random_object_id -from crawlee.storage_clients import StorageClient - -from apify._utils import docs_group -from apify.apify_storage_client._dataset_client import DatasetClient -from apify.apify_storage_client._dataset_collection_client import DatasetCollectionClient -from apify.apify_storage_client._key_value_store_client import KeyValueStoreClient -from apify.apify_storage_client._key_value_store_collection_client import KeyValueStoreCollectionClient -from apify.apify_storage_client._request_queue_client import RequestQueueClient -from apify.apify_storage_client._request_queue_collection_client import RequestQueueCollectionClient - -if TYPE_CHECKING: - from apify._configuration import Configuration - - -@docs_group('Classes') -class ApifyStorageClient(StorageClient): - """A storage client implementation based on the Apify platform storage.""" - - def __init__(self, *, configuration: Configuration) -> None: - self._client_key = crypto_random_object_id() - self._apify_client = ApifyClientAsync( - token=configuration.token, - api_url=configuration.api_base_url, - max_retries=8, - min_delay_between_retries_millis=500, - timeout_secs=360, - ) - self._configuration = configuration - - @classmethod - def from_config(cls, config: Configuration) -> ApifyStorageClient: - return cls(configuration=config) - - @override - def dataset(self, id: str) -> DatasetClient: - return DatasetClient(self._apify_client.dataset(id)) - - @override - def datasets(self) -> DatasetCollectionClient: - return DatasetCollectionClient(self._apify_client.datasets()) - - @override - def key_value_store(self, id: str) -> KeyValueStoreClient: - return KeyValueStoreClient(self._apify_client.key_value_store(id), self._configuration.api_public_base_url) - - @override - def key_value_stores(self) -> KeyValueStoreCollectionClient: - return KeyValueStoreCollectionClient(self._apify_client.key_value_stores()) - - @override - def request_queue(self, id: str) -> RequestQueueClient: - return RequestQueueClient(self._apify_client.request_queue(id, client_key=self._client_key)) - - @override - def request_queues(self) -> RequestQueueCollectionClient: - return RequestQueueCollectionClient(self._apify_client.request_queues()) - - @override - async def purge_on_start(self) -> None: - pass - - @override - def get_rate_limit_errors(self) -> dict[int, int]: - return self._apify_client.stats.rate_limit_errors diff --git a/src/apify/apify_storage_client/_dataset_client.py b/src/apify/apify_storage_client/_dataset_client.py index 93c8d575..a1a4e899 100644 --- a/src/apify/apify_storage_client/_dataset_client.py +++ b/src/apify/apify_storage_client/_dataset_client.py @@ -1,190 +1,183 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import asyncio +from logging import getLogger +from typing import TYPE_CHECKING, Any from typing_extensions import override -from crawlee.storage_clients._base import DatasetClient as BaseDatasetClient +from apify_client import ApifyClientAsync +from crawlee.storage_clients._base import DatasetClient from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata if TYPE_CHECKING: from collections.abc import AsyncIterator - from contextlib import AbstractAsyncContextManager - - from httpx import Response + from datetime import datetime from apify_client.clients import DatasetClientAsync - from crawlee._types import JsonSerializable - + from crawlee.configuration import Configuration -class DatasetClient(BaseDatasetClient): - """Dataset resource client implementation based on the Apify platform storage.""" +logger = getLogger(__name__) - def __init__(self, apify_dataset_client: DatasetClientAsync) -> None: - self._client = apify_dataset_client - @override - async def get(self) -> DatasetMetadata | None: - result = await self._client.get() - return DatasetMetadata.model_validate(result) if result else None +class ApifyDatasetClient(DatasetClient): + """An Apify platform implementation of the dataset client.""" - @override - async def update( + def __init__( self, *, - name: str | None = None, - ) -> DatasetMetadata: - return DatasetMetadata.model_validate( - await self._client.update( - name=name, - ) + id: str, + name: str | None, + created_at: datetime, + accessed_at: datetime, + modified_at: datetime, + item_count: int, + api_client: DatasetClientAsync, + ) -> None: + """Initialize a new instance. + + Preferably use the `ApifyDatasetClient.open` class method to create a new instance. + """ + self._metadata = DatasetMetadata( + id=id, + name=name, + created_at=created_at, + accessed_at=accessed_at, + modified_at=modified_at, + item_count=item_count, ) + self._api_client = api_client + """The Apify dataset client for API operations.""" + + self._lock = asyncio.Lock() + """A lock to ensure that only one operation is performed at a time.""" + + @property @override - async def delete(self) -> None: - await self._client.delete() + def metadata(self) -> DatasetMetadata: + return self._metadata @override - async def list_items( - self, + @classmethod + async def open( + cls, *, - offset: int | None = 0, - limit: int | None = BaseDatasetClient._LIST_ITEMS_LIMIT, # noqa: SLF001 - clean: bool = False, - desc: bool = False, - fields: list[str] | None = None, - omit: list[str] | None = None, - unwind: str | None = None, - skip_empty: bool = False, - skip_hidden: bool = False, - flatten: list[str] | None = None, - view: str | None = None, - ) -> DatasetItemsListPage: - return DatasetItemsListPage.model_validate( - vars( - await self._client.list_items( - offset=offset, - limit=limit, - clean=clean, - desc=desc, - fields=fields, - omit=omit, - unwind=unwind, - skip_empty=skip_empty, - skip_hidden=skip_hidden, - flatten=flatten, - view=view, - ) - ) + id: str | None, + name: str | None, + configuration: Configuration, + ) -> ApifyDatasetClient: + token = getattr(configuration, 'token', None) + api_url = getattr(configuration, 'api_base_url', 'https://api.apify.com') + + # Otherwise, create a new one. + apify_client_async = ApifyClientAsync( + token=token, + api_url=api_url, + max_retries=8, + min_delay_between_retries_millis=500, + timeout_secs=360, + ) + + apify_datasets_client = apify_client_async.datasets() + + metadata = DatasetMetadata.model_validate( + await apify_datasets_client.get_or_create(name=id if id is not None else name), + ) + + apify_dataset_client = apify_client_async.dataset(dataset_id=metadata.id) + + return cls( + id=metadata.id, + name=metadata.name, + created_at=metadata.created_at, + accessed_at=metadata.accessed_at, + modified_at=metadata.modified_at, + item_count=metadata.item_count, + api_client=apify_dataset_client, ) @override - async def iterate_items( - self, - *, - offset: int = 0, - limit: int | None = None, - clean: bool = False, - desc: bool = False, - fields: list[str] | None = None, - omit: list[str] | None = None, - unwind: str | None = None, - skip_empty: bool = False, - skip_hidden: bool = False, - ) -> AsyncIterator[dict]: - async for item in self._client.iterate_items( - offset=offset, - limit=limit, - clean=clean, - desc=desc, - fields=fields, - omit=omit, - unwind=unwind, - skip_empty=skip_empty, - skip_hidden=skip_hidden, - ): - yield item + async def purge(self) -> None: + # TODO: better? + # https://github.com/apify/apify-sdk-python/issues/469 + async with self._lock: + await self._api_client.delete() @override - async def get_items_as_bytes( + async def drop(self) -> None: + async with self._lock: + await self._api_client.delete() + + @override + async def push_data(self, data: list[Any] | dict[str, Any]) -> None: + async with self._lock: + await self._api_client.push_items(items=data) + await self._update_metadata() + + @override + async def get_data( self, *, - item_format: str = 'json', - offset: int | None = None, - limit: int | None = None, - desc: bool = False, + offset: int = 0, + limit: int | None = 999_999_999_999, clean: bool = False, - bom: bool = False, - delimiter: str | None = None, + desc: bool = False, fields: list[str] | None = None, omit: list[str] | None = None, unwind: str | None = None, skip_empty: bool = False, - skip_header_row: bool = False, skip_hidden: bool = False, - xml_root: str | None = None, - xml_row: str | None = None, flatten: list[str] | None = None, - ) -> bytes: - return await self._client.get_items_as_bytes( - item_format=item_format, + view: str | None = None, + ) -> DatasetItemsListPage: + response = await self._api_client.list_items( offset=offset, limit=limit, - desc=desc, clean=clean, - bom=bom, - delimiter=delimiter, + desc=desc, fields=fields, omit=omit, unwind=unwind, skip_empty=skip_empty, - skip_header_row=skip_header_row, skip_hidden=skip_hidden, - xml_root=xml_root, - xml_row=xml_row, flatten=flatten, + view=view, ) + result = DatasetItemsListPage.model_validate(vars(response)) + await self._update_metadata() + return result @override - async def stream_items( + async def iterate_items( self, *, - item_format: str = 'json', - offset: int | None = None, + offset: int = 0, limit: int | None = None, - desc: bool = False, clean: bool = False, - bom: bool = False, - delimiter: str | None = None, + desc: bool = False, fields: list[str] | None = None, omit: list[str] | None = None, unwind: str | None = None, skip_empty: bool = False, - skip_header_row: bool = False, skip_hidden: bool = False, - xml_root: str | None = None, - xml_row: str | None = None, - ) -> AbstractAsyncContextManager[Response | None]: - return self._client.stream_items( - item_format=item_format, + ) -> AsyncIterator[dict]: + async for item in self._api_client.iterate_items( offset=offset, limit=limit, - desc=desc, clean=clean, - bom=bom, - delimiter=delimiter, + desc=desc, fields=fields, omit=omit, unwind=unwind, skip_empty=skip_empty, - skip_header_row=skip_header_row, skip_hidden=skip_hidden, - xml_root=xml_root, - xml_row=xml_row, - ) + ): + yield item - @override - async def push_items(self, items: JsonSerializable) -> None: - await self._client.push_items( - items=items, - ) + await self._update_metadata() + + async def _update_metadata(self) -> None: + """Update the dataset metadata file with current information.""" + metadata = await self._api_client.get() + self._metadata = DatasetMetadata.model_validate(metadata) diff --git a/src/apify/apify_storage_client/_dataset_collection_client.py b/src/apify/apify_storage_client/_dataset_collection_client.py deleted file mode 100644 index f8ffc3e8..00000000 --- a/src/apify/apify_storage_client/_dataset_collection_client.py +++ /dev/null @@ -1,51 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from typing_extensions import override - -from crawlee.storage_clients._base import DatasetCollectionClient as BaseDatasetCollectionClient -from crawlee.storage_clients.models import DatasetListPage, DatasetMetadata - -if TYPE_CHECKING: - from apify_client.clients import DatasetCollectionClientAsync - - -class DatasetCollectionClient(BaseDatasetCollectionClient): - """Dataset collection resource client implementation based on the Apify platform storage.""" - - def __init__(self, apify_dataset_collection_client: DatasetCollectionClientAsync) -> None: - self._client = apify_dataset_collection_client - - @override - async def get_or_create( - self, - *, - id: str | None = None, - name: str | None = None, - schema: dict | None = None, - ) -> DatasetMetadata: - return DatasetMetadata.model_validate( - await self._client.get_or_create( - name=id if id is not None else name, - schema=schema, - ) - ) - - @override - async def list( - self, - *, - unnamed: bool = False, - limit: int | None = None, - offset: int | None = None, - desc: bool = False, - ) -> DatasetListPage: - return DatasetListPage.model_validate( - await self._client.list( - unnamed=unnamed, - limit=limit, - offset=offset, - desc=desc, - ) - ) diff --git a/src/apify/apify_storage_client/_key_value_store_client.py b/src/apify/apify_storage_client/_key_value_store_client.py index 49883b3f..2d423f1d 100644 --- a/src/apify/apify_storage_client/_key_value_store_client.py +++ b/src/apify/apify_storage_client/_key_value_store_client.py @@ -1,90 +1,172 @@ from __future__ import annotations -from contextlib import asynccontextmanager +import asyncio +from logging import getLogger from typing import TYPE_CHECKING, Any from typing_extensions import override from yarl import URL -from crawlee.storage_clients._base import KeyValueStoreClient as BaseKeyValueStoreClient -from crawlee.storage_clients.models import KeyValueStoreListKeysPage, KeyValueStoreMetadata, KeyValueStoreRecord +from apify_client import ApifyClientAsync +from crawlee.storage_clients._base import KeyValueStoreClient +from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecord, KeyValueStoreRecordMetadata +from ._models import KeyValueStoreListKeysPage from apify._crypto import create_hmac_signature if TYPE_CHECKING: from collections.abc import AsyncIterator - from contextlib import AbstractAsyncContextManager - - from httpx import Response + from datetime import datetime from apify_client.clients import KeyValueStoreClientAsync + from crawlee.configuration import Configuration +logger = getLogger(__name__) -class KeyValueStoreClient(BaseKeyValueStoreClient): - """Key-value store resource client implementation based on the Apify platform storage.""" - - def __init__(self, apify_key_value_store_client: KeyValueStoreClientAsync, api_public_base_url: str) -> None: - self._client = apify_key_value_store_client - self._api_public_base_url = api_public_base_url - @override - async def get(self) -> KeyValueStoreMetadata | None: - result = await self._client.get() - return KeyValueStoreMetadata.model_validate(result) if result else None +class ApifyKeyValueStoreClient(KeyValueStoreClient): + """An Apify platform implementation of the key-value store client.""" - @override - async def update( + def __init__( self, *, - name: str | None = None, - ) -> KeyValueStoreMetadata: - return KeyValueStoreMetadata.model_validate(await self._client.update()) + id: str, + name: str | None, + created_at: datetime, + accessed_at: datetime, + modified_at: datetime, + api_client: KeyValueStoreClientAsync, + ) -> None: + """Initialize a new instance. + + Preferably use the `ApifyKeyValueStoreClient.open` class method to create a new instance. + """ + self._metadata = KeyValueStoreMetadata( + id=id, + name=name, + created_at=created_at, + accessed_at=accessed_at, + modified_at=modified_at, + ) + + self._api_client = api_client + """The Apify key-value store client for API operations.""" + + self._lock = asyncio.Lock() + """A lock to ensure that only one operation is performed at a time.""" + @property @override - async def delete(self) -> None: - await self._client.delete() + def metadata(self) -> KeyValueStoreMetadata: + return self._metadata @override - async def list_keys( - self, + @classmethod + async def open( + cls, *, - limit: int = 1000, - exclusive_start_key: str | None = None, - ) -> KeyValueStoreListKeysPage: - return KeyValueStoreListKeysPage.model_validate(await self._client.list_keys()) + id: str | None, + name: str | None, + configuration: Configuration, + ) -> ApifyKeyValueStoreClient: + token = getattr(configuration, 'token', None) + api_url = getattr(configuration, 'api_base_url', 'https://api.apify.com') + + # Otherwise, create a new one. + apify_client_async = ApifyClientAsync( + token=token, + api_url=api_url, + max_retries=8, + min_delay_between_retries_millis=500, + timeout_secs=360, + ) + + apify_kvss_client = apify_client_async.key_value_stores() + + metadata = KeyValueStoreMetadata.model_validate( + await apify_kvss_client.get_or_create(name=id if id is not None else name), + ) + + apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=metadata.id) + + return cls( + id=metadata.id, + name=metadata.name, + created_at=metadata.created_at, + accessed_at=metadata.accessed_at, + modified_at=metadata.modified_at, + api_client=apify_kvs_client, + ) @override - async def get_record(self, key: str) -> KeyValueStoreRecord | None: - result = await self._client.get_record(key) - return KeyValueStoreRecord.model_validate(result) if result else None + async def purge(self) -> None: + # TODO: better? + # https://github.com/apify/apify-sdk-python/issues/469 + async with self._lock: + await self._api_client.delete() @override - async def get_record_as_bytes(self, key: str) -> KeyValueStoreRecord | None: - result = await self._client.get_record_as_bytes(key) - return KeyValueStoreRecord.model_validate(result) if result else None + async def drop(self) -> None: + async with self._lock: + await self._api_client.delete() @override - async def stream_record(self, key: str) -> AbstractAsyncContextManager[KeyValueStoreRecord[Response] | None]: - return self._stream_record_internal(key) + async def get_value(self, key: str) -> KeyValueStoreRecord | None: + response = await self._api_client.get_record(key) + record = KeyValueStoreRecord.model_validate(response) if response else None + await self._update_metadata() + return record - @asynccontextmanager - async def _stream_record_internal(self, key: str) -> AsyncIterator[KeyValueStoreRecord[Response] | None]: - async with self._client.stream_record(key) as response: - yield KeyValueStoreRecord.model_validate(response) + @override + async def set_value(self, key: str, value: Any, content_type: str | None = None) -> None: + async with self._lock: + await self._api_client.set_record( + key=key, + value=value, + content_type=content_type, + ) + await self._update_metadata() @override - async def set_record(self, key: str, value: Any, content_type: str | None = None) -> None: - await self._client.set_record( - key=key, - value=value, - content_type=content_type, - ) + async def delete_value(self, key: str) -> None: + async with self._lock: + await self._api_client.delete_record(key=key) + await self._update_metadata() @override - async def delete_record(self, key: str) -> None: - await self._client.delete_record( - key=key, - ) + async def iterate_keys( + self, + *, + exclusive_start_key: str | None = None, + limit: int | None = None, + ) -> AsyncIterator[KeyValueStoreRecordMetadata]: + count = 0 + + while True: + response = await self._api_client.list_keys(exclusive_start_key=exclusive_start_key) + list_key_page = KeyValueStoreListKeysPage.model_validate(response) + + for item in list_key_page.items: + # Convert KeyValueStoreKeyInfo to KeyValueStoreRecordMetadata + record_metadata = KeyValueStoreRecordMetadata( + key=item.key, + size=item.size, + content_type='application/octet-stream', # Content type not available from list_keys + ) + yield record_metadata + count += 1 + + # If we've reached the limit, stop yielding + if limit and count >= limit: + break + + # If we've reached the limit or there are no more pages, exit the loop + if (limit and count >= limit) or not list_key_page.is_truncated: + break + + exclusive_start_key = list_key_page.next_exclusive_start_key + + await self._update_metadata() async def get_public_url(self, key: str) -> str: """Get a URL for the given key that may be used to publicly access the value in the remote key-value store. @@ -92,18 +174,23 @@ async def get_public_url(self, key: str) -> str: Args: key: The key for which the URL should be generated. """ - if self._client.resource_id is None: + if self._api_client.resource_id is None: raise ValueError('resource_id cannot be None when generating a public URL') public_url = ( - URL(self._api_public_base_url) / 'v2' / 'key-value-stores' / self._client.resource_id / 'records' / key + URL(self._api_client.base_url) / 'v2' / 'key-value-stores' / self._api_client.resource_id / 'records' / key ) - key_value_store = await self.get() + key_value_store = self.metadata - if key_value_store is not None and isinstance(key_value_store.model_extra, dict): + if key_value_store and key_value_store.model_extra: url_signing_secret_key = key_value_store.model_extra.get('urlSigningSecretKey') if url_signing_secret_key: public_url = public_url.with_query(signature=create_hmac_signature(url_signing_secret_key, key)) return str(public_url) + + async def _update_metadata(self) -> None: + """Update the key-value store metadata with current information.""" + metadata = await self._api_client.get() + self._metadata = KeyValueStoreMetadata.model_validate(metadata) diff --git a/src/apify/apify_storage_client/_key_value_store_collection_client.py b/src/apify/apify_storage_client/_key_value_store_collection_client.py deleted file mode 100644 index 0d4caca7..00000000 --- a/src/apify/apify_storage_client/_key_value_store_collection_client.py +++ /dev/null @@ -1,51 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from typing_extensions import override - -from crawlee.storage_clients._base import KeyValueStoreCollectionClient as BaseKeyValueStoreCollectionClient -from crawlee.storage_clients.models import KeyValueStoreListPage, KeyValueStoreMetadata - -if TYPE_CHECKING: - from apify_client.clients import KeyValueStoreCollectionClientAsync - - -class KeyValueStoreCollectionClient(BaseKeyValueStoreCollectionClient): - """Key-value store collection resource client implementation based on the Apify platform storage.""" - - def __init__(self, apify_dataset_collection_client: KeyValueStoreCollectionClientAsync) -> None: - self._client = apify_dataset_collection_client - - @override - async def get_or_create( - self, - *, - id: str | None = None, - name: str | None = None, - schema: dict | None = None, - ) -> KeyValueStoreMetadata: - return KeyValueStoreMetadata.model_validate( - await self._client.get_or_create( - name=id if id is not None else name, - schema=schema, - ) - ) - - @override - async def list( - self, - *, - unnamed: bool = False, - limit: int | None = None, - offset: int | None = None, - desc: bool = False, - ) -> KeyValueStoreListPage: - return KeyValueStoreListPage.model_validate( - await self._client.list( - unnamed=unnamed, - limit=limit, - offset=offset, - desc=desc, - ) - ) diff --git a/src/apify/apify_storage_client/_models.py b/src/apify/apify_storage_client/_models.py new file mode 100644 index 00000000..dd94ec56 --- /dev/null +++ b/src/apify/apify_storage_client/_models.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Annotated + +from pydantic import BaseModel, ConfigDict, Field + +from crawlee import Request +from crawlee._utils.docs import docs_group + + +@docs_group('Data structures') +class ProlongRequestLockResponse(BaseModel): + """Response to prolong request lock calls.""" + + model_config = ConfigDict(populate_by_name=True) + + lock_expires_at: Annotated[datetime, Field(alias='lockExpiresAt')] + + +@docs_group('Data structures') +class RequestQueueHead(BaseModel): + """Model for request queue head. + + Represents a collection of requests retrieved from the beginning of a queue, + including metadata about the queue's state and lock information for the requests. + """ + + model_config = ConfigDict(populate_by_name=True) + + limit: Annotated[int | None, Field(alias='limit', default=None)] + """The maximum number of requests that were requested from the queue.""" + + had_multiple_clients: Annotated[bool, Field(alias='hadMultipleClients', default=False)] + """Indicates whether the queue has been accessed by multiple clients (consumers).""" + + queue_modified_at: Annotated[datetime, Field(alias='queueModifiedAt')] + """The timestamp when the queue was last modified.""" + + lock_time: Annotated[timedelta | None, Field(alias='lockSecs', default=None)] + """The duration for which the returned requests are locked and cannot be processed by other clients.""" + + queue_has_locked_requests: Annotated[bool | None, Field(alias='queueHasLockedRequests', default=False)] + """Indicates whether the queue contains any locked requests.""" + + items: Annotated[list[Request], Field(alias='items', default_factory=list[Request])] + """The list of request objects retrieved from the beginning of the queue.""" + + +class KeyValueStoreKeyInfo(BaseModel): + """Model for a key-value store key info.""" + + model_config = ConfigDict(populate_by_name=True) + + key: Annotated[str, Field(alias='key')] + size: Annotated[int, Field(alias='size')] + + +class KeyValueStoreListKeysPage(BaseModel): + """Model for listing keys in the key-value store.""" + + model_config = ConfigDict(populate_by_name=True) + + count: Annotated[int, Field(alias='count')] + limit: Annotated[int, Field(alias='limit')] + is_truncated: Annotated[bool, Field(alias='isTruncated')] + items: Annotated[list[KeyValueStoreKeyInfo], Field(alias='items', default_factory=list)] + exclusive_start_key: Annotated[str | None, Field(alias='exclusiveStartKey', default=None)] + next_exclusive_start_key: Annotated[str | None, Field(alias='nextExclusiveStartKey', default=None)] + + +class CachedRequest(BaseModel): + """Pydantic model for cached request information.""" + + id: str + """The ID of the request.""" + + was_already_handled: bool + """Whether the request was already handled.""" + + hydrated: Request | None = None + """The hydrated request object (the original one).""" + + lock_expires_at: datetime | None = None + """The expiration time of the lock on the request.""" + + forefront: bool = False + """Whether the request was added to the forefront of the queue.""" diff --git a/src/apify/apify_storage_client/_request_queue_client.py b/src/apify/apify_storage_client/_request_queue_client.py index 036eb2ab..6352b1a9 100644 --- a/src/apify/apify_storage_client/_request_queue_client.py +++ b/src/apify/apify_storage_client/_request_queue_client.py @@ -1,176 +1,628 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import asyncio +from collections import deque +from datetime import datetime, timedelta, timezone +from logging import getLogger +from typing import TYPE_CHECKING, Final +from cachetools import LRUCache from typing_extensions import override +from apify_client import ApifyClientAsync from crawlee import Request -from crawlee.storage_clients._base import RequestQueueClient as BaseRequestQueueClient -from crawlee.storage_clients.models import ( - BatchRequestsOperationResponse, - ProcessedRequest, - ProlongRequestLockResponse, - RequestQueueHead, - RequestQueueHeadWithLocks, - RequestQueueMetadata, -) +from crawlee._utils.requests import unique_key_to_request_id +from crawlee.storage_clients._base import RequestQueueClient +from crawlee.storage_clients.models import AddRequestsResponse, ProcessedRequest, RequestQueueMetadata + +from ._models import CachedRequest, ProlongRequestLockResponse, RequestQueueHead if TYPE_CHECKING: from collections.abc import Sequence from apify_client.clients import RequestQueueClientAsync + from crawlee.configuration import Configuration +logger = getLogger(__name__) -class RequestQueueClient(BaseRequestQueueClient): - """Request queue resource client implementation based on the Apify platform storage.""" - def __init__(self, apify_request_queue_client: RequestQueueClientAsync) -> None: - self._client = apify_request_queue_client +class ApifyRequestQueueClient(RequestQueueClient): + """An Apify platform implementation of the request queue client.""" - @override - async def get(self) -> RequestQueueMetadata | None: - result = await self._client.get() - return RequestQueueMetadata.model_validate({'resourceDirectory': ''} | result) if result else None + _DEFAULT_LOCK_TIME: Final[timedelta] = timedelta(minutes=3) + """The default lock time for requests in the queue.""" - @override - async def update( + _MAX_CACHED_REQUESTS: Final[int] = 1_000_000 + """Maximum number of requests that can be cached.""" + + def __init__( self, *, - name: str | None = None, - ) -> RequestQueueMetadata: - return RequestQueueMetadata.model_validate( - {'resourceDirectory': ''} - | await self._client.update( - name=name, - ) + id: str, + name: str | None, + created_at: datetime, + accessed_at: datetime, + modified_at: datetime, + had_multiple_clients: bool, + handled_request_count: int, + pending_request_count: int, + stats: dict, + total_request_count: int, + api_client: RequestQueueClientAsync, + ) -> None: + """Initialize a new instance. + + Preferably use the `ApifyRequestQueueClient.open` class method to create a new instance. + """ + self._metadata = RequestQueueMetadata( + id=id, + name=name, + created_at=created_at, + accessed_at=accessed_at, + modified_at=modified_at, + had_multiple_clients=had_multiple_clients, + handled_request_count=handled_request_count, + pending_request_count=pending_request_count, + stats=stats, + total_request_count=total_request_count, ) + self._api_client = api_client + """The Apify request queue client for API operations.""" + + self._lock = asyncio.Lock() + """A lock to ensure that only one operation is performed at a time.""" + + self._queue_head = deque[str]() + """A deque to store request IDs in the queue head.""" + + self._requests_cache: LRUCache[str, CachedRequest] = LRUCache(maxsize=self._MAX_CACHED_REQUESTS) + """A cache to store request objects.""" + + self._queue_has_locked_requests: bool | None = None + """Whether the queue has requests locked by another client.""" + + self._should_check_for_forefront_requests = False + """Whether to check for forefront requests in the next list_head call.""" + + @property @override - async def delete(self) -> None: - await self._client.delete() + def metadata(self) -> RequestQueueMetadata: + return self._metadata @override - async def list_head(self, *, limit: int | None = None) -> RequestQueueHead: - return RequestQueueHead.model_validate( - await self._client.list_head( - limit=limit, - ), + @classmethod + async def open( + cls, + *, + id: str | None, + name: str | None, + configuration: Configuration, + ) -> ApifyRequestQueueClient: + # Get API credentials + token = getattr(configuration, 'token', None) + api_url = getattr(configuration, 'api_base_url', 'https://api.apify.com') + + # Create a new API client + apify_client_async = ApifyClientAsync( + token=token, + api_url=api_url, + max_retries=8, + min_delay_between_retries_millis=500, + timeout_secs=360, + ) + + apify_rqs_client = apify_client_async.request_queues() + + # Get or create the request queue + metadata = RequestQueueMetadata.model_validate( + await apify_rqs_client.get_or_create(name=id if id is not None else name), + ) + + apify_rq_client = apify_client_async.request_queue(request_queue_id=metadata.id) + + # Create the client instance + return cls( + id=metadata.id, + name=metadata.name, + created_at=metadata.created_at, + accessed_at=metadata.accessed_at, + modified_at=metadata.modified_at, + had_multiple_clients=metadata.had_multiple_clients, + handled_request_count=metadata.handled_request_count, + pending_request_count=metadata.pending_request_count, + stats=metadata.stats, + total_request_count=metadata.total_request_count, + api_client=apify_rq_client, ) @override - async def list_and_lock_head(self, *, lock_secs: int, limit: int | None = None) -> RequestQueueHeadWithLocks: - return RequestQueueHeadWithLocks.model_validate( - await self._client.list_and_lock_head( - lock_secs=lock_secs, - limit=limit, + async def purge(self) -> None: + # TODO: better? + # https://github.com/apify/apify-sdk-python/issues/469 + async with self._lock: + await self._api_client.delete() + + @override + async def drop(self) -> None: + async with self._lock: + await self._api_client.delete() + + @override + async def add_batch_of_requests( + self, + requests: Sequence[Request], + *, + forefront: bool = False, + ) -> AddRequestsResponse: + """Add a batch of requests to the queue. + + Args: + requests: The requests to add. + forefront: Whether to add the requests to the beginning of the queue. + + Returns: + Response containing information about the added requests. + """ + # Prepare requests for API by converting to dictionaries + requests_dict = [request.model_dump(by_alias=True) for request in requests] + + # Remove 'id' fields from requests as the API doesn't accept them + for request_dict in requests_dict: + if 'id' in request_dict: + del request_dict['id'] + + # Send requests to API + response = await self._api_client.batch_add_requests(requests=requests_dict, forefront=forefront) + + # Update metadata after adding requests + await self._update_metadata() + + return AddRequestsResponse.model_validate(response) + + @override + async def get_request(self, request_id: str) -> Request | None: + """Get a request by ID. + + Args: + request_id: The ID of the request to get. + + Returns: + The request or None if not found. + """ + response = await self._api_client.get_request(request_id) + await self._update_metadata() + + if response is None: + return None + + return Request.model_validate(**response) + + @override + async def fetch_next_request(self) -> Request | None: + """Return the next request in the queue to be processed. + + Once you successfully finish processing of the request, you need to call `mark_request_as_handled` + to mark the request as handled in the queue. If there was some error in processing the request, call + `reclaim_request` instead, so that the queue will give the request to some other consumer + in another call to the `fetch_next_request` method. + + Returns: + The request or `None` if there are no more pending requests. + """ + # Ensure the queue head has requests if available + await self._ensure_head_is_non_empty() + + # If queue head is empty after ensuring, there are no requests + if not self._queue_head: + return None + + # Get the next request ID from the queue head + next_request_id = self._queue_head.popleft() + request = await self._get_or_hydrate_request(next_request_id) + + # Handle potential inconsistency where request might not be in the main table yet + if request is None: + logger.debug( + 'Cannot find a request from the beginning of queue, will be retried later', + extra={'nextRequestId': next_request_id}, ) - ) + return None + + # If the request was already handled, skip it + if request.handled_at is not None: + logger.debug( + 'Request fetched from the beginning of queue was already handled', + extra={'nextRequestId': next_request_id}, + ) + return None + + return request @override - async def add_request( + async def mark_request_as_handled(self, request: Request) -> ProcessedRequest | None: + """Mark a request as handled after successful processing. + + Handled requests will never again be returned by the `fetch_next_request` method. + + Args: + request: The request to mark as handled. + + Returns: + Information about the queue operation. `None` if the given request was not in progress. + """ + # Set the handled_at timestamp if not already set + if request.handled_at is None: + request.handled_at = datetime.now(tz=timezone.utc) + + try: + # Update the request in the API + processed_request = await self._update_request(request) + processed_request.unique_key = request.unique_key + + # Update the cache with the handled request + cache_key = unique_key_to_request_id(request.unique_key) + self._cache_request( + cache_key, + processed_request, + forefront=False, + hydrated_request=request, + ) + + # Update metadata after marking request as handled + await self._update_metadata() + except Exception as exc: + logger.debug(f'Error marking request {request.id} as handled: {exc!s}') + return None + else: + return processed_request + + @override + async def reclaim_request( self, request: Request, *, forefront: bool = False, - ) -> ProcessedRequest: - return ProcessedRequest.model_validate( - {'id': request.id, 'uniqueKey': request.unique_key} - | await self._client.add_request( - request=request.model_dump( - by_alias=True, - exclude={ - 'id', - }, - ), + ) -> ProcessedRequest | None: + """Reclaim a failed request back to the queue. + + The request will be returned for processing later again by another call to `fetch_next_request`. + + Args: + request: The request to return to the queue. + forefront: Whether to add the request to the head or the end of the queue. + + Returns: + Information about the queue operation. `None` if the given request was not in progress. + """ + try: + # Update the request in the API + processed_request = await self._update_request(request, forefront=forefront) + processed_request.unique_key = request.unique_key + + # Update the cache + cache_key = unique_key_to_request_id(request.unique_key) + self._cache_request( + cache_key, + processed_request, forefront=forefront, + hydrated_request=request, ) - ) - @override - async def get_request(self, request_id: str) -> Request | None: - result = await self._client.get_request(request_id) - return Request.model_validate(result) if result else None + # If we're adding to the forefront, we need to check for forefront requests + # in the next list_head call + if forefront: + self._should_check_for_forefront_requests = True + + # Try to release the lock on the request + try: + await self._delete_request_lock(request.id, forefront=forefront) + except Exception as err: + logger.debug(f'Failed to delete request lock for request {request.id}', exc_info=err) + + # Update metadata after reclaiming request + await self._update_metadata() + except Exception as exc: + logger.debug(f'Error reclaiming request {request.id}: {exc!s}') + return None + else: + return processed_request @override - async def update_request( + async def is_empty(self) -> bool: + """Check if the queue is empty. + + Returns: + True if the queue is empty, False otherwise. + """ + head = await self._list_head(limit=1, lock_time=None) + return len(head.items) == 0 + + async def _ensure_head_is_non_empty(self) -> None: + """Ensure that the queue head has requests if they are available in the queue.""" + # If queue head has adequate requests, skip fetching more + if len(self._queue_head) > 1 and not self._should_check_for_forefront_requests: + return + + # Fetch requests from the API and populate the queue head + await self._list_head(lock_time=self._DEFAULT_LOCK_TIME) + + async def _get_or_hydrate_request(self, request_id: str) -> Request | None: + """Get a request by ID, either from cache or by fetching from API. + + Args: + request_id: The ID of the request to get. + + Returns: + The request if found and valid, otherwise None. + """ + # First check if the request is in our cache + cached_entry = self._requests_cache.get(request_id) + + if cached_entry and cached_entry.hydrated: + # If we have the request hydrated in cache, check if lock is expired + if cached_entry.lock_expires_at and cached_entry.lock_expires_at < datetime.now(tz=timezone.utc): + # Try to prolong the lock if it's expired + try: + lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds()) + response = await self._prolong_request_lock( + request_id, forefront=cached_entry.forefront, lock_secs=lock_secs + ) + cached_entry.lock_expires_at = response.lock_expires_at + except Exception: + # If prolonging the lock fails, we lost the request + logger.debug(f'Failed to prolong lock for request {request_id}, returning None') + return None + + return cached_entry.hydrated + + # If not in cache or not hydrated, fetch the request + try: + # Try to acquire or prolong the lock + lock_secs = int(self._DEFAULT_LOCK_TIME.total_seconds()) + await self._prolong_request_lock(request_id, forefront=False, lock_secs=lock_secs) + + # Fetch the request data + request = await self.get_request(request_id) + + # If request is not found, release lock and return None + if not request: + await self._delete_request_lock(request_id) + return None + + # Update cache with hydrated request + cache_key = unique_key_to_request_id(request.unique_key) + self._cache_request( + cache_key, + ProcessedRequest( + id=request_id, + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=request.handled_at is not None, + ), + forefront=False, + hydrated_request=request, + ) + except Exception as exc: + logger.debug(f'Error fetching or locking request {request_id}: {exc!s}') + return None + else: + return request + + async def _update_request( self, request: Request, *, forefront: bool = False, ) -> ProcessedRequest: + """Update a request in the queue. + + Args: + request: The updated request. + forefront: Whether to put the updated request in the beginning or the end of the queue. + + Returns: + The updated request + """ + response = await self._api_client.update_request( + request=request.model_dump(by_alias=True), + forefront=forefront, + ) + return ProcessedRequest.model_validate( - {'id': request.id, 'uniqueKey': request.unique_key} - | await self._client.update_request( - request=request.model_dump( - by_alias=True, + {'id': request.id, 'uniqueKey': request.unique_key} | response, + ) + + async def _list_head( + self, + *, + lock_time: timedelta | None = None, + limit: int = 25, + ) -> RequestQueueHead: + """Retrieve requests from the beginning of the queue. + + Args: + lock_time: Duration for which to lock the retrieved requests. + If None, requests will not be locked. + limit: Maximum number of requests to retrieve. + + Returns: + A collection of requests from the beginning of the queue. + """ + # Return from cache if available and we're not checking for new forefront requests + if self._queue_head and not self._should_check_for_forefront_requests: + logger.debug(f'Using cached queue head with {len(self._queue_head)} requests') + + # Create a list of requests from the cached queue head + items = [] + for request_id in list(self._queue_head)[:limit]: + cached_request = self._requests_cache.get(request_id) + if cached_request and cached_request.hydrated: + items.append(cached_request.hydrated) + + return RequestQueueHead( + limit=limit, + had_multiple_clients=self._metadata.had_multiple_clients, + queue_modified_at=self._metadata.modified_at, + items=items, + queue_has_locked_requests=self._queue_has_locked_requests, + lock_time=lock_time, + ) + + # Otherwise fetch from API + lock_time = lock_time or self._DEFAULT_LOCK_TIME + lock_secs = int(lock_time.total_seconds()) + + response = await self._api_client.list_and_lock_head( + lock_secs=lock_secs, + limit=limit, + ) + + # Update the queue head cache + self._queue_has_locked_requests = response.get('queueHasLockedRequests', False) + + # Clear current queue head if we're checking for forefront requests + if self._should_check_for_forefront_requests: + self._queue_head.clear() + self._should_check_for_forefront_requests = False + + # Process and cache the requests + head_id_buffer = list[str]() + forefront_head_id_buffer = list[str]() + + for request_data in response.get('items', []): + request = Request.model_validate(request_data) + + # Skip requests without ID or unique key + if not request.id or not request.unique_key: + logger.debug( + 'Skipping request from queue head, missing ID or unique key', + extra={ + 'id': request.id, + 'unique_key': request.unique_key, + }, + ) + continue + + # Check if this request was already cached and if it was added to forefront + cache_key = unique_key_to_request_id(request.unique_key) + cached_request = self._requests_cache.get(cache_key) + forefront = cached_request.forefront if cached_request else False + + # Add to appropriate buffer based on forefront flag + if forefront: + forefront_head_id_buffer.insert(0, request.id) + else: + head_id_buffer.append(request.id) + + # Cache the request + self._cache_request( + cache_key, + ProcessedRequest( + id=request.id, + unique_key=request.unique_key, + was_already_present=True, + was_already_handled=False, ), forefront=forefront, + hydrated_request=request, ) - ) - @override - async def delete_request(self, request_id: str) -> None: - await self._client.delete_request(request_id) + # Update the queue head deque + for request_id in head_id_buffer: + self._queue_head.append(request_id) - @override - async def prolong_request_lock( + for request_id in forefront_head_id_buffer: + self._queue_head.appendleft(request_id) + + return RequestQueueHead.model_validate(response) + + async def _prolong_request_lock( self, request_id: str, *, forefront: bool = False, lock_secs: int, ) -> ProlongRequestLockResponse: - return ProlongRequestLockResponse.model_validate( - await self._client.prolong_request_lock( - request_id=request_id, - forefront=forefront, - lock_secs=lock_secs, - ) - ) + """Prolong the lock on a specific request in the queue. - @override - async def delete_request_lock( - self, - request_id: str, - *, - forefront: bool = False, - ) -> None: - await self._client.delete_request_lock( + Args: + request_id: The identifier of the request whose lock is to be prolonged. + forefront: Whether to put the request in the beginning or the end of the queue after lock expires. + lock_secs: The additional amount of time, in seconds, that the request will remain locked. + + Returns: + A response containing the time at which the lock will expire. + """ + response = await self._api_client.prolong_request_lock( request_id=request_id, forefront=forefront, + lock_secs=lock_secs, ) - @override - async def batch_add_requests( + result = ProlongRequestLockResponse( + lock_expires_at=datetime.fromisoformat(response['lockExpiresAt'].replace('Z', '+00:00')) + ) + + # Update the cache with the new lock expiration + for cached_request in self._requests_cache.values(): + if cached_request.id == request_id: + cached_request.lock_expires_at = result.lock_expires_at + break + + return result + + async def _delete_request_lock( self, - requests: Sequence[Request], + request_id: str, *, forefront: bool = False, - ) -> BatchRequestsOperationResponse: - return BatchRequestsOperationResponse.model_validate( - await self._client.batch_add_requests( - requests=[ - r.model_dump( - by_alias=True, - exclude={ - 'id', - }, - ) - for r in requests - ], + ) -> None: + """Delete the lock on a specific request in the queue. + + Args: + request_id: ID of the request to delete the lock. + forefront: Whether to put the request in the beginning or the end of the queue after the lock is deleted. + """ + try: + await self._api_client.delete_request_lock( + request_id=request_id, forefront=forefront, ) - ) - @override - async def batch_delete_requests(self, requests: list[Request]) -> BatchRequestsOperationResponse: - return BatchRequestsOperationResponse.model_validate( - await self._client.batch_delete_requests( - requests=[ - r.model_dump( - by_alias=True, - ) - for r in requests - ], - ) + # Update the cache to remove the lock + for cached_request in self._requests_cache.values(): + if cached_request.id == request_id: + cached_request.lock_expires_at = None + break + except Exception as err: + logger.debug(f'Failed to delete request lock for request {request_id}', exc_info=err) + + def _cache_request( + self, + cache_key: str, + processed_request: ProcessedRequest, + *, + forefront: bool, + hydrated_request: Request | None = None, + ) -> None: + """Cache a request for future use. + + Args: + cache_key: The key to use for caching the request. + processed_request: The processed request information. + forefront: Whether the request was added to the forefront of the queue. + hydrated_request: The hydrated request object, if available. + """ + self._requests_cache[cache_key] = CachedRequest( + id=processed_request.id, + was_already_handled=processed_request.was_already_handled, + hydrated=hydrated_request, + lock_expires_at=None, + forefront=forefront, ) + + async def _update_metadata(self) -> None: + """Update the request queue metadata with current information.""" + metadata = await self._api_client.get() + self._metadata = RequestQueueMetadata.model_validate(metadata) diff --git a/src/apify/apify_storage_client/_request_queue_collection_client.py b/src/apify/apify_storage_client/_request_queue_collection_client.py deleted file mode 100644 index 5bf28836..00000000 --- a/src/apify/apify_storage_client/_request_queue_collection_client.py +++ /dev/null @@ -1,51 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from typing_extensions import override - -from crawlee.storage_clients._base import RequestQueueCollectionClient as BaseRequestQueueCollectionClient -from crawlee.storage_clients.models import RequestQueueListPage, RequestQueueMetadata - -if TYPE_CHECKING: - from apify_client.clients import RequestQueueCollectionClientAsync - - -class RequestQueueCollectionClient(BaseRequestQueueCollectionClient): - """Request queue collection resource client implementation based on the Apify platform storage.""" - - def __init__(self, apify_request_queue_collection_client: RequestQueueCollectionClientAsync) -> None: - self._client = apify_request_queue_collection_client - - @override - async def get_or_create( - self, - *, - id: str | None = None, - name: str | None = None, - schema: dict | None = None, - ) -> RequestQueueMetadata: - return RequestQueueMetadata.model_validate( - {'resourceDirectory': ''} - | await self._client.get_or_create( - name=id if id is not None else name, - ) - ) - - @override - async def list( - self, - *, - unnamed: bool = False, - limit: int | None = None, - offset: int | None = None, - desc: bool = False, - ) -> RequestQueueListPage: - return RequestQueueListPage.model_validate( - await self._client.list( - unnamed=unnamed, - limit=limit, - offset=offset, - desc=desc, - ) - ) diff --git a/src/apify/apify_storage_client/_storage_client.py b/src/apify/apify_storage_client/_storage_client.py new file mode 100644 index 00000000..b00ea9f3 --- /dev/null +++ b/src/apify/apify_storage_client/_storage_client.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +from typing_extensions import override + +from crawlee.configuration import Configuration +from crawlee.storage_clients._base import StorageClient + +from ._dataset_client import ApifyDatasetClient +from ._key_value_store_client import ApifyKeyValueStoreClient +from ._request_queue_client import ApifyRequestQueueClient + + +class ApifyStorageClient(StorageClient): + """Apify storage client.""" + + @override + async def create_dataset_client( + self, + *, + id: str | None = None, + name: str | None = None, + configuration: Configuration | None = None, + ) -> ApifyDatasetClient: + configuration = configuration or Configuration.get_global_configuration() + client = await ApifyDatasetClient.open(id=id, name=name, configuration=configuration) + + if configuration.purge_on_start: + await client.drop() + client = await ApifyDatasetClient.open(id=id, name=name, configuration=configuration) + + return client + + @override + async def create_kvs_client( + self, + *, + id: str | None = None, + name: str | None = None, + configuration: Configuration | None = None, + ) -> ApifyKeyValueStoreClient: + configuration = configuration or Configuration.get_global_configuration() + client = await ApifyKeyValueStoreClient.open(id=id, name=name, configuration=configuration) + + if configuration.purge_on_start: + await client.drop() + client = await ApifyKeyValueStoreClient.open(id=id, name=name, configuration=configuration) + + return client + + @override + async def create_rq_client( + self, + *, + id: str | None = None, + name: str | None = None, + configuration: Configuration | None = None, + ) -> ApifyRequestQueueClient: + configuration = configuration or Configuration.get_global_configuration() + client = await ApifyRequestQueueClient.open(id=id, name=name, configuration=configuration) + + if configuration.purge_on_start: + await client.drop() + client = await ApifyRequestQueueClient.open(id=id, name=name, configuration=configuration) + + return client diff --git a/src/apify/scrapy/extensions/_httpcache.py b/src/apify/scrapy/extensions/_httpcache.py index 509c4d8a..ee6147e8 100644 --- a/src/apify/scrapy/extensions/_httpcache.py +++ b/src/apify/scrapy/extensions/_httpcache.py @@ -51,10 +51,14 @@ def open_spider(self, spider: Spider) -> None: kvs_name = get_kvs_name(spider.name) async def open_kvs() -> KeyValueStore: - config = Configuration.get_global_configuration() - if config.is_at_home: - storage_client = ApifyStorageClient.from_config(config) - return await KeyValueStore.open(name=kvs_name, storage_client=storage_client) + configuration = Configuration.get_global_configuration() + if configuration.is_at_home: + storage_client = ApifyStorageClient() + return await KeyValueStore.open( + name=kvs_name, + configuration=configuration, + storage_client=storage_client, + ) return await KeyValueStore.open(name=kvs_name) logger.debug("Starting background thread for cache storage's event loop") diff --git a/src/apify/scrapy/scheduler.py b/src/apify/scrapy/scheduler.py index a243a368..d3b9b949 100644 --- a/src/apify/scrapy/scheduler.py +++ b/src/apify/scrapy/scheduler.py @@ -49,10 +49,13 @@ def open(self, spider: Spider) -> Deferred[None] | None: self.spider = spider async def open_rq() -> RequestQueue: - config = Configuration.get_global_configuration() - if config.is_at_home: - storage_client = ApifyStorageClient.from_config(config) - return await RequestQueue.open(storage_client=storage_client) + configuration = Configuration.get_global_configuration() + if configuration.is_at_home: + storage_client = ApifyStorageClient() + return await RequestQueue.open( + configuration=configuration, + storage_client=storage_client, + ) return await RequestQueue.open() try: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1cd800f1..b4e649af 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -15,7 +15,7 @@ from apify_client import ApifyClient, ApifyClientAsync from apify_shared.consts import ActorJobStatus, ActorSourceType, ApifyEnvVars from crawlee import service_locator -from crawlee.storages import _creation_management +from crawlee.storages import Dataset, KeyValueStore, RequestQueue import apify._actor from ._utils import generate_unique_resource_name @@ -65,12 +65,15 @@ def _prepare_test_env() -> None: service_locator._storage_client = None # Clear creation-related caches to ensure no state is carried over between tests. - monkeypatch.setattr(_creation_management, '_cache_dataset_by_id', {}) - monkeypatch.setattr(_creation_management, '_cache_dataset_by_name', {}) - monkeypatch.setattr(_creation_management, '_cache_kvs_by_id', {}) - monkeypatch.setattr(_creation_management, '_cache_kvs_by_name', {}) - monkeypatch.setattr(_creation_management, '_cache_rq_by_id', {}) - monkeypatch.setattr(_creation_management, '_cache_rq_by_name', {}) + Dataset._cache_by_id.clear() + Dataset._cache_by_name.clear() + Dataset._default_instance = None + KeyValueStore._cache_by_id.clear() + KeyValueStore._cache_by_name.clear() + KeyValueStore._default_instance = None + RequestQueue._cache_by_id.clear() + RequestQueue._cache_by_name.clear() + RequestQueue._default_instance = None # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) diff --git a/tests/integration/test_actor_dataset.py b/tests/integration/test_actor_dataset.py index 20a71750..52de59c5 100644 --- a/tests/integration/test_actor_dataset.py +++ b/tests/integration/test_actor_dataset.py @@ -104,8 +104,8 @@ async def main() -> None: dataset_by_name_2 = await Actor.open_dataset(name=dataset_name) assert dataset_by_name_1 is dataset_by_name_2 - dataset_by_id_1 = await Actor.open_dataset(id=dataset_by_name_1._id) - dataset_by_id_2 = await Actor.open_dataset(id=dataset_by_name_1._id) + dataset_by_id_1 = await Actor.open_dataset(id=dataset_by_name_1.metadata.id) + dataset_by_id_2 = await Actor.open_dataset(id=dataset_by_name_1.metadata.id) assert dataset_by_id_1 is dataset_by_name_1 assert dataset_by_id_2 is dataset_by_id_1 @@ -129,7 +129,7 @@ async def test_force_cloud( async with Actor: dataset = await Actor.open_dataset(name=dataset_name, force_cloud=True) - dataset_id = dataset._id + dataset_id = dataset.metadata.id await dataset.push_data(dataset_item) diff --git a/tests/integration/test_actor_key_value_store.py b/tests/integration/test_actor_key_value_store.py index 6b6dd767..8b54f8a9 100644 --- a/tests/integration/test_actor_key_value_store.py +++ b/tests/integration/test_actor_key_value_store.py @@ -45,8 +45,8 @@ async def main() -> None: kvs_by_name_2 = await Actor.open_key_value_store(name=kvs_name) assert kvs_by_name_1 is kvs_by_name_2 - kvs_by_id_1 = await Actor.open_key_value_store(id=kvs_by_name_1._id) - kvs_by_id_2 = await Actor.open_key_value_store(id=kvs_by_name_1._id) + kvs_by_id_1 = await Actor.open_key_value_store(id=kvs_by_name_1.metadata.id) + kvs_by_id_2 = await Actor.open_key_value_store(id=kvs_by_name_1.metadata.id) assert kvs_by_id_1 is kvs_by_name_1 assert kvs_by_id_2 is kvs_by_id_1 @@ -69,7 +69,7 @@ async def test_force_cloud( async with Actor: key_value_store = await Actor.open_key_value_store(name=key_value_store_name, force_cloud=True) - key_value_store_id = key_value_store._id + key_value_store_id = key_value_store.metadata.id await key_value_store.set_value('foo', 'bar') @@ -208,15 +208,15 @@ async def main() -> None: default_store_id = Actor.config.default_key_value_store_id record_key = 'public-record-key' - store = await Actor.open_key_value_store() + kvs = await Actor.open_key_value_store() - assert isinstance(store.storage_object.model_extra, dict) - url_signing_secret_key = store.storage_object.model_extra.get('urlSigningSecretKey') + assert isinstance(kvs.metadata.model_extra, dict) + url_signing_secret_key = kvs.metadata.model_extra.get('urlSigningSecretKey') assert url_signing_secret_key is not None - await store.set_value(record_key, {'exposedData': 'test'}, 'application/json') + await kvs.set_value(record_key, {'exposedData': 'test'}, 'application/json') - record_url = await store.get_public_url(record_key) + record_url = await kvs.get_public_url(record_key) signature = create_hmac_signature(url_signing_secret_key, record_key) assert ( diff --git a/tests/integration/test_actor_request_queue.py b/tests/integration/test_actor_request_queue.py index 06e8529e..41cb7bb7 100644 --- a/tests/integration/test_actor_request_queue.py +++ b/tests/integration/test_actor_request_queue.py @@ -46,8 +46,8 @@ async def main() -> None: rq_by_name_2 = await Actor.open_request_queue(name=rq_name) assert rq_by_name_1 is rq_by_name_2 - rq_by_id_1 = await Actor.open_request_queue(id=rq_by_name_1._id) - rq_by_id_2 = await Actor.open_request_queue(id=rq_by_name_1._id) + rq_by_id_1 = await Actor.open_request_queue(id=rq_by_name_1.metadata.id) + rq_by_id_2 = await Actor.open_request_queue(id=rq_by_name_1.metadata.id) assert rq_by_id_1 is rq_by_name_1 assert rq_by_id_2 is rq_by_id_1 @@ -70,7 +70,7 @@ async def test_force_cloud( async with Actor: request_queue = await Actor.open_request_queue(name=request_queue_name, force_cloud=True) - request_queue_id = request_queue._id + request_queue_id = request_queue.metadata.id request_info = await request_queue.add_request(Request.from_url('http://example.com')) diff --git a/tests/integration/test_request_queue.py b/tests/integration/test_request_queue.py index 4bce884a..e6d9f9f3 100644 --- a/tests/integration/test_request_queue.py +++ b/tests/integration/test_request_queue.py @@ -53,7 +53,7 @@ async def main() -> None: # I have seen it get stuck on this call rq = await Actor.open_request_queue() # Add some requests - await rq.add_requests_batched([f'https://example.com/{i}' for i in range(desired_request_count)]) + await rq.add_requests([f'https://example.com/{i}' for i in range(desired_request_count)]) handled_request_count = 0 while next_request := await rq.fetch_next_request(): @@ -87,7 +87,7 @@ async def main() -> None: # I have seen it get stuck on this call rq = await Actor.open_request_queue() # Add some requests - await rq.add_requests_batched( + await rq.add_requests( [ Request.from_url(f'https://example.com/{i}', unique_key=str(i - 1 if i % 4 == 1 else i)) for i in range(desired_request_count) diff --git a/tests/unit/actor/test_actor_dataset.py b/tests/unit/actor/test_actor_dataset.py index ef6282bb..cedfb237 100644 --- a/tests/unit/actor/test_actor_dataset.py +++ b/tests/unit/actor/test_actor_dataset.py @@ -1,16 +1,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING - import pytest from apify_shared.consts import ActorEnvVars +from crawlee.storage_clients import MemoryStorageClient from apify import Actor -if TYPE_CHECKING: - from crawlee.storage_clients import MemoryStorageClient - # NOTE: We only test the dataset methods available on Actor class/instance. # Actual tests for the implementations are in storages/. @@ -31,24 +27,24 @@ async def test_open_dataset_returns_same_references() -> None: dataset_by_name_2 = await Actor.open_dataset(name=dataset_name) assert dataset_by_name_1 is dataset_by_name_2 - dataset_by_id_1 = await Actor.open_dataset(id=dataset_by_name_1._id) - dataset_by_id_2 = await Actor.open_dataset(id=dataset_by_name_1._id) + dataset_by_id_1 = await Actor.open_dataset(id=dataset_by_name_1.metadata.id) + dataset_by_id_2 = await Actor.open_dataset(id=dataset_by_name_1.metadata.id) assert dataset_by_id_1 is dataset_by_name_1 assert dataset_by_id_2 is dataset_by_id_1 -async def test_open_dataset_uses_env_var( - monkeypatch: pytest.MonkeyPatch, - memory_storage_client: MemoryStorageClient, -) -> None: +async def test_open_dataset_uses_env_var(monkeypatch: pytest.MonkeyPatch) -> None: + memory_storage_client = MemoryStorageClient() + default_dataset_id = 'my-new-default-id' monkeypatch.setenv(ActorEnvVars.DEFAULT_DATASET_ID, default_dataset_id) async with Actor: ddt = await Actor.open_dataset() - assert ddt._id == default_dataset_id - await memory_storage_client.dataset(ddt._id).delete() + assert ddt.metadata.id == default_dataset_id + dataset = await memory_storage_client.create_dataset_client(id=ddt.metadata.id) + await dataset.drop() async def test_push_data_to_dataset() -> None: @@ -57,8 +53,5 @@ async def test_push_data_to_dataset() -> None: desired_item_count = 100 await dataset.push_data([{'id': i} for i in range(desired_item_count)]) - dataset_info = await dataset.get_info() - assert dataset_info is not None - list_page = await dataset.get_data(limit=desired_item_count) assert {item['id'] for item in list_page.items} == set(range(desired_item_count)) diff --git a/tests/unit/actor/test_actor_key_value_store.py b/tests/unit/actor/test_actor_key_value_store.py index 821065e1..2ee27c31 100644 --- a/tests/unit/actor/test_actor_key_value_store.py +++ b/tests/unit/actor/test_actor_key_value_store.py @@ -1,20 +1,16 @@ from __future__ import annotations -from typing import TYPE_CHECKING - import pytest from apify_shared.consts import ApifyEnvVars from apify_shared.utils import json_dumps +from crawlee.storage_clients import MemoryStorageClient from ..test_crypto import PRIVATE_KEY_PASSWORD, PRIVATE_KEY_PEM_BASE64, PUBLIC_KEY from apify import Actor from apify._consts import ENCRYPTED_INPUT_VALUE_PREFIX from apify._crypto import public_encrypt -if TYPE_CHECKING: - from crawlee.storage_clients import MemoryStorageClient - # NOTE: We only test the key-value store methods available on Actor class/instance. # Actual tests for the implementations are in storages/. @@ -29,8 +25,8 @@ async def test_open_returns_same_references() -> None: kvs_by_name_2 = await Actor.open_key_value_store(name=kvs_name) assert kvs_by_name_1 is kvs_by_name_2 - kvs_by_id_1 = await Actor.open_key_value_store(id=kvs_by_name_1._id) - kvs_by_id_2 = await Actor.open_key_value_store(id=kvs_by_name_1._id) + kvs_by_id_1 = await Actor.open_key_value_store(id=kvs_by_name_1.metadata.id) + kvs_by_id_2 = await Actor.open_key_value_store(id=kvs_by_name_1.metadata.id) assert kvs_by_id_1 is kvs_by_name_1 assert kvs_by_id_2 is kvs_by_id_1 @@ -50,29 +46,31 @@ async def test_set_and_get_value() -> None: assert value == test_value -async def test_get_input(memory_storage_client: MemoryStorageClient) -> None: +async def test_get_input() -> None: + memory_storage_client = MemoryStorageClient() + input_key = 'INPUT' test_input = {'foo': 'bar'} - await memory_storage_client.key_value_stores().get_or_create(id='default') - await memory_storage_client.key_value_store('default').set_record( + kvs_client = await memory_storage_client.create_kvs_client() + + await kvs_client.set_value( key=input_key, value=json_dumps(test_input), content_type='application/json', ) async with Actor as my_actor: - input = await my_actor.get_input() # noqa: A001 - assert input['foo'] == test_input['foo'] + actor_input = await my_actor.get_input() + assert actor_input['foo'] == test_input['foo'] -async def test_get_input_with_encrypted_secrets( - monkeypatch: pytest.MonkeyPatch, - memory_storage_client: MemoryStorageClient, -) -> None: +async def test_get_input_with_encrypted_secrets(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv(ApifyEnvVars.INPUT_SECRETS_PRIVATE_KEY_FILE, PRIVATE_KEY_PEM_BASE64) monkeypatch.setenv(ApifyEnvVars.INPUT_SECRETS_PRIVATE_KEY_PASSPHRASE, PRIVATE_KEY_PASSWORD) + memory_storage_client = MemoryStorageClient() + input_key = 'INPUT' secret_string = 'secret-string' encrypted_secret = public_encrypt(secret_string, public_key=PUBLIC_KEY) @@ -81,14 +79,15 @@ async def test_get_input_with_encrypted_secrets( 'secret': f'{ENCRYPTED_INPUT_VALUE_PREFIX}:{encrypted_secret["encrypted_password"]}:{encrypted_secret["encrypted_value"]}', # noqa: E501 } - await memory_storage_client.key_value_stores().get_or_create(id='default') - await memory_storage_client.key_value_store('default').set_record( + kvs_client = await memory_storage_client.create_kvs_client() + + await kvs_client.set_value( key=input_key, value=json_dumps(input_with_secret), content_type='application/json', ) async with Actor as my_actor: - input = await my_actor.get_input() # noqa: A001 - assert input['foo'] == input_with_secret['foo'] - assert input['secret'] == secret_string + actor_input = await my_actor.get_input() + assert actor_input['foo'] == input_with_secret['foo'] + assert actor_input['secret'] == secret_string diff --git a/tests/unit/actor/test_actor_request_queue.py b/tests/unit/actor/test_actor_request_queue.py index 5504715f..4450e5d1 100644 --- a/tests/unit/actor/test_actor_request_queue.py +++ b/tests/unit/actor/test_actor_request_queue.py @@ -23,7 +23,7 @@ async def test_open_returns_same_references() -> None: rq_by_name_2 = await Actor.open_key_value_store(name=rq_name) assert rq_by_name_1 is rq_by_name_2 - rq_by_id_1 = await Actor.open_key_value_store(id=rq_by_name_1._id) - rq_by_id_2 = await Actor.open_key_value_store(id=rq_by_name_1._id) + rq_by_id_1 = await Actor.open_key_value_store(id=rq_by_name_1.metadata.id) + rq_by_id_2 = await Actor.open_key_value_store(id=rq_by_name_1.metadata.id) assert rq_by_id_1 is rq_by_name_1 assert rq_by_id_2 is rq_by_id_1 diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 6f336cd6..1b225203 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -12,9 +12,7 @@ from apify_client import ApifyClientAsync from apify_shared.consts import ApifyEnvVars from crawlee import service_locator -from crawlee.configuration import Configuration as CrawleeConfiguration -from crawlee.storage_clients import MemoryStorageClient -from crawlee.storages import _creation_management +from crawlee.storages import Dataset, KeyValueStore, RequestQueue import apify._actor @@ -56,12 +54,15 @@ def _prepare_test_env() -> None: service_locator._storage_client = None # Clear creation-related caches to ensure no state is carried over between tests. - monkeypatch.setattr(_creation_management, '_cache_dataset_by_id', {}) - monkeypatch.setattr(_creation_management, '_cache_dataset_by_name', {}) - monkeypatch.setattr(_creation_management, '_cache_kvs_by_id', {}) - monkeypatch.setattr(_creation_management, '_cache_kvs_by_name', {}) - monkeypatch.setattr(_creation_management, '_cache_rq_by_id', {}) - monkeypatch.setattr(_creation_management, '_cache_rq_by_name', {}) + Dataset._cache_by_id.clear() + Dataset._cache_by_name.clear() + Dataset._default_instance = None + KeyValueStore._cache_by_id.clear() + KeyValueStore._cache_by_name.clear() + KeyValueStore._default_instance = None + RequestQueue._cache_by_id.clear() + RequestQueue._cache_by_name.clear() + RequestQueue._default_instance = None # Verify that the test environment was set up correctly. assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path) @@ -196,12 +197,3 @@ def getattr_override(apify_client_instance: Any, attr_name: str) -> Any: @pytest.fixture def apify_client_async_patcher(monkeypatch: pytest.MonkeyPatch) -> ApifyClientAsyncPatcher: return ApifyClientAsyncPatcher(monkeypatch) - - -@pytest.fixture -def memory_storage_client() -> MemoryStorageClient: - configuration = CrawleeConfiguration() - configuration.persist_storage = True - configuration.write_metadata = True - - return MemoryStorageClient.from_config(configuration) diff --git a/uv.lock b/uv.lock index 7f8a01cd..1bc5db03 100644 --- a/uv.lock +++ b/uv.lock @@ -32,11 +32,12 @@ wheels = [ [[package]] name = "apify" -version = "2.6.0" +version = "2.6.1" source = { editable = "." } dependencies = [ { name = "apify-client" }, { name = "apify-shared" }, + { name = "cachetools" }, { name = "crawlee" }, { name = "cryptography" }, { name = "httpx" }, @@ -66,13 +67,15 @@ dev = [ { name = "respx" }, { name = "ruff" }, { name = "setuptools" }, + { name = "types-cachetools" }, ] [package.metadata] requires-dist = [ { name = "apify-client", specifier = ">=1.9.2" }, { name = "apify-shared", specifier = ">=1.3.0" }, - { name = "crawlee", specifier = "~=0.6.0" }, + { name = "cachetools", specifier = ">=5.5.0" }, + { name = "crawlee", git = "https://github.com/apify/crawlee-python.git?rev=new-storage-clients" }, { name = "cryptography", specifier = ">=42.0.0" }, { name = "httpx", specifier = ">=0.27.0" }, { name = "lazy-object-proxy", specifier = "<1.11.0" }, @@ -98,6 +101,7 @@ dev = [ { name = "respx", specifier = "~=0.22.0" }, { name = "ruff", specifier = "~=0.11.0" }, { name = "setuptools" }, + { name = "types-cachetools", specifier = ">=6.0.0.20250525" }, ] [[package]] @@ -630,8 +634,8 @@ toml = [ [[package]] name = "crawlee" -version = "0.6.10" -source = { registry = "https://pypi.org/simple" } +version = "0.6.11" +source = { git = "https://github.com/apify/crawlee-python.git?rev=new-storage-clients#78efb4ddf234e731a1c784a2280a8b1bec812573" } dependencies = [ { name = "apify-fingerprint-datapoints" }, { name = "browserforge" }, @@ -652,10 +656,6 @@ dependencies = [ { name = "typing-extensions" }, { name = "yarl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/ed/93/20033411bffaf199e44b759fc45be45fabc1d8c357bc4d0bb080713724dc/crawlee-0.6.10.tar.gz", hash = "sha256:a06e9aa19611868712df81ca4b7dc482633f921456bf3cf1a5432ce3836fd432", size = 24135107, upload-time = "2025-06-02T12:10:17.67Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/5a/12/2c6c41438f24760ebe044d5e88eebb35c51178de9aec39b695d0845cbff7/crawlee-0.6.10-py3-none-any.whl", hash = "sha256:081565d0a3f11d21798ec11929f4b0c17e3ba7a84f33251c9b6b0e6457d05367", size = 260863, upload-time = "2025-06-02T12:10:14.994Z" }, -] [[package]] name = "cryptography" @@ -2306,6 +2306,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b6/33/38da585b06978d262cc2b2b45bc57ee75f0ce5e0b4ef1cab1b86461e9298/typeapi-2.2.4-py3-none-any.whl", hash = "sha256:bd6d5e5907fa47e0303bf254e7cc8712d4be4eb26d7ffaedb67c9e7844c53bb8", size = 26387, upload-time = "2025-01-29T11:40:12.328Z" }, ] +[[package]] +name = "types-cachetools" +version = "6.0.0.20250525" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/03/d0/55ff0eeda141436c1bd2142cd026906870c661b3f7755070d6da7ea7210f/types_cachetools-6.0.0.20250525.tar.gz", hash = "sha256:baf06f234cac3aeb44c07893447ba03ecdb6c0742ba2607e28a35d38e6821b02", size = 8925, upload-time = "2025-05-25T03:13:53.498Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/47/8c/4ab0a17ece30fe608270b89cf066387051862899fff9f54ab12511fc7fdd/types_cachetools-6.0.0.20250525-py3-none-any.whl", hash = "sha256:1de8f0fe4bdcb187a48d2026c1e3672830f67943ad2bf3486abe031b632f1252", size = 8938, upload-time = "2025-05-25T03:13:52.406Z" }, +] + [[package]] name = "typing-extensions" version = "4.14.0"