Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions deployment/scripts/reject_processed_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
#!/usr/bin/env python3
"""Mark processed messages as rejected.

Use when a message was accepted under permissive validation rules that have
since become stricter (ex: BaseExecutableContent.metadata now requires a dict
and rejects lists, but some nodes accepted such messages historically). The
API returns 500 on those messages because parsed_content access raises; moving
them to the rejected state matches what nodes that rejected them in the first
place expose to clients.

The actual rejection logic lives in `aleph.repair.mark_processed_message_as_rejected`
and is also wired into `repair_node`, which runs at startup. This script
exists for ad-hoc cleanups when you have a known list of hashes and don't
want to wait for the next restart.

Runs as a dry-run by default. Pass --commit to actually persist changes.
Hashes can be provided via repeated --hash flags or via --hashes-file (one
hash per line, lines starting with # are skipped).
"""

import argparse
import logging
import sys
from pathlib import Path
from typing import Iterable, List

sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent / "src"))

from sqlalchemy import select # noqa: E402

import aleph.config # noqa: E402
from aleph.db.connection import make_engine, make_session_factory # noqa: E402
from aleph.db.models.messages import MessageDb # noqa: E402
from aleph.repair import mark_processed_message_as_rejected # noqa: E402
from aleph.types.db_session import DbSession # noqa: E402
from aleph.types.message_status import ErrorCode, MessageStatus # noqa: E402

LOGGER = logging.getLogger("reject_processed_messages")


def reject_processed_message(
session: DbSession,
item_hash: str,
error_code: ErrorCode,
reason: str,
) -> bool:
message = session.execute(
select(MessageDb).where(MessageDb.item_hash == item_hash)
).scalar_one_or_none()

if message is None:
LOGGER.warning("%s: not found in messages, skipping", item_hash)
return False

if message.status_value == MessageStatus.REJECTED:
LOGGER.info("%s: already rejected, skipping", item_hash)
return False

if message.status_value != MessageStatus.PROCESSED:
LOGGER.warning(
"%s: unexpected status %s, skipping",
item_hash,
message.status_value,
)
return False

message_type = message.type
mark_processed_message_as_rejected(
session=session,
message=message,
error_code=error_code,
reason=reason,
)

LOGGER.info(
"%s: rejected (type=%s, error_code=%s)",
item_hash,
message_type,
error_code.name,
)
return True


def _read_hashes(args: argparse.Namespace) -> List[str]:
hashes: List[str] = list(args.hash or [])
if args.hashes_file:
with open(args.hashes_file, encoding="utf-8") as f:
for raw in f:
line = raw.strip()
if line and not line.startswith("#"):
hashes.append(line)
return hashes


def _parse_error_code(value: str) -> ErrorCode:
if value.lstrip("-").isdigit():
return ErrorCode(int(value))
return ErrorCode[value]


def main(argv: Iterable[str]) -> int:
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
"-c", "--config", dest="config_file", default=None, help="Config file path"
)
parser.add_argument(
"--hash",
action="append",
default=[],
help="Message item hash to reject. Pass multiple times for several hashes.",
)
parser.add_argument(
"--hashes-file",
default=None,
help="Path to a file with one item hash per line.",
)
parser.add_argument(
"--error-code",
type=_parse_error_code,
default=ErrorCode.INVALID_FORMAT,
help="ErrorCode to record (name or integer, default: INVALID_FORMAT).",
)
parser.add_argument(
"--reason",
default=(
"Marked rejected by reject_processed_messages.py: content fails "
"validation under current rules."
),
help="Free-text reason stored on the rejected_messages row.",
)
parser.add_argument(
"--commit",
action="store_true",
help="Persist changes. Without this flag the script runs as a dry-run.",
)
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable debug logging."
)

args = parser.parse_args(list(argv))

logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)

hashes = _read_hashes(args)
if not hashes:
parser.error("Provide --hash and/or --hashes-file with at least one hash")

config = aleph.config.app_config
if args.config_file is not None:
config.yaml.load(args.config_file)

engine = make_engine(config=config, application_name="reject-processed-messages")
session_factory = make_session_factory(engine)

changed = 0
skipped = 0
errors = 0

for item_hash in hashes:
with session_factory() as session:
try:
applied = reject_processed_message(
session=session,
item_hash=item_hash,
error_code=args.error_code,
reason=args.reason,
)
except Exception:
LOGGER.exception("%s: failed to reject", item_hash)
session.rollback()
errors += 1
continue

if not applied:
session.rollback()
skipped += 1
continue

if args.commit:
session.commit()
changed += 1
else:
session.rollback()
LOGGER.info("%s: dry-run, rolled back", item_hash)
changed += 1

mode = "commit" if args.commit else "dry-run"
LOGGER.info(
"Done [%s]: %d changed, %d skipped, %d errors",
mode,
changed,
skipped,
errors,
)
return 1 if errors else 0


if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
150 changes: 147 additions & 3 deletions src/aleph/repair.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,96 @@
import logging
from typing import Any, Dict

from aleph_message.models import ItemHash
from sqlalchemy import delete, select
from aleph_message.models import ItemHash, MessageType
from sqlalchemy import delete, func, select
from sqlalchemy.dialects.postgresql import insert as pg_insert

