Skip to content

refactor!: Introduce new storage clients [WIP] #1107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 42 commits into from

Conversation

vdusek
Copy link
Collaborator

@vdusek vdusek commented Mar 19, 2025

Draft version

  • This is a draft PR; currently, only Dataset and KVS-related components are implemented.
  • Let's discuss the current state before proceeding with further implementation.

Description

  • In this first iteration, the following have been updated or implemented:
    • DatasetClient,
    • FileSystemDatasetClient,
    • MemoryDatasetClient,
    • and the Dataset has been updated accordingly.
  • A lot of things from the Dataset were removed and will be implemented in the specific storage clients instead.
  • The memory client is now split into file system and memory implementations, eliminating the need for a persist_storage flag.
  • All collection clients have been removed.
  • Storage client method names have been aligned with storage naming.
  • Users are now warned when using method arguments that are not supported.
  • Creation management in the storage clients has been removed; creation management in the storages/ module will be removed later.

Suggested Dataset breaking changes

  • storage_objectmetadata property
  • get_infometadata property
  • set_metadata method has been removed - Do we want to support it (e.g. for renaming)?
  • from_storage_object method has been removed - Use the open method with name and/or id instead.
  • check_and_serialize method has been removed - The clients should handle serialization.
  • iterate_itemsiterate method

Suggested KeyValueStore breaking changes

  • storage_objectmetadata property
  • get_infometadata property
  • set_metadata method has been removed - Do we want to support it (e.g. for renaming)?
  • from_storage_object method has been removed - Use the open method with name and/or id instead.
  • persist_autosaved_values - It should be managed by the underlying client.
  • get_auto_saved_value - It should be managed by the underlying client.

Issues

Dataset usage example

import asyncio

from crawlee.storage_clients import DatasetStorageClient
from crawlee.storages import Dataset


async def main() -> None:
    dataset = await Dataset.open(
        purge_on_start=False,
        storage_client=DatasetStorageClient(),
    )
    print(f'default dataset - ID: {dataset.id}, name: {dataset.name}')

    await dataset.push_data({'name': 'John'})
    await dataset.push_data({'name': 'John', 'age': 20})
    await dataset.push_data({})

    dataset_with_name = await Dataset.open(
        name='my_dataset',
        storage_client=file_system_storage_client,
    )
    print(f'named dataset - ID: {dataset_with_name.id}, name: {dataset_with_name.name}')

    await dataset_with_name.push_data([{'age': 30}, {'age': 25}])

    print('Default dataset items:')
    async for item in dataset.iterate(skip_empty=True):
        print(item)

    print('Named dataset items:')
    async for item in dataset_with_name.iterate():
        print(item)

    items = await dataset.get_data()
    print(items)


if __name__ == '__main__':
    asyncio.run(main())

KVS usage example

import asyncio

import requests

from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import KeyValueStore


async def main() -> None:
    print('Opening key-value store "my_kvs"...')
    storage_client = FileSystemStorageClient()
    kvs = await KeyValueStore.open(name='my_kvs', storage_client=storage_client)

    print('Setting value to "file.json"...')
    await kvs.set_value('file.json', {'key': 'value'})

    print('Setting value to "file.jpg"...')
    response = requests.get('https://avatars.githubusercontent.com/u/25082181')
    await kvs.set_value('file.jpg', response.content)

    print('Iterating over keys:')
    async for key in kvs.iterate_keys():
        print(f'Key: {key}')

    print('Listing keys:')
    keys = [key.key for key in await kvs.list_keys()]
    print(f'Keys: {keys}')

    for key in keys:
        print(f'Getting value of {key}...')
        value = await kvs.get_value(key)
        print(f'Value: {str(value)[:100]}')

    print('Deleting value of "file.json"...')
    await kvs.delete_value('file.json')


if __name__ == '__main__':
    asyncio.run(main())

Testing

  • Adjust existing tests and add new ones if necessary once there is agreement on the final form.

Checklist

  • CI passed

