Skip to content

Commit de1c03f

Browse files
authored
refactor!: Introduce new storage client system (#1194)
## Description - I consolidated all commits from #1107 into this new PR. - The previous storage-clients implementation was completely replaced with a redesigned clients, including: - new storage-client interface, - in-memory storage client, - file-system storage client, - Apify storage client (implemented in the SDK; see apify/apify-sdk-python#470), - and various adjustments elsewhere in the codebase. - The old "memory plus persist" client has been split into separate memory and file-system implementations. - The `Configuration.persist_storage` and `Configuration.persist_metadata` options were removed. - All old collection clients have been removed, they're no longer needed. - Each storage client now prints warnings if you pass method arguments it does not support. - The creation management modules in the storage clients and storages were removed. - Storage client parameters (e.g. `purge_on_start`, or `token` and `base_api_url` for the Apify client) are configured via the `Configuration`. - Every storage, and its corresponding client, now provides both a `purge` method (which clears all items but preserves the storage and metadata) and a `drop` method (which removes the entire storage, metadata included). - All unused types, models, and helper utilities have been removed. - The detailed, per-storage/client changes are listed below... ### Dataset - Properties: - `id` - `name` - `metadata` - Methods: - `open` - `purge` (new method) - `drop` - `push_data` - `get_data` - `iterate_items` - `list_items` (new method) - `export_to` - Breaking changes: - `from_storage_object` method has been removed - Use the `open` method with `name` or `id` instead. - `get_info` -> `metadata` property - `storage_object` -> `metadata` property - `set_metadata` method has been removed (it wasn't propage to clients) - Do we want to support it (e.g. for renaming)? - `write_to_json` -> method has been removed, use `export_to` instead - `write_to_csv` -> method has been removed, use `export_to` instead ```python import asyncio from crawlee.storage_clients import FileSystemStorageClient from crawlee.storages import Dataset async def main() -> None: dataset = await Dataset.open(storage_client=FileSystemStorageClient()) print(f'default dataset - ID: {dataset.id}, name: {dataset.name}') await dataset.push_data({'name': 'John'}) await dataset.push_data({'name': 'John', 'age': 20}) await dataset.push_data({}) dataset_with_name = await Dataset.open( name='my_dataset', storage_client=FileSystemStorageClient(), ) print(f'named dataset - ID: {dataset_with_name.id}, name: {dataset_with_name.name}') await dataset_with_name.push_data([{'age': 30}, {'age': 25}]) print('Default dataset items:') async for item in dataset.iterate_items(skip_empty=True): print(item) print('Named dataset items:') async for item in dataset_with_name.iterate_items(): print(item) items = await dataset.get_data() print(items) dataset_by_id = await Dataset.open(id=dataset_with_name.id) print(f'dataset by ID - ID: {dataset_by_id.id}, name: {dataset_by_id.name}') if __name__ == '__main__': asyncio.run(main()) ``` ### Key-value store - Properties: - `id` - `name` - `metadata` - Methods: - `open` - `purge` (new method) - `drop` - `get_value` - `set_value` - `delete_value` (new method, Apify platform's set_value support setting an empty value to a key, so having a separate method for deleting is useful) - `iterate_keys` - `list_keys` (new method) - `get_public_url` - `get_auto_saved_value` - `persist_autosaved_values` - Breaking changes: - `from_storage_object` method has been removed - Use the `open` method with `name` or `id` instead. - `get_info` -> `metadata` property - `storage_object` -> `metadata` property - `set_metadata` method has been removed (it wasn't propage to clients) - Do we want to support it (e.g. for renaming)? ```python import asyncio import requests from crawlee.storage_clients import FileSystemStorageClient from crawlee.storages import KeyValueStore async def main() -> None: print('Opening key-value store "my_kvs"...') storage_client = FileSystemStorageClient() kvs = await KeyValueStore.open(name='my_kvs', storage_client=storage_client) print('Setting value to "file.json"...') await kvs.set_value('file.json', {'key': 'value'}) print('Setting value to "file.jpg"...') response = requests.get('https://avatars.githubusercontent.com/u/25082181') await kvs.set_value('file.jpg', response.content) print('Iterating over keys:') async for key in kvs.iterate_keys(): print(f'Key: {key}') print('Listing keys:') keys = [key.key for key in await kvs.list_keys()] print(f'Keys: {keys}') for key in keys: print(f'Getting value of {key}...') value = await kvs.get_value(key) print(f'Value: {str(value)[:100]}') print('Deleting value of "file.json"...') await kvs.delete_value('file.json') kvs_default = await KeyValueStore.open(storage_client=storage_client) special_key = 'key with spaces/and/slashes!@#$%^&*()' test_value = 'Special key value' await kvs_default.set_value(key=special_key, value=test_value) record = await kvs_default.get_value(key=special_key) assert record is not None assert record == test_value result = await kvs_default.list_keys() print(f'kvs_default list keys = {result}') kvs_2 = await KeyValueStore.open() result = await kvs_2.list_keys() print(f'kvs_2 list keys = {result}') if __name__ == '__main__': asyncio.run(main()) ``` ### Request queue - Properties: - `id` - `name` - `metadata` - Methods: - `open` - `purge` (new method) - `drop` - `add_request` - `add_requests_batched` -> `add_requests` - `fetch_next_request` - `get_request` - `mark_request_as_handled` - `reclaim_request` - `is_empty` - `is_finished` - Breaking changes: - `from_storage_object` method has been removed - Use the `open` method with `name` or `id` instead. - `get_info` -> `metadata` property - `storage_object` -> `metadata` property - `set_metadata` method has been removed (it wasn't propage to clients) - Do we want to support it (e.g. for renaming)? - `get_handled_count` method had been removed - Use `metadata.handled_request_count` instead. - `get_total_count` method has been removed - Use `metadata.total_request_count` instead. - `resource_directory` from the `RequestQueueMetadata` was removed, use `path_to...` property instead. - `RequestQueueHead` model has been removed - Use `RequestQueueHeadWithLocks` instead. - Notes: - New RQ `add_requests` contain `forefront` arg (Apify API supports it) ```python import asyncio from crawlee import Request from crawlee.configuration import Configuration from crawlee.storage_clients import FileSystemStorageClient from crawlee.storages import RequestQueue async def main() -> None: rq = await RequestQueue.open( name='my-queue', storage_client=FileSystemStorageClient(), configuration=Configuration(purge_on_start=True), ) print(f'RequestQueue: {rq}') print(f'RequestQueue client: {rq._client}') await rq.add_requests( requests=[ Request.from_url('https://example.com', use_extended_unique_key=True), Request.from_url('https://crawlee.dev', use_extended_unique_key=True), Request.from_url('https://apify.com', use_extended_unique_key=True), ], ) print('Requests were added to the queue') is_empty = await rq.is_empty() is_finished = await rq.is_finished() print(f'Is empty: {is_empty}') print(f'Is finished: {is_finished}') request = await rq.fetch_next_request() print(f'Fetched request: {request}') await rq.add_request('https://facebook.com', forefront=True) request = await rq.fetch_next_request() print(f'Fetched request: {request}') rq_default = await RequestQueue.open( storage_client=FileSystemStorageClient(), configuration=Configuration(purge_on_start=True), ) await rq_default.add_request('https://example.com/1') await rq_default.add_requests( [ 'https://example.com/priority-1', 'https://example.com/priority-2', 'https://example.com/priority-3', ] ) await rq_default.add_request('https://example.com/2') if __name__ == '__main__': asyncio.run(main()) ``` ### BaseDatasetClient - Properties: - `metadata` - Methods: - `open` - `purge` - `drop` - `push_data` - `get_data` - `iterate_items` ### BaseKeyValueStoreClient - Properties: - `metadata` - Methods: - `open` - `purge` - `drop` - `get_value` - `set_value` - `delete_value` - `iterate_keys` - `get_public_url` ### BaseRequestQueueClient - Properties: - `metadata` - Methods: - `open` - `purge` - `drop` - `add_requests_batch` -> `add_batch_of_requests` (one backend method for 2 frontend methods) - `get_request` - `fetch_next_request` - `mark_request_as_handled` - `reclaim_request` - `is_empty` - Models - `RequestQueueHeadWithLocks` -> `RequestQueueHead` - `BatchRequestsOperationResponse` -> `AddRequestsResponse` - Notes: - Old file system (memory) version didn't persist the in-progress requests - Old file system (memory) version didn't persist the forefront values (now there is a FS-specific `_sequence` field in the FS Request) - The methods manipulating locks and listing heads are now only internal methods of Apify RQ client. ## Issues - Closes: #92 - Closes: #147 - Closes: #783 - Closes: #1247 - Relates: #1175 - Relates: #1191 ## Testing - The original tests were mostly removed and replaced with a new ones. - Each storage-client implementation now has its own dedicated tests at the client level (more targeted/edge-case coverage). - On top of that, there are storage-level tests that use a parametrized fixture for each storage client (`file-system` and `memory`), ensuring every storage test runs against every client implementation. ## Checklist - [x] CI passed
1 parent f855a90 commit de1c03f

File tree

105 files changed

+6831
-6967
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

105 files changed

+6831
-6967
lines changed

docs/deployment/code_examples/google/cloud_run_example.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,23 @@
55
import uvicorn
66
from litestar import Litestar, get
77

8-
from crawlee import service_locator
98
from crawlee.crawlers import PlaywrightCrawler, PlaywrightCrawlingContext
10-
11-
# highlight-start
12-
# Disable writing storage data to the file system
13-
configuration = service_locator.get_configuration()
14-
configuration.persist_storage = False
15-
configuration.write_metadata = False
16-
# highlight-end
9+
from crawlee.storage_clients import MemoryStorageClient
1710

1811

1912
@get('/')
2013
async def main() -> str:
2114
"""The crawler entry point that will be called when the HTTP endpoint is accessed."""
15+
# highlight-start
16+
# Disable writing storage data to the file system
17+
storage_client = MemoryStorageClient()
18+
# highlight-end
19+
2220
crawler = PlaywrightCrawler(
2321
headless=True,
2422
max_requests_per_crawl=10,
2523
browser_type='firefox',
24+
storage_client=storage_client,
2625
)
2726

2827
@crawler.router.default_handler

docs/deployment/code_examples/google/google_example.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,21 @@
66
import functions_framework
77
from flask import Request, Response
88

9-
from crawlee import service_locator
109
from crawlee.crawlers import (
1110
BeautifulSoupCrawler,
1211
BeautifulSoupCrawlingContext,
1312
)
14-
15-
# highlight-start
16-
# Disable writing storage data to the file system
17-
configuration = service_locator.get_configuration()
18-
configuration.persist_storage = False
19-
configuration.write_metadata = False
20-
# highlight-end
13+
from crawlee.storage_clients import MemoryStorageClient
2114

2215

2316
async def main() -> str:
17+
# highlight-start
18+
# Disable writing storage data to the file system
19+
storage_client = MemoryStorageClient()
20+
# highlight-end
21+
2422
crawler = BeautifulSoupCrawler(
23+
storage_client=storage_client,
2524
max_request_retries=1,
2625
request_handler_timeout=timedelta(seconds=30),
2726
max_requests_per_crawl=10,

docs/examples/code_examples/export_entire_dataset_to_file_csv.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async def request_handler(context: BeautifulSoupCrawlingContext) -> None:
3030
await crawler.run(['https://crawlee.dev'])
3131

3232
# Export the entire dataset to a CSV file.
33-
await crawler.export_data_csv(path='results.csv')
33+
await crawler.export_data(path='results.csv')
3434

3535

3636
if __name__ == '__main__':

docs/examples/code_examples/export_entire_dataset_to_file_json.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ async def request_handler(context: BeautifulSoupCrawlingContext) -> None:
3030
await crawler.run(['https://crawlee.dev'])
3131

3232
# Export the entire dataset to a JSON file.
33-
await crawler.export_data_json(path='results.json')
33+
await crawler.export_data(path='results.json')
3434

3535

3636
if __name__ == '__main__':

docs/examples/code_examples/parsel_crawler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ async def some_hook(context: BasicCrawlingContext) -> None:
4040
await crawler.run(['https://github.com'])
4141

4242
# Export the entire dataset to a JSON file.
43-
await crawler.export_data_json(path='results.json')
43+
await crawler.export_data(path='results.json')
4444

4545

4646
if __name__ == '__main__':
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING
4+
5+
from crawlee.storage_clients import StorageClient
6+
from crawlee.storage_clients._base import (
7+
DatasetClient,
8+
KeyValueStoreClient,
9+
RequestQueueClient,
10+
)
11+
12+
if TYPE_CHECKING:
13+
from crawlee.configuration import Configuration
14+
15+
# Implement the storage type clients with your backend logic.
16+
17+
18+
class CustomDatasetClient(DatasetClient):
19+
# Implement methods like push_data, get_data, iterate_items, etc.
20+
pass
21+
22+
23+
class CustomKeyValueStoreClient(KeyValueStoreClient):
24+
# Implement methods like get_value, set_value, delete, etc.
25+
pass
26+
27+
28+
class CustomRequestQueueClient(RequestQueueClient):
29+
# Implement methods like add_request, fetch_next_request, etc.
30+
pass
31+
32+
33+
# Implement the storage client factory.
34+
35+
36+
class CustomStorageClient(StorageClient):
37+
async def create_dataset_client(
38+
self,
39+
*,
40+
id: str | None = None,
41+
name: str | None = None,
42+
configuration: Configuration | None = None,
43+
) -> CustomDatasetClient:
44+
# Create and return your custom dataset client.
45+
pass
46+
47+
async def create_kvs_client(
48+
self,
49+
*,
50+
id: str | None = None,
51+
name: str | None = None,
52+
configuration: Configuration | None = None,
53+
) -> CustomKeyValueStoreClient:
54+
# Create and return your custom key-value store client.
55+
pass
56+
57+
async def create_rq_client(
58+
self,
59+
*,
60+
id: str | None = None,
61+
name: str | None = None,
62+
configuration: Configuration | None = None,
63+
) -> CustomRequestQueueClient:
64+
# Create and return your custom request queue client.
65+
pass
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from crawlee.crawlers import ParselCrawler
2+
from crawlee.storage_clients import FileSystemStorageClient
3+
4+
# Create a new instance of storage client.
5+
storage_client = FileSystemStorageClient()
6+
7+
# And pass it to the crawler.
8+
crawler = ParselCrawler(storage_client=storage_client)
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from crawlee.configuration import Configuration
2+
from crawlee.crawlers import ParselCrawler
3+
from crawlee.storage_clients import FileSystemStorageClient
4+
5+
# Create a new instance of storage client.
6+
storage_client = FileSystemStorageClient()
7+
8+
# Create a configuration with custom settings.
9+
configuration = Configuration(
10+
storage_dir='./my_storage',
11+
purge_on_start=False,
12+
)
13+
14+
# And pass them to the crawler.
15+
crawler = ParselCrawler(
16+
storage_client=storage_client,
17+
configuration=configuration,
18+
)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from crawlee.crawlers import ParselCrawler
2+
from crawlee.storage_clients import MemoryStorageClient
3+
4+
# Create a new instance of storage client.
5+
storage_client = MemoryStorageClient()
6+
7+
# And pass it to the crawler.
8+
crawler = ParselCrawler(storage_client=storage_client)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import asyncio
2+
3+
from crawlee import service_locator
4+
from crawlee.crawlers import ParselCrawler
5+
from crawlee.storage_clients import MemoryStorageClient
6+
from crawlee.storages import Dataset
7+
8+
9+
async def main() -> None:
10+
# Create custom storage client, MemoryStorageClient for example.
11+
storage_client = MemoryStorageClient()
12+
13+
# Register it globally via the service locator.
14+
service_locator.set_storage_client(storage_client)
15+
16+
# Or pass it directly to the crawler, it will be registered globally
17+
# to the service locator under the hood.
18+
crawler = ParselCrawler(storage_client=storage_client)
19+
20+
# Or just provide it when opening a storage (e.g. dataset), it will be used
21+
# for this storage only, not globally.
22+
dataset = await Dataset.open(
23+
name='my_dataset',
24+
storage_client=storage_client,
25+
)
26+
27+
28+
if __name__ == '__main__':
29+
asyncio.run(main())

0 commit comments

Comments
 (0)