Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions openviking/resource/watch_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

from pydantic import BaseModel, Field

from openviking.resource.watch_storage import (
WATCH_TASK_STORAGE_BAK_URI,
WATCH_TASK_STORAGE_TMP_URI,
WATCH_TASK_STORAGE_URI,
)
from openviking_cli.exceptions import ConflictError, NotFoundError
from openviking_cli.utils.logger import get_logger

Expand Down Expand Up @@ -110,9 +115,9 @@ class WatchManager:
Supports multi-tenant authorization.
"""

STORAGE_URI = "viking://resources/.watch_tasks.json"
STORAGE_BAK_URI = "viking://resources/.watch_tasks.json.bak"
STORAGE_TMP_URI = "viking://resources/.watch_tasks.json.tmp"
STORAGE_URI = WATCH_TASK_STORAGE_URI
STORAGE_BAK_URI = WATCH_TASK_STORAGE_BAK_URI
STORAGE_TMP_URI = WATCH_TASK_STORAGE_TMP_URI

def __init__(self, viking_fs: Optional[Any] = None):
"""Initialize WatchManager.
Expand Down
26 changes: 26 additions & 0 deletions openviking/resource/watch_storage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0
"""Shared constants and helpers for watch-task persistence storage."""

from __future__ import annotations


WATCH_TASK_STORAGE_URI = "viking://resources/.watch_tasks.json"
WATCH_TASK_STORAGE_BAK_URI = "viking://resources/.watch_tasks.json.bak"
WATCH_TASK_STORAGE_TMP_URI = "viking://resources/.watch_tasks.json.tmp"

WATCH_TASK_CONTROL_URIS = frozenset(
{
WATCH_TASK_STORAGE_URI,
WATCH_TASK_STORAGE_BAK_URI,
WATCH_TASK_STORAGE_TMP_URI,
}
)


def is_watch_task_control_uri(uri: str) -> bool:
"""Return True when a URI points at internal watch-task control state."""
if not isinstance(uri, str):
return False
normalized = uri.rstrip("/")
return normalized in WATCH_TASK_CONTROL_URIS
3 changes: 3 additions & 0 deletions openviking/storage/content_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
from typing import Any, Dict, Optional

from openviking.resource.watch_storage import is_watch_task_control_uri
from openviking.server.identity import RequestContext
from openviking.session.memory.utils.content import deserialize_full, serialize_with_metadata
from openviking.storage.queuefs import SemanticMsg, get_queue_manager
Expand Down Expand Up @@ -135,6 +136,8 @@ def _validate_target_uri(self, uri: str) -> None:
name = uri.rstrip("/").split("/")[-1]
if name in _DERIVED_FILENAMES:
raise InvalidArgumentError(f"cannot write derived semantic file directly: {uri}")
if is_watch_task_control_uri(uri):
raise InvalidArgumentError(f"cannot write watch task control file directly: {uri}")

parsed = VikingURI(uri)
if parsed.scope not in {"resources", "user", "agent"}:
Expand Down
3 changes: 3 additions & 0 deletions openviking/storage/viking_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

from openviking.pyagfs.exceptions import AGFSClientError, AGFSHTTPError
from openviking.resource.watch_storage import is_watch_task_control_uri
from openviking.server.identity import RequestContext, Role
from openviking.telemetry import get_current_telemetry
from openviking.utils.time_utils import format_simplified, get_current_timestamp, parse_iso_datetime
Expand Down Expand Up @@ -1307,6 +1308,8 @@ def _is_accessible(self, uri: str, ctx: RequestContext) -> bool:
return True
if not parts:
return True
if is_watch_task_control_uri(normalized_uri):
return False

scope = parts[0]
if scope in {"resources", "temp"}:
Expand Down
106 changes: 106 additions & 0 deletions tests/server/test_watch_task_acl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0

"""Regression tests for watch-task control file access boundaries."""

import contextvars

import pytest

