diff --git a/openviking/server/routers/observer.py b/openviking/server/routers/observer.py index e2c626c25..78ecd3764 100644 --- a/openviking/server/routers/observer.py +++ b/openviking/server/routers/observer.py @@ -92,6 +92,16 @@ async def observer_retrieval( return Response(status="ok", result=_component_to_dict(component)) +@router.get("/usage") +async def observer_usage( + ctx: RequestContext = Depends(get_request_context), +): + """Get context usage metrics (vector count, breakdowns).""" + service = get_service() + component = service.debug.observer.usage(ctx=ctx) + return Response(status="ok", result=_component_to_dict(component)) + + @router.get("/system") async def observer_system( ctx: RequestContext = Depends(get_request_context), diff --git a/openviking/service/debug_service.py b/openviking/service/debug_service.py index 955f97d61..229eeeee4 100644 --- a/openviking/service/debug_service.py +++ b/openviking/service/debug_service.py @@ -14,6 +14,7 @@ ModelsObserver, QueueObserver, RetrievalObserver, + UsageObserver, VikingDBObserver, ) from openviking.storage.queuefs import get_queue_manager @@ -185,6 +186,16 @@ def retrieval(self) -> ComponentStatus: status=observer.get_status_table(), ) + def usage(self, ctx: Optional[RequestContext] = None) -> ComponentStatus: + """Get context usage metrics.""" + observer = UsageObserver(self._vikingdb) + return ComponentStatus( + name="usage", + is_healthy=observer.is_healthy(), + has_errors=observer.has_errors(), + status=observer.get_status_table(ctx=ctx), + ) + def system(self, ctx: Optional[RequestContext] = None) -> SystemStatus: """Get system overall status.""" components = { diff --git a/openviking/storage/observers/__init__.py b/openviking/storage/observers/__init__.py index c8b643c6b..b953ff309 100644 --- a/openviking/storage/observers/__init__.py +++ b/openviking/storage/observers/__init__.py @@ -10,6 +10,7 @@ ) from .queue_observer import QueueObserver from .retrieval_observer import RetrievalObserver +from .usage_observer import UsageObserver from .vikingdb_observer import VikingDBObserver __all__ = [ @@ -21,5 +22,6 @@ "set_prometheus_observer", "QueueObserver", "RetrievalObserver", + "UsageObserver", "VikingDBObserver", ] diff --git a/openviking/storage/observers/usage_observer.py b/openviking/storage/observers/usage_observer.py new file mode 100644 index 000000000..4ed172ab9 --- /dev/null +++ b/openviking/storage/observers/usage_observer.py @@ -0,0 +1,72 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +""" +UsageObserver: Context usage metrics observer. + +Provides vector count, memory count, resource count, and session count +breakdowns for the observer API. +""" + +from typing import Any, Dict, Optional + +from openviking.server.identity import RequestContext +from openviking.storage.observers.base_observer import BaseObserver +from openviking.storage.vikingdb_manager import VikingDBManager +from openviking_cli.utils import run_async +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class UsageObserver(BaseObserver): + """Observes context usage metrics: vectors, memories, resources, sessions.""" + + def __init__(self, vikingdb_manager: Optional[VikingDBManager] = None): + self._vikingdb = vikingdb_manager + self._last_usage: Dict[str, Any] = {} + + async def get_usage_async(self, ctx: Optional[RequestContext] = None) -> Dict[str, Any]: + """Collect usage metrics. + + Returns: + Dict with total_vectors and breakdown by context type. + """ + result: Dict[str, Any] = { + "total_vectors": 0, + } + + # Vector count from VikingDB + if self._vikingdb is not None: + try: + if await self._vikingdb.collection_exists(): + result["total_vectors"] = await self._vikingdb.count() + except Exception as e: + logger.warning(f"Failed to get vector count: {e}") + result["total_vectors"] = -1 + + self._last_usage = result + return result + + def get_usage(self, ctx: Optional[RequestContext] = None) -> Dict[str, Any]: + """Synchronous wrapper for get_usage_async.""" + return run_async(self.get_usage_async(ctx=ctx)) + + def get_status_table(self, ctx: Optional[RequestContext] = None) -> str: + """Format usage metrics as a table.""" + from tabulate import tabulate + + usage = self.get_usage(ctx=ctx) + + data = [ + {"Metric": "Total Vectors", "Value": usage.get("total_vectors", 0)}, + ] + + return tabulate(data, headers="keys", tablefmt="pretty") + + def is_healthy(self) -> bool: + """Usage observer is healthy if VikingDB is available.""" + return self._vikingdb is not None + + def has_errors(self) -> bool: + """Check if the last usage query had errors.""" + return self._last_usage.get("total_vectors", 0) == -1