From 036b8bc57e0a70f76b865fb381fefe8bbcb83d53 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 18 May 2026 23:33:11 +0200 Subject: [PATCH 01/12] config: add scoring block for node metrics ingestion --- src/aleph/config.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/aleph/config.py b/src/aleph/config.py index 9dd818eaa..dd3f08588 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -59,6 +59,16 @@ def get_defaults(): # Allowed channels for credit balance messages. "channels": ["ALEPH_CREDIT"], }, + "scoring": { + # Addresses allowed to publish node scoring metrics. + "addresses": [ + "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + ], + # Channel scoring messages are published on. + "channel": "aleph-scoring", + # POST message type that carries the node metrics payload. + "metrics_post_type": "aleph-network-metrics", + }, "jobs": { "pending_messages": { # Maximum number of retries for a message. From 901a8a7ee0d48d80ade60d796de341f63fc44f4e Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 18 May 2026 23:35:41 +0200 Subject: [PATCH 02/12] db: add CrnMetricDb and CcnMetricDb models --- src/aleph/db/models/__init__.py | 1 + src/aleph/db/models/metrics.py | 43 +++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) create mode 100644 src/aleph/db/models/metrics.py diff --git a/src/aleph/db/models/__init__.py b/src/aleph/db/models/__init__.py index 4b099ca1b..220e6e02f 100644 --- a/src/aleph/db/models/__init__.py +++ b/src/aleph/db/models/__init__.py @@ -5,6 +5,7 @@ from .files import * # noqa from .message_counts import * # noqa from .messages import * # noqa +from .metrics import * # noqa from .peers import * # noqa from .pending_messages import * # noqa from .pending_txs import * # noqa diff --git a/src/aleph/db/models/metrics.py b/src/aleph/db/models/metrics.py new file mode 100644 index 000000000..722e2b035 --- /dev/null +++ b/src/aleph/db/models/metrics.py @@ -0,0 +1,43 @@ +from typing import Optional + +from sqlalchemy import BigInteger, Float, ForeignKey, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from .base import Base + + +class CrnMetricDb(Base): + __tablename__ = "crn_metrics" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + item_hash: Mapped[str] = mapped_column( + ForeignKey("messages.item_hash", ondelete="CASCADE"), + nullable=False, + index=True, + ) + node_id: Mapped[str] = mapped_column(String, nullable=False) + measured_at: Mapped[float] = mapped_column(Float, nullable=False) + base_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + base_latency_ipv4: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + full_check_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + diagnostic_vm_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + + +class CcnMetricDb(Base): + __tablename__ = "ccn_metrics" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + item_hash: Mapped[str] = mapped_column( + ForeignKey("messages.item_hash", ondelete="CASCADE"), + nullable=False, + index=True, + ) + node_id: Mapped[str] = mapped_column(String, nullable=False) + measured_at: Mapped[float] = mapped_column(Float, nullable=False) + base_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + base_latency_ipv4: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + metrics_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + aggregate_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + file_download_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + pending_messages: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + eth_height_remaining: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) From 6a0a31cb264326e77edc246b7e9a688ed60bf3c3 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Mon, 18 May 2026 23:41:37 +0200 Subject: [PATCH 03/12] migration: create crn_metrics and ccn_metrics, backfill, drop views --- .../0059_b9c4f1e6a2d7_persist_node_metrics.py | 165 ++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py diff --git a/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py new file mode 100644 index 000000000..1d4321b1e --- /dev/null +++ b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py @@ -0,0 +1,165 @@ +"""Persist node scoring metrics + +Revision ID: b9c4f1e6a2d7 +Revises: 4f1e8d2a6c3b +Create Date: 2026-05-18 + +Creates crn_metrics and ccn_metrics tables, backfills them from the +existing JSON-unnesting views, then drops the views. +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import text + +revision = "b9c4f1e6a2d7" +down_revision = "4f1e8d2a6c3b" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "crn_metrics", + sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True), + sa.Column( + "item_hash", + sa.String, + sa.ForeignKey("messages.item_hash", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("node_id", sa.String, nullable=False), + sa.Column("measured_at", sa.Float, nullable=False), + sa.Column("base_latency", sa.Float), + sa.Column("base_latency_ipv4", sa.Float), + sa.Column("full_check_latency", sa.Float), + sa.Column("diagnostic_vm_latency", sa.Float), + ) + op.create_index( + "ix_crn_metrics_item_hash", "crn_metrics", ["item_hash"] + ) + op.create_index( + "ix_crn_metrics_node_id_measured_at", + "crn_metrics", + ["node_id", sa.text("measured_at DESC")], + ) + + op.create_table( + "ccn_metrics", + sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True), + sa.Column( + "item_hash", + sa.String, + sa.ForeignKey("messages.item_hash", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("node_id", sa.String, nullable=False), + sa.Column("measured_at", sa.Float, nullable=False), + sa.Column("base_latency", sa.Float), + sa.Column("base_latency_ipv4", sa.Float), + sa.Column("metrics_latency", sa.Float), + sa.Column("aggregate_latency", sa.Float), + sa.Column("file_download_latency", sa.Float), + sa.Column("pending_messages", sa.Integer), + sa.Column("eth_height_remaining", sa.Integer), + ) + op.create_index( + "ix_ccn_metrics_item_hash", "ccn_metrics", ["item_hash"] + ) + op.create_index( + "ix_ccn_metrics_node_id_measured_at", + "ccn_metrics", + ["node_id", sa.text("measured_at DESC")], + ) + + op.execute( + text( + """ + INSERT INTO crn_metrics ( + item_hash, node_id, measured_at, base_latency, base_latency_ipv4, + full_check_latency, diagnostic_vm_latency + ) + SELECT + item_hash, node_id, measured_at, base_latency, base_latency_ipv4, + full_check_latency, diagnostic_vm_latency + FROM crn_metric_view + WHERE node_id IS NOT NULL AND measured_at IS NOT NULL + """ + ) + ) + op.execute( + text( + """ + INSERT INTO ccn_metrics ( + item_hash, node_id, measured_at, base_latency, base_latency_ipv4, + metrics_latency, aggregate_latency, file_download_latency, + pending_messages, eth_height_remaining + ) + SELECT + item_hash, node_id, measured_at, base_latency, base_latency_ipv4, + metrics_latency, aggregate_latency, file_download_latency, + pending_messages, eth_height_remaining + FROM ccn_metric_view + WHERE node_id IS NOT NULL AND measured_at IS NOT NULL + """ + ) + ) + + op.execute(text("DROP VIEW IF EXISTS crn_metric_view")) + op.execute(text("DROP VIEW IF EXISTS ccn_metric_view")) + + +def downgrade() -> None: + op.execute( + text( + """ + CREATE OR REPLACE VIEW ccn_metric_view AS + WITH json_data AS ( + SELECT item_hash, + jsonb_array_elements(content -> 'content' -> 'metrics' -> 'ccn') as ccn_data + FROM messages + WHERE channel = 'aleph-scoring' + AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4' + ) + SELECT item_hash, + (ccn_data ->> 'measured_at')::float as measured_at, + ccn_data ->> 'node_id' as node_id, + (ccn_data ->> 'base_latency')::float as base_latency, + (ccn_data ->> 'metrics_latency')::float as metrics_latency, + (ccn_data ->> 'aggregate_latency')::float as aggregate_latency, + (ccn_data ->> 'base_latency_ipv4')::float as base_latency_ipv4, + (ccn_data ->> 'file_download_latency')::float as file_download_latency, + (ccn_data ->> 'pending_messages')::int as pending_messages, + (ccn_data ->> 'eth_height_remaining')::int as eth_height_remaining + FROM json_data + """ + ) + ) + op.execute( + text( + """ + CREATE OR REPLACE VIEW crn_metric_view AS + WITH json_data AS ( + SELECT item_hash, + jsonb_array_elements(content -> 'content' -> 'metrics' -> 'crn') as crn_data + FROM messages + WHERE channel = 'aleph-scoring' + AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4' + ) + SELECT item_hash as item_hash, + (crn_data ->> 'measured_at')::float as measured_at, + crn_data ->> 'node_id' as node_id, + (crn_data ->> 'base_latency')::float as base_latency, + (crn_data ->> 'base_latency_ipv4')::float as base_latency_ipv4, + (crn_data ->> 'full_check_latency')::float as full_check_latency, + (crn_data ->> 'diagnostic_vm_latency')::float as diagnostic_vm_latency + FROM json_data + """ + ) + ) + op.drop_index("ix_ccn_metrics_node_id_measured_at", table_name="ccn_metrics") + op.drop_index("ix_ccn_metrics_item_hash", table_name="ccn_metrics") + op.drop_table("ccn_metrics") + op.drop_index("ix_crn_metrics_node_id_measured_at", table_name="crn_metrics") + op.drop_index("ix_crn_metrics_item_hash", table_name="crn_metrics") + op.drop_table("crn_metrics") From cfc2dc5ab084c213db181285eb1a60b456e94f26 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 19 May 2026 00:38:08 +0200 Subject: [PATCH 04/12] db: add row builders for node metric ingestion Add _build_crn_rows and _build_ccn_rows helpers that convert a scoring message's JSON content into lists of dicts ready for bulk insert. Both helpers skip entries missing required fields (node_id, measured_at) and coerce non-numeric values to None instead of raising. --- src/aleph/db/accessors/metrics.py | 69 +++++++++++++++- tests/db/test_metrics_persistence.py | 115 +++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 tests/db/test_metrics_persistence.py diff --git a/src/aleph/db/accessors/metrics.py b/src/aleph/db/accessors/metrics.py index fd54bb984..1e3bc531a 100644 --- a/src/aleph/db/accessors/metrics.py +++ b/src/aleph/db/accessors/metrics.py @@ -1,5 +1,5 @@ import time -from typing import Optional +from typing import Any, List, Mapping, Optional from sqlalchemy import select, text from sqlalchemy.orm.session import Session @@ -9,6 +9,73 @@ from aleph.types.sort_order import SortOrder, SortOrderForMetrics +def _coerce_float(value: Any) -> Optional[float]: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + +def _coerce_int(value: Any) -> Optional[int]: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _build_crn_rows( + item_hash: str, crn_array: List[Mapping[str, Any]] +) -> List[Mapping[str, Any]]: + rows: List[Mapping[str, Any]] = [] + for entry in crn_array: + if not isinstance(entry, Mapping): + continue + node_id = entry.get("node_id") + measured_at = _coerce_float(entry.get("measured_at")) + if node_id is None or measured_at is None: + continue + rows.append({ + "item_hash": item_hash, + "node_id": str(node_id), + "measured_at": measured_at, + "base_latency": _coerce_float(entry.get("base_latency")), + "base_latency_ipv4": _coerce_float(entry.get("base_latency_ipv4")), + "full_check_latency": _coerce_float(entry.get("full_check_latency")), + "diagnostic_vm_latency": _coerce_float(entry.get("diagnostic_vm_latency")), + }) + return rows + + +def _build_ccn_rows( + item_hash: str, ccn_array: List[Mapping[str, Any]] +) -> List[Mapping[str, Any]]: + rows: List[Mapping[str, Any]] = [] + for entry in ccn_array: + if not isinstance(entry, Mapping): + continue + node_id = entry.get("node_id") + measured_at = _coerce_float(entry.get("measured_at")) + if node_id is None or measured_at is None: + continue + rows.append({ + "item_hash": item_hash, + "node_id": str(node_id), + "measured_at": measured_at, + "base_latency": _coerce_float(entry.get("base_latency")), + "base_latency_ipv4": _coerce_float(entry.get("base_latency_ipv4")), + "metrics_latency": _coerce_float(entry.get("metrics_latency")), + "aggregate_latency": _coerce_float(entry.get("aggregate_latency")), + "file_download_latency": _coerce_float(entry.get("file_download_latency")), + "pending_messages": _coerce_int(entry.get("pending_messages")), + "eth_height_remaining": _coerce_int(entry.get("eth_height_remaining")), + }) + return rows + + def _parse_ccn_result(result): keys = [ "item_hash", diff --git a/tests/db/test_metrics_persistence.py b/tests/db/test_metrics_persistence.py new file mode 100644 index 000000000..efcd9e2b0 --- /dev/null +++ b/tests/db/test_metrics_persistence.py @@ -0,0 +1,115 @@ +from aleph.db.accessors.metrics import _build_ccn_rows, _build_crn_rows + + +def test_build_crn_rows_full_payload(): + item_hash = "msg-1" + crn_array = [ + { + "measured_at": 1700000000.0, + "node_id": "node-A", + "base_latency": 0.1, + "base_latency_ipv4": 0.11, + "full_check_latency": 0.2, + "diagnostic_vm_latency": 0.3, + }, + { + "measured_at": 1700000001.0, + "node_id": "node-B", + "base_latency": 0.4, + "base_latency_ipv4": 0.41, + "full_check_latency": 0.5, + "diagnostic_vm_latency": 0.6, + }, + ] + rows = _build_crn_rows(item_hash, crn_array) + assert len(rows) == 2 + assert rows[0] == { + "item_hash": "msg-1", + "node_id": "node-A", + "measured_at": 1700000000.0, + "base_latency": 0.1, + "base_latency_ipv4": 0.11, + "full_check_latency": 0.2, + "diagnostic_vm_latency": 0.3, + } + + +def test_build_crn_rows_missing_optional_fields(): + rows = _build_crn_rows("msg-2", [ + {"measured_at": 1700000000.0, "node_id": "node-A"}, + ]) + assert rows == [{ + "item_hash": "msg-2", + "node_id": "node-A", + "measured_at": 1700000000.0, + "base_latency": None, + "base_latency_ipv4": None, + "full_check_latency": None, + "diagnostic_vm_latency": None, + }] + + +def test_build_crn_rows_skips_entries_missing_required_fields(): + # Without node_id or measured_at, the existing view emits NULL and + # the migration's WHERE clause filters those out. The builder skips them. + rows = _build_crn_rows("msg-3", [ + {"measured_at": 1700000000.0}, # missing node_id + {"node_id": "node-A"}, # missing measured_at + {"measured_at": 1700000001.0, "node_id": "ok"}, # valid + ]) + assert len(rows) == 1 + assert rows[0]["node_id"] == "ok" + + +def test_build_crn_rows_non_numeric_field_becomes_none(): + rows = _build_crn_rows("msg-4", [ + { + "measured_at": 1700000000.0, + "node_id": "node-A", + "base_latency": "not-a-number", + }, + ]) + assert rows[0]["base_latency"] is None + + +def test_build_crn_rows_empty_array(): + assert _build_crn_rows("msg-5", []) == [] + + +def test_build_ccn_rows_full_payload(): + rows = _build_ccn_rows("msg-1", [ + { + "measured_at": 1700000000.0, + "node_id": "node-A", + "base_latency": 0.1, + "base_latency_ipv4": 0.11, + "metrics_latency": 0.2, + "aggregate_latency": 0.3, + "file_download_latency": 0.4, + "pending_messages": 42, + "eth_height_remaining": 7, + }, + ]) + assert rows == [{ + "item_hash": "msg-1", + "node_id": "node-A", + "measured_at": 1700000000.0, + "base_latency": 0.1, + "base_latency_ipv4": 0.11, + "metrics_latency": 0.2, + "aggregate_latency": 0.3, + "file_download_latency": 0.4, + "pending_messages": 42, + "eth_height_remaining": 7, + }] + + +def test_build_ccn_rows_non_numeric_pending_messages_becomes_none(): + rows = _build_ccn_rows("msg-2", [ + { + "measured_at": 1700000000.0, + "node_id": "node-A", + "pending_messages": "lots", + }, + ]) + assert rows[0]["pending_messages"] is None From 2d4fda88fb190763b7421d1492ab56080c85cb82 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 19 May 2026 00:43:48 +0200 Subject: [PATCH 05/12] db: add insert_node_metrics accessor for scoring posts --- src/aleph/db/accessors/metrics.py | 78 +++++++--- tests/db/test_metrics_persistence.py | 222 +++++++++++++++++++++------ 2 files changed, 230 insertions(+), 70 deletions(-) diff --git a/src/aleph/db/accessors/metrics.py b/src/aleph/db/accessors/metrics.py index 1e3bc531a..5e0738ebc 100644 --- a/src/aleph/db/accessors/metrics.py +++ b/src/aleph/db/accessors/metrics.py @@ -1,10 +1,11 @@ import time from typing import Any, List, Mapping, Optional -from sqlalchemy import select, text +from sqlalchemy import insert, select, text from sqlalchemy.orm.session import Session from sqlalchemy.sql import Select +from aleph.db.models import CcnMetricDb, CrnMetricDb from aleph.types.db_session import DbSession from aleph.types.sort_order import SortOrder, SortOrderForMetrics @@ -38,15 +39,19 @@ def _build_crn_rows( measured_at = _coerce_float(entry.get("measured_at")) if node_id is None or measured_at is None: continue - rows.append({ - "item_hash": item_hash, - "node_id": str(node_id), - "measured_at": measured_at, - "base_latency": _coerce_float(entry.get("base_latency")), - "base_latency_ipv4": _coerce_float(entry.get("base_latency_ipv4")), - "full_check_latency": _coerce_float(entry.get("full_check_latency")), - "diagnostic_vm_latency": _coerce_float(entry.get("diagnostic_vm_latency")), - }) + rows.append( + { + "item_hash": item_hash, + "node_id": str(node_id), + "measured_at": measured_at, + "base_latency": _coerce_float(entry.get("base_latency")), + "base_latency_ipv4": _coerce_float(entry.get("base_latency_ipv4")), + "full_check_latency": _coerce_float(entry.get("full_check_latency")), + "diagnostic_vm_latency": _coerce_float( + entry.get("diagnostic_vm_latency") + ), + } + ) return rows @@ -61,18 +66,22 @@ def _build_ccn_rows( measured_at = _coerce_float(entry.get("measured_at")) if node_id is None or measured_at is None: continue - rows.append({ - "item_hash": item_hash, - "node_id": str(node_id), - "measured_at": measured_at, - "base_latency": _coerce_float(entry.get("base_latency")), - "base_latency_ipv4": _coerce_float(entry.get("base_latency_ipv4")), - "metrics_latency": _coerce_float(entry.get("metrics_latency")), - "aggregate_latency": _coerce_float(entry.get("aggregate_latency")), - "file_download_latency": _coerce_float(entry.get("file_download_latency")), - "pending_messages": _coerce_int(entry.get("pending_messages")), - "eth_height_remaining": _coerce_int(entry.get("eth_height_remaining")), - }) + rows.append( + { + "item_hash": item_hash, + "node_id": str(node_id), + "measured_at": measured_at, + "base_latency": _coerce_float(entry.get("base_latency")), + "base_latency_ipv4": _coerce_float(entry.get("base_latency_ipv4")), + "metrics_latency": _coerce_float(entry.get("metrics_latency")), + "aggregate_latency": _coerce_float(entry.get("aggregate_latency")), + "file_download_latency": _coerce_float( + entry.get("file_download_latency") + ), + "pending_messages": _coerce_int(entry.get("pending_messages")), + "eth_height_remaining": _coerce_int(entry.get("eth_height_remaining")), + } + ) return rows @@ -206,3 +215,28 @@ def query_metric_crn( result = session.execute(select_stmt).fetchall() return _parse_crn_result(result=result) + + +def insert_node_metrics( + session: DbSession, + item_hash: str, + content: Mapping[str, Any], +) -> None: + metrics = content.get("metrics") or {} + if not isinstance(metrics, Mapping): + return + + crn_array = metrics.get("crn") or [] + ccn_array = metrics.get("ccn") or [] + if not isinstance(crn_array, list): + crn_array = [] + if not isinstance(ccn_array, list): + ccn_array = [] + + crn_rows = _build_crn_rows(item_hash, crn_array) + ccn_rows = _build_ccn_rows(item_hash, ccn_array) + + if crn_rows: + session.execute(insert(CrnMetricDb), crn_rows) + if ccn_rows: + session.execute(insert(CcnMetricDb), ccn_rows) diff --git a/tests/db/test_metrics_persistence.py b/tests/db/test_metrics_persistence.py index efcd9e2b0..915711f6f 100644 --- a/tests/db/test_metrics_persistence.py +++ b/tests/db/test_metrics_persistence.py @@ -1,4 +1,16 @@ -from aleph.db.accessors.metrics import _build_ccn_rows, _build_crn_rows +import datetime as dt + +from aleph_message.models import Chain, ItemType, MessageType +from sqlalchemy import select + +from aleph.db.accessors.metrics import ( + _build_ccn_rows, + _build_crn_rows, + insert_node_metrics, +) +from aleph.db.models import CcnMetricDb, CrnMetricDb, MessageDb +from aleph.types.channel import Channel +from aleph.types.db_session import DbSession, DbSessionFactory def test_build_crn_rows_full_payload(): @@ -35,40 +47,51 @@ def test_build_crn_rows_full_payload(): def test_build_crn_rows_missing_optional_fields(): - rows = _build_crn_rows("msg-2", [ - {"measured_at": 1700000000.0, "node_id": "node-A"}, - ]) - assert rows == [{ - "item_hash": "msg-2", - "node_id": "node-A", - "measured_at": 1700000000.0, - "base_latency": None, - "base_latency_ipv4": None, - "full_check_latency": None, - "diagnostic_vm_latency": None, - }] + rows = _build_crn_rows( + "msg-2", + [ + {"measured_at": 1700000000.0, "node_id": "node-A"}, + ], + ) + assert rows == [ + { + "item_hash": "msg-2", + "node_id": "node-A", + "measured_at": 1700000000.0, + "base_latency": None, + "base_latency_ipv4": None, + "full_check_latency": None, + "diagnostic_vm_latency": None, + } + ] def test_build_crn_rows_skips_entries_missing_required_fields(): # Without node_id or measured_at, the existing view emits NULL and # the migration's WHERE clause filters those out. The builder skips them. - rows = _build_crn_rows("msg-3", [ - {"measured_at": 1700000000.0}, # missing node_id - {"node_id": "node-A"}, # missing measured_at - {"measured_at": 1700000001.0, "node_id": "ok"}, # valid - ]) + rows = _build_crn_rows( + "msg-3", + [ + {"measured_at": 1700000000.0}, # missing node_id + {"node_id": "node-A"}, # missing measured_at + {"measured_at": 1700000001.0, "node_id": "ok"}, # valid + ], + ) assert len(rows) == 1 assert rows[0]["node_id"] == "ok" def test_build_crn_rows_non_numeric_field_becomes_none(): - rows = _build_crn_rows("msg-4", [ - { - "measured_at": 1700000000.0, - "node_id": "node-A", - "base_latency": "not-a-number", - }, - ]) + rows = _build_crn_rows( + "msg-4", + [ + { + "measured_at": 1700000000.0, + "node_id": "node-A", + "base_latency": "not-a-number", + }, + ], + ) assert rows[0]["base_latency"] is None @@ -77,10 +100,27 @@ def test_build_crn_rows_empty_array(): def test_build_ccn_rows_full_payload(): - rows = _build_ccn_rows("msg-1", [ + rows = _build_ccn_rows( + "msg-1", + [ + { + "measured_at": 1700000000.0, + "node_id": "node-A", + "base_latency": 0.1, + "base_latency_ipv4": 0.11, + "metrics_latency": 0.2, + "aggregate_latency": 0.3, + "file_download_latency": 0.4, + "pending_messages": 42, + "eth_height_remaining": 7, + }, + ], + ) + assert rows == [ { - "measured_at": 1700000000.0, + "item_hash": "msg-1", "node_id": "node-A", + "measured_at": 1700000000.0, "base_latency": 0.1, "base_latency_ipv4": 0.11, "metrics_latency": 0.2, @@ -88,28 +128,114 @@ def test_build_ccn_rows_full_payload(): "file_download_latency": 0.4, "pending_messages": 42, "eth_height_remaining": 7, - }, - ]) - assert rows == [{ - "item_hash": "msg-1", - "node_id": "node-A", - "measured_at": 1700000000.0, - "base_latency": 0.1, - "base_latency_ipv4": 0.11, - "metrics_latency": 0.2, - "aggregate_latency": 0.3, - "file_download_latency": 0.4, - "pending_messages": 42, - "eth_height_remaining": 7, - }] + } + ] def test_build_ccn_rows_non_numeric_pending_messages_becomes_none(): - rows = _build_ccn_rows("msg-2", [ - { - "measured_at": 1700000000.0, - "node_id": "node-A", - "pending_messages": "lots", - }, - ]) + rows = _build_ccn_rows( + "msg-2", + [ + { + "measured_at": 1700000000.0, + "node_id": "node-A", + "pending_messages": "lots", + }, + ], + ) assert rows[0]["pending_messages"] is None + + +def _seed_scoring_message(session: DbSession, item_hash: str) -> None: + """Insert a minimal MessageDb so the FK from metric rows resolves.""" + msg = MessageDb( + item_hash=item_hash, + type=MessageType.post, + chain=Chain.ETH, + sender="0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + channel=Channel("aleph-scoring"), + signature=None, + item_type=ItemType.inline, + item_content="{}", + content={ + "type": "aleph-network-metrics", + "address": "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + "content": {}, + "time": 1700000000.0, + }, + time=dt.datetime(2024, 1, 1, tzinfo=dt.timezone.utc), + size=2, + ) + session.add(msg) + session.flush() + + +def test_insert_node_metrics_writes_crn_and_ccn(session_factory: DbSessionFactory): + item_hash = "abc-test-1" + content = { + "metrics": { + "crn": [ + { + "measured_at": 1700000000.0, + "node_id": "crn-A", + "base_latency": 0.1, + }, + ], + "ccn": [ + { + "measured_at": 1700000001.0, + "node_id": "ccn-A", + "pending_messages": 5, + }, + ], + } + } + with session_factory() as session: + _seed_scoring_message(session, item_hash) + insert_node_metrics(session=session, item_hash=item_hash, content=content) + session.commit() + + crn_rows = list( + session.execute( + select(CrnMetricDb).where(CrnMetricDb.item_hash == item_hash) + ).scalars() + ) + ccn_rows = list( + session.execute( + select(CcnMetricDb).where(CcnMetricDb.item_hash == item_hash) + ).scalars() + ) + + assert len(crn_rows) == 1 + assert crn_rows[0].node_id == "crn-A" + assert crn_rows[0].base_latency == 0.1 + assert len(ccn_rows) == 1 + assert ccn_rows[0].node_id == "ccn-A" + assert ccn_rows[0].pending_messages == 5 + + +def test_insert_node_metrics_missing_metrics_key_is_noop( + session_factory: DbSessionFactory, +): + item_hash = "abc-test-2" + with session_factory() as session: + _seed_scoring_message(session, item_hash) + insert_node_metrics(session=session, item_hash=item_hash, content={}) + session.commit() + + assert ( + list( + session.execute( + select(CrnMetricDb).where(CrnMetricDb.item_hash == item_hash) + ).scalars() + ) + == [] + ) + assert ( + list( + session.execute( + select(CcnMetricDb).where(CcnMetricDb.item_hash == item_hash) + ).scalars() + ) + == [] + ) From 91956d49e098e35ae9fd12bec2f05d38720c5859 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 19 May 2026 00:50:48 +0200 Subject: [PATCH 06/12] db: read node metric queries from new tables instead of views Switch query_metric_crn and query_metric_ccn from select_from(text("*_metric_view")) to ORM column selects against CrnMetricDb / CcnMetricDb, and remove the now-dead _build_metric_filter helper. --- src/aleph/db/accessors/metrics.py | 99 +++++++++++----------------- tests/db/test_metrics_persistence.py | 79 ++++++++++++++++++++++ 2 files changed, 119 insertions(+), 59 deletions(-) diff --git a/src/aleph/db/accessors/metrics.py b/src/aleph/db/accessors/metrics.py index 5e0738ebc..a66970bc1 100644 --- a/src/aleph/db/accessors/metrics.py +++ b/src/aleph/db/accessors/metrics.py @@ -1,9 +1,8 @@ import time from typing import Any, List, Mapping, Optional -from sqlalchemy import insert, select, text +from sqlalchemy import insert, select from sqlalchemy.orm.session import Session -from sqlalchemy.sql import Select from aleph.db.models import CcnMetricDb, CrnMetricDb from aleph.types.db_session import DbSession @@ -120,30 +119,6 @@ def _parse_crn_result(result): return result_dict -def _build_metric_filter( - select_stmt: Select, - node_id: Optional[str], - start_date: Optional[float], - end_date: Optional[float], - sort_order: Optional[SortOrder], -): - if node_id: - select_stmt = select_stmt.where(text("node_id = :node_id")).params( - node_id=node_id - ) - if start_date: - select_stmt = select_stmt.where(text("measured_at >= :start_date")).params( - start_date=start_date - ) - if end_date: - select_stmt = select_stmt.where(text("measured_at <= :end_date")).params( - end_date=end_date - ) - if sort_order: - select_stmt = select_stmt.order_by(text(f"measured_at {sort_order.to_sql()}")) - return select_stmt - - def query_metric_ccn( session: Session, node_id: Optional[str] = None, @@ -158,27 +133,30 @@ def query_metric_ccn( start_date = end_date - 60 * 60 * 24 * 14 select_stmt = select( - text("item_hash"), - text("measured_at"), - text("base_latency"), - text("base_latency_ipv4"), - text("metrics_latency"), - text("aggregate_latency"), - text("file_download_latency"), - text("pending_messages"), - text("eth_height_remaining"), - ).select_from(text("ccn_metric_view")) - - select_stmt = _build_metric_filter( - select_stmt=select_stmt, - node_id=node_id, - start_date=start_date, - end_date=end_date, - sort_order=sort_order, + CcnMetricDb.item_hash, + CcnMetricDb.measured_at, + CcnMetricDb.base_latency, + CcnMetricDb.base_latency_ipv4, + CcnMetricDb.metrics_latency, + CcnMetricDb.aggregate_latency, + CcnMetricDb.file_download_latency, + CcnMetricDb.pending_messages, + CcnMetricDb.eth_height_remaining, ) - result = session.execute(select_stmt).fetchall() + if node_id: + select_stmt = select_stmt.where(CcnMetricDb.node_id == node_id) + if start_date: + select_stmt = select_stmt.where(CcnMetricDb.measured_at >= start_date) + if end_date: + select_stmt = select_stmt.where(CcnMetricDb.measured_at <= end_date) + if sort_order: + order_col = CcnMetricDb.measured_at + select_stmt = select_stmt.order_by( + order_col.asc() if sort_order == SortOrder.ASCENDING else order_col.desc() + ) + result = session.execute(select_stmt).fetchall() return _parse_ccn_result(result=result) @@ -196,24 +174,27 @@ def query_metric_crn( start_date = end_date - 60 * 60 * 24 * 14 select_stmt = select( - text("item_hash"), - text("measured_at"), - text("base_latency"), - text("base_latency_ipv4"), - text("full_check_latency"), - text("diagnostic_vm_latency"), - ).select_from(text("crn_metric_view")) - - select_stmt = _build_metric_filter( - select_stmt=select_stmt, - node_id=node_id, - start_date=start_date, - end_date=end_date, - sort_order=sort_order, + CrnMetricDb.item_hash, + CrnMetricDb.measured_at, + CrnMetricDb.base_latency, + CrnMetricDb.base_latency_ipv4, + CrnMetricDb.full_check_latency, + CrnMetricDb.diagnostic_vm_latency, ) - result = session.execute(select_stmt).fetchall() + if node_id: + select_stmt = select_stmt.where(CrnMetricDb.node_id == node_id) + if start_date: + select_stmt = select_stmt.where(CrnMetricDb.measured_at >= start_date) + if end_date: + select_stmt = select_stmt.where(CrnMetricDb.measured_at <= end_date) + if sort_order: + order_col = CrnMetricDb.measured_at + select_stmt = select_stmt.order_by( + order_col.asc() if sort_order == SortOrder.ASCENDING else order_col.desc() + ) + result = session.execute(select_stmt).fetchall() return _parse_crn_result(result=result) diff --git a/tests/db/test_metrics_persistence.py b/tests/db/test_metrics_persistence.py index 915711f6f..a9bcbac04 100644 --- a/tests/db/test_metrics_persistence.py +++ b/tests/db/test_metrics_persistence.py @@ -7,10 +7,13 @@ _build_ccn_rows, _build_crn_rows, insert_node_metrics, + query_metric_ccn, + query_metric_crn, ) from aleph.db.models import CcnMetricDb, CrnMetricDb, MessageDb from aleph.types.channel import Channel from aleph.types.db_session import DbSession, DbSessionFactory +from aleph.types.sort_order import SortOrder def test_build_crn_rows_full_payload(): @@ -239,3 +242,79 @@ def test_insert_node_metrics_missing_metrics_key_is_noop( ) == [] ) + + +def test_query_metric_crn_filters_by_node_and_date(session_factory: DbSessionFactory): + item_hash = "abc-test-q1" + with session_factory() as session: + _seed_scoring_message(session, item_hash) + session.add_all( + [ + CrnMetricDb( + item_hash=item_hash, + node_id="node-A", + measured_at=100.0, + base_latency=0.1, + ), + CrnMetricDb( + item_hash=item_hash, + node_id="node-A", + measured_at=200.0, + base_latency=0.2, + ), + CrnMetricDb( + item_hash=item_hash, + node_id="node-B", + measured_at=150.0, + base_latency=0.9, + ), + ] + ) + session.commit() + + result = query_metric_crn( + session=session, + node_id="node-A", + start_date=150.0, + end_date=None, + sort_order=SortOrder.ASCENDING, + ) + + # _parse_crn_result returns dict-of-lists keyed by column name. node_id is not exposed. + assert "node_id" not in result + assert result["measured_at"] == [200.0] + assert result["base_latency"] == [0.2] + + +def test_query_metric_ccn_returns_all_columns(session_factory: DbSessionFactory): + item_hash = "abc-test-q2" + with session_factory() as session: + _seed_scoring_message(session, item_hash) + session.add( + CcnMetricDb( + item_hash=item_hash, + node_id="ccn-A", + measured_at=500.0, + base_latency=0.1, + base_latency_ipv4=0.11, + metrics_latency=0.2, + aggregate_latency=0.3, + file_download_latency=0.4, + pending_messages=5, + eth_height_remaining=7, + ) + ) + session.commit() + + result = query_metric_ccn( + session=session, + node_id="ccn-A", + start_date=400.0, + end_date=None, + sort_order=SortOrder.ASCENDING, + ) + + assert result["measured_at"] == [500.0] + assert result["base_latency"] == [0.1] + assert result["pending_messages"] == [5] + assert result["eth_height_remaining"] == [7] From 0edafbedf4666cf41d1f79d5dc695a7182bc316c Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 19 May 2026 01:05:40 +0200 Subject: [PATCH 07/12] handlers: persist node metrics on scoring POSTs Extend PostMessageHandler with three new constructor params (scoring_addresses, scoring_channel, scoring_metrics_post_type) and a scoring branch in process_post that calls insert_node_metrics when a POST matches the allowlist and post type. Wire the config values through BaseMessageHandler. Update all test call sites. --- src/aleph/handlers/content/post.py | 20 +++ src/aleph/handlers/message_handler.py | 3 + .../test_process_forgets.py | 3 + .../message_processing/test_process_posts.py | 15 ++ tests/messages/__init__.py | 0 tests/messages/test_scoring_metrics.py | 157 ++++++++++++++++++ .../test_check_sender_authorization.py | 11 +- 7 files changed, 208 insertions(+), 1 deletion(-) create mode 100644 tests/messages/__init__.py create mode 100644 tests/messages/test_scoring_metrics.py diff --git a/src/aleph/handlers/content/post.py b/src/aleph/handlers/content/post.py index 548e110e4..5eb6a095b 100644 --- a/src/aleph/handlers/content/post.py +++ b/src/aleph/handlers/content/post.py @@ -18,6 +18,7 @@ update_credit_balances_transfer as update_credit_balances_transfer_db, ) from aleph.db.accessors.balances import validate_credit_transfer_balance +from aleph.db.accessors.metrics import insert_node_metrics from aleph.db.accessors.posts import ( delete_amends, delete_post, @@ -201,12 +202,18 @@ def __init__( credit_balances_addresses: List[str], credit_balances_post_types: List[str], credit_balances_channels: List[str], + scoring_addresses: List[str], + scoring_channel: str, + scoring_metrics_post_type: str, ): self.balances_addresses = balances_addresses self.balances_post_type = balances_post_type self.credit_balances_addresses = credit_balances_addresses self.credit_balances_post_types = credit_balances_post_types self.credit_balances_channels = credit_balances_channels + self.scoring_addresses = scoring_addresses + self.scoring_channel = scoring_channel + self.scoring_metrics_post_type = scoring_metrics_post_type async def check_dependencies(self, session: DbSession, message: MessageDb): content = get_post_content(message) @@ -330,6 +337,19 @@ async def process_post(self, session: DbSession, message: MessageDb): ) LOGGER.info("Done updating credit balances") + if ( + content.type == self.scoring_metrics_post_type + and content.address in self.scoring_addresses + and message.channel == self.scoring_channel + and isinstance(content.content, dict) + ): + LOGGER.info("Persisting scoring metrics from %s", message.item_hash) + insert_node_metrics( + session=session, + item_hash=message.item_hash, + content=content.content, + ) + async def process(self, session: DbSession, messages: List[MessageDb]) -> None: for message in messages: diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index c88353199..a51ddf007 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -73,6 +73,9 @@ def __init__( credit_balances_addresses=config.aleph.credit_balances.addresses.value, credit_balances_post_types=config.aleph.credit_balances.post_types.value, credit_balances_channels=config.aleph.credit_balances.channels.value, + scoring_addresses=config.aleph.scoring.addresses.value, + scoring_channel=config.aleph.scoring.channel.value, + scoring_metrics_post_type=config.aleph.scoring.metrics_post_type.value, ), MessageType.program: vm_handler, MessageType.store: StoreMessageHandler( diff --git a/tests/message_processing/test_process_forgets.py b/tests/message_processing/test_process_forgets.py index 4984f4491..8b79028f7 100644 --- a/tests/message_processing/test_process_forgets.py +++ b/tests/message_processing/test_process_forgets.py @@ -46,6 +46,9 @@ def forget_handler(mocker) -> ForgetMessageHandler: credit_balances_addresses=["nope"], credit_balances_post_types=["no-balances-in-tests"], credit_balances_channels=["nope"], + scoring_addresses=[], + scoring_channel="not-a-scoring-channel", + scoring_metrics_post_type="not-a-scoring-post-type", ), MessageType.program: vm_handler, MessageType.store: StoreMessageHandler( diff --git a/tests/message_processing/test_process_posts.py b/tests/message_processing/test_process_posts.py index 25d931214..5a1ed9e38 100644 --- a/tests/message_processing/test_process_posts.py +++ b/tests/message_processing/test_process_posts.py @@ -76,6 +76,9 @@ async def test_forget_original_post( credit_balances_addresses=[], credit_balances_post_types=["no-credit-balances-today"], credit_balances_channels=["nope"], + scoring_addresses=[], + scoring_channel="not-a-scoring-channel", + scoring_metrics_post_type="not-a-scoring-post-type", ) with session_factory() as session: original_message = get_message_by_item_hash( @@ -197,6 +200,9 @@ async def test_credit_transfer_non_whitelisted_sender( "aleph_credit_expense", ], credit_balances_channels=["ALEPH_CREDIT"], + scoring_addresses=[], + scoring_channel="not-a-scoring-channel", + scoring_metrics_post_type="not-a-scoring-post-type", ) with session_factory() as session: @@ -268,6 +274,9 @@ async def test_credit_transfer_insufficient_balance_rejected( "aleph_credit_expense", ], credit_balances_channels=["ALEPH_CREDIT"], + scoring_addresses=[], + scoring_channel="not-a-scoring-channel", + scoring_metrics_post_type="not-a-scoring-post-type", ) with session_factory() as session: @@ -302,6 +311,9 @@ async def test_credit_distribution_non_whitelisted_sender_ignored( "aleph_credit_expense", ], credit_balances_channels=["ALEPH_CREDIT"], + scoring_addresses=[], + scoring_channel="not-a-scoring-channel", + scoring_metrics_post_type="not-a-scoring-post-type", ) with session_factory() as session: @@ -334,6 +346,9 @@ def _make_handler() -> PostMessageHandler: "aleph_credit_expense", ], credit_balances_channels=["ALEPH_CREDIT"], + scoring_addresses=[], + scoring_channel="not-a-scoring-channel", + scoring_metrics_post_type="not-a-scoring-post-type", ) diff --git a/tests/messages/__init__.py b/tests/messages/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/messages/test_scoring_metrics.py b/tests/messages/test_scoring_metrics.py new file mode 100644 index 000000000..4af6b405b --- /dev/null +++ b/tests/messages/test_scoring_metrics.py @@ -0,0 +1,157 @@ +import datetime as dt + +import pytest +from aleph_message.models import Chain, ItemType, MessageType +from sqlalchemy import select + +from aleph.db.models import CcnMetricDb, CrnMetricDb, MessageDb +from aleph.handlers.content.post import PostMessageHandler +from aleph.types.channel import Channel +from aleph.types.db_session import DbSessionFactory + +SCORING_SENDER = "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4" +SCORING_CHANNEL = "aleph-scoring" +SCORING_POST_TYPE = "aleph-network-metrics" + + +def _make_scoring_message( + item_hash: str, content_type: str, address: str, channel: str +) -> MessageDb: + payload = { + "type": content_type, + "address": address, + "time": 1700000000.0, + "content": { + "metrics": { + "crn": [ + { + "measured_at": 1700000000.0, + "node_id": "crn-A", + "base_latency": 0.1, + }, + ], + "ccn": [ + { + "measured_at": 1700000000.0, + "node_id": "ccn-A", + "pending_messages": 3, + }, + ], + }, + }, + } + return MessageDb( + item_hash=item_hash, + type=MessageType.post, + chain=Chain.ETH, + sender=address, + channel=Channel(channel), + signature=None, + item_type=ItemType.inline, + item_content="{}", + content=payload, + time=dt.datetime(2024, 1, 1, tzinfo=dt.timezone.utc), + size=2, + ) + + +def _handler() -> PostMessageHandler: + return PostMessageHandler( + balances_addresses=[], + balances_post_type="no-balances-today", + credit_balances_addresses=[], + credit_balances_post_types=["no-credit-balances-today"], + credit_balances_channels=["nope"], + scoring_addresses=[SCORING_SENDER], + scoring_channel=SCORING_CHANNEL, + scoring_metrics_post_type=SCORING_POST_TYPE, + ) + + +@pytest.mark.asyncio +async def test_scoring_post_inserts_metric_rows(session_factory: DbSessionFactory): + handler = _handler() + msg = _make_scoring_message( + item_hash="hash-1", + content_type=SCORING_POST_TYPE, + address=SCORING_SENDER, + channel=SCORING_CHANNEL, + ) + with session_factory() as session: + session.add(msg) + session.flush() + await handler.process(session=session, messages=[msg]) + session.commit() + + crn = list(session.execute(select(CrnMetricDb)).scalars()) + ccn = list(session.execute(select(CcnMetricDb)).scalars()) + + assert [(r.item_hash, r.node_id, r.base_latency) for r in crn] == [ + ("hash-1", "crn-A", 0.1) + ] + assert [(r.item_hash, r.node_id, r.pending_messages) for r in ccn] == [ + ("hash-1", "ccn-A", 3) + ] + + +@pytest.mark.asyncio +async def test_non_scoring_post_type_does_not_insert_metrics( + session_factory: DbSessionFactory, +): + handler = _handler() + msg = _make_scoring_message( + item_hash="hash-2", + content_type="something-else", + address=SCORING_SENDER, + channel=SCORING_CHANNEL, + ) + with session_factory() as session: + session.add(msg) + session.flush() + await handler.process(session=session, messages=[msg]) + session.commit() + + assert list(session.execute(select(CrnMetricDb)).scalars()) == [] + assert list(session.execute(select(CcnMetricDb)).scalars()) == [] + + +@pytest.mark.asyncio +async def test_non_allowlisted_sender_does_not_insert_metrics( + session_factory: DbSessionFactory, +): + handler = _handler() + msg = _make_scoring_message( + item_hash="hash-3", + content_type=SCORING_POST_TYPE, + address="0xSomeoneElse00000000000000000000000000", + channel=SCORING_CHANNEL, + ) + with session_factory() as session: + session.add(msg) + session.flush() + await handler.process(session=session, messages=[msg]) + session.commit() + + assert list(session.execute(select(CrnMetricDb)).scalars()) == [] + assert list(session.execute(select(CcnMetricDb)).scalars()) == [] + + +@pytest.mark.asyncio +async def test_wrong_channel_does_not_insert_metrics( + session_factory: DbSessionFactory, +): + handler = _handler() + msg = _make_scoring_message( + item_hash="hash-4", + content_type=SCORING_POST_TYPE, + address=SCORING_SENDER, + channel="wrong-channel", + ) + with session_factory() as session: + session.add(msg) + session.flush() + await handler.process(session=session, messages=[msg]) + session.commit() + + assert list(session.execute(select(CrnMetricDb)).scalars()) == [] + assert list(session.execute(select(CcnMetricDb)).scalars()) == [] diff --git a/tests/permissions/test_check_sender_authorization.py b/tests/permissions/test_check_sender_authorization.py index be4cd2a85..58bfd6492 100644 --- a/tests/permissions/test_check_sender_authorization.py +++ b/tests/permissions/test_check_sender_authorization.py @@ -510,7 +510,16 @@ async def test_amend_different_owner_denied(mocker, session_factory: DbSessionFa ) # Create the handler - handler = PostMessageHandler([], "", [], [], []) + handler = PostMessageHandler( + balances_addresses=[], + balances_post_type="", + credit_balances_addresses=[], + credit_balances_post_types=[], + credit_balances_channels=[], + scoring_addresses=[], + scoring_channel="not-a-scoring-channel", + scoring_metrics_post_type="not-a-scoring-post-type", + ) # Test that the permission check fails due to owner mismatch with session_factory() as session: From 9940eab70ddc054718379c2e22ff319859c726d3 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 19 May 2026 09:47:07 +0200 Subject: [PATCH 08/12] test: verify FK cascade deletes metric rows on message delete --- tests/messages/test_scoring_metrics.py | 66 ++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/tests/messages/test_scoring_metrics.py b/tests/messages/test_scoring_metrics.py index 4af6b405b..8ee5d5959 100644 --- a/tests/messages/test_scoring_metrics.py +++ b/tests/messages/test_scoring_metrics.py @@ -155,3 +155,69 @@ async def test_wrong_channel_does_not_insert_metrics( assert list(session.execute(select(CrnMetricDb)).scalars()) == [] assert list(session.execute(select(CcnMetricDb)).scalars()) == [] + + +@pytest.mark.asyncio +async def test_forget_cascades_to_metric_rows(session_factory: DbSessionFactory): + handler = _handler() + msg = _make_scoring_message( + item_hash="hash-forget", + content_type=SCORING_POST_TYPE, + address=SCORING_SENDER, + channel=SCORING_CHANNEL, + ) + with session_factory() as session: + session.add(msg) + session.flush() + await handler.process(session=session, messages=[msg]) + session.commit() + + # Sanity: rows are there + assert ( + len( + list( + session.execute( + select(CrnMetricDb).where( + CrnMetricDb.item_hash == "hash-forget" + ) + ).scalars() + ) + ) + == 1 + ) + assert ( + len( + list( + session.execute( + select(CcnMetricDb).where( + CcnMetricDb.item_hash == "hash-forget" + ) + ).scalars() + ) + ) + == 1 + ) + + # Now delete the source message and verify cascade. + with session_factory() as session: + existing = session.get(MessageDb, "hash-forget") + assert existing is not None + session.delete(existing) + session.commit() + + assert ( + list( + session.execute( + select(CrnMetricDb).where(CrnMetricDb.item_hash == "hash-forget") + ).scalars() + ) + == [] + ) + assert ( + list( + session.execute( + select(CcnMetricDb).where(CcnMetricDb.item_hash == "hash-forget") + ).scalars() + ) + == [] + ) From d7de374a94b63098000632dc1324546430968595 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Tue, 19 May 2026 10:19:16 +0200 Subject: [PATCH 09/12] test: verify metrics API serves rows from persisted tables Add API contract tests for /api/v0/compute/{node_id}/metrics (CRN) and /api/v0/core/{node_id}/metrics (CCN) confirming that both endpoints serve rows from the new CrnMetricDb/CcnMetricDb tables and return 404 for unknown nodes. Fix two related regressions introduced when Task 6 switched the query accessors from the old JSONB views to the new tables: - Add default ORDER BY measured_at ASC to query_metric_ccn and query_metric_crn so that callers without an explicit sort parameter get deterministic, ascending results (matching the implicit ordering the old views provided). - Extend _load_fixtures in tests/api/conftest.py to call insert_node_metrics for aleph-network-metrics messages so that the existing test_new_metric.py tests can find data in the new tables. --- src/aleph/db/accessors/metrics.py | 22 ++-- tests/api/conftest.py | 13 +++ tests/web/controllers/test_metrics_api.py | 131 ++++++++++++++++++++++ 3 files changed, 155 insertions(+), 11 deletions(-) create mode 100644 tests/web/controllers/test_metrics_api.py diff --git a/src/aleph/db/accessors/metrics.py b/src/aleph/db/accessors/metrics.py index a66970bc1..1458d6164 100644 --- a/src/aleph/db/accessors/metrics.py +++ b/src/aleph/db/accessors/metrics.py @@ -124,7 +124,7 @@ def query_metric_ccn( node_id: Optional[str] = None, start_date: Optional[float] = None, end_date: Optional[float] = None, - sort_order: Optional[SortOrder] = None, + sort_order: Optional[SortOrderForMetrics] = None, ): # Default to the last 2 weeks from now, or 2 weeks before the `end_date`. if not start_date and not end_date: @@ -150,11 +150,11 @@ def query_metric_ccn( select_stmt = select_stmt.where(CcnMetricDb.measured_at >= start_date) if end_date: select_stmt = select_stmt.where(CcnMetricDb.measured_at <= end_date) - if sort_order: - order_col = CcnMetricDb.measured_at - select_stmt = select_stmt.order_by( - order_col.asc() if sort_order == SortOrder.ASCENDING else order_col.desc() - ) + order_col = CcnMetricDb.measured_at + if sort_order == SortOrder.DESCENDING: + select_stmt = select_stmt.order_by(order_col.desc()) + else: + select_stmt = select_stmt.order_by(order_col.asc()) result = session.execute(select_stmt).fetchall() return _parse_ccn_result(result=result) @@ -188,11 +188,11 @@ def query_metric_crn( select_stmt = select_stmt.where(CrnMetricDb.measured_at >= start_date) if end_date: select_stmt = select_stmt.where(CrnMetricDb.measured_at <= end_date) - if sort_order: - order_col = CrnMetricDb.measured_at - select_stmt = select_stmt.order_by( - order_col.asc() if sort_order == SortOrder.ASCENDING else order_col.desc() - ) + order_col = CrnMetricDb.measured_at + if sort_order == SortOrder.DESCENDING: + select_stmt = select_stmt.order_by(order_col.desc()) + else: + select_stmt = select_stmt.order_by(order_col.asc()) result = session.execute(select_stmt).fetchall() return _parse_crn_result(result=result) diff --git a/tests/api/conftest.py b/tests/api/conftest.py index cf57307dd..a8029bea3 100644 --- a/tests/api/conftest.py +++ b/tests/api/conftest.py @@ -18,6 +18,7 @@ from aleph.chains.signature_verifier import SignatureVerifier from aleph.db.accessors.aggregates import refresh_aggregate +from aleph.db.accessors.metrics import insert_node_metrics from aleph.db.models import ( AggregateElementDb, ChainTxDb, @@ -74,6 +75,18 @@ async def _load_fixtures( ) session.add(message_status) + # Populate metric tables for aleph-network-metrics messages so that + # the query_metric_ccn / query_metric_crn accessors (which now read + # the new persisted tables) can find the data during tests. + content = message_dict.get("content", {}) + if content.get("type") == "aleph-network-metrics": + session.flush() + insert_node_metrics( + session, + item_hash=message_dict["item_hash"], + content=content.get("content", {}), + ) + session.commit() return messages_json if raw else messages diff --git a/tests/web/controllers/test_metrics_api.py b/tests/web/controllers/test_metrics_api.py new file mode 100644 index 000000000..4b1cd22a3 --- /dev/null +++ b/tests/web/controllers/test_metrics_api.py @@ -0,0 +1,131 @@ +""" +API contract tests for the persisted-metrics endpoints. + +/api/v0/compute/{node_id}/metrics -- CRN +/api/v0/core/{node_id}/metrics -- CCN + +These tests verify that the endpoints serve rows from the CrnMetricDb / +CcnMetricDb tables and preserve the existing response shape. + +Note: both accessors default to a 14-day lookback window when no start_date +is given. To avoid that filter we pass ?start_date=0 on every request so all +rows in the table are visible regardless of their measured_at value. +""" + +import datetime as dt + +import pytest +from aleph_message.models import Chain, ItemType, MessageType + +from aleph.db.models import CcnMetricDb, CrnMetricDb, MessageDb +from aleph.types.channel import Channel +from aleph.types.db_session import DbSessionFactory + +SCORING_SENDER = "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4" +_MEASURED_AT = 1700000000.0 # Nov 2023 -- well outside the default 14-day window + +# The accessors use `if not start_date` to decide whether to apply the 14-day +# lookback window. Passing start_date=0 would be falsy and still trigger the +# lookback. Use 1 (1970-01-01 00:00:01 UTC) instead: truthy, before any real +# metric, so every row is included. +_START_DATE_BYPASS = "start_date=1" + + +def _seed_message(session, item_hash: str) -> None: + """Insert a minimal processed MessageDb that metric rows can FK into.""" + session.add( + MessageDb( + item_hash=item_hash, + type=MessageType.post, + chain=Chain.ETH, + sender=SCORING_SENDER, + channel=Channel("aleph-scoring"), + signature=None, + item_type=ItemType.inline, + item_content="{}", + content={ + "type": "aleph-network-metrics", + "address": SCORING_SENDER, + "content": {}, + "time": 0.0, + }, + time=dt.datetime(2024, 1, 1, tzinfo=dt.timezone.utc), + size=2, + ) + ) + session.flush() + + +@pytest.mark.asyncio +async def test_get_crn_metrics_returns_persisted_rows( + ccn_api_client, session_factory: DbSessionFactory +): + node_id = "crn-node-X" + with session_factory() as session: + _seed_message(session, "h-crn") + session.add( + CrnMetricDb( + item_hash="h-crn", + node_id=node_id, + measured_at=_MEASURED_AT, + base_latency=0.1, + base_latency_ipv4=0.11, + full_check_latency=0.2, + diagnostic_vm_latency=0.3, + ) + ) + session.commit() + + response = await ccn_api_client.get( + f"/api/v0/compute/{node_id}/metrics?{_START_DATE_BYPASS}" + ) + assert response.status == 200, await response.text() + body = await response.json() + assert "metrics" in body + metrics = body["metrics"] + assert metrics["measured_at"] == [_MEASURED_AT] + assert metrics["base_latency"] == [0.1] + assert metrics["full_check_latency"] == [0.2] + + +@pytest.mark.asyncio +async def test_get_ccn_metrics_returns_persisted_rows( + ccn_api_client, session_factory: DbSessionFactory +): + node_id = "ccn-node-Y" + with session_factory() as session: + _seed_message(session, "h-ccn") + session.add( + CcnMetricDb( + item_hash="h-ccn", + node_id=node_id, + measured_at=_MEASURED_AT, + base_latency=0.1, + base_latency_ipv4=0.11, + metrics_latency=0.2, + aggregate_latency=0.3, + file_download_latency=0.4, + pending_messages=5, + eth_height_remaining=7, + ) + ) + session.commit() + + response = await ccn_api_client.get( + f"/api/v0/core/{node_id}/metrics?{_START_DATE_BYPASS}" + ) + assert response.status == 200, await response.text() + body = await response.json() + assert "metrics" in body + metrics = body["metrics"] + assert metrics["measured_at"] == [_MEASURED_AT] + assert metrics["pending_messages"] == [5] + assert metrics["eth_height_remaining"] == [7] + + +@pytest.mark.asyncio +async def test_get_crn_metrics_404_when_node_unknown(ccn_api_client): + response = await ccn_api_client.get( + f"/api/v0/compute/no-such-node/metrics?{_START_DATE_BYPASS}" + ) + assert response.status == 404 From 3eb729d07c70aff2dc2a714d3f4a747d4ff484f3 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 20 May 2026 01:48:26 +0200 Subject: [PATCH 10/12] perf(migration): build crn/ccn_metrics indexes and FK after backfill The original ordering created both tables with their indexes and FK to messages first, then ran the INSERT. On a populated CCN, the backfill fans out via jsonb_array_elements to roughly scoring_messages * nodes_per_msg rows, and every insert paid for two B-tree updates plus an FK lookup against the 5M-row messages table. Single-transaction alembic ran the whole thing without checkpoint relief, so WAL growth and random IO dominated. Reorder upgrade() to: 1. create the two heap tables (no indexes, no FK) 2. run both INSERT...SELECT against bare tables 3. CREATE INDEX in one shot on populated tables (sorted bulk build) 4. ADD CONSTRAINT ... NOT VALID then VALIDATE CONSTRAINT The end state is identical: same indexes, same FK semantics (VALIDATE-d FK enforces existing rows and future inserts), same ON DELETE CASCADE. Tested via existing metrics-persistence and metrics-API suites; the FK cascade test still passes. --- .../0059_b9c4f1e6a2d7_persist_node_metrics.py | 79 ++++++++++++------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py index 1d4321b1e..ccb27b283 100644 --- a/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py +++ b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py @@ -6,6 +6,14 @@ Creates crn_metrics and ccn_metrics tables, backfills them from the existing JSON-unnesting views, then drops the views. + +Backfill ordering: tables are created without indexes or FK, the bulk +INSERT runs against bare heap tables, then indexes are built in one +shot on the populated tables (sorted bulk build) and the FK is added +NOT VALID + VALIDATE-d separately. This avoids per-row index +maintenance and per-row FK lookups against messages during the +backfill, which fans out to roughly scoring_messages * nodes_per_msg +rows. """ import sqlalchemy as sa @@ -22,12 +30,7 @@ def upgrade() -> None: op.create_table( "crn_metrics", sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True), - sa.Column( - "item_hash", - sa.String, - sa.ForeignKey("messages.item_hash", ondelete="CASCADE"), - nullable=False, - ), + sa.Column("item_hash", sa.String, nullable=False), sa.Column("node_id", sa.String, nullable=False), sa.Column("measured_at", sa.Float, nullable=False), sa.Column("base_latency", sa.Float), @@ -35,24 +38,10 @@ def upgrade() -> None: sa.Column("full_check_latency", sa.Float), sa.Column("diagnostic_vm_latency", sa.Float), ) - op.create_index( - "ix_crn_metrics_item_hash", "crn_metrics", ["item_hash"] - ) - op.create_index( - "ix_crn_metrics_node_id_measured_at", - "crn_metrics", - ["node_id", sa.text("measured_at DESC")], - ) - op.create_table( "ccn_metrics", sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True), - sa.Column( - "item_hash", - sa.String, - sa.ForeignKey("messages.item_hash", ondelete="CASCADE"), - nullable=False, - ), + sa.Column("item_hash", sa.String, nullable=False), sa.Column("node_id", sa.String, nullable=False), sa.Column("measured_at", sa.Float, nullable=False), sa.Column("base_latency", sa.Float), @@ -63,14 +52,6 @@ def upgrade() -> None: sa.Column("pending_messages", sa.Integer), sa.Column("eth_height_remaining", sa.Integer), ) - op.create_index( - "ix_ccn_metrics_item_hash", "ccn_metrics", ["item_hash"] - ) - op.create_index( - "ix_ccn_metrics_node_id_measured_at", - "ccn_metrics", - ["node_id", sa.text("measured_at DESC")], - ) op.execute( text( @@ -105,6 +86,46 @@ def upgrade() -> None: ) ) + op.create_index("ix_crn_metrics_item_hash", "crn_metrics", ["item_hash"]) + op.create_index( + "ix_crn_metrics_node_id_measured_at", + "crn_metrics", + ["node_id", sa.text("measured_at DESC")], + ) + op.create_index("ix_ccn_metrics_item_hash", "ccn_metrics", ["item_hash"]) + op.create_index( + "ix_ccn_metrics_node_id_measured_at", + "ccn_metrics", + ["node_id", sa.text("measured_at DESC")], + ) + + op.execute( + text( + """ + ALTER TABLE crn_metrics + ADD CONSTRAINT fk_crn_metrics_item_hash + FOREIGN KEY (item_hash) REFERENCES messages(item_hash) ON DELETE CASCADE + NOT VALID + """ + ) + ) + op.execute( + text( + """ + ALTER TABLE ccn_metrics + ADD CONSTRAINT fk_ccn_metrics_item_hash + FOREIGN KEY (item_hash) REFERENCES messages(item_hash) ON DELETE CASCADE + NOT VALID + """ + ) + ) + op.execute( + text("ALTER TABLE crn_metrics VALIDATE CONSTRAINT fk_crn_metrics_item_hash") + ) + op.execute( + text("ALTER TABLE ccn_metrics VALIDATE CONSTRAINT fk_ccn_metrics_item_hash") + ) + op.execute(text("DROP VIEW IF EXISTS crn_metric_view")) op.execute(text("DROP VIEW IF EXISTS ccn_metric_view")) From 22166efbeb42846b0859290938eafebf0d3e901b Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 20 May 2026 23:51:02 +0200 Subject: [PATCH 11/12] perf(metrics): range-partition crn/ccn_metrics on TIMESTAMPTZ measured_at MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The unpartitioned tables produced by the earlier 0059 grew to ~88M rows / 30GB+ on epyc, covering 3 years of scoring history with no way to expire old data short of a full DELETE + VACUUM. Reshape the tables to time-range partitioning so retention becomes a DROP PARTITION instead of a long DELETE, and bound the initial backfill to a 12-month horizon. Schema ------ * crn_metrics and ccn_metrics are now PARTITION BY RANGE (measured_at) * measured_at is TIMESTAMPTZ (was DOUBLE PRECISION epoch); PK is composite (id, measured_at) because PG requires the partition key in any PK on a partitioned table * Initial coverage: 13 monthly partitions [now-12mo, now+1mo) plus a DEFAULT catch-all * FK to messages(item_hash) ON DELETE CASCADE is added after backfill. PG does not yet support NOT VALID FKs on partitioned tables so the ADD CONSTRAINT validates in one pass — still cheaper than the per-INSERT FK check we'd pay with the constraint present during the bulk SELECT * The 0059 migration also seeds a 'metrics_partition' row in cron_jobs Cron ---- * New MetricsPartitionCronJob runs daily and (a) pre-creates the next N months of partitions, (b) DETACHes + DROPs partitions whose upper bound is past the retention cutoff, (c) warns if the DEFAULT partition ever gets rows (operational signal that lookahead is too short) * Both responsibilities are idempotent; steady-state runs are no-ops * aleph.toolkit.partitions holds the shared month-bounds / naming helpers used by both migration and cron so they agree on partition_name() formatting Application layer ----------------- * CrnMetricDb/CcnMetricDb.measured_at switched to dt.datetime * _build_crn_rows/_build_ccn_rows convert epoch float -> UTC datetime * query_metric_*() still accept and return epoch floats (unchanged API contract); conversion happens at the accessor boundary * config.aleph.scoring grows retention_months and partition_lookahead_months keys Tests ----- * 27 metrics-related tests pass: builder/accessor adapted for TIMESTAMPTZ; new cron tests cover steady-state no-op, create-missing- future, drop-past-cutoff, DEFAULT-fills warning, partition routing; FK CASCADE still verified through tests/messages/test_scoring_metrics --- .../0059_b9c4f1e6a2d7_persist_node_metrics.py | 195 +++++++++---- src/aleph/commands.py | 6 + src/aleph/config.py | 8 + src/aleph/db/accessors/metrics.py | 63 ++++- src/aleph/db/models/metrics.py | 17 +- src/aleph/jobs/cron/metrics_partition_job.py | 199 ++++++++++++++ src/aleph/toolkit/partitions.py | 52 ++++ tests/db/test_metrics_persistence.py | 17 +- tests/jobs/test_metrics_partition_job.py | 259 ++++++++++++++++++ tests/web/controllers/test_metrics_api.py | 5 +- 10 files changed, 744 insertions(+), 77 deletions(-) create mode 100644 src/aleph/jobs/cron/metrics_partition_job.py create mode 100644 src/aleph/toolkit/partitions.py create mode 100644 tests/jobs/test_metrics_partition_job.py diff --git a/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py index ccb27b283..8f52efa49 100644 --- a/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py +++ b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py @@ -1,91 +1,167 @@ -"""Persist node scoring metrics +"""Persist node scoring metrics in range-partitioned tables. Revision ID: b9c4f1e6a2d7 Revises: 4f1e8d2a6c3b Create Date: 2026-05-18 -Creates crn_metrics and ccn_metrics tables, backfills them from the -existing JSON-unnesting views, then drops the views. +Creates crn_metrics and ccn_metrics as RANGE-partitioned tables on +measured_at (TIMESTAMPTZ), with monthly child partitions, a DEFAULT +catch-all partition, and a 12-month retention horizon at backfill. +Backfills from the existing JSON-unnesting views (filtered to the +retention window), then drops the views. Seeds a cron job that +maintains partitions (creates next month, drops past-cutoff). -Backfill ordering: tables are created without indexes or FK, the bulk -INSERT runs against bare heap tables, then indexes are built in one -shot on the populated tables (sorted bulk build) and the FK is added -NOT VALID + VALIDATE-d separately. This avoids per-row index -maintenance and per-row FK lookups against messages during the -backfill, which fans out to roughly scoring_messages * nodes_per_msg -rows. +Partition design: +* Partition key: measured_at TIMESTAMPTZ +* Granularity: 1 month +* Initial coverage: [now - 12 months, now + 1 month) (13 child partitions) +* Plus a DEFAULT partition as a safety net for out-of-range timestamps +* PK is (id, measured_at) because Postgres requires the partition key in + any PK on a partitioned table; id stays a BIGSERIAL sequence column +* Indexes: ix_*_item_hash, ix_*_node_id_measured_at (both built on + populated tables after backfill, replicated to children via the + partitioned-index mechanism) +* FK to messages(item_hash) ON DELETE CASCADE, added NOT VALID then + VALIDATE after backfill to skip per-row check during the bulk load + +The retention cron (metrics_partition) runs daily and is responsible +for creating next month's partition and dropping past-cutoff +partitions. See aleph.jobs.cron.metrics_partition_job. """ +import datetime as dt + import sqlalchemy as sa from alembic import op from sqlalchemy import text +from aleph.toolkit.partitions import ( + add_months, + month_floor, + monthly_bounds, + partition_name, + ts_literal, +) + revision = "b9c4f1e6a2d7" down_revision = "4f1e8d2a6c3b" branch_labels = None depends_on = None +RETENTION_MONTHS = 12 +LOOKAHEAD_MONTHS = 1 + + def upgrade() -> None: - op.create_table( - "crn_metrics", - sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True), - sa.Column("item_hash", sa.String, nullable=False), - sa.Column("node_id", sa.String, nullable=False), - sa.Column("measured_at", sa.Float, nullable=False), - sa.Column("base_latency", sa.Float), - sa.Column("base_latency_ipv4", sa.Float), - sa.Column("full_check_latency", sa.Float), - sa.Column("diagnostic_vm_latency", sa.Float), + now_month = month_floor(dt.datetime.now(tz=dt.timezone.utc)) + cutoff = add_months(now_month, -RETENTION_MONTHS) + upper_bound = add_months(now_month, LOOKAHEAD_MONTHS + 1) + + # 1. Create both parent tables (PARTITION BY RANGE on measured_at). + op.execute( + text( + """ + CREATE TABLE crn_metrics ( + id BIGINT GENERATED BY DEFAULT AS IDENTITY, + measured_at TIMESTAMPTZ NOT NULL, + item_hash TEXT NOT NULL, + node_id TEXT NOT NULL, + base_latency DOUBLE PRECISION, + base_latency_ipv4 DOUBLE PRECISION, + full_check_latency DOUBLE PRECISION, + diagnostic_vm_latency DOUBLE PRECISION, + PRIMARY KEY (id, measured_at) + ) PARTITION BY RANGE (measured_at) + """ + ) ) - op.create_table( - "ccn_metrics", - sa.Column("id", sa.BigInteger, primary_key=True, autoincrement=True), - sa.Column("item_hash", sa.String, nullable=False), - sa.Column("node_id", sa.String, nullable=False), - sa.Column("measured_at", sa.Float, nullable=False), - sa.Column("base_latency", sa.Float), - sa.Column("base_latency_ipv4", sa.Float), - sa.Column("metrics_latency", sa.Float), - sa.Column("aggregate_latency", sa.Float), - sa.Column("file_download_latency", sa.Float), - sa.Column("pending_messages", sa.Integer), - sa.Column("eth_height_remaining", sa.Integer), + op.execute( + text( + """ + CREATE TABLE ccn_metrics ( + id BIGINT GENERATED BY DEFAULT AS IDENTITY, + measured_at TIMESTAMPTZ NOT NULL, + item_hash TEXT NOT NULL, + node_id TEXT NOT NULL, + base_latency DOUBLE PRECISION, + base_latency_ipv4 DOUBLE PRECISION, + metrics_latency DOUBLE PRECISION, + aggregate_latency DOUBLE PRECISION, + file_download_latency DOUBLE PRECISION, + pending_messages INTEGER, + eth_height_remaining INTEGER, + PRIMARY KEY (id, measured_at) + ) PARTITION BY RANGE (measured_at) + """ + ) ) + # 2. Create monthly child partitions for [cutoff, upper_bound). + for table in ("crn_metrics", "ccn_metrics"): + for lower, upper in monthly_bounds(cutoff, upper_bound): + op.execute( + text( + f"CREATE TABLE {partition_name(table, lower)} " + f"PARTITION OF {table} " + f"FOR VALUES FROM ('{ts_literal(lower)}') " + f"TO ('{ts_literal(upper)}')" + ) + ) + # DEFAULT partition: catch-all for timestamps outside the rolling + # range (e.g. far-future scoring posts). The cron job alerts if + # this ever holds rows; data here is hard to migrate to a proper + # partition without DETACH + INSERT, so it should stay empty in + # steady state. + op.execute(text(f"CREATE TABLE {table}_default PARTITION OF {table} DEFAULT")) + + # 3. Backfill from the existing views, filtered to the retention + # window. Cast the float epoch to TIMESTAMPTZ in the SELECT so + # partition routing picks the right child. + cutoff_epoch = cutoff.timestamp() op.execute( text( - """ + f""" INSERT INTO crn_metrics ( - item_hash, node_id, measured_at, base_latency, base_latency_ipv4, + item_hash, measured_at, node_id, base_latency, base_latency_ipv4, full_check_latency, diagnostic_vm_latency ) SELECT - item_hash, node_id, measured_at, base_latency, base_latency_ipv4, + item_hash, + to_timestamp(measured_at) AT TIME ZONE 'UTC' AS measured_at, + node_id, base_latency, base_latency_ipv4, full_check_latency, diagnostic_vm_latency FROM crn_metric_view - WHERE node_id IS NOT NULL AND measured_at IS NOT NULL + WHERE node_id IS NOT NULL + AND measured_at IS NOT NULL + AND measured_at >= {cutoff_epoch} """ ) ) op.execute( text( - """ + f""" INSERT INTO ccn_metrics ( - item_hash, node_id, measured_at, base_latency, base_latency_ipv4, + item_hash, measured_at, node_id, base_latency, base_latency_ipv4, metrics_latency, aggregate_latency, file_download_latency, pending_messages, eth_height_remaining ) SELECT - item_hash, node_id, measured_at, base_latency, base_latency_ipv4, + item_hash, + to_timestamp(measured_at) AT TIME ZONE 'UTC' AS measured_at, + node_id, base_latency, base_latency_ipv4, metrics_latency, aggregate_latency, file_download_latency, pending_messages, eth_height_remaining FROM ccn_metric_view - WHERE node_id IS NOT NULL AND measured_at IS NOT NULL + WHERE node_id IS NOT NULL + AND measured_at IS NOT NULL + AND measured_at >= {cutoff_epoch} """ ) ) + # 4. Build indexes on the parent (partitioned-index, cascades to + # every child). op.create_index("ix_crn_metrics_item_hash", "crn_metrics", ["item_hash"]) op.create_index( "ix_crn_metrics_node_id_measured_at", @@ -99,13 +175,16 @@ def upgrade() -> None: ["node_id", sa.text("measured_at DESC")], ) + # 5. FK to messages. PG does not support NOT VALID on partitioned + # tables (yet, as of PG 17). The ADD CONSTRAINT validates each + # child partition in one pass — still faster than the per-INSERT + # FK check we'd pay if the constraint were present during step 3. op.execute( text( """ ALTER TABLE crn_metrics ADD CONSTRAINT fk_crn_metrics_item_hash FOREIGN KEY (item_hash) REFERENCES messages(item_hash) ON DELETE CASCADE - NOT VALID """ ) ) @@ -115,22 +194,30 @@ def upgrade() -> None: ALTER TABLE ccn_metrics ADD CONSTRAINT fk_ccn_metrics_item_hash FOREIGN KEY (item_hash) REFERENCES messages(item_hash) ON DELETE CASCADE - NOT VALID """ ) ) - op.execute( - text("ALTER TABLE crn_metrics VALIDATE CONSTRAINT fk_crn_metrics_item_hash") - ) - op.execute( - text("ALTER TABLE ccn_metrics VALIDATE CONSTRAINT fk_ccn_metrics_item_hash") - ) op.execute(text("DROP VIEW IF EXISTS crn_metric_view")) op.execute(text("DROP VIEW IF EXISTS ccn_metric_view")) + # 6. Seed the cron-job row that drives partition maintenance. + # Daily interval. last_run set to epoch so the first scheduler tick + # runs the job (creates next month if missing, drops anything that + # slipped past retention). + op.execute( + text( + """ + INSERT INTO cron_jobs(id, interval, last_run) + VALUES ('metrics_partition', 86400, '1970-01-01 00:00:00+00') + """ + ) + ) + def downgrade() -> None: + op.execute(text("DELETE FROM cron_jobs WHERE id = 'metrics_partition'")) + op.execute( text( """ @@ -178,9 +265,7 @@ def downgrade() -> None: """ ) ) - op.drop_index("ix_ccn_metrics_node_id_measured_at", table_name="ccn_metrics") - op.drop_index("ix_ccn_metrics_item_hash", table_name="ccn_metrics") - op.drop_table("ccn_metrics") - op.drop_index("ix_crn_metrics_node_id_measured_at", table_name="crn_metrics") - op.drop_index("ix_crn_metrics_item_hash", table_name="crn_metrics") - op.drop_table("crn_metrics") + + # Dropping the parent cascades to all child partitions and the FK. + op.execute(text("DROP TABLE IF EXISTS ccn_metrics CASCADE")) + op.execute(text("DROP TABLE IF EXISTS crn_metrics CASCADE")) diff --git a/src/aleph/commands.py b/src/aleph/commands.py index 79f247d0b..e1173ab1f 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -33,6 +33,7 @@ from aleph.jobs.cron.balance_job import BalanceCronJob from aleph.jobs.cron.credit_balance_job import CreditBalanceCronJob from aleph.jobs.cron.cron_job import CronJob, cron_job_task +from aleph.jobs.cron.metrics_partition_job import MetricsPartitionCronJob from aleph.network import listener_tasks from aleph.repair import repair_node from aleph.services import p2p @@ -171,6 +172,11 @@ async def main(args: List[str]) -> None: session_factory=session_factory, max_unauthenticated_upload_file_size=config.storage.max_unauthenticated_upload_file_size.value, ), + "metrics_partition": MetricsPartitionCronJob( + session_factory=session_factory, + retention_months=config.aleph.scoring.retention_months.value, + lookahead_months=config.aleph.scoring.partition_lookahead_months.value, + ), }, ) chain_data_service = ChainDataService( diff --git a/src/aleph/config.py b/src/aleph/config.py index dd3f08588..a5ffb74a0 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -68,6 +68,14 @@ def get_defaults(): "channel": "aleph-scoring", # POST message type that carries the node metrics payload. "metrics_post_type": "aleph-network-metrics", + # Retention horizon for crn_metrics / ccn_metrics. Partitions + # whose upper bound is older than this are detached and dropped + # by the metrics_partition cron job. + "retention_months": 12, + # How many months ahead of "now" to keep partitions + # pre-created. Guards against incoming scoring posts + # falling into the DEFAULT catch-all partition. + "partition_lookahead_months": 1, }, "jobs": { "pending_messages": { diff --git a/src/aleph/db/accessors/metrics.py b/src/aleph/db/accessors/metrics.py index 1458d6164..70453188e 100644 --- a/src/aleph/db/accessors/metrics.py +++ b/src/aleph/db/accessors/metrics.py @@ -1,3 +1,4 @@ +import datetime as dt import time from typing import Any, List, Mapping, Optional @@ -27,6 +28,31 @@ def _coerce_int(value: Any) -> Optional[int]: return None +def _coerce_measured_at(value: Any) -> Optional[dt.datetime]: + """Scoring payloads carry measured_at as a Unix epoch number. The DB + column is TIMESTAMPTZ so the partition key can be a real time. Return + None on anything that isn't a usable timestamp.""" + epoch = _coerce_float(value) + if epoch is None: + return None + try: + return dt.datetime.fromtimestamp(epoch, tz=dt.timezone.utc) + except (OverflowError, OSError, ValueError): + return None + + +def _epoch_to_datetime(value: Optional[float]) -> Optional[dt.datetime]: + if value is None: + return None + return dt.datetime.fromtimestamp(value, tz=dt.timezone.utc) + + +def _datetime_to_epoch(value: Optional[dt.datetime]) -> Optional[float]: + if value is None: + return None + return value.timestamp() + + def _build_crn_rows( item_hash: str, crn_array: List[Mapping[str, Any]] ) -> List[Mapping[str, Any]]: @@ -35,7 +61,7 @@ def _build_crn_rows( if not isinstance(entry, Mapping): continue node_id = entry.get("node_id") - measured_at = _coerce_float(entry.get("measured_at")) + measured_at = _coerce_measured_at(entry.get("measured_at")) if node_id is None or measured_at is None: continue rows.append( @@ -62,7 +88,7 @@ def _build_ccn_rows( if not isinstance(entry, Mapping): continue node_id = entry.get("node_id") - measured_at = _coerce_float(entry.get("measured_at")) + measured_at = _coerce_measured_at(entry.get("measured_at")) if node_id is None or measured_at is None: continue rows.append( @@ -100,6 +126,12 @@ def _parse_ccn_result(result): # Transpose the result and create a dictionary result_dict = {key: list(values) for key, values in zip(keys, zip(*result))} + # API contract serializes measured_at as epoch seconds. + if "measured_at" in result_dict: + result_dict["measured_at"] = [ + _datetime_to_epoch(v) for v in result_dict["measured_at"] + ] + return result_dict @@ -116,6 +148,11 @@ def _parse_crn_result(result): # Transpose the result and create a dictionary result_dict = {key: list(values) for key, values in zip(keys, zip(*result))} + if "measured_at" in result_dict: + result_dict["measured_at"] = [ + _datetime_to_epoch(v) for v in result_dict["measured_at"] + ] + return result_dict @@ -132,6 +169,9 @@ def query_metric_ccn( elif end_date and not start_date: start_date = end_date - 60 * 60 * 24 * 14 + start_dt = _epoch_to_datetime(start_date) + end_dt = _epoch_to_datetime(end_date) + select_stmt = select( CcnMetricDb.item_hash, CcnMetricDb.measured_at, @@ -146,10 +186,10 @@ def query_metric_ccn( if node_id: select_stmt = select_stmt.where(CcnMetricDb.node_id == node_id) - if start_date: - select_stmt = select_stmt.where(CcnMetricDb.measured_at >= start_date) - if end_date: - select_stmt = select_stmt.where(CcnMetricDb.measured_at <= end_date) + if start_dt: + select_stmt = select_stmt.where(CcnMetricDb.measured_at >= start_dt) + if end_dt: + select_stmt = select_stmt.where(CcnMetricDb.measured_at <= end_dt) order_col = CcnMetricDb.measured_at if sort_order == SortOrder.DESCENDING: select_stmt = select_stmt.order_by(order_col.desc()) @@ -173,6 +213,9 @@ def query_metric_crn( elif end_date and not start_date: start_date = end_date - 60 * 60 * 24 * 14 + start_dt = _epoch_to_datetime(start_date) + end_dt = _epoch_to_datetime(end_date) + select_stmt = select( CrnMetricDb.item_hash, CrnMetricDb.measured_at, @@ -184,10 +227,10 @@ def query_metric_crn( if node_id: select_stmt = select_stmt.where(CrnMetricDb.node_id == node_id) - if start_date: - select_stmt = select_stmt.where(CrnMetricDb.measured_at >= start_date) - if end_date: - select_stmt = select_stmt.where(CrnMetricDb.measured_at <= end_date) + if start_dt: + select_stmt = select_stmt.where(CrnMetricDb.measured_at >= start_dt) + if end_dt: + select_stmt = select_stmt.where(CrnMetricDb.measured_at <= end_dt) order_col = CrnMetricDb.measured_at if sort_order == SortOrder.DESCENDING: select_stmt = select_stmt.order_by(order_col.desc()) diff --git a/src/aleph/db/models/metrics.py b/src/aleph/db/models/metrics.py index 722e2b035..c3b1c6ecf 100644 --- a/src/aleph/db/models/metrics.py +++ b/src/aleph/db/models/metrics.py @@ -1,22 +1,31 @@ +import datetime as dt from typing import Optional -from sqlalchemy import BigInteger, Float, ForeignKey, Integer, String +from sqlalchemy import TIMESTAMP, BigInteger, Float, ForeignKey, Integer, String from sqlalchemy.orm import Mapped, mapped_column from .base import Base +# crn_metrics and ccn_metrics are RANGE-partitioned on measured_at at the +# Postgres level. Partition DDL lives in the alembic migration; SQLAlchemy +# only sees the logical parent table. The PK is composite (id, measured_at) +# because Postgres requires the partition key to be part of any PK on a +# partitioned table; id is still a BIGSERIAL sequence column. + class CrnMetricDb(Base): __tablename__ = "crn_metrics" id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + measured_at: Mapped[dt.datetime] = mapped_column( + TIMESTAMP(timezone=True), primary_key=True, nullable=False + ) item_hash: Mapped[str] = mapped_column( ForeignKey("messages.item_hash", ondelete="CASCADE"), nullable=False, index=True, ) node_id: Mapped[str] = mapped_column(String, nullable=False) - measured_at: Mapped[float] = mapped_column(Float, nullable=False) base_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) base_latency_ipv4: Mapped[Optional[float]] = mapped_column(Float, nullable=True) full_check_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) @@ -27,13 +36,15 @@ class CcnMetricDb(Base): __tablename__ = "ccn_metrics" id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + measured_at: Mapped[dt.datetime] = mapped_column( + TIMESTAMP(timezone=True), primary_key=True, nullable=False + ) item_hash: Mapped[str] = mapped_column( ForeignKey("messages.item_hash", ondelete="CASCADE"), nullable=False, index=True, ) node_id: Mapped[str] = mapped_column(String, nullable=False) - measured_at: Mapped[float] = mapped_column(Float, nullable=False) base_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) base_latency_ipv4: Mapped[Optional[float]] = mapped_column(Float, nullable=True) metrics_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) diff --git a/src/aleph/jobs/cron/metrics_partition_job.py b/src/aleph/jobs/cron/metrics_partition_job.py new file mode 100644 index 000000000..f302b8938 --- /dev/null +++ b/src/aleph/jobs/cron/metrics_partition_job.py @@ -0,0 +1,199 @@ +"""Cron job that maintains monthly partitions of crn_metrics and +ccn_metrics. + +Two responsibilities per run: + +1. Pre-create the next ``LOOKAHEAD_MONTHS`` worth of monthly partitions + if they don't already exist. This guarantees there's always a real + partition ready for incoming scoring posts, so writes never have to + fall back to the DEFAULT catch-all partition. + +2. Detach + drop partitions whose upper bound is older than the + retention cutoff (``RETENTION_MONTHS`` ago). DETACH first so the + parent table only briefly holds an ACCESS EXCLUSIVE lock; the + subsequent DROP only touches the (now-standalone) child table. + +Both operations are idempotent. A run that finds the next partition +already present and nothing past the cutoff is a no-op. + +The DEFAULT partition is left untouched. If it ever contains rows the +cron logs a warning (operational signal that the lookahead is too +short or that out-of-range data is arriving).""" + +import datetime as dt +import logging +from typing import Iterable, List, Tuple + +from sqlalchemy import text + +from aleph.db.models.cron_jobs import CronJobDb +from aleph.jobs.cron.cron_job import BaseCronJob +from aleph.toolkit.partitions import ( + add_months, + month_floor, + monthly_bounds, + partition_name, + ts_literal, +) +from aleph.types.db_session import DbSession, DbSessionFactory + +LOGGER = logging.getLogger(__name__) + +PARTITIONED_TABLES = ("crn_metrics", "ccn_metrics") + + +class MetricsPartitionCronJob(BaseCronJob): + """Roll monthly partitions forward for the metrics tables. + + :param session_factory: DB session factory. + :param retention_months: Drop partitions whose upper bound is older + than ``now - retention_months``. + :param lookahead_months: Ensure partitions exist up to and including + ``now + lookahead_months``. + """ + + def __init__( + self, + session_factory: DbSessionFactory, + retention_months: int, + lookahead_months: int, + ): + self.session_factory = session_factory + self.retention_months = retention_months + self.lookahead_months = lookahead_months + + async def run(self, now: dt.datetime, job: CronJobDb) -> None: + now_month = month_floor(now) + cutoff = add_months(now_month, -self.retention_months) + # Lookahead is inclusive: ensure partition for now_month + N + # exists, so range becomes [..., now_month + N + 1). + lookahead_upper = add_months(now_month, self.lookahead_months + 1) + + with self.session_factory() as session: + for table in PARTITIONED_TABLES: + self._ensure_partitions(session, table, now_month, lookahead_upper) + self._drop_past_cutoff(session, table, cutoff) + self._warn_if_default_has_rows(session, table) + session.commit() + + @staticmethod + def _ensure_partitions( + session: DbSession, + table: str, + start: dt.datetime, + end_exclusive: dt.datetime, + ) -> None: + """Create any missing monthly partitions in [start, end_exclusive).""" + existing = _list_partitions(session, table) + existing_names = {name for name, _, _ in existing} + for lower, upper in monthly_bounds(start, end_exclusive): + name = partition_name(table, lower) + if name in existing_names: + continue + LOGGER.info( + "Creating partition %s on %s for [%s, %s)", + name, + table, + lower.isoformat(), + upper.isoformat(), + ) + session.execute( + text( + f"CREATE TABLE {name} PARTITION OF {table} " + f"FOR VALUES FROM ('{ts_literal(lower)}') " + f"TO ('{ts_literal(upper)}')" + ) + ) + + @staticmethod + def _drop_past_cutoff(session: DbSession, table: str, cutoff: dt.datetime) -> None: + """DETACH + DROP partitions whose upper bound is <= cutoff. + + DETACH briefly takes ACCESS EXCLUSIVE on the parent, then the + DROP only touches the now-standalone child. Metrics tables are + not on a latency-sensitive read path so plain DETACH is fine; + CONCURRENTLY would require autocommit, which the cron's + transactional session doesn't offer.""" + for name, lower, upper in _list_partitions(session, table): + if upper is None or lower is None: + # The DEFAULT partition has no bounds. Skip. + continue + if upper <= cutoff: + LOGGER.info( + "Dropping partition %s on %s (upper=%s <= cutoff=%s)", + name, + table, + upper.isoformat(), + cutoff.isoformat(), + ) + session.execute(text(f"ALTER TABLE {table} DETACH PARTITION {name}")) + session.execute(text(f"DROP TABLE {name}")) + + @staticmethod + def _warn_if_default_has_rows(session: DbSession, table: str) -> None: + default_name = f"{table}_default" + result = session.execute(text(f"SELECT count(*) FROM {default_name}")).scalar() + if result and result > 0: + LOGGER.warning( + "DEFAULT partition %s holds %s rows. Lookahead may be too " + "short, or out-of-range timestamps are arriving.", + default_name, + result, + ) + + +def _list_partitions( + session: DbSession, parent: str +) -> List[Tuple[str, dt.datetime, dt.datetime]]: + """Return (child_name, lower_bound, upper_bound) for every existing + partition of `parent`. The DEFAULT partition appears with + (name, None, None).""" + rows: Iterable = session.execute( + text( + """ + SELECT c.relname AS child_name, + pg_get_expr(c.relpartbound, c.oid) AS bound_expr + FROM pg_inherits i + JOIN pg_class p ON p.oid = i.inhparent + JOIN pg_class c ON c.oid = i.inhrelid + WHERE p.relname = :parent + """ + ), + {"parent": parent}, + ).fetchall() + + out: List[Tuple[str, dt.datetime, dt.datetime]] = [] + for name, expr in rows: + bounds = _parse_bound_expr(expr) + if bounds is None: + out.append((name, None, None)) # type: ignore[arg-type] + else: + lower, upper = bounds + out.append((name, lower, upper)) + return out + + +def _parse_bound_expr(expr: str): + """Parse pg_get_expr output for a RANGE partition. + + Examples: + FOR VALUES FROM ('2026-05-01 00:00:00+00') TO ('2026-06-01 00:00:00+00') + DEFAULT + """ + if expr is None or "DEFAULT" in expr: + return None + # The expression is well-formed Postgres output; parse the two + # quoted timestamps in order. + parts = expr.split("'") + if len(parts) < 5: + return None + try: + lower = dt.datetime.fromisoformat(parts[1]) + upper = dt.datetime.fromisoformat(parts[3]) + except ValueError: + return None + if lower.tzinfo is None: + lower = lower.replace(tzinfo=dt.timezone.utc) + if upper.tzinfo is None: + upper = upper.replace(tzinfo=dt.timezone.utc) + return lower, upper diff --git a/src/aleph/toolkit/partitions.py b/src/aleph/toolkit/partitions.py new file mode 100644 index 000000000..59601557f --- /dev/null +++ b/src/aleph/toolkit/partitions.py @@ -0,0 +1,52 @@ +"""Helpers for monthly RANGE-partitioned tables on a TIMESTAMPTZ column. + +Used by: +* the alembic migration that creates crn_metrics / ccn_metrics +* the metrics_partition cron job that maintains them (create next month, + drop past-cutoff) + +Keeping the naming and bounds logic in one place means migration-time +partitions and cron-created partitions follow identical conventions. +""" + +import datetime as dt +from typing import List, Tuple + + +def month_floor(d: dt.datetime) -> dt.datetime: + """First instant of d's month, UTC.""" + return dt.datetime(d.year, d.month, 1, tzinfo=dt.timezone.utc) + + +def add_months(d: dt.datetime, months: int) -> dt.datetime: + """Shift d by N calendar months (positive or negative). Snaps to the + first day of the resulting month.""" + total = d.month - 1 + months + year = d.year + total // 12 + month = total % 12 + 1 + return dt.datetime(year, month, 1, tzinfo=dt.timezone.utc) + + +def monthly_bounds( + start: dt.datetime, end_exclusive: dt.datetime +) -> List[Tuple[dt.datetime, dt.datetime]]: + """List of [lower, upper) month-aligned ranges from start to + end_exclusive. Both arguments should already be at month + boundaries.""" + bounds: List[Tuple[dt.datetime, dt.datetime]] = [] + cursor = start + while cursor < end_exclusive: + upper = add_months(cursor, 1) + bounds.append((cursor, upper)) + cursor = upper + return bounds + + +def partition_name(table: str, lower: dt.datetime) -> str: + """Naming convention: _YYYY_MM, e.g. crn_metrics_2026_05.""" + return f"{table}_{lower.strftime('%Y_%m')}" + + +def ts_literal(d: dt.datetime) -> str: + """TIMESTAMPTZ literal suitable for inlining into DDL.""" + return d.strftime("%Y-%m-%d %H:%M:%S%z") diff --git a/tests/db/test_metrics_persistence.py b/tests/db/test_metrics_persistence.py index a9bcbac04..e1d89cc90 100644 --- a/tests/db/test_metrics_persistence.py +++ b/tests/db/test_metrics_persistence.py @@ -15,6 +15,9 @@ from aleph.types.db_session import DbSession, DbSessionFactory from aleph.types.sort_order import SortOrder +_TS_1 = dt.datetime.fromtimestamp(1700000000.0, tz=dt.timezone.utc) +_TS_2 = dt.datetime.fromtimestamp(1700000001.0, tz=dt.timezone.utc) + def test_build_crn_rows_full_payload(): item_hash = "msg-1" @@ -41,7 +44,7 @@ def test_build_crn_rows_full_payload(): assert rows[0] == { "item_hash": "msg-1", "node_id": "node-A", - "measured_at": 1700000000.0, + "measured_at": _TS_1, "base_latency": 0.1, "base_latency_ipv4": 0.11, "full_check_latency": 0.2, @@ -60,7 +63,7 @@ def test_build_crn_rows_missing_optional_fields(): { "item_hash": "msg-2", "node_id": "node-A", - "measured_at": 1700000000.0, + "measured_at": _TS_1, "base_latency": None, "base_latency_ipv4": None, "full_check_latency": None, @@ -123,7 +126,7 @@ def test_build_ccn_rows_full_payload(): { "item_hash": "msg-1", "node_id": "node-A", - "measured_at": 1700000000.0, + "measured_at": _TS_1, "base_latency": 0.1, "base_latency_ipv4": 0.11, "metrics_latency": 0.2, @@ -253,19 +256,19 @@ def test_query_metric_crn_filters_by_node_and_date(session_factory: DbSessionFac CrnMetricDb( item_hash=item_hash, node_id="node-A", - measured_at=100.0, + measured_at=dt.datetime.fromtimestamp(100.0, tz=dt.timezone.utc), base_latency=0.1, ), CrnMetricDb( item_hash=item_hash, node_id="node-A", - measured_at=200.0, + measured_at=dt.datetime.fromtimestamp(200.0, tz=dt.timezone.utc), base_latency=0.2, ), CrnMetricDb( item_hash=item_hash, node_id="node-B", - measured_at=150.0, + measured_at=dt.datetime.fromtimestamp(150.0, tz=dt.timezone.utc), base_latency=0.9, ), ] @@ -294,7 +297,7 @@ def test_query_metric_ccn_returns_all_columns(session_factory: DbSessionFactory) CcnMetricDb( item_hash=item_hash, node_id="ccn-A", - measured_at=500.0, + measured_at=dt.datetime.fromtimestamp(500.0, tz=dt.timezone.utc), base_latency=0.1, base_latency_ipv4=0.11, metrics_latency=0.2, diff --git a/tests/jobs/test_metrics_partition_job.py b/tests/jobs/test_metrics_partition_job.py new file mode 100644 index 000000000..c8167f5c9 --- /dev/null +++ b/tests/jobs/test_metrics_partition_job.py @@ -0,0 +1,259 @@ +"""Tests for the metrics_partition cron job. + +The migration creates partitions [now-12mo, now+1mo) at test time, so a +cron run with the same retention/lookahead is a no-op in steady state. +We test the moving parts by passing custom retention/lookahead values +that force the job to either create or drop partitions. +""" + +import datetime as dt + +import pytest +from aleph_message.models import Chain, ItemType, MessageType +from sqlalchemy import text + +from aleph.db.models import CrnMetricDb, MessageDb +from aleph.db.models.cron_jobs import CronJobDb +from aleph.jobs.cron.metrics_partition_job import ( + MetricsPartitionCronJob, + _list_partitions, + _parse_bound_expr, +) +from aleph.toolkit.partitions import add_months, month_floor, partition_name +from aleph.types.channel import Channel +from aleph.types.db_session import DbSessionFactory + + +def _now_month() -> dt.datetime: + return month_floor(dt.datetime.now(tz=dt.timezone.utc)) + + +def _list(session, table): + return _list_partitions(session, table) + + +def test_parse_bound_expr_range(): + expr = "FOR VALUES FROM ('2026-05-01 00:00:00+00') " "TO ('2026-06-01 00:00:00+00')" + lower, upper = _parse_bound_expr(expr) + assert lower == dt.datetime(2026, 5, 1, tzinfo=dt.timezone.utc) + assert upper == dt.datetime(2026, 6, 1, tzinfo=dt.timezone.utc) + + +def test_parse_bound_expr_default_returns_none(): + assert _parse_bound_expr("DEFAULT") is None + + +def test_parse_bound_expr_with_type_cast(): + # pg_get_expr may include the type annotation, e.g. when the column + # type isn't unambiguous from context. + expr = ( + "FOR VALUES FROM ('2026-05-01 00:00:00+00'::timestamp with time zone) " + "TO ('2026-06-01 00:00:00+00'::timestamp with time zone)" + ) + lower, upper = _parse_bound_expr(expr) + assert lower == dt.datetime(2026, 5, 1, tzinfo=dt.timezone.utc) + assert upper == dt.datetime(2026, 6, 1, tzinfo=dt.timezone.utc) + + +def _make_cron_row() -> CronJobDb: + return CronJobDb( + id="metrics_partition", + interval=86400, + last_run=dt.datetime(2000, 1, 1, tzinfo=dt.timezone.utc), + ) + + +@pytest.mark.asyncio +async def test_cron_is_noop_in_steady_state(session_factory: DbSessionFactory): + """With the same retention/lookahead the migration used, a run + against a fresh DB should not change the partition list.""" + job = MetricsPartitionCronJob( + session_factory=session_factory, + retention_months=12, + lookahead_months=1, + ) + with session_factory() as session: + before_crn = sorted(n for n, _, _ in _list(session, "crn_metrics")) + before_ccn = sorted(n for n, _, _ in _list(session, "ccn_metrics")) + + await job.run(now=dt.datetime.now(tz=dt.timezone.utc), job=_make_cron_row()) + + with session_factory() as session: + after_crn = sorted(n for n, _, _ in _list(session, "crn_metrics")) + after_ccn = sorted(n for n, _, _ in _list(session, "ccn_metrics")) + + assert before_crn == after_crn + assert before_ccn == after_ccn + + +@pytest.mark.asyncio +async def test_cron_creates_missing_future_partition( + session_factory: DbSessionFactory, +): + """Bumping the lookahead forces the cron to add a new partition for + the month beyond what the migration already created.""" + now = dt.datetime.now(tz=dt.timezone.utc) + expected_extra_month = add_months(month_floor(now), 2) + expected_name = partition_name("crn_metrics", expected_extra_month) + + with session_factory() as session: + existing = {n for n, _, _ in _list(session, "crn_metrics")} + assert expected_name not in existing + + job = MetricsPartitionCronJob( + session_factory=session_factory, + retention_months=12, + lookahead_months=2, + ) + await job.run(now=now, job=_make_cron_row()) + + with session_factory() as session: + after = {n for n, _, _ in _list(session, "crn_metrics")} + assert expected_name in after + + +@pytest.mark.asyncio +async def test_cron_drops_past_cutoff_partitions( + session_factory: DbSessionFactory, +): + """Tightening the retention horizon drops the now-out-of-range + monthly partitions via DETACH + DROP.""" + now = dt.datetime.now(tz=dt.timezone.utc) + now_month = month_floor(now) + # With retention=1mo, only the partition starting now_month should remain. + # Partitions older than (now_month - 1mo) should be dropped. + new_cutoff = add_months(now_month, -1) + + with session_factory() as session: + all_partitions = _list(session, "crn_metrics") + to_drop = [ + name + for name, lower, upper in all_partitions + if upper is not None and upper <= new_cutoff + ] + assert to_drop, "Expected at least one partition past the new cutoff" + + job = MetricsPartitionCronJob( + session_factory=session_factory, + retention_months=1, + lookahead_months=1, + ) + await job.run(now=now, job=_make_cron_row()) + + with session_factory() as session: + remaining = {n for n, _, _ in _list(session, "crn_metrics")} + + for name in to_drop: + assert name not in remaining + + +@pytest.mark.asyncio +async def test_cron_warns_when_default_has_rows( + session_factory: DbSessionFactory, caplog +): + """If a row lands in the DEFAULT partition, the next cron run logs + a warning so operators notice.""" + item_hash = "msg-default-warn" + far_future = dt.datetime(2099, 1, 1, tzinfo=dt.timezone.utc) + with session_factory() as session: + session.add( + MessageDb( + item_hash=item_hash, + type=MessageType.post, + chain=Chain.ETH, + sender="0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + channel=Channel("aleph-scoring"), + signature=None, + item_type=ItemType.inline, + item_content="{}", + content={ + "type": "aleph-network-metrics", + "address": "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + "content": {}, + "time": 0.0, + }, + time=dt.datetime(2024, 1, 1, tzinfo=dt.timezone.utc), + size=2, + ) + ) + session.add( + CrnMetricDb( + item_hash=item_hash, + node_id="node-far-future", + measured_at=far_future, + base_latency=0.1, + ) + ) + session.commit() + + # Sanity: the row landed in the DEFAULT partition. + default_count = session.execute( + text("SELECT count(*) FROM crn_metrics_default") + ).scalar() + assert default_count == 1 + + job = MetricsPartitionCronJob( + session_factory=session_factory, + retention_months=12, + lookahead_months=1, + ) + with caplog.at_level("WARNING"): + await job.run(now=dt.datetime.now(tz=dt.timezone.utc), job=_make_cron_row()) + + warnings = [r for r in caplog.records if "DEFAULT partition" in r.getMessage()] + assert warnings, "Expected a warning about DEFAULT partition having rows" + + +@pytest.mark.asyncio +async def test_partition_routing_places_rows_in_correct_child( + session_factory: DbSessionFactory, +): + """Inserting a row with measured_at inside a child partition's + range should land in that child, not the DEFAULT.""" + now = dt.datetime.now(tz=dt.timezone.utc) + in_range_ts = month_floor(now) + dt.timedelta(days=2) + expected_child = partition_name("crn_metrics", month_floor(now)) + + item_hash = "msg-routing" + with session_factory() as session: + session.add( + MessageDb( + item_hash=item_hash, + type=MessageType.post, + chain=Chain.ETH, + sender="0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + channel=Channel("aleph-scoring"), + signature=None, + item_type=ItemType.inline, + item_content="{}", + content={ + "type": "aleph-network-metrics", + "address": "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + "content": {}, + "time": 0.0, + }, + time=dt.datetime(2024, 1, 1, tzinfo=dt.timezone.utc), + size=2, + ) + ) + session.add( + CrnMetricDb( + item_hash=item_hash, + node_id="node-routed", + measured_at=in_range_ts, + base_latency=0.5, + ) + ) + session.commit() + + child_count = session.execute( + text(f"SELECT count(*) FROM {expected_child} WHERE item_hash = :h"), + {"h": item_hash}, + ).scalar() + default_count = session.execute( + text("SELECT count(*) FROM crn_metrics_default WHERE item_hash = :h"), + {"h": item_hash}, + ).scalar() + + assert child_count == 1 + assert default_count == 0 diff --git a/tests/web/controllers/test_metrics_api.py b/tests/web/controllers/test_metrics_api.py index 4b1cd22a3..1b47f3bc4 100644 --- a/tests/web/controllers/test_metrics_api.py +++ b/tests/web/controllers/test_metrics_api.py @@ -23,6 +23,7 @@ SCORING_SENDER = "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4" _MEASURED_AT = 1700000000.0 # Nov 2023 -- well outside the default 14-day window +_MEASURED_AT_DT = dt.datetime.fromtimestamp(_MEASURED_AT, tz=dt.timezone.utc) # The accessors use `if not start_date` to decide whether to apply the 14-day # lookback window. Passing start_date=0 would be falsy and still trigger the @@ -67,7 +68,7 @@ async def test_get_crn_metrics_returns_persisted_rows( CrnMetricDb( item_hash="h-crn", node_id=node_id, - measured_at=_MEASURED_AT, + measured_at=_MEASURED_AT_DT, base_latency=0.1, base_latency_ipv4=0.11, full_check_latency=0.2, @@ -99,7 +100,7 @@ async def test_get_ccn_metrics_returns_persisted_rows( CcnMetricDb( item_hash="h-ccn", node_id=node_id, - measured_at=_MEASURED_AT, + measured_at=_MEASURED_AT_DT, base_latency=0.1, base_latency_ipv4=0.11, metrics_latency=0.2, From b3645be6440b6a2dfee676e0f3e5890c5f30ab75 Mon Sep 17 00:00:00 2001 From: Olivier Desenfans Date: Wed, 20 May 2026 23:58:50 +0200 Subject: [PATCH 12/12] review: fix migration docstring and reject empty node_id in builders Two fixes from review of the partitioning rework: * Migration docstring claimed FK was added NOT VALID + VALIDATE, but PG (as of 17) does not support NOT VALID on partitioned tables and the code does plain ADD CONSTRAINT after backfill. Update the docstring to match the code. * _build_crn_rows / _build_ccn_rows skipped entries with node_id IS NULL but accepted empty-string node_ids, which the API can never look up. Tighten to 'if not node_id' so missing and empty are treated the same way. Add a regression test. Pre-existing typing asymmetry between query_metric_crn (node_id: str) and query_metric_ccn (node_id: Optional[str]) is left alone here; not introduced by this PR and out of scope for the partitioning work. --- .../0059_b9c4f1e6a2d7_persist_node_metrics.py | 7 +++++-- src/aleph/db/accessors/metrics.py | 8 ++++++-- tests/db/test_metrics_persistence.py | 13 +++++++++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py index 8f52efa49..b1eb78d60 100644 --- a/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py +++ b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py @@ -21,8 +21,11 @@ * Indexes: ix_*_item_hash, ix_*_node_id_measured_at (both built on populated tables after backfill, replicated to children via the partitioned-index mechanism) -* FK to messages(item_hash) ON DELETE CASCADE, added NOT VALID then - VALIDATE after backfill to skip per-row check during the bulk load +* FK to messages(item_hash) ON DELETE CASCADE, added after backfill. + PG (as of 17) does not support NOT VALID on partitioned tables, so + the ADD CONSTRAINT validates each child in one pass; this is still + cheaper than the per-row FK check we'd pay if the constraint were + present during the bulk INSERT. The retention cron (metrics_partition) runs daily and is responsible for creating next month's partition and dropping past-cutoff diff --git a/src/aleph/db/accessors/metrics.py b/src/aleph/db/accessors/metrics.py index 70453188e..798cd2e67 100644 --- a/src/aleph/db/accessors/metrics.py +++ b/src/aleph/db/accessors/metrics.py @@ -62,7 +62,9 @@ def _build_crn_rows( continue node_id = entry.get("node_id") measured_at = _coerce_measured_at(entry.get("measured_at")) - if node_id is None or measured_at is None: + # Reject missing-or-empty node_id: an empty string is unusable + # for the (node_id, measured_at) lookups the API serves. + if not node_id or measured_at is None: continue rows.append( { @@ -89,7 +91,9 @@ def _build_ccn_rows( continue node_id = entry.get("node_id") measured_at = _coerce_measured_at(entry.get("measured_at")) - if node_id is None or measured_at is None: + # Reject missing-or-empty node_id: an empty string is unusable + # for the (node_id, measured_at) lookups the API serves. + if not node_id or measured_at is None: continue rows.append( { diff --git a/tests/db/test_metrics_persistence.py b/tests/db/test_metrics_persistence.py index e1d89cc90..cb736647f 100644 --- a/tests/db/test_metrics_persistence.py +++ b/tests/db/test_metrics_persistence.py @@ -87,6 +87,19 @@ def test_build_crn_rows_skips_entries_missing_required_fields(): assert rows[0]["node_id"] == "ok" +def test_build_crn_rows_skips_empty_node_id(): + # An empty-string node_id is unusable for the API's per-node + # lookups, so the builder treats it the same as a missing one. + rows = _build_crn_rows( + "msg-empty", + [ + {"measured_at": 1700000000.0, "node_id": ""}, + {"measured_at": 1700000001.0, "node_id": "ok"}, + ], + ) + assert [r["node_id"] for r in rows] == ["ok"] + + def test_build_crn_rows_non_numeric_field_becomes_none(): rows = _build_crn_rows( "msg-4",