Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apify.events import ApifyEventManager, EventManager, LocalEventManager
from apify.log import _configure_logging, logger
from apify.storage_clients import ApifyStorageClient, SmartApifyStorageClient
from apify.storage_clients._apify._alias_resolving import AliasResolver
from apify.storage_clients._file_system import ApifyFileSystemStorageClient
from apify.storages import Dataset, KeyValueStore, RequestQueue

Expand Down Expand Up @@ -203,6 +204,10 @@ async def __aenter__(self) -> Self:
if not Actor.is_at_home():
# Make sure that the input related KVS is initialized to ensure that the input aware client is used
await self.open_key_value_store()
else:
# Load pre-existing non-default aliased storages from configuration
# Supported only on the Apify platform, where those storages are pre-created by the platform.
await AliasResolver.register_aliases(configuration=self.configuration)
return self

async def __aexit__(
Expand Down
33 changes: 33 additions & 0 deletions src/apify/_configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import dataclasses
import json
from datetime import datetime, timedelta
from decimal import Decimal
Expand Down Expand Up @@ -34,6 +35,29 @@ def _transform_to_list(value: Any) -> list[str] | None:
return value if isinstance(value, list) else str(value).split(',')


@dataclasses.dataclass
class ActorStorages:
"""Storage IDs for different storage types used by an Actor."""

key_value_stores: dict[str, str]
datasets: dict[str, str]
request_queues: dict[str, str]


def _load_storage_keys(data: None | str | dict | ActorStorages) -> ActorStorages | None:
"""Load storage keys from environment."""
if data is None:
return None
if isinstance(data, ActorStorages):
return data
storage_mapping = data if isinstance(data, dict) else json.loads(data)
return ActorStorages(
key_value_stores=storage_mapping.get('keyValueStores', {}),
datasets=storage_mapping.get('datasets', {}),
request_queues=storage_mapping.get('requestQueues', {}),
)


@docs_group('Configuration')
class Configuration(CrawleeConfiguration):
"""A class for specifying the configuration of an Actor.
Expand Down Expand Up @@ -446,6 +470,15 @@ class Configuration(CrawleeConfiguration):
BeforeValidator(lambda data: json.loads(data) if isinstance(data, str) else data or None),
] = None

actor_storages: Annotated[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the naming here - we usually strip the actor_ prefix. But I do agree that a plain storages would look odd. So let's keep it this way, I guess.

ActorStorages | None,
Field(
alias='actor_storages_json',
description='Storage IDs for the actor',
),
BeforeValidator(_load_storage_keys),
] = None

@model_validator(mode='after')
def disable_browser_sandbox_on_platform(self) -> Self:
"""Disable the browser sandbox mode when running on the Apify platform.
Expand Down
38 changes: 38 additions & 0 deletions src/apify/storage_clients/_apify/_alias_resolving.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,41 @@ async def _get_default_kvs_client(configuration: Configuration) -> KeyValueStore
raise ValueError("'Configuration.default_key_value_store_id' must be set.")

return apify_client_async.key_value_store(key_value_store_id=configuration.default_key_value_store_id)

@classmethod
async def register_aliases(cls, configuration: Configuration) -> None:
"""Load alias mapping from configuration to the default kvs."""
if configuration.actor_storages is None:
return

configuration_mapping = {}

if configuration.default_dataset_id != configuration.actor_storages.datasets.get('default'):
logger.warning(
f'Conflicting default dataset ids: {configuration.default_dataset_id=},'
f" {configuration.actor_storages.datasets.get('default')=}"
)

for mapping, storage_type in (
(configuration.actor_storages.key_value_stores, 'KeyValueStore'),
(configuration.actor_storages.datasets, 'Dataset'),
(configuration.actor_storages.request_queues, 'RequestQueue'),
):
for storage_alias, storage_id in mapping.items():
configuration_mapping[
cls( # noqa: SLF001# It is ok in own classmethod.
storage_type=storage_type,
alias='__default__' if storage_alias == 'default' else storage_alias,
configuration=configuration,
)._storage_key
] = storage_id

# Bulk update the mapping in the default KVS with the configuration mapping.
client = await cls._get_default_kvs_client(configuration=configuration)
existing_mapping = ((await client.get_record(cls._ALIAS_MAPPING_KEY)) or {'value': {}}).get('value', {})

# Update the existing mapping with the configuration mapping.
existing_mapping.update(configuration_mapping)
# Store the updated mapping back in the KVS and in memory.
await client.set_record(cls._ALIAS_MAPPING_KEY, existing_mapping)
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing_mapping = ((await client.get_record(...)) or {'value': {}}).get('value', {}) assumes the record always has a value key containing a dict. However, this module already documents/handles get_record sometimes returning the mapping dict directly (without value). In that case, this code will treat the mapping as missing and overwrite it with only configuration_mapping. Also, if value is present but not a dict (e.g. None), existing_mapping.update(...) will raise. Please mirror the normalization logic used in _get_alias_map/store_mapping: normalize record into a dict[str, str] whether it comes wrapped in {key,value} or as a raw mapping, otherwise default to {}.

Copilot uses AI. Check for mistakes.
cls._alias_map.update(existing_mapping)
Empty file.
24 changes: 24 additions & 0 deletions tests/e2e/test_schema_storages/actor_source/actor.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"actorSpecification": 1,
"version": "0.0",
"storages": {
"datasets": {
"default": {
"actorSpecification": 1,
"fields": {
"properties": {
"id": { "type": "string" }
}
}
},
"custom": {
"actorSpecification": 1,
"fields": {
"properties": {
"id": { "type": "string" }
}
}
}
}
}
}
7 changes: 7 additions & 0 deletions tests/e2e/test_schema_storages/actor_source/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from apify import Actor


