Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 49 additions & 14 deletions openviking/storage/viking_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import hashlib
import json
import re
from contextlib import contextmanager
from contextlib import asynccontextmanager, contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import PurePath
Expand All @@ -40,6 +40,17 @@
logger = get_logger(__name__)


@asynccontextmanager
async def _noop_async_lock() -> Any:
yield


@asynccontextmanager
async def _asyncio_lock_context(lock: asyncio.Lock) -> Any:
async with lock:
yield


# ========== Dataclass ==========


Expand Down Expand Up @@ -179,6 +190,19 @@ def __init__(
self._bound_ctx: contextvars.ContextVar[Optional[RequestContext]] = contextvars.ContextVar(
"vikingfs_bound_ctx", default=None
)
self._append_fallback_locks: Dict[str, asyncio.Lock] = {}

def _append_lock_context(self, path: str):
try:
from openviking.storage.transaction import LockContext, get_lock_manager

return LockContext(get_lock_manager(), [path], lock_mode="point")
except RuntimeError:
lock = self._append_fallback_locks.get(path)
if lock is None:
lock = asyncio.Lock()
self._append_fallback_locks[path] = lock
return _asyncio_lock_context(lock)

@staticmethod
def _default_ctx() -> RequestContext:
Expand Down Expand Up @@ -1716,26 +1740,37 @@ async def append_file(
ctx: Optional[RequestContext] = None,
) -> None:
"""Append content to file."""
from openviking.storage.errors import LockAcquisitionError, ResourceBusyError

self._ensure_access(uri, ctx)
path = self._uri_to_path(uri, ctx=ctx)
lock_path = path.rsplit("/", 1)[0] or path

try:
existing = ""
try:
existing_bytes = self._handle_agfs_read(self.agfs.read(path))
existing_bytes = await self._decrypt_content(existing_bytes, ctx=ctx)
existing = self._decode_bytes(existing_bytes)
except AGFSHTTPError as e:
if e.status_code != 404:
async with self._append_lock_context(lock_path):
existing = ""
try:
existing_bytes = self._handle_agfs_read(self.agfs.read(path))
existing_bytes = await self._decrypt_content(existing_bytes, ctx=ctx)
existing = self._decode_bytes(existing_bytes)
except AGFSHTTPError as e:
if e.status_code != 404:
raise
except (FileNotFoundError, RuntimeError) as e:
if not any(
msg in str(e).lower() for msg in ["not found", "no such file or directory"]
):
raise
except AGFSClientError:
raise
except AGFSClientError:
raise

await self._ensure_parent_dirs(path)
final_content = (existing + content).encode("utf-8")
final_content = await self._encrypt_content(final_content, ctx=ctx)
self.agfs.write(path, final_content)
await self._ensure_parent_dirs(path)
final_content = (existing + content).encode("utf-8")
final_content = await self._encrypt_content(final_content, ctx=ctx)
self.agfs.write(path, final_content)

except LockAcquisitionError as e:
raise ResourceBusyError(f"Resource is being processed: {uri}") from e
except Exception as e:
logger.error(f"[VikingFS] Failed to append to file {uri}: {e}")
raise IOError(f"Failed to append to file {uri}: {e}")
Expand Down
74 changes: 74 additions & 0 deletions tests/storage/test_viking_fs_append.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock

import pytest

from openviking.storage.viking_fs import VikingFS


class _RaceAGFS:
def __init__(self) -> None:
self.contents: dict[str, bytes] = {}

def read(self, path: str) -> bytes:
if path not in self.contents:
raise RuntimeError(f"not found: {path}")
return self.contents[path]

def write(self, path: str, data: bytes) -> None:
self.contents[path] = data


@pytest.mark.asyncio
async def test_append_file_treats_runtime_not_found_as_missing() -> None:
agfs = MagicMock()
agfs.read.side_effect = RuntimeError(
"not found: /local/default/session/default/session-1/messages.jsonl"
)
fs = VikingFS(agfs=agfs)
fs._ensure_parent_dirs = AsyncMock() # type: ignore[method-assign]

await fs.append_file(
"viking://session/default/session-1/messages.jsonl",
'{"role":"user"}\n',
)

agfs.write.assert_called_once_with(
"/local/default/session/default/session-1/messages.jsonl",
b'{"role":"user"}\n',
)


@pytest.mark.asyncio
async def test_append_file_still_raises_unrelated_runtime_errors() -> None:
agfs = MagicMock()
agfs.read.side_effect = RuntimeError("backend unavailable")
fs = VikingFS(agfs=agfs)
fs._ensure_parent_dirs = AsyncMock() # type: ignore[method-assign]

with pytest.raises(IOError, match="backend unavailable"):
await fs.append_file(
"viking://session/default/session-1/messages.jsonl",
'{"role":"user"}\n',
)


@pytest.mark.asyncio
async def test_append_file_serializes_concurrent_first_writes() -> None:
agfs = _RaceAGFS()
fs = VikingFS(agfs=agfs)

async def slow_ensure_parent_dirs(_path: str) -> None:
await asyncio.sleep(0.01)

fs._ensure_parent_dirs = slow_ensure_parent_dirs # type: ignore[method-assign]

await asyncio.gather(
fs.append_file("viking://session/default/session-1/messages.jsonl", "first\n"),
fs.append_file("viking://session/default/session-1/messages.jsonl", "second\n"),
)

assert agfs.contents["/local/default/session/default/session-1/messages.jsonl"] in {
b"first\nsecond\n",
b"second\nfirst\n",
}
Loading