Skip to content

Commit 3336b2d

Browse files
committed
KVS update
1 parent 67d706b commit 3336b2d

File tree

4 files changed

+118
-53
lines changed

4 files changed

+118
-53
lines changed

src/crawlee/storage_clients/_base/_key_value_store_client.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,15 @@
66
from crawlee._utils.docs import docs_group
77

88
if TYPE_CHECKING:
9+
from collections.abc import AsyncIterator
910
from datetime import datetime
1011
from pathlib import Path
1112

12-
from crawlee.storage_clients.models import KeyValueStoreListKeysPage, KeyValueStoreRecord
13+
from crawlee.storage_clients.models import (
14+
KeyValueStoreListKeysPage,
15+
KeyValueStoreRecord,
16+
KeyValueStoreRecordMetadata,
17+
)
1318

1419
# Properties:
1520
# - id
@@ -126,6 +131,23 @@ async def iterate_keys(
126131
*,
127132
exclusive_start_key: str | None = None,
128133
limit: int = 1000,
134+
) -> AsyncIterator[KeyValueStoreRecordMetadata]:
135+
"""Iterate over the existing keys in the key-value store.
136+
137+
The backend method for the `KeyValueStore.iterate_keys` call.
138+
"""
139+
# This syntax is to make mypy properly work with abstract AsyncIterator.
140+
# https://mypy.readthedocs.io/en/stable/more_types.html#asynchronous-iterators
141+
raise NotImplementedError
142+
if False: # type: ignore[unreachable]
143+
yield 0
144+
145+
@abstractmethod
146+
async def list_keys(
147+
self,
148+
*,
149+
exclusive_start_key: str | None = None,
150+
limit: int = 1000,
129151
) -> KeyValueStoreListKeysPage:
130152
"""List the keys in the key-value store.
131153

src/crawlee/storage_clients/_file_system/_key_value_store_client.py

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from crawlee._utils.crypto import crypto_random_object_id
1515
from crawlee.storage_clients._base import KeyValueStoreClient
1616
from crawlee.storage_clients.models import (
17-
KeyValueStoreKeyInfo,
1817
KeyValueStoreListKeysPage,
1918
KeyValueStoreMetadata,
2019
KeyValueStoreRecord,
@@ -245,11 +244,15 @@ async def get_value(self, *, key: str) -> KeyValueStoreRecord | None:
245244
# Update the metadata to record access
246245
await self._update_metadata(update_accessed_at=True)
247246

247+
# Calculate the size of the value in bytes
248+
size = len(value_bytes)
249+
248250
return KeyValueStoreRecord(
249251
key=metadata.key,
250252
value=value,
251253
content_type=metadata.content_type,
252254
filename=filename,
255+
size=size,
253256
)
254257

255258
@override
@@ -271,7 +274,9 @@ async def set_value(self, *, key: str, value: Any, content_type: str | None = No
271274
record_path = self._path_to_kvs / filename
272275

273276
# Get the metadata.
274-
record_metadata = KeyValueStoreRecordMetadata(key=key, content_type=content_type)
277+
# Calculate the size of the value in bytes
278+
size = len(value_bytes)
279+
record_metadata = KeyValueStoreRecordMetadata(key=key, content_type=content_type, size=size)
275280
record_metadata_filepath = record_path.with_name(f'{record_path.name}.{METADATA_FILENAME}')
276281
record_metadata_content = await json_dumps(record_metadata.model_dump())
277282

@@ -330,61 +335,87 @@ async def delete_value(self, *, key: str) -> None:
330335
if deleted:
331336
await self._update_metadata(update_accessed_at=True, update_modified_at=True)
332337

338+
@override
333339
async def iterate_keys(
334340
self,
335341
*,
336342
exclusive_start_key: str | None = None,
337343
limit: int = 1000,
338-
) -> AsyncIterator[KeyValueStoreKeyInfo]:
339-
keys = []
340-
has_next = False
341-
344+
) -> AsyncIterator[KeyValueStoreRecordMetadata]:
342345
# Check if the KVS directory exists
343346
if not self._path_to_kvs.exists():
344347
return
345348

349+
count = 0
346350
async with self._lock:
347351
# Get all files in the KVS directory
348-
files = await asyncio.to_thread(self._path_to_kvs.glob, '*')
352+
files = sorted(await asyncio.to_thread(list, self._path_to_kvs.glob('*')))
349353

350-
# Filter out metadata files and get unique key names
351-
key_files = {}
352354
for file_path in files:
355+
# Skip the main metadata file
353356
if file_path.name == METADATA_FILENAME:
354357
continue
355358

356-
# Skip metadata files for records
357-
if file_path.name.endswith(f'.{METADATA_FILENAME}'):
359+
# Only process metadata files for records
360+
if not file_path.name.endswith(f'.{METADATA_FILENAME}'):
358361
continue
359362

360-
# Extract the base key name
361-
key = file_path.name
362-
key_files[key] = file_path
363+
# Extract the base key name from the metadata filename
364+
key_name = file_path.name[: -len(f'.{METADATA_FILENAME}')]
363365

364-
# Sort keys for consistent ordering
365-
all_keys = sorted(key_files.keys())
366+
# Apply exclusive_start_key filter if provided
367+
if exclusive_start_key is not None and key_name <= exclusive_start_key:
368+
continue
366369

367-
# Apply exclusive_start_key if provided
368-
if exclusive_start_key is not None:
369-
start_idx = 0
370-
for idx, key in enumerate(all_keys):
371-
if key > exclusive_start_key: # exclusive start
372-
start_idx = idx
373-
break
374-
all_keys = all_keys[start_idx:]
370+
# Try to read and parse the metadata file
371+
try:
372+
metadata_content = await asyncio.to_thread(file_path.read_text, encoding='utf-8')
373+
metadata_dict = json.loads(metadata_content)
374+
record_metadata = KeyValueStoreRecordMetadata(**metadata_dict)
375375

376-
# Apply limit
377-
if len(all_keys) > limit:
378-
keys = all_keys[:limit]
379-
has_next = True
380-
else:
381-
keys = all_keys
382-
has_next = False
376+
yield record_metadata
377+
378+
count += 1
379+
if count >= limit:
380+
break
381+
except (json.JSONDecodeError, ValidationError) as e:
382+
logger.warning(f'Failed to parse metadata file {file_path}: {e}')
383383

384384
# Update accessed_at timestamp
385385
await self._update_metadata(update_accessed_at=True)
386386

387-
return KeyValueStoreListKeysPage(keys=keys, has_next=has_next)
387+
@override
388+
async def list_keys(
389+
self,
390+
*,
391+
exclusive_start_key: str | None = None,
392+
limit: int = 1000,
393+
) -> KeyValueStoreListKeysPage:
394+
keys = []
395+
had_more = False
396+
next_exclusive_start_key = None
397+
398+
# Use the iterate_keys method to get all keys
399+
async for metadata in self.iterate_keys(exclusive_start_key=exclusive_start_key, limit=limit + 1):
400+
keys.append(metadata.key)
401+
# If we've collected more than the limit, we know there are more keys
402+
if len(keys) > limit:
403+
had_more = True
404+
next_exclusive_start_key = metadata.key
405+
keys.pop() # Remove the extra key
406+
break
407+
408+
# Update the accessed_at timestamp is already handled by iterate_keys
409+
410+
return KeyValueStoreListKeysPage(
411+
count=len(keys),
412+
items=keys,
413+
had_more=had_more,
414+
is_truncated=had_more,
415+
limit=limit,
416+
exclusive_start_key=exclusive_start_key,
417+
next_exclusive_start_key=next_exclusive_start_key,
418+
)
388419

389420
@override
390421
async def get_public_url(self, *, key: str) -> str:

src/crawlee/storage_clients/models.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,35 +63,35 @@ class RequestQueueMetadata(StorageMetadata):
6363

6464

6565
@docs_group('Data structures')
66-
class KeyValueStoreRecord(BaseModel, Generic[KvsValueType]):
67-
"""Model for a key-value store record."""
66+
class KeyValueStoreRecordMetadata(BaseModel):
67+
"""Model for a key-value store record metadata."""
6868

6969
model_config = ConfigDict(populate_by_name=True)
7070

7171
key: Annotated[str, Field(alias='key')]
72-
value: Annotated[KvsValueType, Field(alias='value')]
73-
content_type: Annotated[str | None, Field(alias='contentType', default=None)]
74-
filename: Annotated[str | None, Field(alias='filename', default=None)]
72+
"""The key of the record.
7573
74+
A unique identifier for the record in the key-value store.
75+
"""
7676

77-
@docs_group('Data structures')
78-
class KeyValueStoreRecordMetadata(BaseModel):
79-
"""Model for a key-value store record metadata."""
77+
content_type: Annotated[str, Field(alias='contentType')]
78+
"""The MIME type of the record.
8079
81-
model_config = ConfigDict(populate_by_name=True)
80+
Describe the format and type of data stored in the record, following the MIME specification.
81+
"""
8282

83-
key: Annotated[str, Field(alias='key')]
84-
content_type: Annotated[str, Field(alias='contentType')]
83+
size: Annotated[int, Field(alias='size')]
84+
"""The size of the record in bytes."""
8585

8686

8787
@docs_group('Data structures')
88-
class KeyValueStoreKeyInfo(BaseModel):
89-
"""Model for a key-value store key info."""
88+
class KeyValueStoreRecord(KeyValueStoreRecordMetadata, Generic[KvsValueType]):
89+
"""Model for a key-value store record."""
9090

9191
model_config = ConfigDict(populate_by_name=True)
9292

93-
key: Annotated[str, Field(alias='key')]
94-
size: Annotated[int, Field(alias='size')]
93+
value: Annotated[KvsValueType, Field(alias='value')]
94+
"""The value of the record."""
9595

9696

9797
@docs_group('Data structures')
@@ -103,9 +103,9 @@ class KeyValueStoreListKeysPage(BaseModel):
103103
count: Annotated[int, Field(alias='count')]
104104
limit: Annotated[int, Field(alias='limit')]
105105
is_truncated: Annotated[bool, Field(alias='isTruncated')]
106-
items: Annotated[list[KeyValueStoreKeyInfo], Field(alias='items', default_factory=list)]
107106
exclusive_start_key: Annotated[str | None, Field(alias='exclusiveStartKey', default=None)]
108107
next_exclusive_start_key: Annotated[str | None, Field(alias='nextExclusiveStartKey', default=None)]
108+
items: Annotated[list[KeyValueStoreRecordMetadata], Field(alias='items', default_factory=list)]
109109

110110

111111
@docs_group('Data structures')

src/crawlee/storages/_key_value_store.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
from __future__ import annotations
22

3-
from collections.abc import AsyncIterator
43
from pathlib import Path
54
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar
65

76
from crawlee import service_locator
87
from crawlee._utils.docs import docs_group
98
from crawlee.events._types import Event, EventPersistStateData
10-
from crawlee.storage_clients.models import KeyValueStoreKeyInfo, KeyValueStoreMetadata
9+
from crawlee.storage_clients.models import KeyValueStoreMetadata, KeyValueStoreRecordMetadata
1110

1211
if TYPE_CHECKING:
1312
from collections.abc import AsyncIterator
@@ -21,14 +20,27 @@
2120

2221
# TODO:
2322
# - inherit from storage class
24-
# - caching / memoization of both datasets & dataset clients
23+
# - caching / memoization of both KVS & KVS clients
2524

2625
# Suggested KVS breaking changes:
2726
# - from_storage_object method has been removed - Use the open method with name and/or id instead.
2827
# - get_info -> metadata property
2928
# - storage_object -> metadata property
3029
# - set_metadata method has been removed - Do we want to support it (e.g. for renaming)?
3130

31+
# Properties:
32+
# - id
33+
# - name
34+
# - metadata
35+
36+
# Methods:
37+
# - open
38+
# - drop
39+
# - get_value
40+
# - set_value
41+
# - iterate_keys
42+
# - list_keys (new method)
43+
# - get_public_url
3244

3345
@docs_group('Classes')
3446
class KeyValueStore:
@@ -163,7 +175,7 @@ async def iterate_keys(
163175
self,
164176
exclusive_start_key: str | None = None,
165177
limit: int = 1000,
166-
) -> AsyncIterator[KeyValueStoreKeyInfo]:
178+
) -> AsyncIterator[KeyValueStoreRecordMetadata]:
167179
"""Iterate over the existing keys in the KVS.
168180
169181
Args:

0 commit comments

Comments
 (0)