from aleph.db.accessors.files import upsert_file
from aleph.db.models import AlephCreditBalanceDb, AlephCreditHistoryDb, StoredFileDb
from aleph.db.accessors.messages import (
make_message_status_upsert_query,
make_upsert_rejected_message_statement,
)
from aleph.db.accessors.vms import delete_vm, delete_vm_updates
from aleph.db.models import (
AlephCreditBalanceDb,
AlephCreditHistoryDb,
MessageDb,
MessageStatusDb,
StoredFileDb,
)
from aleph.storage import StorageService
from aleph.toolkit.timestamp import utc_now
from aleph.types.db_session import DbSession, DbSessionFactory
from aleph.types.message_status import ErrorCode, MessageStatus

LOGGER = logging.getLogger(__name__)


def _wire_message_dict(message: MessageDb) -> Dict[str, Any]:
"""Snapshot a ``MessageDb`` row as a JSON-serializable wire-format dict
suitable for the ``rejected_messages.message`` JSONB column."""
data = message.to_dict(exclude=set(MessageDb.DENORMALIZED_COLUMNS))

if data.get("time") is not None:
data["time"] = data["time"].timestamp()

for key in ("chain", "type", "item_type"):
value = data.get(key)
if value is not None and hasattr(value, "value"):
data[key] = value.value

return data


def mark_processed_message_as_rejected(
session: DbSession,
message: MessageDb,
error_code: ErrorCode,
reason: str,
) -> None:
"""Transition a processed message into the REJECTED state.

Mirrors ``mark_pending_message_as_rejected`` for messages that already
cleared the pipeline under permissive rules but are no longer valid under
current ones (ex: ExecutableContent.metadata used to accept lists, now
requires a dict). Cleans up type-specific state (VM rows for
program/instance), snapshots the row into ``rejected_messages``, flips
``message_status`` to REJECTED, and deletes the ``messages`` row. The
trigger keeps ``message_counts`` consistent; FK cascades clean
``message_confirmations`` and ``account_costs``.

Does not commit. Caller is responsible for state checks (in particular,
that ``message.status_value == MessageStatus.PROCESSED``).
"""
snapshot = _wire_message_dict(message)

if message.type in (MessageType.program, MessageType.instance):
delete_vm(session=session, vm_hash=message.item_hash)
_ = list(delete_vm_updates(session=session, vm_hash=message.item_hash))

session.execute(
make_upsert_rejected_message_statement(
item_hash=message.item_hash,
pending_message_dict=snapshot,
error_code=int(error_code),
details={"errors": [reason]},
exc_traceback=reason,
tx_hash=None,
)
)

session.execute(
make_message_status_upsert_query(
item_hash=message.item_hash,
new_status=MessageStatus.REJECTED,
reception_time=utc_now(),
where=MessageStatusDb.status != MessageStatus.REJECTED,
)
)

session.execute(delete(MessageDb).where(MessageDb.item_hash == message.item_hash))


async def _fix_file_sizes(
session: DbSession, storage_service: StorageService, store_files: bool
):
Expand Down Expand Up @@ -137,6 +216,68 @@ def _repair_credit_balances(session_factory: DbSessionFactory) -> None:
LOGGER.info("Credit balances repair complete (%d address(es))", len(addresses))


_INVALID_METADATA_REASON = (
"ExecutableContent.metadata must be a dict; legacy rows with a list value "
"no longer parse and surfaced as 500s at the API."
)


def _reject_invalid_program_metadata(session_factory: DbSessionFactory) -> None:
"""Reject PROGRAM messages whose ``content.metadata`` is a JSON array.

aleph-message historically accepted ``ExecutableContent.metadata`` as
either a dict or a list. The current validator requires a dict, so rows
accepted under the old rules trip ``parsed_content`` access and surface as
500s on ``GET /api/v0/messages/<hash>``. Moves them to the rejected state
so the API can render them the same way nodes that rejected them in the
first place do.

Per-message commits so a single bad row does not roll back the rest.
"""
with session_factory() as session:
select_stmt = (
select(MessageDb.item_hash)
.where(MessageDb.type == MessageType.program)
.where(MessageDb.status_value == MessageStatus.PROCESSED)
.where(func.jsonb_typeof(MessageDb.content["metadata"]) == "array")
)
item_hashes = list(session.execute(select_stmt).scalars())

if not item_hashes:
return

LOGGER.info(
"Rejecting %d PROGRAM message(s) with non-dict metadata", len(item_hashes)
)

rejected = 0
for item_hash in item_hashes:
with session_factory() as session:
try:
message = session.execute(
select(MessageDb).where(MessageDb.item_hash == item_hash)
).scalar_one_or_none()
if message is None or message.status_value != MessageStatus.PROCESSED:
continue
mark_processed_message_as_rejected(
session=session,
message=message,
error_code=ErrorCode.INVALID_FORMAT,
reason=_INVALID_METADATA_REASON,
)
session.commit()
rejected += 1
except Exception:
LOGGER.exception("Failed to reject program %s", item_hash)
session.rollback()

LOGGER.info(
"Done: rejected %d / %d PROGRAM message(s) with non-dict metadata",
rejected,
len(item_hashes),
)


async def repair_node(
storage_service: StorageService, session_factory: DbSessionFactory
):
Expand All @@ -147,3 +288,6 @@ async def repair_node(

LOGGER.info("Repairing credit balances")
_repair_credit_balances(session_factory)

LOGGER.info("Rejecting PROGRAM messages with invalid metadata")
_reject_invalid_program_metadata(session_factory)
Loading
Loading