Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openviking/storage/content_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async def _prepare_temp_write(
mode: str,
ctx: RequestContext,
) -> tuple[str, str]:
temp_base = self._viking_fs.create_temp_uri()
temp_base = self._viking_fs.create_temp_uri(ctx=ctx)
await self._viking_fs.mkdir(temp_base, exist_ok=True, ctx=ctx)
root_name = root_uri.rstrip("/").split("/")[-1]
temp_root_uri = f"{temp_base.rstrip('/')}/{root_name}"
Expand Down
50 changes: 39 additions & 11 deletions openviking/storage/viking_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,13 @@ def _ensure_access(self, uri: str, ctx: Optional[RequestContext]) -> None:
if not self._is_accessible(normalized_uri, real_ctx):
raise PermissionError(f"Access denied for {uri}")

def _ensure_mutable_access(self, uri: str, ctx: Optional[RequestContext]) -> None:
self._ensure_access(uri, ctx)
real_ctx = self._ctx_or_default(ctx)
normalized_uri, _ = self._normalized_uri_parts(uri)
if real_ctx.role != Role.ROOT and normalized_uri.rstrip("/") == "viking://temp":
raise PermissionError("Temp root is read-only for non-root users")

# ========== AGFS Basic Commands ==========

async def read(
Expand Down Expand Up @@ -326,7 +333,7 @@ async def write(
ctx: Optional[RequestContext] = None,
) -> str:
"""Write file"""
self._ensure_access(uri, ctx)
self._ensure_mutable_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
if isinstance(data, str):
data = data.encode("utf-8")
Expand All @@ -342,7 +349,7 @@ async def mkdir(
ctx: Optional[RequestContext] = None,
) -> None:
"""Create directory."""
self._ensure_access(uri, ctx)
self._ensure_mutable_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
# Always ensure parent directories exist before creating this directory
await self._ensure_parent_dirs(path)
Expand Down Expand Up @@ -375,7 +382,7 @@ async def rm(
from openviking.storage.errors import LockAcquisitionError, ResourceBusyError
from openviking.storage.transaction import LockContext, get_lock_manager

self._ensure_access(uri, ctx)
self._ensure_mutable_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
target_uri = self._path_to_uri(path, ctx=ctx)

Expand Down Expand Up @@ -429,8 +436,8 @@ async def mv(
from openviking.pyagfs.helpers import cp as agfs_cp
from openviking.storage.transaction import LockContext, get_lock_manager

self._ensure_access(old_uri, ctx)
self._ensure_access(new_uri, ctx)
self._ensure_mutable_access(old_uri, ctx)
self._ensure_mutable_access(new_uri, ctx)
old_path = self._uri_to_path(old_uri, ctx=ctx)
new_path = self._uri_to_path(new_uri, ctx=ctx)
target_uri = self._path_to_uri(old_path, ctx=ctx)
Expand Down Expand Up @@ -1277,12 +1284,17 @@ def _path_to_uri(self, path: str, ctx: Optional[RequestContext] = None) -> str:
else:
return f"viking://{path}"

def _looks_like_legacy_temp_leaf(self, value: str) -> bool:
return bool(re.match(r"^\d{8}_[0-9a-f]{6}$", value or ""))

def _extract_space_from_uri(self, uri: str) -> Optional[str]:
"""Extract space segment from URI if present.

URIs are WYSIWYG: viking://{scope}/{space}/...
For user/agent, the second segment is space unless it's a known structure dir.
For session, the second segment is always space (when 3+ parts).
Legacy temp URIs keep the historical shape viking://temp/<temp-id> and therefore
intentionally have no space segment.
"""
_, parts = self._normalized_uri_parts(uri)
if len(parts) < 2:
Expand All @@ -1292,6 +1304,8 @@ def _extract_space_from_uri(self, uri: str) -> Optional[str]:
# Treat scope-root metadata files as not having a tenant space segment.
if len(parts) == 2 and second in {".abstract.md", ".overview.md"}:
return None
if scope == "temp" and self._looks_like_legacy_temp_leaf(second):
return None
if scope == "user" and second not in self._USER_STRUCTURE_DIRS:
return second
if scope == "agent" and second not in self._AGENT_STRUCTURE_DIRS:
Expand All @@ -1309,8 +1323,14 @@ def _is_accessible(self, uri: str, ctx: RequestContext) -> bool:
return True

scope = parts[0]
if scope in {"resources", "temp"}:
if scope == "resources":
return True
if scope == "temp":
if len(parts) == 1:
return True
if parts[1] == ctx.user.user_space_name():
return True
return bool(re.fullmatch(r"\d{8}_[0-9a-f]{6}", parts[1]))
if scope == "_system":
return False

Expand Down Expand Up @@ -1860,8 +1880,8 @@ async def move_file(
ctx: Optional[RequestContext] = None,
) -> None:
"""Move file."""
self._ensure_access(from_uri, ctx)
self._ensure_access(to_uri, ctx)
self._ensure_mutable_access(from_uri, ctx)
self._ensure_mutable_access(to_uri, ctx)
from_path = self._uri_to_path(from_uri, ctx=ctx)

content_bytes = await self.read_file_bytes(from_uri, ctx=ctx)
Expand All @@ -1870,12 +1890,20 @@ async def move_file(

# ========== Temp File Operations (backward compatible) ==========

def create_temp_uri(self) -> str:
"""Create temp directory URI."""
return VikingURI.create_temp_uri()
def create_temp_uri(self, ctx: Optional[RequestContext] = None) -> str:
"""Create a temp directory URI.

- explicit ctx or bound request context -> user-scoped temp URI
- no explicit/bound context -> legacy temp URI shape for backward compatibility
"""
real_ctx = ctx if ctx is not None else self._bound_ctx.get()
if real_ctx is None:
return VikingURI.create_temp_uri()
return VikingURI.create_temp_uri(space=real_ctx.user.user_space_name())

async def delete_temp(self, temp_uri: str, ctx: Optional[RequestContext] = None) -> None:
"""Delete temp directory and its contents."""
self._ensure_mutable_access(temp_uri, ctx)
path = self._uri_to_path(temp_uri, ctx=ctx)
try:
for entry in self._ls_entries(path):
Expand Down
19 changes: 16 additions & 3 deletions openviking_cli/utils/uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,23 @@ def normalize(uri: str) -> str:
return f"{VikingURI.SCHEME}://{uri}"

@classmethod
def create_temp_uri(cls) -> str:
"""Create temp directory URI like viking://temp/MMDDHHMM_XXXXXX"""
def create_temp_uri(cls, space: Optional[str] = None) -> str:
"""Create temp directory URI.

When ``space`` is provided, generate a user-scoped temp URI like
``viking://temp/<space>/MMDDHHMM_XXXXXX``. This preserves isolation
between users sharing the same account while keeping temp data in the
temp scope.

When ``space`` is omitted, fall back to the legacy shape
``viking://temp/MMDDHHMM_XXXXXX`` for compatibility with callers that
do not have a user context.
"""
import datetime
import uuid

temp_id = uuid.uuid4().hex[:6]
return f"viking://temp/{datetime.datetime.now().strftime('%m%d%H%M')}_{temp_id}"
temp_leaf = f"{datetime.datetime.now().strftime('%m%d%H%M')}_{temp_id}"
if space:
return f"viking://temp/{space}/{temp_leaf}"
return f"viking://temp/{temp_leaf}"
Loading