from openviking.resource.watch_storage import (
WATCH_TASK_STORAGE_BAK_URI,
WATCH_TASK_STORAGE_TMP_URI,
WATCH_TASK_STORAGE_URI,
)
from openviking.server.identity import RequestContext, Role
from openviking.storage.content_write import ContentWriteCoordinator
from openviking.storage.viking_fs import VikingFS
from openviking_cli.exceptions import InvalidArgumentError
from openviking_cli.session.user_id import UserIdentifier


@pytest.fixture
def root_ctx() -> RequestContext:
return RequestContext(user=UserIdentifier.the_default_user(), role=Role.ROOT)


@pytest.fixture
def user_ctx() -> RequestContext:
return RequestContext(user=UserIdentifier("default", "alice", "default"), role=Role.USER)


@pytest.fixture
def bare_viking_fs() -> VikingFS:
fs = object.__new__(VikingFS)
fs._bound_ctx = contextvars.ContextVar("vikingfs_bound_ctx", default=None)
return fs


@pytest.mark.parametrize(
"uri",
[
WATCH_TASK_STORAGE_URI,
WATCH_TASK_STORAGE_BAK_URI,
WATCH_TASK_STORAGE_TMP_URI,
],
)
def test_watch_task_control_files_are_root_only(bare_viking_fs, root_ctx, user_ctx, uri):
assert bare_viking_fs._is_accessible(uri, root_ctx) is True
assert bare_viking_fs._is_accessible(uri, user_ctx) is False

with pytest.raises(PermissionError):
bare_viking_fs._ensure_access(uri, user_ctx)


@pytest.mark.asyncio
async def test_hidden_listing_filters_watch_task_control_files_for_non_root(
bare_viking_fs, root_ctx, user_ctx
):
bare_viking_fs._uri_to_path = lambda uri, ctx=None: "/fake/resources"
bare_viking_fs._ctx_or_default = lambda ctx=None: ctx
bare_viking_fs._ls_entries = lambda path: [
{"name": ".watch_tasks.json", "isDir": False, "size": 10, "modTime": "2026-01-01T00:00:00+00:00"},
{"name": ".watch_tasks.json.bak", "isDir": False, "size": 10, "modTime": "2026-01-01T00:00:00+00:00"},
{"name": ".watch_tasks.json.tmp", "isDir": False, "size": 10, "modTime": "2026-01-01T00:00:00+00:00"},
{"name": "public.txt", "isDir": False, "size": 5, "modTime": "2026-01-01T00:00:00+00:00"},
]
bare_viking_fs._path_to_uri = lambda path, ctx=None: f"viking://resources/{path.split('/')[-1]}"

root_entries = await bare_viking_fs._ls_original(
"viking://resources",
show_all_hidden=True,
ctx=root_ctx,
)
root_uris = {entry["uri"] for entry in root_entries}
assert root_uris >= {
WATCH_TASK_STORAGE_URI,
WATCH_TASK_STORAGE_BAK_URI,
WATCH_TASK_STORAGE_TMP_URI,
"viking://resources/public.txt",
}

user_entries = await bare_viking_fs._ls_original(
"viking://resources",
show_all_hidden=True,
ctx=user_ctx,
)
user_uris = {entry["uri"] for entry in user_entries}
assert "viking://resources/public.txt" in user_uris
assert WATCH_TASK_STORAGE_URI not in user_uris
assert WATCH_TASK_STORAGE_BAK_URI not in user_uris
assert WATCH_TASK_STORAGE_TMP_URI not in user_uris


@pytest.mark.parametrize(
"uri",
[
WATCH_TASK_STORAGE_URI,
WATCH_TASK_STORAGE_BAK_URI,
WATCH_TASK_STORAGE_TMP_URI,
],
)
def test_content_write_rejects_watch_task_control_files(uri):
coordinator = object.__new__(ContentWriteCoordinator)

with pytest.raises(InvalidArgumentError, match="watch task control file"):
coordinator._validate_target_uri(uri)