Skip to content
1 change: 1 addition & 0 deletions changes/6222.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement periodic polling and synchronization for remote reservoir
12 changes: 12 additions & 0 deletions docs/manager/graphql-reference/supergraph.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ enum ArtifactRemoteStatus
{
SCANNED @join__enumValue(graph: STRAWBERRY)
AVAILABLE @join__enumValue(graph: STRAWBERRY)
FAILED @join__enumValue(graph: STRAWBERRY)
}

"""Added in 25.14.0"""
Expand Down Expand Up @@ -414,6 +415,9 @@ input ArtifactRevisionFilter
@join__type(graph: STRAWBERRY)
{
status: ArtifactRevisionStatusFilter = null

"""Added in 25.16.0"""
remoteStatus: ArtifactRevisionRemoteStatusFilter = null
version: StringFilter = null
artifactId: ID = null
size: IntFilter = null
Expand Down Expand Up @@ -448,6 +452,14 @@ enum ArtifactRevisionOrderField
STATUS @join__enumValue(graph: STRAWBERRY)
}

"""Added in 25.16.0"""
input ArtifactRevisionRemoteStatusFilter
@join__type(graph: STRAWBERRY)
{
in: [ArtifactRemoteStatus!] = null
equals: ArtifactRemoteStatus = null
}

