diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index 2d2360dcd..83e564baf 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -17,7 +17,7 @@ import hashlib import json import re -from contextlib import contextmanager +from contextlib import asynccontextmanager, contextmanager from dataclasses import dataclass, field from datetime import datetime from pathlib import PurePath @@ -40,6 +40,17 @@ logger = get_logger(__name__) +@asynccontextmanager +async def _noop_async_lock() -> Any: + yield + + +@asynccontextmanager +async def _asyncio_lock_context(lock: asyncio.Lock) -> Any: + async with lock: + yield + + # ========== Dataclass ========== @@ -179,6 +190,19 @@ def __init__( self._bound_ctx: contextvars.ContextVar[Optional[RequestContext]] = contextvars.ContextVar( "vikingfs_bound_ctx", default=None ) + self._append_fallback_locks: Dict[str, asyncio.Lock] = {} + + def _append_lock_context(self, path: str): + try: + from openviking.storage.transaction import LockContext, get_lock_manager + + return LockContext(get_lock_manager(), [path], lock_mode="point") + except RuntimeError: + lock = self._append_fallback_locks.get(path) + if lock is None: + lock = asyncio.Lock() + self._append_fallback_locks[path] = lock + return _asyncio_lock_context(lock) @staticmethod def _default_ctx() -> RequestContext: @@ -1716,26 +1740,37 @@ async def append_file( ctx: Optional[RequestContext] = None, ) -> None: """Append content to file.""" + from openviking.storage.errors import LockAcquisitionError, ResourceBusyError + self._ensure_access(uri, ctx) path = self._uri_to_path(uri, ctx=ctx) + lock_path = path.rsplit("/", 1)[0] or path try: - existing = "" - try: - existing_bytes = self._handle_agfs_read(self.agfs.read(path)) - existing_bytes = await self._decrypt_content(existing_bytes, ctx=ctx) - existing = self._decode_bytes(existing_bytes) - except AGFSHTTPError as e: - if e.status_code != 404: + async with self._append_lock_context(lock_path): + existing = "" + try: + existing_bytes = self._handle_agfs_read(self.agfs.read(path)) + existing_bytes = await self._decrypt_content(existing_bytes, ctx=ctx) + existing = self._decode_bytes(existing_bytes) + except AGFSHTTPError as e: + if e.status_code != 404: + raise + except (FileNotFoundError, RuntimeError) as e: + if not any( + msg in str(e).lower() for msg in ["not found", "no such file or directory"] + ): + raise + except AGFSClientError: raise - except AGFSClientError: - raise - await self._ensure_parent_dirs(path) - final_content = (existing + content).encode("utf-8") - final_content = await self._encrypt_content(final_content, ctx=ctx) - self.agfs.write(path, final_content) + await self._ensure_parent_dirs(path) + final_content = (existing + content).encode("utf-8") + final_content = await self._encrypt_content(final_content, ctx=ctx) + self.agfs.write(path, final_content) + except LockAcquisitionError as e: + raise ResourceBusyError(f"Resource is being processed: {uri}") from e except Exception as e: logger.error(f"[VikingFS] Failed to append to file {uri}: {e}") raise IOError(f"Failed to append to file {uri}: {e}") diff --git a/tests/storage/test_viking_fs_append.py b/tests/storage/test_viking_fs_append.py new file mode 100644 index 000000000..fe94c58b5 --- /dev/null +++ b/tests/storage/test_viking_fs_append.py @@ -0,0 +1,74 @@ +import asyncio +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from openviking.storage.viking_fs import VikingFS + + +class _RaceAGFS: + def __init__(self) -> None: + self.contents: dict[str, bytes] = {} + + def read(self, path: str) -> bytes: + if path not in self.contents: + raise RuntimeError(f"not found: {path}") + return self.contents[path] + + def write(self, path: str, data: bytes) -> None: + self.contents[path] = data + + +@pytest.mark.asyncio +async def test_append_file_treats_runtime_not_found_as_missing() -> None: + agfs = MagicMock() + agfs.read.side_effect = RuntimeError( + "not found: /local/default/session/default/session-1/messages.jsonl" + ) + fs = VikingFS(agfs=agfs) + fs._ensure_parent_dirs = AsyncMock() # type: ignore[method-assign] + + await fs.append_file( + "viking://session/default/session-1/messages.jsonl", + '{"role":"user"}\n', + ) + + agfs.write.assert_called_once_with( + "/local/default/session/default/session-1/messages.jsonl", + b'{"role":"user"}\n', + ) + + +@pytest.mark.asyncio +async def test_append_file_still_raises_unrelated_runtime_errors() -> None: + agfs = MagicMock() + agfs.read.side_effect = RuntimeError("backend unavailable") + fs = VikingFS(agfs=agfs) + fs._ensure_parent_dirs = AsyncMock() # type: ignore[method-assign] + + with pytest.raises(IOError, match="backend unavailable"): + await fs.append_file( + "viking://session/default/session-1/messages.jsonl", + '{"role":"user"}\n', + ) + + +@pytest.mark.asyncio +async def test_append_file_serializes_concurrent_first_writes() -> None: + agfs = _RaceAGFS() + fs = VikingFS(agfs=agfs) + + async def slow_ensure_parent_dirs(_path: str) -> None: + await asyncio.sleep(0.01) + + fs._ensure_parent_dirs = slow_ensure_parent_dirs # type: ignore[method-assign] + + await asyncio.gather( + fs.append_file("viking://session/default/session-1/messages.jsonl", "first\n"), + fs.append_file("viking://session/default/session-1/messages.jsonl", "second\n"), + ) + + assert agfs.contents["/local/default/session/default/session-1/messages.jsonl"] in { + b"first\nsecond\n", + b"second\nfirst\n", + }