Skip to content

Commit dacdba5

Browse files
committed
feat: Add Privileged Storage Worker
1 parent 0650b00 commit dacdba5

File tree

22 files changed

+1102
-0
lines changed

22 files changed

+1102
-0
lines changed

changes/5779.feature.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add Privileged Storage Worker

src/ai/backend/storage/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ python_distribution(
3636
"backendai_cli_v10": {
3737
"storage": "ai.backend.storage.cli.__main__:main",
3838
"storage.start-server": "ai.backend.storage.server:main",
39+
"storage.start-privileged-worker": "ai.backend.storage.privileged.server:main",
3940
},
4041
},
4142
generate_setup=True,

src/ai/backend/storage/privileged/__init__.py

Whitespace-only changes.

src/ai/backend/storage/privileged/bgtask/__init__.py

Whitespace-only changes.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from ai.backend.common.bgtask.task.registry import BackgroundTaskHandlerRegistry
2+
from ai.backend.common.events.dispatcher import EventProducer
3+
4+
from ...bgtask.tasks.delete import VFolderDeleteTaskHandler
5+
from ...volumes.pool import VolumePool
6+
7+
8+
class BgtaskHandlerRegistryCreator:
9+
def __init__(self, volume_pool: VolumePool, event_producer: EventProducer) -> None:
10+
self._volume_pool = volume_pool
11+
self._event_producer = event_producer
12+
13+
def create(self) -> BackgroundTaskHandlerRegistry:
14+
registry = BackgroundTaskHandlerRegistry()
15+
registry.register(VFolderDeleteTaskHandler(self._volume_pool, self._event_producer))
16+
17+
return registry

src/ai/backend/storage/privileged/bootstrap/__init__.py