"""Added in 25.14.0"""
input ArtifactRevisionStatusFilter
@join__type(graph: STRAWBERRY)
Expand Down
10 changes: 10 additions & 0 deletions docs/manager/graphql-reference/v2-schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ enum ArtifactRegistryType {
enum ArtifactRemoteStatus {
SCANNED
AVAILABLE
FAILED
}

"""Added in 25.14.0"""
Expand Down Expand Up @@ -191,6 +192,9 @@ type ArtifactRevisionEdge {
"""Added in 25.14.0"""
input ArtifactRevisionFilter {
status: ArtifactRevisionStatusFilter = null

"""Added in 25.16.0"""
remoteStatus: ArtifactRevisionRemoteStatusFilter = null
version: StringFilter = null
artifactId: ID = null
size: IntFilter = null
Expand Down Expand Up @@ -219,6 +223,12 @@ enum ArtifactRevisionOrderField {
STATUS
}

"""Added in 25.16.0"""
input ArtifactRevisionRemoteStatusFilter {
in: [ArtifactRemoteStatus!] = null
equals: ArtifactRemoteStatus = null
}

"""Added in 25.14.0"""
input ArtifactRevisionStatusFilter {
in: [ArtifactStatus!] = null
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/common/events/event_types/artifact/anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ class ModelImportDoneEvent(BaseArtifactEvent):
revision: str
registry_type: ArtifactRegistryType
registry_name: str
success: bool

@classmethod
@override
def event_name(cls) -> str:
return "model_import_done"

def serialize(self) -> tuple:
return (self.model_id, self.revision, self.registry_type, self.registry_name)
return (self.model_id, self.revision, self.registry_type, self.registry_name, self.success)

@classmethod
def deserialize(cls, value: tuple):
Expand All @@ -50,6 +51,7 @@ def deserialize(cls, value: tuple):
revision=value[1],
registry_type=value[2],
registry_name=value[3],
success=value[4],
)

@override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,37 @@ def domain_id(self) -> Optional[str]:
@override
def user_event(self) -> Optional[UserEvent]:
return None


class DoPullReservoirRegistryEvent(AbstractAnycastEvent):
"""Event to trigger reservoir registry artifact pulling from remote storage."""

def __init__(self) -> None:
pass

@override
def serialize(self) -> tuple:
return ()

@classmethod
@override
def deserialize(cls, value: tuple) -> Self:
return cls()

@classmethod
@override
def event_name(cls) -> str:
return "do_pull_reservoir_registry"

@classmethod
@override
def event_domain(cls) -> EventDomain:
return EventDomain.ARTIFACT

@override
def domain_id(self) -> Optional[str]:
return None

@override
def user_event(self) -> Optional[UserEvent]:
return None
22 changes: 22 additions & 0 deletions src/ai/backend/manager/api/gql/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from ai.backend.manager.repositories.artifact.types import (
ArtifactFilterOptions,
ArtifactOrderingOptions,
ArtifactRemoteStatusFilter,
ArtifactRemoteStatusFilterType,
ArtifactRevisionFilterOptions,
ArtifactRevisionOrderingOptions,
ArtifactStatusFilter,
Expand Down Expand Up @@ -123,9 +125,18 @@ class ArtifactRevisionStatusFilter:
equals: Optional[ArtifactStatus] = None


@strawberry.input(description="Added in 25.16.0")
class ArtifactRevisionRemoteStatusFilter:
in_: Optional[list[ArtifactRemoteStatus]] = strawberry.field(name="in", default=None)
equals: Optional[ArtifactRemoteStatus] = None


@strawberry.input(description="Added in 25.14.0")
class ArtifactRevisionFilter:
status: Optional[ArtifactRevisionStatusFilter] = None
remote_status: Optional[ArtifactRevisionRemoteStatusFilter] = strawberry.field(
default=None, description="Added in 25.16.0"
)
version: Optional[StringFilter] = None
artifact_id: Optional[ID] = None
size: Optional[IntFilter] = None
Expand All @@ -152,6 +163,17 @@ def to_repo_filter(self) -> ArtifactRevisionFilterOptions:
type=ArtifactStatusFilterType.EQUALS, values=[self.status.equals]
)

# Handle remote_status filter using ArtifactRevisionRemoteStatusFilter
if self.remote_status:
if self.remote_status.in_:
repo_filter.remote_status_filter = ArtifactRemoteStatusFilter(
type=ArtifactRemoteStatusFilterType.IN, values=self.remote_status.in_
)
elif self.remote_status.equals:
repo_filter.remote_status_filter = ArtifactRemoteStatusFilter(
type=ArtifactRemoteStatusFilterType.EQUALS, values=[self.remote_status.equals]
)

# Pass StringFilter directly for processing in repository
repo_filter.version_filter = self.version

Expand Down
3 changes: 3 additions & 0 deletions src/ai/backend/manager/data/artifact/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class ArtifactRemoteStatus(enum.StrEnum):

SCANNED = "SCANNED"
AVAILABLE = "AVAILABLE"
FAILED = "FAILED"


class ArtifactAvailability(enum.StrEnum):
Expand Down Expand Up @@ -86,6 +87,7 @@ class ArtifactRevisionResponseData:
version: str
size: Optional[int]
status: ArtifactStatus
remote_status: Optional[ArtifactRemoteStatus]
created_at: Optional[datetime]
updated_at: Optional[datetime]

Expand All @@ -97,6 +99,7 @@ def from_revision_data(cls, data: ArtifactRevisionData) -> Self:
version=data.version,
size=data.size,
status=data.status,
remote_status=data.remote_status,
created_at=data.created_at,
updated_at=data.updated_at,
)
Expand Down
7 changes: 7 additions & 0 deletions src/ai/backend/manager/event_dispatcher/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ModelMetadataFetchDoneEvent,
)
from ai.backend.common.events.event_types.artifact_registry.anycast import (
DoPullReservoirRegistryEvent,
DoScanReservoirRegistryEvent,
)
from ai.backend.common.events.event_types.bgtask.broadcast import (
Expand Down Expand Up @@ -218,6 +219,7 @@ def __init__(self, args: DispatcherArgs) -> None:
args.repositories.reservoir_registry.repository,
args.repositories.object_storage.repository,
args.storage_manager,
args.config_provider,
)

def dispatch(self, event_dispatcher: EventDispatcher) -> None:
Expand Down Expand Up @@ -589,6 +591,11 @@ def _dispatch_artifact_registry_events(self, event_dispatcher: EventDispatcher)
None,
self._artifact_registry_event_handler.handle_artifact_registry_scan,
)
evd.consume(
DoPullReservoirRegistryEvent,
None,
self._artifact_registry_event_handler.handle_artifact_registry_pull,
)

def _dispatch_idle_check_events(
self,
Expand Down
7 changes: 7 additions & 0 deletions src/ai/backend/manager/event_dispatcher/handlers/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ async def handle_model_import_done(
artifact.id, revision=event.revision
)

if event.success is False:
log.warning("Model import failed: {} revision: {}", event.model_id, event.revision)
await self._artifact_repository.update_artifact_revision_status(
revision.id, ArtifactStatus.FAILED
)
return
Comment on lines +79 to +84
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not event.success:


try:
if self._config_provider.config.reservoir.enable_approve_process:
await self._artifact_repository.update_artifact_revision_status(
Expand Down
Loading
Loading