@github-actions github-actions bot added this to the 110th sprint - Tooling team milestone Mar 19, 2025
@github-actions github-actions bot added the t-tooling Issues with this label are in the ownership of the tooling team. label Mar 19, 2025
@vdusek vdusek marked this pull request as draft March 19, 2025 16:54
@vdusek vdusek changed the title Memory storage refactor refactor!: Introduce new storage clients Mar 19, 2025
@vdusek vdusek added enhancement New feature or request. debt Code quality improvement or decrease of technical debt. and removed enhancement New feature or request. labels Mar 19, 2025

Args:
kwargs: Keyword arguments for the storage client method.
offset: Skips the specified number of items at the start.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to explicitly say in what order some arguments take effect to avoid misinterpretations.

For example: offset + desc

  • reverse and then offset
    or
  • offset and then reverse?

@vdusek vdusek force-pushed the memory-storage-refactor branch from 1f433c8 to ec32ec9 Compare March 20, 2025 16:12
@vdusek vdusek force-pushed the memory-storage-refactor branch 3 times, most recently from 2a9a840 to bdadd43 Compare April 11, 2025 07:23
@vdusek vdusek force-pushed the memory-storage-refactor branch from 8df5054 to ce2bf85 Compare April 17, 2025 09:49
@vdusek vdusek force-pushed the memory-storage-refactor branch from 88154fe to e84b978 Compare May 9, 2025 13:16
@vdusek vdusek changed the title refactor!: Introduce new storage clients refactor!: Introduce new storage clients [WIP] May 10, 2025
@vdusek
Copy link
Collaborator Author

vdusek commented May 10, 2025

Closing in favor of #1194.

@vdusek vdusek closed this May 10, 2025
@vdusek vdusek removed request for janbuchar and Mantisus May 10, 2025 09:02
@vdusek vdusek deleted the memory-storage-refactor branch June 17, 2025 07:10
vdusek added a commit that referenced this pull request Jul 1, 2025
## Description

- I consolidated all commits from
#1107 into this new PR.
- The previous storage-clients implementation was completely replaced
with a redesigned clients, including:
    - new storage-client interface,
    - in-memory storage client,
    - file-system storage client,