async def main() -> None:
async with Actor:
assert Actor.configuration.actor_storages
assert (await Actor.open_dataset(alias='custom')).id == Actor.configuration.actor_storages.datasets['custom']
26 changes: 26 additions & 0 deletions tests/e2e/test_schema_storages/test_schema_storages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from ..conftest import MakeActorFunction, RunActorFunction

_ACTOR_SOURCE_DIR = Path(__file__).parent / 'actor_source'


def read_actor_source(filename: str) -> str:
return (_ACTOR_SOURCE_DIR / filename).read_text()


async def test_configuration_storages(make_actor: MakeActorFunction, run_actor: RunActorFunction) -> None:
actor = await make_actor(
label='schema_storages',
source_files={
'src/main.py': read_actor_source('main.py'),
'.actor/actor.json': read_actor_source('actor.json'),
},
)
run_result = await run_actor(actor)

assert run_result.status == 'SUCCEEDED'
53 changes: 53 additions & 0 deletions tests/integration/test_storages.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from __future__ import annotations

import asyncio
from typing import cast

import pytest

from crawlee import service_locator
from crawlee.storages import Dataset, KeyValueStore, RequestQueue

from apify import Actor, Configuration
from apify._configuration import ActorStorages
from apify.storage_clients import ApifyStorageClient, MemoryStorageClient, SmartApifyStorageClient
from apify.storage_clients._apify._alias_resolving import AliasResolver


@pytest.mark.parametrize(
Expand Down Expand Up @@ -125,3 +128,53 @@ async def test_actor_implicit_storage_init(apify_token: str) -> None:
assert await Actor.open_dataset() is not await Actor.open_dataset(force_cloud=True)
assert await Actor.open_key_value_store() is not await Actor.open_key_value_store(force_cloud=True)
assert await Actor.open_request_queue() is not await Actor.open_request_queue(force_cloud=True)


async def test_actor_storages_alias_resolving(apify_token: str) -> None:
"""Test that `AliasResolver.register_aliases` correctly updates default KVS with Actor storages."""

# Actor storages
datasets = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}
request_queues = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}
key_value_stores = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}

# Set up the configuration and storage client for the test
configuration = Configuration(
default_key_value_store_id='default_kvs_id',
token=apify_token,
actor_storages=ActorStorages(
datasets=datasets, request_queues=request_queues, key_value_stores=key_value_stores
),
)
storage_client = ApifyStorageClient()
service_locator.set_configuration(configuration)
service_locator.set_storage_client(storage_client)

client_cache_key = cast('tuple', storage_client.get_storage_client_cache_key(configuration))[-1]
# Add some unrelated pre-existing alias mapping (it should be preserved after registering aliases)
pre_existing_mapping = {f'KeyValueStore,pre_existing_alias,{client_cache_key}': 'pre_existing_id'}

default_kvs = await KeyValueStore.open(configuration=configuration, storage_client=storage_client)
await default_kvs.set_value(AliasResolver._ALIAS_MAPPING_KEY, pre_existing_mapping)

# Construct the expected mapping
expected_mapping = {}
for storage_type, storage_map in (
('Dataset', datasets),
('KeyValueStore', key_value_stores),
('RequestQueue', request_queues),
):
for storage_alias, storage_id in storage_map.items():
expected_mapping[
','.join(
(storage_type, '__default__' if storage_alias == 'default' else storage_alias, client_cache_key)
)
] = storage_id
expected_mapping.update(pre_existing_mapping)

try:
configuration.default_key_value_store_id = default_kvs.id
await AliasResolver.register_aliases(configuration=configuration)
assert await default_kvs.get_value(AliasResolver._ALIAS_MAPPING_KEY) == expected_mapping
finally:
await default_kvs.drop()
23 changes: 23 additions & 0 deletions tests/unit/actor/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,26 @@ def test_actor_pricing_info_from_json_env_var(monkeypatch: pytest.MonkeyPatch) -
config = ApifyConfiguration()
assert config.actor_pricing_info is not None
assert config.actor_pricing_info.pricing_model == 'PAY_PER_EVENT'


def test_actor_storage_json_env_var(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that actor_storages_json is parsed from JSON env var."""
import json

datasets = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}
request_queues = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}
key_value_stores = {'default': 'default_dataset_id', 'custom': 'custom_dataset_id'}

actor_storages_json = json.dumps(
{
'datasets': datasets,
'requestQueues': request_queues,
'keyValueStores': key_value_stores,
}
)
monkeypatch.setenv('ACTOR_STORAGES_JSON', actor_storages_json)
config = ApifyConfiguration()
assert config.actor_storages
assert config.actor_storages.datasets == datasets
assert config.actor_storages.request_queues == request_queues
assert config.actor_storages.key_value_stores == key_value_stores
Loading