Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/aleph/cli/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,12 @@ def parse_args(args):
action="store_true",
default=False,
)
parser.add_argument(
"--repair-native-storage",
dest="repair_native_storage",
help="Scan local storage cache at startup and delete entries whose content "
"does not match their SHA-256 hash. Corrupt entries are refetched on demand.",
action="store_true",
default=False,
)
return parser.parse_args(args)
4 changes: 3 additions & 1 deletion src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ async def main(args: List[str]) -> None:
)

await repair_node(
storage_service=storage_service, session_factory=session_factory
storage_service=storage_service,
session_factory=session_factory,
repair_storage=args.repair_native_storage,
)

set_start_method("spawn")
Expand Down
46 changes: 45 additions & 1 deletion src/aleph/repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from aleph.db.models import StoredFileDb
from aleph.storage import StorageService
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.utils import get_sha256

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -46,10 +47,53 @@ async def _fix_file_sizes(
)


def _is_storage_hash(h: str) -> bool:
return len(h) == 64 and all(c in "0123456789abcdef" for c in h)


async def _fix_corrupt_storage_cache(
session: DbSession, storage_service: StorageService
) -> int:
"""Delete locally cached storage files whose content does not match their SHA-256 hash.

Only storage-type hashes (64-char hex) are checked. IPFS hashes are skipped —
computing an IPFS hash requires a daemon round-trip. Corrupt entries are removed
so they are refetched from the network on next access. Returns the count removed.
"""
all_hashes = [
f.hash
for f in session.execute(select(StoredFileDb)).scalars().all()
if _is_storage_hash(f.hash)
]

LOGGER.info(
"Checking %d storage-type cache entries for SHA-256 integrity", len(all_hashes)
)

removed = 0
for file_hash in all_hashes:
content = await storage_service.storage_engine.read(filename=file_hash)
if content is None:
continue
if get_sha256(content) != file_hash:
LOGGER.warning(
"Corrupt cache entry '%s': SHA-256 mismatch, deleting", file_hash
)
await storage_service.storage_engine.delete(filename=file_hash)
removed += 1

LOGGER.info("Removed %d corrupt cache entries", removed)
return removed


async def repair_node(
storage_service: StorageService, session_factory: DbSessionFactory
storage_service: StorageService,
session_factory: DbSessionFactory,
repair_storage: bool = False,
):
LOGGER.info("Fixing file sizes")
with session_factory() as session:
await _fix_file_sizes(session, storage_service, store_files=True)
if repair_storage:
await _fix_corrupt_storage_cache(session, storage_service)
session.commit()
63 changes: 62 additions & 1 deletion src/aleph/services/storage/fileystem_engine.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import os
from pathlib import Path
from typing import AsyncIterable, Optional, Union

Expand Down Expand Up @@ -43,7 +45,66 @@ async def _read_iterator():

async def write(self, filename: str, content: bytes):
file_path = self.folder / filename
file_path.write_bytes(content)
temp_path = self.folder / f"{filename}.tmp"

# Run blocking syscalls (os.open, os.fsync, os.replace) off the event loop.
await asyncio.to_thread(self._write_durably, temp_path, file_path, content)

@staticmethod
def _write_durably(temp_path: Path, file_path: Path, content: bytes) -> None:
"""Atomically and durably write ``content`` to ``file_path``.

Steps:
1. Write bytes to ``temp_path`` (same directory as ``file_path``).
2. fsync the file descriptor so data and file metadata hit the disk.
3. Atomically rename via ``os.replace`` (POSIX-atomic on same FS).
4. Best-effort fsync of the parent directory so the rename is durable
across kernel crashes (POSIX-only; silently skipped on Windows).

On any exception (including post-rename errors in the directory-fsync
section), the temp file is removed best-effort and the exception is
re-raised. The target file is never touched until the rename succeeds,
so crashes leave either the old content or none.
"""
fd = os.open(
temp_path,
os.O_WRONLY | os.O_CREAT | os.O_TRUNC,
0o644,
)
try:
try:
view = memoryview(content)
written = 0
while written < len(view):
written += os.write(fd, view[written:])
os.fsync(fd)
finally:
os.close(fd)

os.replace(temp_path, file_path)

# Best-effort directory fsync — makes the rename durable.
# os.O_DIRECTORY is POSIX-only (AttributeError on Windows);
# some filesystems/VMs also raise OSError — both are silently skipped.
Comment thread
aliel marked this conversation as resolved.
try:
dir_fd = os.open(file_path.parent, os.O_DIRECTORY)
except (AttributeError, OSError):
return
try:
os.fsync(dir_fd)
except OSError:
pass
finally:
os.close(dir_fd)

except Exception:
try:
# temp_path may already be gone if os.replace succeeded before
# the exception (e.g. an error in the dir-fsync section).
temp_path.unlink(missing_ok=True)
except OSError:
pass
raise
Comment on lines +100 to +107
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup of the temporary directory should be left to the caller, either declare the temporary file in this function or perform the cleanup in the caller.


async def delete(self, filename: str):
file_path = self.folder / filename
Expand Down
44 changes: 38 additions & 6 deletions src/aleph/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""