Whitespace-only changes.
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from typing import override
4+
5+
from ai.backend.common.stage.types import (
6+
ArgsSpecGenerator,
7+
Provisioner,
8+
ProvisionStage,
9+
)
10+
11+
from ...bgtask.tags import ROOT_PRIVILEGED_TAG
12+
from ..config import StorageProxyPrivilegedWorkerConfig
13+
from .stage.bgtask import (
14+
BgtaskManagerProvisioner,
15+
BgtaskManagerResult,
16+
BgtaskManagerSpec,
17+
BgtaskManagerSpecGenerator,
18+
BgtaskManagerStage,
19+
)
20+
from .stage.etcd import (
21+
EtcdProvisioner,
22+
EtcdResult,
23+
EtcdSpec,
24+
EtcdSpecGenerator,
25+
EtcdStage,
26+
)
27+
from .stage.event_dispatcher import (
28+
EventDispatcherProvisioner,
29+
EventDispatcherResult,
30+
EventDispatcherSpec,
31+
EventDispatcherSpecGenerator,
32+
EventDispatcherStage,
33+
)
34+
from .stage.logger import (
35+
LoggerProvisioner,
36+
LoggerResult,
37+
LoggerSpec,
38+
LoggerSpecGenerator,
39+
LoggerStage,
40+
)
41+
from .stage.message_queue import (
42+
MessageQueueProvisioner,
43+
MessageQueueResult,
44+
MessageQueueSpec,
45+
MessageQueueSpecGenerator,
46+
MessageQueueStage,
47+
)
48+
from .stage.monitor import (
49+
MonitorProvisioner,
50+
MonitorResult,
51+
MonitorSpec,
52+
MonitorSpecGenerator,
53+
MonitorStage,
54+
)
55+
from .stage.redis_config import (
56+
RedisConfigProvisioner,
57+
RedisConfigResult,
58+
RedisConfigSpec,
59+
RedisConfigSpecGenerator,
60+
RedisConfigStage,
61+
)
62+
from .stage.valkey_client import (
63+
ValkeyClientProvisioner,
64+
ValkeyClientResult,
65+
ValkeyClientSpec,
66+
ValkeyClientSpecGenerator,
67+
ValkeyClientStage,
68+
)
69+
from .stage.volume_pool import (
70+
VolumePoolProvisioner,
71+
VolumePoolResult,
72+
VolumePoolSpec,
73+
VolumePoolSpecGenerator,
74+
VolumePoolStage,
75+
)
76+
77+
78+
@dataclass
79+
class BootstrapSpec:
80+
loop: asyncio.AbstractEventLoop
81+
local_config: StorageProxyPrivilegedWorkerConfig
82+
pidx: int
83+
84+
85+
class BootstrapSpecGenerator(ArgsSpecGenerator[BootstrapSpec]):
86+
pass
87+
88+
89+
@dataclass
90+
class BootstrapResult:
91+
logger: LoggerResult
92+
monitor: MonitorResult
93+
etcd: EtcdResult
94+
redis_config: RedisConfigResult
95+
message_queue: MessageQueueResult
96+
event_dispatcher: EventDispatcherResult
97+
valkey_client: ValkeyClientResult
98+
volume_pool: VolumePoolResult
99+
bgtask_manager: BgtaskManagerResult
100+
101+
102+
class BootstrapProvisioner(Provisioner[BootstrapSpec, BootstrapResult]):
103+
@property
104+
@override
105+
def name(self) -> str:
106+
return "storage-worker-bootstrap"
107+
108+
@override
109+
async def setup(self, spec: BootstrapSpec) -> BootstrapResult:
110+
local_config = spec.local_config
111+
sub_logger_stage = LoggerStage(LoggerProvisioner())
112+
sub_logger_spec = LoggerSpec(
113+
is_master=False,
114+
ipc_base_path=local_config.storage_proxy.ipc_base_path,
115+
config=spec.local_config.logging,
116+
)
117+
await sub_logger_stage.setup(LoggerSpecGenerator(sub_logger_spec))
118+
logger_result = await sub_logger_stage.wait_for_resource()
119+
120+
monitor_stage = MonitorStage(MonitorProvisioner())
121+
monitor_spec = MonitorSpec(loop=spec.loop, pidx=spec.pidx, local_config=local_config)
122+
await monitor_stage.setup(MonitorSpecGenerator(monitor_spec))
123+
monitor_result = await monitor_stage.wait_for_resource()
124+
125+
etcd_stage = EtcdStage(EtcdProvisioner())
126+
etcd_spec = EtcdSpec(local_config=local_config)
127+
await etcd_stage.setup(EtcdSpecGenerator(etcd_spec))
128+
etcd_result = await etcd_stage.wait_for_resource()
129+
130+
redis_config_stage = RedisConfigStage(RedisConfigProvisioner())
131+
redis_config_spec = RedisConfigSpec(etcd=etcd_result.etcd)
132+
await redis_config_stage.setup(RedisConfigSpecGenerator(redis_config_spec))
133+
redis_config_result = await redis_config_stage.wait_for_resource()
134+
redis_profile_target = redis_config_result.redis_config.to_redis_profile_target()
135+
136+
mq_stage = MessageQueueStage(MessageQueueProvisioner())
137+
mq_spec = MessageQueueSpec(
138+
local_config=local_config, redis_profile_target=redis_profile_target
139+
)
140+
await mq_stage.setup(MessageQueueSpecGenerator(mq_spec))
141+
mq_result = await mq_stage.wait_for_resource()
142+
143+
event_dispatcher_stage = EventDispatcherStage(EventDispatcherProvisioner())
144+
event_dispatcher_spec = EventDispatcherSpec(
145+
message_queue=mq_result.message_queue,
146+
log_events=local_config.debug.log_events,
147+
event_observer=monitor_result.metric_registry.event,
148+
source_id=None,
149+
)
150+
await event_dispatcher_stage.setup(EventDispatcherSpecGenerator(event_dispatcher_spec))
151+
event_dispatcher_result = await event_dispatcher_stage.wait_for_resource()
152+
153+
valkey_client_stage = ValkeyClientStage(ValkeyClientProvisioner())
154+
valkey_client_spec = ValkeyClientSpec(redis_profile_target)
155+
await valkey_client_stage.setup(ValkeyClientSpecGenerator(valkey_client_spec))
156+
valkey_client_result = await valkey_client_stage.wait_for_resource()
157+
158+
volume_pool_stage = VolumePoolStage(VolumePoolProvisioner())
159+
volume_pool_spec = VolumePoolSpec(
160+
local_config=local_config,
161+
etcd=etcd_result.etcd,
162+
event_dispatcher=event_dispatcher_result.event_dispatcher,
163+
event_producer=event_dispatcher_result.event_producer,
164+
)
165+
await volume_pool_stage.setup(VolumePoolSpecGenerator(volume_pool_spec))
166+
volume_pool_result = await volume_pool_stage.wait_for_resource()
167+
168+
bgtask_manager_stage = BgtaskManagerStage(BgtaskManagerProvisioner())
169+
bgtask_manager_spec = BgtaskManagerSpec(
170+
volume_pool=volume_pool_result.volume_pool,
171+
valkey_client=valkey_client_result.bgtask_client,
172+
event_producer=event_dispatcher_result.event_producer,
173+
node_id=local_config.storage_proxy.node_id,
174+
tags=[ROOT_PRIVILEGED_TAG],
175+
)
176+
await bgtask_manager_stage.setup(BgtaskManagerSpecGenerator(bgtask_manager_spec))
177+
bgtask_manager_result = await bgtask_manager_stage.wait_for_resource()
178+
179+
return BootstrapResult(
180+
logger=logger_result,
181+
monitor=monitor_result,
182+
etcd=etcd_result,
183+
redis_config=redis_config_result,
184+
message_queue=mq_result,
185+
event_dispatcher=event_dispatcher_result,
186+
valkey_client=valkey_client_result,
187+
volume_pool=volume_pool_result,
188+
bgtask_manager=bgtask_manager_result,
189+
)
190+
191+
@override
192+
async def teardown(self, resource: BootstrapResult) -> None:
193+
pass
194+
195+
196+
class BootstrapStage(ProvisionStage[BootstrapSpec, BootstrapResult]):
197+
pass