- Apify storage client (implemented in the SDK; see
apify/apify-sdk-python#470),
    - and various adjustments elsewhere in the codebase.
- The old "memory plus persist" client has been split into separate
memory and file-system implementations.
- The `Configuration.persist_storage` and
`Configuration.persist_metadata` options were removed.
- All old collection clients have been removed, they're no longer
needed.
- Each storage client now prints warnings if you pass method arguments
it does not support.
- The creation management modules in the storage clients and storages
were removed.
- Storage client parameters (e.g. `purge_on_start`, or `token` and
`base_api_url` for the Apify client) are configured via the
`Configuration`.
- Every storage, and its corresponding client, now provides both a
`purge` method (which clears all items but preserves the storage and
metadata) and a `drop` method (which removes the entire storage,
metadata included).
- All unused types, models, and helper utilities have been removed.
- The detailed, per-storage/client changes are listed below...

### Dataset

- Properties:
    - `id`
    - `name`
    - `metadata`
- Methods:
    - `open`
    - `purge` (new method)
    - `drop`
    - `push_data`
    - `get_data`
    - `iterate_items`
    - `list_items` (new method)
    - `export_to`
- Breaking changes:
- `from_storage_object` method has been removed - Use the `open` method
with `name` or `id` instead.
    - `get_info` -> `metadata` property
    - `storage_object` -> `metadata` property
- `set_metadata` method has been removed (it wasn't propage to clients)
        - Do we want to support it (e.g. for renaming)?
- `write_to_json` -> method has been removed, use `export_to` instead
    - `write_to_csv` -> method has been removed, use `export_to` instead

```python
import asyncio

from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import Dataset


async def main() -> None:
    dataset = await Dataset.open(storage_client=FileSystemStorageClient())
    print(f'default dataset - ID: {dataset.id}, name: {dataset.name}')

    await dataset.push_data({'name': 'John'})
    await dataset.push_data({'name': 'John', 'age': 20})
    await dataset.push_data({})

    dataset_with_name = await Dataset.open(
        name='my_dataset',
        storage_client=FileSystemStorageClient(),
    )
    print(f'named dataset - ID: {dataset_with_name.id}, name: {dataset_with_name.name}')

    await dataset_with_name.push_data([{'age': 30}, {'age': 25}])

    print('Default dataset items:')
    async for item in dataset.iterate_items(skip_empty=True):
        print(item)

    print('Named dataset items:')
    async for item in dataset_with_name.iterate_items():
        print(item)

    items = await dataset.get_data()
    print(items)

    dataset_by_id = await Dataset.open(id=dataset_with_name.id)
    print(f'dataset by ID - ID: {dataset_by_id.id}, name: {dataset_by_id.name}')


if __name__ == '__main__':
    asyncio.run(main())
```

### Key-value store

- Properties:
    - `id`
    - `name`
    - `metadata`
- Methods:
    - `open`
    - `purge` (new method)
    - `drop`
    - `get_value`
    - `set_value`
- `delete_value` (new method, Apify platform's set_value support setting
an empty value to a key, so having a separate method for deleting is
useful)
    - `iterate_keys`
    - `list_keys` (new method)
    - `get_public_url`
    - `get_auto_saved_value`
    - `persist_autosaved_values`
- Breaking changes:
- `from_storage_object` method has been removed - Use the `open` method
with `name` or `id` instead.
    - `get_info` -> `metadata` property
    - `storage_object` -> `metadata` property
- `set_metadata` method has been removed (it wasn't propage to clients)
        - Do we want to support it (e.g. for renaming)?

```python
import asyncio

import requests

from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import KeyValueStore


async def main() -> None:
    print('Opening key-value store "my_kvs"...')
    storage_client = FileSystemStorageClient()
    kvs = await KeyValueStore.open(name='my_kvs', storage_client=storage_client)

    print('Setting value to "file.json"...')
    await kvs.set_value('file.json', {'key': 'value'})

    print('Setting value to "file.jpg"...')
    response = requests.get('https://avatars.githubusercontent.com/u/25082181')
    await kvs.set_value('file.jpg', response.content)

    print('Iterating over keys:')
    async for key in kvs.iterate_keys():
        print(f'Key: {key}')

    print('Listing keys:')
    keys = [key.key for key in await kvs.list_keys()]
    print(f'Keys: {keys}')

    for key in keys:
        print(f'Getting value of {key}...')
        value = await kvs.get_value(key)
        print(f'Value: {str(value)[:100]}')

    print('Deleting value of "file.json"...')
    await kvs.delete_value('file.json')

    kvs_default = await KeyValueStore.open(storage_client=storage_client)

    special_key = 'key with spaces/and/slashes!@#$%^&*()'
    test_value = 'Special key value'

    await kvs_default.set_value(key=special_key, value=test_value)

    record = await kvs_default.get_value(key=special_key)
    assert record is not None
    assert record == test_value

    result = await kvs_default.list_keys()
    print(f'kvs_default list keys = {result}')

    kvs_2 = await KeyValueStore.open()
    result = await kvs_2.list_keys()
    print(f'kvs_2 list keys = {result}')


if __name__ == '__main__':
    asyncio.run(main())
```

### Request queue

- Properties:
    - `id`
    - `name`
    - `metadata`
- Methods:
    - `open`
    - `purge` (new method)
    - `drop`
    - `add_request`
    - `add_requests_batched` -> `add_requests`
    - `fetch_next_request`
    - `get_request`
    - `mark_request_as_handled`
    - `reclaim_request`
    - `is_empty`
    - `is_finished`
- Breaking changes:
- `from_storage_object` method has been removed - Use the `open` method
with `name` or `id` instead.
    - `get_info` -> `metadata` property
    - `storage_object` -> `metadata` property
- `set_metadata` method has been removed (it wasn't propage to clients)
        - Do we want to support it (e.g. for renaming)?
- `get_handled_count` method had been removed - Use
`metadata.handled_request_count` instead.
- `get_total_count` method has been removed - Use
`metadata.total_request_count` instead.
- `resource_directory` from the `RequestQueueMetadata` was removed, use
`path_to...` property instead.
- `RequestQueueHead` model has been removed - Use
`RequestQueueHeadWithLocks` instead.
- Notes:
- New RQ `add_requests` contain `forefront` arg (Apify API supports it)

```python
import asyncio

from crawlee import Request
from crawlee.configuration import Configuration
from crawlee.storage_clients import FileSystemStorageClient
from crawlee.storages import RequestQueue


async def main() -> None:
    rq = await RequestQueue.open(
        name='my-queue',
        storage_client=FileSystemStorageClient(),
        configuration=Configuration(purge_on_start=True),
    )

    print(f'RequestQueue: {rq}')
    print(f'RequestQueue client: {rq._client}')

    await rq.add_requests(
        requests=[
            Request.from_url('https://example.com', use_extended_unique_key=True),
            Request.from_url('https://crawlee.dev', use_extended_unique_key=True),
            Request.from_url('https://apify.com', use_extended_unique_key=True),
        ],
    )

    print('Requests were added to the queue')

    is_empty = await rq.is_empty()
    is_finished = await rq.is_finished()

    print(f'Is empty: {is_empty}')
    print(f'Is finished: {is_finished}')

    request = await rq.fetch_next_request()
    print(f'Fetched request: {request}')

    await rq.add_request('https://facebook.com', forefront=True)

    request = await rq.fetch_next_request()
    print(f'Fetched request: {request}')

    rq_default = await RequestQueue.open(
        storage_client=FileSystemStorageClient(),
        configuration=Configuration(purge_on_start=True),
    )

    await rq_default.add_request('https://example.com/1')
    await rq_default.add_requests(
        [
            'https://example.com/priority-1',
            'https://example.com/priority-2',
            'https://example.com/priority-3',
        ]
    )
    await rq_default.add_request('https://example.com/2')


if __name__ == '__main__':
    asyncio.run(main())
```

### BaseDatasetClient

- Properties:
    - `metadata`
- Methods:
    - `open`
    - `purge`
    - `drop`
    - `push_data`
    - `get_data`
    - `iterate_items`

### BaseKeyValueStoreClient

- Properties:
    - `metadata`
- Methods:
    - `open`
    - `purge`
    - `drop`
    - `get_value`
    - `set_value`
    - `delete_value`
    - `iterate_keys`
    - `get_public_url`

### BaseRequestQueueClient

- Properties:
    - `metadata`
- Methods:
    - `open`
    - `purge`
    - `drop`
- `add_requests_batch` -> `add_batch_of_requests` (one backend method
for 2 frontend methods)
    - `get_request`
    - `fetch_next_request`
    - `mark_request_as_handled`
    - `reclaim_request`
    - `is_empty`
- Models
    - `RequestQueueHeadWithLocks` -> `RequestQueueHead`
    - `BatchRequestsOperationResponse` -> `AddRequestsResponse`
- Notes:
- Old file system (memory) version didn't persist the in-progress
requests
- Old file system (memory) version didn't persist the forefront values
(now there is a FS-specific `_sequence` field in the FS Request)
- The methods manipulating locks and listing heads are now only internal
methods of Apify RQ client.

## Issues

- Closes: #92
- Closes: #147
- Closes: #783
- Closes: #1247
- Relates: #1175
- Relates: #1191

## Testing

- The original tests were mostly removed and replaced with a new ones.
- Each storage-client implementation now has its own dedicated tests at
the client level (more targeted/edge-case coverage).
- On top of that, there are storage-level tests that use a parametrized
fixture for each storage client (`file-system` and `memory`), ensuring
every storage test runs against every client implementation.

## Checklist

- [x] CI passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
debt Code quality improvement or decrease of technical debt. t-tooling Issues with this label are in the ownership of the tooling team.
Projects
None yet
2 participants