import asyncio
import json
import logging
from hashlib import sha256
from typing import Any, Final, Optional, cast
Expand Down Expand Up @@ -85,18 +84,31 @@ async def get_message_content(
# unknown, could retry later? shouldn't have arrived this far though.
raise ValueError(f"Unknown item type: '{item_type}'.")

# NUL character (U+0000) is a schema violation, not corruption — raise without recovery.
check_for_u0000(item_content)

try:
content = aleph_json.loads(item_content)
except aleph_json.DecodeError as e:
error_msg = f"Can't decode JSON: {e}"
LOGGER.warning(error_msg)
raise InvalidContent(error_msg)
except json.decoder.JSONDecodeError as e:
error_msg = f"Can't decode JSON: {e}"
LOGGER.warning(error_msg)
raise InvalidContent(error_msg)
if source == ContentSource.DB and item_type in (
ItemType.ipfs,
ItemType.storage,
):
recovery_content = await self._recover_cached_content(
item_hash, ItemType(item_type)
)
item_content = recovery_content.value
source = recovery_content.source
try:
content = aleph_json.loads(item_content)
except aleph_json.DecodeError as retry_e:
raise InvalidContent(
f"Content still invalid after retry: {retry_e}"
) from retry_e
else:
raise InvalidContent(error_msg)

return MessageContent(
hash=item_hash,
Expand All @@ -105,6 +117,26 @@ async def get_message_content(
raw_value=item_content,
)

async def _recover_cached_content(
self, item_hash: str, engine: ItemType
) -> RawContent:
"""Delete a corrupt cache entry and refetch its raw bytes from the network.

Does not interpret the content — callers decide what to do with the bytes.
Raises ContentCurrentlyUnavailable if the content cannot be fetched.
"""
LOGGER.warning(
"Corrupted cached content for %s, deleting and retrying from network",
item_hash,
)
await self.storage_engine.delete(filename=item_hash)
return await self.get_hash_content(
item_hash,
engine=engine,
use_network=True,
use_ipfs=True,
)

async def _fetch_content_from_network(
self, content_hash: str, engine: ItemType, timeout: int
) -> Optional[bytes]:
Expand Down
123 changes: 123 additions & 0 deletions tests/services/test_fileystem_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Tests for FileSystemStorageEngine's durable atomic write.

These tests cover the invariants provided by ``write()``:
- Content on disk matches what was written (happy path).
- A temp file is used and atomically renamed.
- ``os.fsync`` is called on the file fd before ``os.replace``.
- Failures during rename leave the target unchanged and clean up the temp file.
"""

import os
from pathlib import Path
from unittest.mock import patch

import pytest

from aleph.services.storage.fileystem_engine import FileSystemStorageEngine


@pytest.mark.asyncio
async def test_write_produces_final_file(tmp_path: Path):
engine = FileSystemStorageEngine(folder=tmp_path)
await engine.write(filename="abc", content=b"hello")

final = tmp_path / "abc"
assert final.is_file()
assert final.read_bytes() == b"hello"
assert not (tmp_path / "abc.tmp").exists()


@pytest.mark.asyncio
async def test_write_uses_temp_file_then_rename(tmp_path: Path):
engine = FileSystemStorageEngine(folder=tmp_path)

observed = {}
real_replace = os.replace

def spy_replace(src, dst):
observed["src_name"] = os.path.basename(src)
observed["src_exists_before"] = os.path.exists(src)
observed["dst_missing_before"] = not os.path.exists(dst)
return real_replace(src, dst)

with patch(
"aleph.services.storage.fileystem_engine.os.replace",
side_effect=spy_replace,
):
await engine.write(filename="abc", content=b"hello")

assert observed["src_name"] == "abc.tmp"
assert observed["src_exists_before"] is True
assert observed["dst_missing_before"] is True
assert (tmp_path / "abc").read_bytes() == b"hello"


@pytest.mark.asyncio
async def test_write_fsyncs_before_rename(tmp_path: Path):
engine = FileSystemStorageEngine(folder=tmp_path)

call_order = []
real_fsync = os.fsync
real_replace = os.replace

def tracking_fsync(fd):
call_order.append("fsync")
return real_fsync(fd)

def tracking_replace(src, dst):
call_order.append("replace")
return real_replace(src, dst)

with (
patch(
"aleph.services.storage.fileystem_engine.os.fsync",
side_effect=tracking_fsync,
),
patch(
"aleph.services.storage.fileystem_engine.os.replace",
side_effect=tracking_replace,
),
):
await engine.write(filename="abc", content=b"hello")

# The file-fd fsync must occur before the atomic rename.
first_replace = call_order.index("replace")
assert "fsync" in call_order[:first_replace]


@pytest.mark.asyncio
async def test_write_failure_cleans_up_temp_and_preserves_target(tmp_path: Path):
engine = FileSystemStorageEngine(folder=tmp_path)

# Pre-seed the target to confirm it survives a failed write.
target = tmp_path / "abc"
target.write_bytes(b"original")

def boom(src, dst):
raise OSError("simulated rename failure")

with patch(
"aleph.services.storage.fileystem_engine.os.replace",
side_effect=boom,
):
with pytest.raises(OSError):
await engine.write(filename="abc", content=b"new-content")

assert target.read_bytes() == b"original"
assert not (tmp_path / "abc.tmp").exists()


@pytest.mark.asyncio
async def test_read_missing_returns_none(tmp_path: Path):
engine = FileSystemStorageEngine(folder=tmp_path)
assert await engine.read("missing") is None


@pytest.mark.asyncio
async def test_overwrite_replaces_content(tmp_path: Path):
engine = FileSystemStorageEngine(folder=tmp_path)
await engine.write(filename="abc", content=b"v1")
await engine.write(filename="abc", content=b"v2")

assert (tmp_path / "abc").read_bytes() == b"v2"
assert not (tmp_path / "abc.tmp").exists()
Loading
Loading