diff --git a/deployment/scripts/reject_processed_messages.py b/deployment/scripts/reject_processed_messages.py new file mode 100644 index 000000000..149e5b850 --- /dev/null +++ b/deployment/scripts/reject_processed_messages.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 +"""Mark processed messages as rejected. + +Use when a message was accepted under permissive validation rules that have +since become stricter (ex: BaseExecutableContent.metadata now requires a dict +and rejects lists, but some nodes accepted such messages historically). The +API returns 500 on those messages because parsed_content access raises; moving +them to the rejected state matches what nodes that rejected them in the first +place expose to clients. + +The actual rejection logic lives in `aleph.repair.mark_processed_message_as_rejected` +and is also wired into `repair_node`, which runs at startup. This script +exists for ad-hoc cleanups when you have a known list of hashes and don't +want to wait for the next restart. + +Runs as a dry-run by default. Pass --commit to actually persist changes. +Hashes can be provided via repeated --hash flags or via --hashes-file (one +hash per line, lines starting with # are skipped). +""" + +import argparse +import logging +import sys +from pathlib import Path +from typing import Iterable, List + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "src")) + +from sqlalchemy import select # noqa: E402 + +import aleph.config # noqa: E402 +from aleph.db.connection import make_engine, make_session_factory # noqa: E402 +from aleph.db.models.messages import MessageDb # noqa: E402 +from aleph.repair import mark_processed_message_as_rejected # noqa: E402 +from aleph.types.db_session import DbSession # noqa: E402 +from aleph.types.message_status import ErrorCode, MessageStatus # noqa: E402 + +LOGGER = logging.getLogger("reject_processed_messages") + + +def reject_processed_message( + session: DbSession, + item_hash: str, + error_code: ErrorCode, + reason: str, +) -> bool: + message = session.execute( + select(MessageDb).where(MessageDb.item_hash == item_hash) + ).scalar_one_or_none() + + if message is None: + LOGGER.warning("%s: not found in messages, skipping", item_hash) + return False + + if message.status_value == MessageStatus.REJECTED: + LOGGER.info("%s: already rejected, skipping", item_hash) + return False + + if message.status_value != MessageStatus.PROCESSED: + LOGGER.warning( + "%s: unexpected status %s, skipping", + item_hash, + message.status_value, + ) + return False + + message_type = message.type + mark_processed_message_as_rejected( + session=session, + message=message, + error_code=error_code, + reason=reason, + ) + + LOGGER.info( + "%s: rejected (type=%s, error_code=%s)", + item_hash, + message_type, + error_code.name, + ) + return True + + +def _read_hashes(args: argparse.Namespace) -> List[str]: + hashes: List[str] = list(args.hash or []) + if args.hashes_file: + with open(args.hashes_file, encoding="utf-8") as f: + for raw in f: + line = raw.strip() + if line and not line.startswith("#"): + hashes.append(line) + return hashes + + +def _parse_error_code(value: str) -> ErrorCode: + if value.lstrip("-").isdigit(): + return ErrorCode(int(value)) + return ErrorCode[value] + + +def main(argv: Iterable[str]) -> int: + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "-c", "--config", dest="config_file", default=None, help="Config file path" + ) + parser.add_argument( + "--hash", + action="append", + default=[], + help="Message item hash to reject. Pass multiple times for several hashes.", + ) + parser.add_argument( + "--hashes-file", + default=None, + help="Path to a file with one item hash per line.", + ) + parser.add_argument( + "--error-code", + type=_parse_error_code, + default=ErrorCode.INVALID_FORMAT, + help="ErrorCode to record (name or integer, default: INVALID_FORMAT).", + ) + parser.add_argument( + "--reason", + default=( + "Marked rejected by reject_processed_messages.py: content fails " + "validation under current rules." + ), + help="Free-text reason stored on the rejected_messages row.", + ) + parser.add_argument( + "--commit", + action="store_true", + help="Persist changes. Without this flag the script runs as a dry-run.", + ) + parser.add_argument( + "-v", "--verbose", action="store_true", help="Enable debug logging." + ) + + args = parser.parse_args(list(argv)) + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + ) + + hashes = _read_hashes(args) + if not hashes: + parser.error("Provide --hash and/or --hashes-file with at least one hash") + + config = aleph.config.app_config + if args.config_file is not None: + config.yaml.load(args.config_file) + + engine = make_engine(config=config, application_name="reject-processed-messages") + session_factory = make_session_factory(engine) + + changed = 0 + skipped = 0 + errors = 0 + + for item_hash in hashes: + with session_factory() as session: + try: + applied = reject_processed_message( + session=session, + item_hash=item_hash, + error_code=args.error_code, + reason=args.reason, + ) + except Exception: + LOGGER.exception("%s: failed to reject", item_hash) + session.rollback() + errors += 1 + continue + + if not applied: + session.rollback() + skipped += 1 + continue + + if args.commit: + session.commit() + changed += 1 + else: + session.rollback() + LOGGER.info("%s: dry-run, rolled back", item_hash) + changed += 1 + + mode = "commit" if args.commit else "dry-run" + LOGGER.info( + "Done [%s]: %d changed, %d skipped, %d errors", + mode, + changed, + skipped, + errors, + ) + return 1 if errors else 0 + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/src/aleph/repair.py b/src/aleph/repair.py index ac2b67f7d..ed7156fbd 100644 --- a/src/aleph/repair.py +++ b/src/aleph/repair.py @@ -1,17 +1,96 @@ import logging +from typing import Any, Dict -from aleph_message.models import ItemHash -from sqlalchemy import delete, select +from aleph_message.models import ItemHash, MessageType +from sqlalchemy import delete, func, select from sqlalchemy.dialects.postgresql import insert as pg_insert from aleph.db.accessors.files import upsert_file -from aleph.db.models import AlephCreditBalanceDb, AlephCreditHistoryDb, StoredFileDb +from aleph.db.accessors.messages import ( + make_message_status_upsert_query, + make_upsert_rejected_message_statement, +) +from aleph.db.accessors.vms import delete_vm, delete_vm_updates +from aleph.db.models import ( + AlephCreditBalanceDb, + AlephCreditHistoryDb, + MessageDb, + MessageStatusDb, + StoredFileDb, +) from aleph.storage import StorageService +from aleph.toolkit.timestamp import utc_now from aleph.types.db_session import DbSession, DbSessionFactory +from aleph.types.message_status import ErrorCode, MessageStatus LOGGER = logging.getLogger(__name__) +def _wire_message_dict(message: MessageDb) -> Dict[str, Any]: + """Snapshot a ``MessageDb`` row as a JSON-serializable wire-format dict + suitable for the ``rejected_messages.message`` JSONB column.""" + data = message.to_dict(exclude=set(MessageDb.DENORMALIZED_COLUMNS)) + + if data.get("time") is not None: + data["time"] = data["time"].timestamp() + + for key in ("chain", "type", "item_type"): + value = data.get(key) + if value is not None and hasattr(value, "value"): + data[key] = value.value + + return data + + +def mark_processed_message_as_rejected( + session: DbSession, + message: MessageDb, + error_code: ErrorCode, + reason: str, +) -> None: + """Transition a processed message into the REJECTED state. + + Mirrors ``mark_pending_message_as_rejected`` for messages that already + cleared the pipeline under permissive rules but are no longer valid under + current ones (ex: ExecutableContent.metadata used to accept lists, now + requires a dict). Cleans up type-specific state (VM rows for + program/instance), snapshots the row into ``rejected_messages``, flips + ``message_status`` to REJECTED, and deletes the ``messages`` row. The + trigger keeps ``message_counts`` consistent; FK cascades clean + ``message_confirmations`` and ``account_costs``. + + Does not commit. Caller is responsible for state checks (in particular, + that ``message.status_value == MessageStatus.PROCESSED``). + """ + snapshot = _wire_message_dict(message) + + if message.type in (MessageType.program, MessageType.instance): + delete_vm(session=session, vm_hash=message.item_hash) + _ = list(delete_vm_updates(session=session, vm_hash=message.item_hash)) + + session.execute( + make_upsert_rejected_message_statement( + item_hash=message.item_hash, + pending_message_dict=snapshot, + error_code=int(error_code), + details={"errors": [reason]}, + exc_traceback=reason, + tx_hash=None, + ) + ) + + session.execute( + make_message_status_upsert_query( + item_hash=message.item_hash, + new_status=MessageStatus.REJECTED, + reception_time=utc_now(), + where=MessageStatusDb.status != MessageStatus.REJECTED, + ) + ) + + session.execute(delete(MessageDb).where(MessageDb.item_hash == message.item_hash)) + + async def _fix_file_sizes( session: DbSession, storage_service: StorageService, store_files: bool ): @@ -137,6 +216,68 @@ def _repair_credit_balances(session_factory: DbSessionFactory) -> None: LOGGER.info("Credit balances repair complete (%d address(es))", len(addresses)) +_INVALID_METADATA_REASON = ( + "ExecutableContent.metadata must be a dict; legacy rows with a list value " + "no longer parse and surfaced as 500s at the API." +) + + +def _reject_invalid_program_metadata(session_factory: DbSessionFactory) -> None: + """Reject PROGRAM messages whose ``content.metadata`` is a JSON array. + + aleph-message historically accepted ``ExecutableContent.metadata`` as + either a dict or a list. The current validator requires a dict, so rows + accepted under the old rules trip ``parsed_content`` access and surface as + 500s on ``GET /api/v0/messages/``. Moves them to the rejected state + so the API can render them the same way nodes that rejected them in the + first place do. + + Per-message commits so a single bad row does not roll back the rest. + """ + with session_factory() as session: + select_stmt = ( + select(MessageDb.item_hash) + .where(MessageDb.type == MessageType.program) + .where(MessageDb.status_value == MessageStatus.PROCESSED) + .where(func.jsonb_typeof(MessageDb.content["metadata"]) == "array") + ) + item_hashes = list(session.execute(select_stmt).scalars()) + + if not item_hashes: + return + + LOGGER.info( + "Rejecting %d PROGRAM message(s) with non-dict metadata", len(item_hashes) + ) + + rejected = 0 + for item_hash in item_hashes: + with session_factory() as session: + try: + message = session.execute( + select(MessageDb).where(MessageDb.item_hash == item_hash) + ).scalar_one_or_none() + if message is None or message.status_value != MessageStatus.PROCESSED: + continue + mark_processed_message_as_rejected( + session=session, + message=message, + error_code=ErrorCode.INVALID_FORMAT, + reason=_INVALID_METADATA_REASON, + ) + session.commit() + rejected += 1 + except Exception: + LOGGER.exception("Failed to reject program %s", item_hash) + session.rollback() + + LOGGER.info( + "Done: rejected %d / %d PROGRAM message(s) with non-dict metadata", + rejected, + len(item_hashes), + ) + + async def repair_node( storage_service: StorageService, session_factory: DbSessionFactory ): @@ -147,3 +288,6 @@ async def repair_node( LOGGER.info("Repairing credit balances") _repair_credit_balances(session_factory) + + LOGGER.info("Rejecting PROGRAM messages with invalid metadata") + _reject_invalid_program_metadata(session_factory) diff --git a/tests/test_repair.py b/tests/test_repair.py new file mode 100644 index 000000000..f14b5444e --- /dev/null +++ b/tests/test_repair.py @@ -0,0 +1,180 @@ +import datetime as dt + +import pytest +from aleph_message.models import Chain, ItemHash, ItemType, MessageType +from sqlalchemy import select + +from aleph.db.accessors.messages import get_message_status, get_rejected_message +from aleph.db.models import MessageDb, MessageStatusDb +from aleph.repair import _reject_invalid_program_metadata +from aleph.toolkit.timestamp import timestamp_to_datetime +from aleph.types.channel import Channel +from aleph.types.db_session import DbSessionFactory +from aleph.types.message_status import ErrorCode, MessageStatus + + +def _program_message(item_hash: str, content: dict) -> MessageDb: + return MessageDb( + item_hash=item_hash, + chain=Chain.ETH, + sender="0x0000000000000000000000000000000000000001", + signature="0xsig", + item_type=ItemType.inline, + type=MessageType.program, + content=content, + size=100, + time=timestamp_to_datetime(1700000000.0), + channel=Channel("TEST"), + ) + + +def _seed(session, message: MessageDb) -> None: + session.add(message) + session.add( + MessageStatusDb( + item_hash=message.item_hash, + status=MessageStatus.PROCESSED, + reception_time=dt.datetime(2026, 1, 1, tzinfo=dt.timezone.utc), + ) + ) + + +def test_reject_invalid_program_metadata_rejects_list_metadata( + session_factory: DbSessionFactory, +): + """PROGRAM rows whose content.metadata is a JSON array should be moved to + REJECTED so the API stops 500ing on them.""" + bad_hash = "bad" + "0" * 61 + good_hash = "dad" + "0" * 61 + + with session_factory() as session: + _seed( + session, + _program_message( + bad_hash, + {"address": "0xabc", "metadata": ["legacy-list-value"]}, + ), + ) + _seed( + session, + _program_message( + good_hash, + {"address": "0xdef", "metadata": {"name": "good"}}, + ), + ) + session.commit() + + _reject_invalid_program_metadata(session_factory) + + with session_factory() as session: + # Bad message: gone from `messages`, present in `rejected_messages`, + # status flipped to REJECTED. + assert ( + session.execute( + select(MessageDb).where(MessageDb.item_hash == bad_hash) + ).scalar_one_or_none() + is None + ) + + rejected = get_rejected_message(session=session, item_hash=bad_hash) + assert rejected is not None + assert rejected.error_code == ErrorCode.INVALID_FORMAT + assert rejected.message["item_hash"] == bad_hash + assert rejected.message["content"]["metadata"] == ["legacy-list-value"] + + status = get_message_status(session=session, item_hash=ItemHash(bad_hash)) + assert status is not None + assert status.status == MessageStatus.REJECTED + + # Good message: untouched. + good = session.execute( + select(MessageDb).where(MessageDb.item_hash == good_hash) + ).scalar_one_or_none() + assert good is not None + assert good.status_value == MessageStatus.PROCESSED + + good_status = get_message_status(session=session, item_hash=ItemHash(good_hash)) + assert good_status is not None + assert good_status.status == MessageStatus.PROCESSED + + assert get_rejected_message(session=session, item_hash=good_hash) is None + + +def test_reject_invalid_program_metadata_ignores_other_types( + session_factory: DbSessionFactory, +): + """The repair targets PROGRAM only. Other message types stay put even if + they happen to carry a `metadata` array (the schema differs).""" + aggregate_hash = "agg" + "0" * 61 + + with session_factory() as session: + message = MessageDb( + item_hash=aggregate_hash, + chain=Chain.ETH, + sender="0x0000000000000000000000000000000000000002", + signature="0xsig", + item_type=ItemType.inline, + type=MessageType.aggregate, + content={"address": "0xabc", "metadata": ["whatever"]}, + size=100, + time=timestamp_to_datetime(1700000000.0), + channel=Channel("TEST"), + ) + session.add(message) + session.add( + MessageStatusDb( + item_hash=aggregate_hash, + status=MessageStatus.PROCESSED, + reception_time=dt.datetime(2026, 1, 1, tzinfo=dt.timezone.utc), + ) + ) + session.commit() + + _reject_invalid_program_metadata(session_factory) + + with session_factory() as session: + msg = session.execute( + select(MessageDb).where(MessageDb.item_hash == aggregate_hash) + ).scalar_one_or_none() + assert msg is not None + assert get_rejected_message(session=session, item_hash=aggregate_hash) is None + + +def test_reject_invalid_program_metadata_no_op_when_empty( + session_factory: DbSessionFactory, +): + """Empty DB should be a no-op rather than an error.""" + _reject_invalid_program_metadata(session_factory) + + +@pytest.mark.parametrize( + "metadata_value", + [ + {"name": "valid"}, + None, + ], +) +def test_reject_invalid_program_metadata_skips_valid_programs( + session_factory: DbSessionFactory, metadata_value +): + """Programs with a dict metadata (or none at all) should not be touched.""" + item_hash = "ok" + "0" * 62 + + content: dict = {"address": "0xabc"} + if metadata_value is not None: + content["metadata"] = metadata_value + + with session_factory() as session: + _seed(session, _program_message(item_hash, content)) + session.commit() + + _reject_invalid_program_metadata(session_factory) + + with session_factory() as session: + assert ( + session.execute( + select(MessageDb).where(MessageDb.item_hash == item_hash) + ).scalar_one_or_none() + is not None + ) + assert get_rejected_message(session=session, item_hash=item_hash) is None