Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions openviking/server/routers/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ async def observer_retrieval(
return Response(status="ok", result=_component_to_dict(component))


@router.get("/usage")
async def observer_usage(
ctx: RequestContext = Depends(get_request_context),
):
"""Get context usage metrics (vector count, breakdowns)."""
service = get_service()
component = service.debug.observer.usage(ctx=ctx)
return Response(status="ok", result=_component_to_dict(component))


@router.get("/system")
async def observer_system(
ctx: RequestContext = Depends(get_request_context),
Expand Down
11 changes: 11 additions & 0 deletions openviking/service/debug_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ModelsObserver,
QueueObserver,
RetrievalObserver,
UsageObserver,
VikingDBObserver,
)
from openviking.storage.queuefs import get_queue_manager
Expand Down Expand Up @@ -185,6 +186,16 @@ def retrieval(self) -> ComponentStatus:
status=observer.get_status_table(),
)

def usage(self, ctx: Optional[RequestContext] = None) -> ComponentStatus:
"""Get context usage metrics."""
observer = UsageObserver(self._vikingdb)
return ComponentStatus(
name="usage",
is_healthy=observer.is_healthy(),
has_errors=observer.has_errors(),
status=observer.get_status_table(ctx=ctx),
)

def system(self, ctx: Optional[RequestContext] = None) -> SystemStatus:
"""Get system overall status."""
components = {
Expand Down
2 changes: 2 additions & 0 deletions openviking/storage/observers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from .queue_observer import QueueObserver
from .retrieval_observer import RetrievalObserver
from .usage_observer import UsageObserver
from .vikingdb_observer import VikingDBObserver

__all__ = [
Expand All @@ -21,5 +22,6 @@
"set_prometheus_observer",
"QueueObserver",
"RetrievalObserver",
"UsageObserver",
"VikingDBObserver",
]
72 changes: 72 additions & 0 deletions openviking/storage/observers/usage_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0
"""
UsageObserver: Context usage metrics observer.

Provides vector count, memory count, resource count, and session count
breakdowns for the observer API.
"""

from typing import Any, Dict, Optional

from openviking.server.identity import RequestContext
from openviking.storage.observers.base_observer import BaseObserver
from openviking.storage.vikingdb_manager import VikingDBManager
from openviking_cli.utils import run_async
from openviking_cli.utils.logger import get_logger

logger = get_logger(__name__)


class UsageObserver(BaseObserver):
"""Observes context usage metrics: vectors, memories, resources, sessions."""

def __init__(self, vikingdb_manager: Optional[VikingDBManager] = None):
self._vikingdb = vikingdb_manager
self._last_usage: Dict[str, Any] = {}

async def get_usage_async(self, ctx: Optional[RequestContext] = None) -> Dict[str, Any]:
"""Collect usage metrics.

Returns:
Dict with total_vectors and breakdown by context type.
"""
result: Dict[str, Any] = {
"total_vectors": 0,
}

# Vector count from VikingDB
if self._vikingdb is not None:
try:
if await self._vikingdb.collection_exists():
result["total_vectors"] = await self._vikingdb.count()
except Exception as e:
logger.warning(f"Failed to get vector count: {e}")
result["total_vectors"] = -1

self._last_usage = result
return result

def get_usage(self, ctx: Optional[RequestContext] = None) -> Dict[str, Any]:
"""Synchronous wrapper for get_usage_async."""
return run_async(self.get_usage_async(ctx=ctx))

def get_status_table(self, ctx: Optional[RequestContext] = None) -> str:
"""Format usage metrics as a table."""
from tabulate import tabulate

usage = self.get_usage(ctx=ctx)

data = [
{"Metric": "Total Vectors", "Value": usage.get("total_vectors", 0)},
]

return tabulate(data, headers="keys", tablefmt="pretty")

def is_healthy(self) -> bool:
"""Usage observer is healthy if VikingDB is available."""
return self._vikingdb is not None

def has_errors(self) -> bool:
"""Check if the last usage query had errors."""
return self._last_usage.get("total_vectors", 0) == -1
Loading