src/ai/backend/storage/privileged/bootstrap/stage/__init__.py

Whitespace-only changes.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from collections.abc import Iterable
2+
from dataclasses import dataclass
3+
from typing import override
4+
5+
from ai.backend.common.bgtask.bgtask import BackgroundTaskManager
6+
from ai.backend.common.bgtask.task.registry import BackgroundTaskHandlerRegistry
7+
from ai.backend.common.clients.valkey_client.valkey_bgtask.client import ValkeyBgtaskClient
8+
from ai.backend.common.events.dispatcher import EventProducer
9+
from ai.backend.common.stage.types import (
10+
ArgsSpecGenerator,
11+
Provisioner,
12+
ProvisionStage,
13+
)
14+
15+
from ....bgtask.tasks.delete import VFolderDeleteTaskHandler
16+
from ....volumes.pool import VolumePool
17+
18+
19+
@dataclass
20+
class BgtaskManagerSpec:
21+
volume_pool: VolumePool
22+
valkey_client: ValkeyBgtaskClient
23+
event_producer: EventProducer
24+
node_id: str
25+
tags: Iterable[str]
26+
27+
28+
class BgtaskManagerSpecGenerator(ArgsSpecGenerator[BgtaskManagerSpec]):
29+
pass
30+
31+
32+
@dataclass
33+
class BgtaskManagerResult:
34+
bgtask_mgr: BackgroundTaskManager
35+
36+
37+
class BgtaskManagerProvisioner(Provisioner[BgtaskManagerSpec, BgtaskManagerResult]):
38+
@property
39+
@override
40+
def name(self) -> str:
41+
return "storage-worker-bgtask"
42+
43+
@override
44+
async def setup(self, spec: BgtaskManagerSpec) -> BgtaskManagerResult:
45+
registry = self._register_bgtask_handlers(spec.volume_pool, spec.event_producer)
46+
bgtask_mgr = BackgroundTaskManager(
47+
event_producer=spec.event_producer,
48+
task_registry=registry,
49+
valkey_client=spec.valkey_client,
50+
server_id=spec.node_id,
51+
tags=spec.tags,
52+
)
53+
return BgtaskManagerResult(bgtask_mgr)
54+
55+
def _register_bgtask_handlers(
56+
self, volume_pool: VolumePool, event_producer: EventProducer
57+
) -> BackgroundTaskHandlerRegistry:
58+
registry = BackgroundTaskHandlerRegistry()
59+
registry.register(VFolderDeleteTaskHandler(volume_pool, event_producer))
60+
return registry
61+
62+
@override
63+
async def teardown(self, resource: BgtaskManagerResult) -> None:
64+
pass
65+
66+
67+
class BgtaskManagerStage(ProvisionStage[BgtaskManagerSpec, BgtaskManagerResult]):
68+
pass
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from dataclasses import dataclass
2+
from typing import override
3+
4+
from ai.backend.common.etcd import AsyncEtcd
5+
from ai.backend.common.stage.types import (
6+
ArgsSpecGenerator,
7+
Provisioner,
8+
ProvisionStage,
9+
)
10+
11+
from ....config.loaders import make_etcd
12+
from ...config import StorageProxyPrivilegedWorkerConfig
13+
14+
15+
@dataclass
16+
class EtcdSpec:
17+
local_config: StorageProxyPrivilegedWorkerConfig
18+
19+
20+
class EtcdSpecGenerator(ArgsSpecGenerator[EtcdSpec]):
21+
pass
22+
23+
24+
@dataclass
25+
class EtcdResult:
26+
etcd: AsyncEtcd
27+
28+
29+
class EtcdProvisioner(Provisioner[EtcdSpec, EtcdResult]):
30+
@property
31+
@override
32+
def name(self) -> str:
33+
return "storage-worker-etcd"
34+
35+
@override
36+
async def setup(self, spec: EtcdSpec) -> EtcdResult:
37+
etcd = make_etcd(spec.local_config)
38+
return EtcdResult(etcd)
39+
40+
@override
41+
async def teardown(self, resource: EtcdResult) -> None:
42+
pass
43+
44+
45+
class EtcdStage(ProvisionStage[EtcdSpec, EtcdResult]):
46+
pass

0 commit comments

Comments
 (0)