Skip to content

Commit 770394b

Browse files
committed
Add init version of new Apify storage clients
1 parent 3a173da commit 770394b

File tree

6 files changed

+1086
-0
lines changed

6 files changed

+1086
-0
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from ._dataset_client import ApifyDatasetClient
2+
from ._key_value_store_client import ApifyKeyValueStoreClient
3+
from ._request_queue_client import ApifyRequestQueueClient
4+
from ._storage_client import ApifyStorageClient
5+
6+
__all__ = [
7+
'ApifyDatasetClient',
8+
'ApifyKeyValueStoreClient',
9+
'ApifyRequestQueueClient',
10+
'ApifyStorageClient',
11+
]
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from logging import getLogger
5+
from typing import TYPE_CHECKING, Any
6+
7+
from typing_extensions import override
8+
9+
from apify_client import ApifyClientAsync
10+
from crawlee.storage_clients._base import DatasetClient
11+
from crawlee.storage_clients.models import DatasetItemsListPage, DatasetMetadata
12+
13+
if TYPE_CHECKING:
14+
from collections.abc import AsyncIterator
15+
from datetime import datetime
16+
17+
from apify_client.clients import DatasetClientAsync
18+
19+
from apify import Configuration
20+
21+
logger = getLogger(__name__)
22+
23+
24+
class ApifyDatasetClient(DatasetClient):
25+
"""An Apify platform implementation of the dataset client."""
26+
27+
def __init__(
28+
self,
29+
*,
30+
id: str,
31+
name: str | None,
32+
created_at: datetime,
33+
accessed_at: datetime,
34+
modified_at: datetime,
35+
item_count: int,
36+
api_client: DatasetClientAsync,
37+
) -> None:
38+
"""Initialize a new instance.
39+
40+
Preferably use the `ApifyDatasetClient.open` class method to create a new instance.
41+
"""
42+
self._metadata = DatasetMetadata(
43+
id=id,
44+
name=name,
45+
created_at=created_at,
46+
accessed_at=accessed_at,
47+
modified_at=modified_at,
48+
item_count=item_count,
49+
)
50+
51+
self._api_client = api_client
52+
"""The Apify dataset client for API operations."""
53+
54+
self._lock = asyncio.Lock()
55+
"""A lock to ensure that only one operation is performed at a time."""
56+
57+
@override
58+
@property
59+
def metadata(self) -> DatasetMetadata:
60+
return self._metadata
61+
62+
@override
63+
@classmethod
64+
async def open(
65+
cls,
66+
*,
67+
id: str | None,
68+
name: str | None,
69+
configuration: Configuration,
70+
) -> ApifyDatasetClient:
71+
token = configuration.token
72+
api_url = configuration.api_base_url
73+
74+
# Otherwise, create a new one.
75+
apify_client_async = ApifyClientAsync(
76+
token=token,
77+
api_url=api_url,
78+
max_retries=8,
79+
min_delay_between_retries_millis=500,
80+
timeout_secs=360,
81+
)
82+
83+
apify_datasets_client = apify_client_async.datasets()
84+
85+
metadata = DatasetMetadata.model_validate(
86+
await apify_datasets_client.get_or_create(name=id if id is not None else name),
87+
)
88+
89+
apify_dataset_client = apify_client_async.dataset(dataset_id=metadata.id)
90+
91+
return cls(
92+
id=metadata.id,
93+
name=metadata.name,
94+
created_at=metadata.created_at,
95+
accessed_at=metadata.accessed_at,
96+
modified_at=metadata.modified_at,
97+
item_count=metadata.item_count,
98+
api_client=apify_dataset_client,
99+
)
100+
101+
@override
102+
async def purge(self) -> None:
103+
# TODO: better
104+
async with self._lock:
105+
await self._api_client.delete()
106+
107+
@override
108+
async def drop(self) -> None:
109+
async with self._lock:
110+
await self._api_client.delete()
111+
112+
@override
113+
async def push_data(self, data: list[Any] | dict[str, Any]) -> None:
114+
async with self._lock:
115+
await self._api_client.push_items(items=data)
116+
await self._update_metadata()
117+
118+
@override
119+
async def get_data(
120+
self,
121+
*,
122+
offset: int = 0,
123+
limit: int | None = 999_999_999_999,
124+
clean: bool = False,
125+
desc: bool = False,
126+
fields: list[str] | None = None,
127+
omit: list[str] | None = None,
128+
unwind: str | None = None,
129+
skip_empty: bool = False,
130+
skip_hidden: bool = False,
131+
flatten: list[str] | None = None,
132+
view: str | None = None,
133+
) -> DatasetItemsListPage:
134+
response = await self._api_client.list_items(
135+
offset=offset,
136+
limit=limit,
137+
clean=clean,
138+
desc=desc,
139+
fields=fields,
140+
omit=omit,
141+
unwind=unwind,
142+
skip_empty=skip_empty,
143+
skip_hidden=skip_hidden,
144+
flatten=flatten,
145+
view=view,
146+
)
147+
result = DatasetItemsListPage.model_validate(vars(response))
148+
await self._update_metadata()
149+
return result
150+
151+
@override
152+
async def iterate_items(
153+
self,
154+
*,
155+
offset: int = 0,
156+
limit: int | None = None,
157+
clean: bool = False,
158+
desc: bool = False,
159+
fields: list[str] | None = None,
160+
omit: list[str] | None = None,
161+
unwind: str | None = None,
162+
skip_empty: bool = False,
163+
skip_hidden: bool = False,
164+
) -> AsyncIterator[dict]:
165+
async for item in self._api_client.iterate_items(
166+
offset=offset,
167+
limit=limit,
168+
clean=clean,
169+
desc=desc,
170+
fields=fields,
171+
omit=omit,
172+
unwind=unwind,
173+
skip_empty=skip_empty,
174+
skip_hidden=skip_hidden,
175+
):
176+
yield item
177+
178+
await self._update_metadata()
179+
180+
async def _update_metadata(self) -> None:
181+
"""Update the dataset metadata file with current information."""
182+
metadata = await self._api_client.get()
183+
self._metadata = DatasetMetadata.model_validate(metadata)
Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from logging import getLogger
5+
from typing import TYPE_CHECKING, Any
6+
7+
from typing_extensions import override
8+
from yarl import URL
9+
10+
from apify_client import ApifyClientAsync
11+
from crawlee.storage_clients._base import KeyValueStoreClient
12+
from crawlee.storage_clients.models import (
13+
KeyValueStoreListKeysPage,
14+
KeyValueStoreMetadata,
15+
KeyValueStoreRecord,
16+
KeyValueStoreRecordMetadata,
17+
)
18+
19+
from apify._crypto import create_hmac_signature
20+
21+
if TYPE_CHECKING:
22+
from collections.abc import AsyncIterator
23+
from datetime import datetime
24+
25+
from apify_client.clients import KeyValueStoreClientAsync
26+
27+
from apify import Configuration
28+
29+
logger = getLogger(__name__)
30+
31+
32+
class ApifyKeyValueStoreClient(KeyValueStoreClient):
33+
"""An Apify platform implementation of the key-value store client."""
34+
35+
def __init__(
36+
self,
37+
*,
38+
id: str,
39+
name: str | None,
40+
created_at: datetime,
41+
accessed_at: datetime,
42+
modified_at: datetime,
43+
api_client: KeyValueStoreClientAsync,
44+
) -> None:
45+
"""Initialize a new instance.
46+
47+
Preferably use the `ApifyKeyValueStoreClient.open` class method to create a new instance.
48+
"""
49+
self._metadata = KeyValueStoreMetadata(
50+
id=id,
51+
name=name,
52+
created_at=created_at,
53+
accessed_at=accessed_at,
54+
modified_at=modified_at,
55+
)
56+
57+
self._api_client = api_client
58+
"""The Apify key-value store client for API operations."""
59+
60+
self._lock = asyncio.Lock()
61+
"""A lock to ensure that only one operation is performed at a time."""
62+
63+
@override
64+
@property
65+
def metadata(self) -> KeyValueStoreMetadata:
66+
return self._metadata
67+
68+
@override
69+
@classmethod
70+
async def open(
71+
cls,
72+
*,
73+
id: str | None,
74+
name: str | None,
75+
configuration: Configuration,
76+
) -> ApifyKeyValueStoreClient:
77+
token = configuration.token
78+
api_url = configuration.api_base_url
79+
80+
# Otherwise, create a new one.
81+
apify_client_async = ApifyClientAsync(
82+
token=token,
83+
api_url=api_url,
84+
max_retries=8,
85+
min_delay_between_retries_millis=500,
86+
timeout_secs=360,
87+
)
88+
89+
apify_kvss_client = apify_client_async.key_value_stores()
90+
91+
metadata = KeyValueStoreMetadata.model_validate(
92+
await apify_kvss_client.get_or_create(name=id if id is not None else name),
93+
)
94+
95+
apify_kvs_client = apify_client_async.key_value_store(key_value_store_id=metadata.id)
96+
97+
return cls(
98+
id=metadata.id,
99+
name=metadata.name,
100+
created_at=metadata.created_at,
101+
accessed_at=metadata.accessed_at,
102+
modified_at=metadata.modified_at,
103+
api_client=apify_kvs_client,
104+
)
105+
106+
@override
107+
async def purge(self) -> None:
108+
# TODO: better
109+
async with self._lock:
110+
await self._api_client.delete()
111+
112+
@override
113+
async def drop(self) -> None:
114+
async with self._lock:
115+
await self._api_client.delete()
116+
117+
@override
118+
async def get_value(self, key: str) -> KeyValueStoreRecord | None:
119+
response = await self._api_client.get_record(key)
120+
record = KeyValueStoreRecord.model_validate(response) if response else None
121+
await self._update_metadata()
122+
return record
123+
124+
@override
125+
async def set_value(self, key: str, value: Any, content_type: str | None = None) -> None:
126+
async with self._lock:
127+
await self._api_client.set_record(
128+
key=key,
129+
value=value,
130+
content_type=content_type,
131+
)
132+
await self._update_metadata()
133+
134+
@override
135+
async def delete_value(self, key: str) -> None:
136+
async with self._lock:
137+
await self._api_client.delete_record(key=key)
138+
await self._update_metadata()
139+
140+
@override
141+
async def iterate_keys(
142+
self,
143+
*,
144+
exclusive_start_key: str | None = None,
145+
limit: int | None = None,
146+
) -> AsyncIterator[KeyValueStoreRecordMetadata]:
147+
count = 0
148+
149+
while True:
150+
response = await self._api_client.list_keys(exclusive_start_key=exclusive_start_key)
151+
list_key_page = KeyValueStoreListKeysPage.model_validate(response)
152+
153+
for item in list_key_page.items:
154+
yield item
155+
count += 1
156+
157+
# If we've reached the limit, stop yielding
158+
if limit and count >= limit:
159+
break
160+
161+
# If we've reached the limit or there are no more pages, exit the loop
162+
if (limit and count >= limit) or not list_key_page.is_truncated:
163+
break
164+
165+
exclusive_start_key = list_key_page.next_exclusive_start_key
166+
167+
await self._update_metadata()
168+
169+
async def get_public_url(self, key: str) -> str:
170+
"""Get a URL for the given key that may be used to publicly access the value in the remote key-value store.
171+
172+
Args:
173+
key: The key for which the URL should be generated.
174+
"""
175+
if self._api_client.resource_id is None:
176+
raise ValueError('resource_id cannot be None when generating a public URL')
177+
178+
public_url = (
179+
URL(self._api_client.base_url) / 'v2' / 'key-value-stores' / self._api_client.resource_id / 'records' / key
180+
)
181+
182+
key_value_store = self.metadata
183+
184+
if key_value_store and key_value_store.model_extra:
185+
url_signing_secret_key = key_value_store.model_extra.get('urlSigningSecretKey')
186+
if url_signing_secret_key:
187+
public_url = public_url.with_query(signature=create_hmac_signature(url_signing_secret_key, key))
188+
189+
return str(public_url)
190+
191+
async def _update_metadata(self) -> None:
192+
"""Update the key-value store metadata with current information."""
193+
metadata = await self._api_client.get()
194+
self._metadata = KeyValueStoreMetadata.model_validate(metadata)

0 commit comments

Comments
 (0)