diff --git a/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py new file mode 100644 index 000000000..b1eb78d60 --- /dev/null +++ b/deployment/migrations/versions/0059_b9c4f1e6a2d7_persist_node_metrics.py @@ -0,0 +1,274 @@ +"""Persist node scoring metrics in range-partitioned tables. + +Revision ID: b9c4f1e6a2d7 +Revises: 4f1e8d2a6c3b +Create Date: 2026-05-18 + +Creates crn_metrics and ccn_metrics as RANGE-partitioned tables on +measured_at (TIMESTAMPTZ), with monthly child partitions, a DEFAULT +catch-all partition, and a 12-month retention horizon at backfill. +Backfills from the existing JSON-unnesting views (filtered to the +retention window), then drops the views. Seeds a cron job that +maintains partitions (creates next month, drops past-cutoff). + +Partition design: +* Partition key: measured_at TIMESTAMPTZ +* Granularity: 1 month +* Initial coverage: [now - 12 months, now + 1 month) (13 child partitions) +* Plus a DEFAULT partition as a safety net for out-of-range timestamps +* PK is (id, measured_at) because Postgres requires the partition key in + any PK on a partitioned table; id stays a BIGSERIAL sequence column +* Indexes: ix_*_item_hash, ix_*_node_id_measured_at (both built on + populated tables after backfill, replicated to children via the + partitioned-index mechanism) +* FK to messages(item_hash) ON DELETE CASCADE, added after backfill. + PG (as of 17) does not support NOT VALID on partitioned tables, so + the ADD CONSTRAINT validates each child in one pass; this is still + cheaper than the per-row FK check we'd pay if the constraint were + present during the bulk INSERT. + +The retention cron (metrics_partition) runs daily and is responsible +for creating next month's partition and dropping past-cutoff +partitions. See aleph.jobs.cron.metrics_partition_job. +""" + +import datetime as dt + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import text + +from aleph.toolkit.partitions import ( + add_months, + month_floor, + monthly_bounds, + partition_name, + ts_literal, +) + +revision = "b9c4f1e6a2d7" +down_revision = "4f1e8d2a6c3b" +branch_labels = None +depends_on = None + + +RETENTION_MONTHS = 12 +LOOKAHEAD_MONTHS = 1 + + +def upgrade() -> None: + now_month = month_floor(dt.datetime.now(tz=dt.timezone.utc)) + cutoff = add_months(now_month, -RETENTION_MONTHS) + upper_bound = add_months(now_month, LOOKAHEAD_MONTHS + 1) + + # 1. Create both parent tables (PARTITION BY RANGE on measured_at). + op.execute( + text( + """ + CREATE TABLE crn_metrics ( + id BIGINT GENERATED BY DEFAULT AS IDENTITY, + measured_at TIMESTAMPTZ NOT NULL, + item_hash TEXT NOT NULL, + node_id TEXT NOT NULL, + base_latency DOUBLE PRECISION, + base_latency_ipv4 DOUBLE PRECISION, + full_check_latency DOUBLE PRECISION, + diagnostic_vm_latency DOUBLE PRECISION, + PRIMARY KEY (id, measured_at) + ) PARTITION BY RANGE (measured_at) + """ + ) + ) + op.execute( + text( + """ + CREATE TABLE ccn_metrics ( + id BIGINT GENERATED BY DEFAULT AS IDENTITY, + measured_at TIMESTAMPTZ NOT NULL, + item_hash TEXT NOT NULL, + node_id TEXT NOT NULL, + base_latency DOUBLE PRECISION, + base_latency_ipv4 DOUBLE PRECISION, + metrics_latency DOUBLE PRECISION, + aggregate_latency DOUBLE PRECISION, + file_download_latency DOUBLE PRECISION, + pending_messages INTEGER, + eth_height_remaining INTEGER, + PRIMARY KEY (id, measured_at) + ) PARTITION BY RANGE (measured_at) + """ + ) + ) + + # 2. Create monthly child partitions for [cutoff, upper_bound). + for table in ("crn_metrics", "ccn_metrics"): + for lower, upper in monthly_bounds(cutoff, upper_bound): + op.execute( + text( + f"CREATE TABLE {partition_name(table, lower)} " + f"PARTITION OF {table} " + f"FOR VALUES FROM ('{ts_literal(lower)}') " + f"TO ('{ts_literal(upper)}')" + ) + ) + # DEFAULT partition: catch-all for timestamps outside the rolling + # range (e.g. far-future scoring posts). The cron job alerts if + # this ever holds rows; data here is hard to migrate to a proper + # partition without DETACH + INSERT, so it should stay empty in + # steady state. + op.execute(text(f"CREATE TABLE {table}_default PARTITION OF {table} DEFAULT")) + + # 3. Backfill from the existing views, filtered to the retention + # window. Cast the float epoch to TIMESTAMPTZ in the SELECT so + # partition routing picks the right child. + cutoff_epoch = cutoff.timestamp() + op.execute( + text( + f""" + INSERT INTO crn_metrics ( + item_hash, measured_at, node_id, base_latency, base_latency_ipv4, + full_check_latency, diagnostic_vm_latency + ) + SELECT + item_hash, + to_timestamp(measured_at) AT TIME ZONE 'UTC' AS measured_at, + node_id, base_latency, base_latency_ipv4, + full_check_latency, diagnostic_vm_latency + FROM crn_metric_view + WHERE node_id IS NOT NULL + AND measured_at IS NOT NULL + AND measured_at >= {cutoff_epoch} + """ + ) + ) + op.execute( + text( + f""" + INSERT INTO ccn_metrics ( + item_hash, measured_at, node_id, base_latency, base_latency_ipv4, + metrics_latency, aggregate_latency, file_download_latency, + pending_messages, eth_height_remaining + ) + SELECT + item_hash, + to_timestamp(measured_at) AT TIME ZONE 'UTC' AS measured_at, + node_id, base_latency, base_latency_ipv4, + metrics_latency, aggregate_latency, file_download_latency, + pending_messages, eth_height_remaining + FROM ccn_metric_view + WHERE node_id IS NOT NULL + AND measured_at IS NOT NULL + AND measured_at >= {cutoff_epoch} + """ + ) + ) + + # 4. Build indexes on the parent (partitioned-index, cascades to + # every child). + op.create_index("ix_crn_metrics_item_hash", "crn_metrics", ["item_hash"]) + op.create_index( + "ix_crn_metrics_node_id_measured_at", + "crn_metrics", + ["node_id", sa.text("measured_at DESC")], + ) + op.create_index("ix_ccn_metrics_item_hash", "ccn_metrics", ["item_hash"]) + op.create_index( + "ix_ccn_metrics_node_id_measured_at", + "ccn_metrics", + ["node_id", sa.text("measured_at DESC")], + ) + + # 5. FK to messages. PG does not support NOT VALID on partitioned + # tables (yet, as of PG 17). The ADD CONSTRAINT validates each + # child partition in one pass — still faster than the per-INSERT + # FK check we'd pay if the constraint were present during step 3. + op.execute( + text( + """ + ALTER TABLE crn_metrics + ADD CONSTRAINT fk_crn_metrics_item_hash + FOREIGN KEY (item_hash) REFERENCES messages(item_hash) ON DELETE CASCADE + """ + ) + ) + op.execute( + text( + """ + ALTER TABLE ccn_metrics + ADD CONSTRAINT fk_ccn_metrics_item_hash + FOREIGN KEY (item_hash) REFERENCES messages(item_hash) ON DELETE CASCADE + """ + ) + ) + + op.execute(text("DROP VIEW IF EXISTS crn_metric_view")) + op.execute(text("DROP VIEW IF EXISTS ccn_metric_view")) + + # 6. Seed the cron-job row that drives partition maintenance. + # Daily interval. last_run set to epoch so the first scheduler tick + # runs the job (creates next month if missing, drops anything that + # slipped past retention). + op.execute( + text( + """ + INSERT INTO cron_jobs(id, interval, last_run) + VALUES ('metrics_partition', 86400, '1970-01-01 00:00:00+00') + """ + ) + ) + + +def downgrade() -> None: + op.execute(text("DELETE FROM cron_jobs WHERE id = 'metrics_partition'")) + + op.execute( + text( + """ + CREATE OR REPLACE VIEW ccn_metric_view AS + WITH json_data AS ( + SELECT item_hash, + jsonb_array_elements(content -> 'content' -> 'metrics' -> 'ccn') as ccn_data + FROM messages + WHERE channel = 'aleph-scoring' + AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4' + ) + SELECT item_hash, + (ccn_data ->> 'measured_at')::float as measured_at, + ccn_data ->> 'node_id' as node_id, + (ccn_data ->> 'base_latency')::float as base_latency, + (ccn_data ->> 'metrics_latency')::float as metrics_latency, + (ccn_data ->> 'aggregate_latency')::float as aggregate_latency, + (ccn_data ->> 'base_latency_ipv4')::float as base_latency_ipv4, + (ccn_data ->> 'file_download_latency')::float as file_download_latency, + (ccn_data ->> 'pending_messages')::int as pending_messages, + (ccn_data ->> 'eth_height_remaining')::int as eth_height_remaining + FROM json_data + """ + ) + ) + op.execute( + text( + """ + CREATE OR REPLACE VIEW crn_metric_view AS + WITH json_data AS ( + SELECT item_hash, + jsonb_array_elements(content -> 'content' -> 'metrics' -> 'crn') as crn_data + FROM messages + WHERE channel = 'aleph-scoring' + AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4' + ) + SELECT item_hash as item_hash, + (crn_data ->> 'measured_at')::float as measured_at, + crn_data ->> 'node_id' as node_id, + (crn_data ->> 'base_latency')::float as base_latency, + (crn_data ->> 'base_latency_ipv4')::float as base_latency_ipv4, + (crn_data ->> 'full_check_latency')::float as full_check_latency, + (crn_data ->> 'diagnostic_vm_latency')::float as diagnostic_vm_latency + FROM json_data + """ + ) + ) + + # Dropping the parent cascades to all child partitions and the FK. + op.execute(text("DROP TABLE IF EXISTS ccn_metrics CASCADE")) + op.execute(text("DROP TABLE IF EXISTS crn_metrics CASCADE")) diff --git a/src/aleph/commands.py b/src/aleph/commands.py index 79f247d0b..e1173ab1f 100644 --- a/src/aleph/commands.py +++ b/src/aleph/commands.py @@ -33,6 +33,7 @@ from aleph.jobs.cron.balance_job import BalanceCronJob from aleph.jobs.cron.credit_balance_job import CreditBalanceCronJob from aleph.jobs.cron.cron_job import CronJob, cron_job_task +from aleph.jobs.cron.metrics_partition_job import MetricsPartitionCronJob from aleph.network import listener_tasks from aleph.repair import repair_node from aleph.services import p2p @@ -171,6 +172,11 @@ async def main(args: List[str]) -> None: session_factory=session_factory, max_unauthenticated_upload_file_size=config.storage.max_unauthenticated_upload_file_size.value, ), + "metrics_partition": MetricsPartitionCronJob( + session_factory=session_factory, + retention_months=config.aleph.scoring.retention_months.value, + lookahead_months=config.aleph.scoring.partition_lookahead_months.value, + ), }, ) chain_data_service = ChainDataService( diff --git a/src/aleph/config.py b/src/aleph/config.py index 9dd818eaa..a5ffb74a0 100644 --- a/src/aleph/config.py +++ b/src/aleph/config.py @@ -59,6 +59,24 @@ def get_defaults(): # Allowed channels for credit balance messages. "channels": ["ALEPH_CREDIT"], }, + "scoring": { + # Addresses allowed to publish node scoring metrics. + "addresses": [ + "0x4D52380D3191274a04846c89c069E6C3F2Ed94e4", + ], + # Channel scoring messages are published on. + "channel": "aleph-scoring", + # POST message type that carries the node metrics payload. + "metrics_post_type": "aleph-network-metrics", + # Retention horizon for crn_metrics / ccn_metrics. Partitions + # whose upper bound is older than this are detached and dropped + # by the metrics_partition cron job. + "retention_months": 12, + # How many months ahead of "now" to keep partitions + # pre-created. Guards against incoming scoring posts + # falling into the DEFAULT catch-all partition. + "partition_lookahead_months": 1, + }, "jobs": { "pending_messages": { # Maximum number of retries for a message. diff --git a/src/aleph/db/accessors/metrics.py b/src/aleph/db/accessors/metrics.py index fd54bb984..798cd2e67 100644 --- a/src/aleph/db/accessors/metrics.py +++ b/src/aleph/db/accessors/metrics.py @@ -1,14 +1,119 @@ +import datetime as dt import time -from typing import Optional +from typing import Any, List, Mapping, Optional -from sqlalchemy import select, text +from sqlalchemy import insert, select from sqlalchemy.orm.session import Session -from sqlalchemy.sql import Select +from aleph.db.models import CcnMetricDb, CrnMetricDb from aleph.types.db_session import DbSession from aleph.types.sort_order import SortOrder, SortOrderForMetrics +def _coerce_float(value: Any) -> Optional[float]: + if value is None: + return None + try: + return float(value) + except (TypeError, ValueError): + return None + + +def _coerce_int(value: Any) -> Optional[int]: + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + +def _coerce_measured_at(value: Any) -> Optional[dt.datetime]: + """Scoring payloads carry measured_at as a Unix epoch number. The DB + column is TIMESTAMPTZ so the partition key can be a real time. Return + None on anything that isn't a usable timestamp.""" + epoch = _coerce_float(value) + if epoch is None: + return None + try: + return dt.datetime.fromtimestamp(epoch, tz=dt.timezone.utc) + except (OverflowError, OSError, ValueError): + return None + + +def _epoch_to_datetime(value: Optional[float]) -> Optional[dt.datetime]: + if value is None: + return None + return dt.datetime.fromtimestamp(value, tz=dt.timezone.utc) + + +def _datetime_to_epoch(value: Optional[dt.datetime]) -> Optional[float]: + if value is None: + return None + return value.timestamp() + + +def _build_crn_rows( + item_hash: str, crn_array: List[Mapping[str, Any]] +) -> List[Mapping[str, Any]]: + rows: List[Mapping[str, Any]] = [] + for entry in crn_array: + if not isinstance(entry, Mapping): + continue + node_id = entry.get("node_id") + measured_at = _coerce_measured_at(entry.get("measured_at")) + # Reject missing-or-empty node_id: an empty string is unusable + # for the (node_id, measured_at) lookups the API serves. + if not node_id or measured_at is None: + continue + rows.append( + { + "item_hash": item_hash, + "node_id": str(node_id), + "measured_at": measured_at, + "base_latency": _coerce_float(entry.get("base_latency")), + "base_latency_ipv4": _coerce_float(entry.get("base_latency_ipv4")), + "full_check_latency": _coerce_float(entry.get("full_check_latency")), + "diagnostic_vm_latency": _coerce_float( + entry.get("diagnostic_vm_latency") + ), + } + ) + return rows + + +def _build_ccn_rows( + item_hash: str, ccn_array: List[Mapping[str, Any]] +) -> List[Mapping[str, Any]]: + rows: List[Mapping[str, Any]] = [] + for entry in ccn_array: + if not isinstance(entry, Mapping): + continue + node_id = entry.get("node_id") + measured_at = _coerce_measured_at(entry.get("measured_at")) + # Reject missing-or-empty node_id: an empty string is unusable + # for the (node_id, measured_at) lookups the API serves. + if not node_id or measured_at is None: + continue + rows.append( + { + "item_hash": item_hash, + "node_id": str(node_id), + "measured_at": measured_at, + "base_latency": _coerce_float(entry.get("base_latency")), + "base_latency_ipv4": _coerce_float(entry.get("base_latency_ipv4")), + "metrics_latency": _coerce_float(entry.get("metrics_latency")), + "aggregate_latency": _coerce_float(entry.get("aggregate_latency")), + "file_download_latency": _coerce_float( + entry.get("file_download_latency") + ), + "pending_messages": _coerce_int(entry.get("pending_messages")), + "eth_height_remaining": _coerce_int(entry.get("eth_height_remaining")), + } + ) + return rows + + def _parse_ccn_result(result): keys = [ "item_hash", @@ -25,6 +130,12 @@ def _parse_ccn_result(result): # Transpose the result and create a dictionary result_dict = {key: list(values) for key, values in zip(keys, zip(*result))} + # API contract serializes measured_at as epoch seconds. + if "measured_at" in result_dict: + result_dict["measured_at"] = [ + _datetime_to_epoch(v) for v in result_dict["measured_at"] + ] + return result_dict @@ -41,31 +152,12 @@ def _parse_crn_result(result): # Transpose the result and create a dictionary result_dict = {key: list(values) for key, values in zip(keys, zip(*result))} - return result_dict + if "measured_at" in result_dict: + result_dict["measured_at"] = [ + _datetime_to_epoch(v) for v in result_dict["measured_at"] + ] - -def _build_metric_filter( - select_stmt: Select, - node_id: Optional[str], - start_date: Optional[float], - end_date: Optional[float], - sort_order: Optional[SortOrder], -): - if node_id: - select_stmt = select_stmt.where(text("node_id = :node_id")).params( - node_id=node_id - ) - if start_date: - select_stmt = select_stmt.where(text("measured_at >= :start_date")).params( - start_date=start_date - ) - if end_date: - select_stmt = select_stmt.where(text("measured_at <= :end_date")).params( - end_date=end_date - ) - if sort_order: - select_stmt = select_stmt.order_by(text(f"measured_at {sort_order.to_sql()}")) - return select_stmt + return result_dict def query_metric_ccn( @@ -73,7 +165,7 @@ def query_metric_ccn( node_id: Optional[str] = None, start_date: Optional[float] = None, end_date: Optional[float] = None, - sort_order: Optional[SortOrder] = None, + sort_order: Optional[SortOrderForMetrics] = None, ): # Default to the last 2 weeks from now, or 2 weeks before the `end_date`. if not start_date and not end_date: @@ -81,28 +173,34 @@ def query_metric_ccn( elif end_date and not start_date: start_date = end_date - 60 * 60 * 24 * 14 + start_dt = _epoch_to_datetime(start_date) + end_dt = _epoch_to_datetime(end_date) + select_stmt = select( - text("item_hash"), - text("measured_at"), - text("base_latency"), - text("base_latency_ipv4"), - text("metrics_latency"), - text("aggregate_latency"), - text("file_download_latency"), - text("pending_messages"), - text("eth_height_remaining"), - ).select_from(text("ccn_metric_view")) - - select_stmt = _build_metric_filter( - select_stmt=select_stmt, - node_id=node_id, - start_date=start_date, - end_date=end_date, - sort_order=sort_order, + CcnMetricDb.item_hash, + CcnMetricDb.measured_at, + CcnMetricDb.base_latency, + CcnMetricDb.base_latency_ipv4, + CcnMetricDb.metrics_latency, + CcnMetricDb.aggregate_latency, + CcnMetricDb.file_download_latency, + CcnMetricDb.pending_messages, + CcnMetricDb.eth_height_remaining, ) - result = session.execute(select_stmt).fetchall() + if node_id: + select_stmt = select_stmt.where(CcnMetricDb.node_id == node_id) + if start_dt: + select_stmt = select_stmt.where(CcnMetricDb.measured_at >= start_dt) + if end_dt: + select_stmt = select_stmt.where(CcnMetricDb.measured_at <= end_dt) + order_col = CcnMetricDb.measured_at + if sort_order == SortOrder.DESCENDING: + select_stmt = select_stmt.order_by(order_col.desc()) + else: + select_stmt = select_stmt.order_by(order_col.asc()) + result = session.execute(select_stmt).fetchall() return _parse_ccn_result(result=result) @@ -119,23 +217,54 @@ def query_metric_crn( elif end_date and not start_date: start_date = end_date - 60 * 60 * 24 * 14 + start_dt = _epoch_to_datetime(start_date) + end_dt = _epoch_to_datetime(end_date) + select_stmt = select( - text("item_hash"), - text("measured_at"), - text("base_latency"), - text("base_latency_ipv4"), - text("full_check_latency"), - text("diagnostic_vm_latency"), - ).select_from(text("crn_metric_view")) - - select_stmt = _build_metric_filter( - select_stmt=select_stmt, - node_id=node_id, - start_date=start_date, - end_date=end_date, - sort_order=sort_order, + CrnMetricDb.item_hash, + CrnMetricDb.measured_at, + CrnMetricDb.base_latency, + CrnMetricDb.base_latency_ipv4, + CrnMetricDb.full_check_latency, + CrnMetricDb.diagnostic_vm_latency, ) - result = session.execute(select_stmt).fetchall() + if node_id: + select_stmt = select_stmt.where(CrnMetricDb.node_id == node_id) + if start_dt: + select_stmt = select_stmt.where(CrnMetricDb.measured_at >= start_dt) + if end_dt: + select_stmt = select_stmt.where(CrnMetricDb.measured_at <= end_dt) + order_col = CrnMetricDb.measured_at + if sort_order == SortOrder.DESCENDING: + select_stmt = select_stmt.order_by(order_col.desc()) + else: + select_stmt = select_stmt.order_by(order_col.asc()) + result = session.execute(select_stmt).fetchall() return _parse_crn_result(result=result) + + +def insert_node_metrics( + session: DbSession, + item_hash: str, + content: Mapping[str, Any], +) -> None: + metrics = content.get("metrics") or {} + if not isinstance(metrics, Mapping): + return + + crn_array = metrics.get("crn") or [] + ccn_array = metrics.get("ccn") or [] + if not isinstance(crn_array, list): + crn_array = [] + if not isinstance(ccn_array, list): + ccn_array = [] + + crn_rows = _build_crn_rows(item_hash, crn_array) + ccn_rows = _build_ccn_rows(item_hash, ccn_array) + + if crn_rows: + session.execute(insert(CrnMetricDb), crn_rows) + if ccn_rows: + session.execute(insert(CcnMetricDb), ccn_rows) diff --git a/src/aleph/db/models/__init__.py b/src/aleph/db/models/__init__.py index 4b099ca1b..220e6e02f 100644 --- a/src/aleph/db/models/__init__.py +++ b/src/aleph/db/models/__init__.py @@ -5,6 +5,7 @@ from .files import * # noqa from .message_counts import * # noqa from .messages import * # noqa +from .metrics import * # noqa from .peers import * # noqa from .pending_messages import * # noqa from .pending_txs import * # noqa diff --git a/src/aleph/db/models/metrics.py b/src/aleph/db/models/metrics.py new file mode 100644 index 000000000..c3b1c6ecf --- /dev/null +++ b/src/aleph/db/models/metrics.py @@ -0,0 +1,54 @@ +import datetime as dt +from typing import Optional + +from sqlalchemy import TIMESTAMP, BigInteger, Float, ForeignKey, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from .base import Base + +# crn_metrics and ccn_metrics are RANGE-partitioned on measured_at at the +# Postgres level. Partition DDL lives in the alembic migration; SQLAlchemy +# only sees the logical parent table. The PK is composite (id, measured_at) +# because Postgres requires the partition key to be part of any PK on a +# partitioned table; id is still a BIGSERIAL sequence column. + + +class CrnMetricDb(Base): + __tablename__ = "crn_metrics" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + measured_at: Mapped[dt.datetime] = mapped_column( + TIMESTAMP(timezone=True), primary_key=True, nullable=False + ) + item_hash: Mapped[str] = mapped_column( + ForeignKey("messages.item_hash", ondelete="CASCADE"), + nullable=False, + index=True, + ) + node_id: Mapped[str] = mapped_column(String, nullable=False) + base_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + base_latency_ipv4: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + full_check_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + diagnostic_vm_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + + +class CcnMetricDb(Base): + __tablename__ = "ccn_metrics" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True) + measured_at: Mapped[dt.datetime] = mapped_column( + TIMESTAMP(timezone=True), primary_key=True, nullable=False + ) + item_hash: Mapped[str] = mapped_column( + ForeignKey("messages.item_hash", ondelete="CASCADE"), + nullable=False, + index=True, + ) + node_id: Mapped[str] = mapped_column(String, nullable=False) + base_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + base_latency_ipv4: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + metrics_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + aggregate_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + file_download_latency: Mapped[Optional[float]] = mapped_column(Float, nullable=True) + pending_messages: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + eth_height_remaining: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) diff --git a/src/aleph/handlers/content/post.py b/src/aleph/handlers/content/post.py index 548e110e4..5eb6a095b 100644 --- a/src/aleph/handlers/content/post.py +++ b/src/aleph/handlers/content/post.py @@ -18,6 +18,7 @@ update_credit_balances_transfer as update_credit_balances_transfer_db, ) from aleph.db.accessors.balances import validate_credit_transfer_balance +from aleph.db.accessors.metrics import insert_node_metrics from aleph.db.accessors.posts import ( delete_amends, delete_post, @@ -201,12 +202,18 @@ def __init__( credit_balances_addresses: List[str], credit_balances_post_types: List[str], credit_balances_channels: List[str], + scoring_addresses: List[str], + scoring_channel: str, + scoring_metrics_post_type: str, ): self.balances_addresses = balances_addresses self.balances_post_type = balances_post_type self.credit_balances_addresses = credit_balances_addresses self.credit_balances_post_types = credit_balances_post_types self.credit_balances_channels = credit_balances_channels + self.scoring_addresses = scoring_addresses + self.scoring_channel = scoring_channel + self.scoring_metrics_post_type = scoring_metrics_post_type async def check_dependencies(self, session: DbSession, message: MessageDb): content = get_post_content(message) @@ -330,6 +337,19 @@ async def process_post(self, session: DbSession, message: MessageDb): ) LOGGER.info("Done updating credit balances") + if ( + content.type == self.scoring_metrics_post_type + and content.address in self.scoring_addresses + and message.channel == self.scoring_channel + and isinstance(content.content, dict) + ): + LOGGER.info("Persisting scoring metrics from %s", message.item_hash) + insert_node_metrics( + session=session, + item_hash=message.item_hash, + content=content.content, + ) + async def process(self, session: DbSession, messages: List[MessageDb]) -> None: for message in messages: diff --git a/src/aleph/handlers/message_handler.py b/src/aleph/handlers/message_handler.py index c88353199..a51ddf007 100644 --- a/src/aleph/handlers/message_handler.py +++ b/src/aleph/handlers/message_handler.py @@ -73,6 +73,9 @@ def __init__( credit_balances_addresses=config.aleph.credit_balances.addresses.value, credit_balances_post_types=config.aleph.credit_balances.post_types.value, credit_balances_channels=config.aleph.credit_balances.channels.value, + scoring_addresses=config.aleph.scoring.addresses.value, + scoring_channel=config.aleph.scoring.channel.value, + scoring_metrics_post_type=config.aleph.scoring.metrics_post_type.value, ), MessageType.program: vm_handler, MessageType.store: StoreMessageHandler( diff --git a/src/aleph/jobs/cron/metrics_partition_job.py b/src/aleph/jobs/cron/metrics_partition_job.py new file mode 100644 index 000000000..f302b8938 --- /dev/null +++ b/src/aleph/jobs/cron/metrics_partition_job.py @@ -0,0 +1,199 @@ +"""Cron job that maintains monthly partitions of crn_metrics and +ccn_metrics. + +Two responsibilities per run: + +1. Pre-create the next ``LOOKAHEAD_MONTHS`` worth of monthly partitions + if they don't already exist. This guarantees there's always a real + partition ready for incoming scoring posts, so writes never have to + fall back to the DEFAULT catch-all partition. + +2. Detach + drop partitions whose upper bound is older than the + retention cutoff (``RETENTION_MONTHS`` ago). DETACH first so the + parent table only briefly holds an ACCESS EXCLUSIVE lock; the + subsequent DROP only touches the (now-standalone) child table. + +Both operations are idempotent. A run that finds the next partition +already present and nothing past the cutoff is a no-op. + +The DEFAULT partition is left untouched. If it ever contains rows the +cron logs a warning (operational signal that the lookahead is too +short or that out-of-range data is arriving).""" + +import datetime as dt +import logging +from typing import Iterable, List, Tuple + +from sqlalchemy import text + +from aleph.db.models.cron_jobs import CronJobDb +from aleph.jobs.cron.cron_job import BaseCronJob +from aleph.toolkit.partitions import ( + add_months, + month_floor, + monthly_bounds, + partition_name, + ts_literal, +) +from aleph.types.db_session import DbSession, DbSessionFactory + +LOGGER = logging.getLogger(__name__) + +PARTITIONED_TABLES = ("crn_metrics", "ccn_metrics") + + +class MetricsPartitionCronJob(BaseCronJob): + """Roll monthly partitions forward for the metrics tables. + + :param session_factory: DB session factory. + :param retention_months: Drop partitions whose upper bound is older + than ``now - retention_months``. + :param lookahead_months: Ensure partitions exist up to and including + ``now + lookahead_months``. + """ + + def __init__( + self, + session_factory: DbSessionFactory, + retention_months: int, + lookahead_months: int, + ): + self.session_factory = session_factory + self.retention_months = retention_months + self.lookahead_months = lookahead_months + + async def run(self, now: dt.datetime, job: CronJobDb) -> None: + now_month = month_floor(now) + cutoff = add_months(now_month, -self.retention_months) + # Lookahead is inclusive: ensure partition for now_month + N + # exists, so range becomes [..., now_month + N + 1). + lookahead_upper = add_months(now_month, self.lookahead_months + 1) + + with self.session_factory() as session: + for table in PARTITIONED_TABLES: + self._ensure_partitions(session, table, now_month, lookahead_upper) + self._drop_past_cutoff(session, table, cutoff) + self._warn_if_default_has_rows(session, table) + session.commit() + + @staticmethod + def _ensure_partitions( + session: DbSession, + table: str, + start: dt.datetime, + end_exclusive: dt.datetime, + ) -> None: + """Create any missing monthly partitions in [start, end_exclusive).""" + existing = _list_partitions(session, table) + existing_names = {name for name, _, _ in existing} + for lower, upper in monthly_bounds(start, end_exclusive): + name = partition_name(table, lower) + if name in existing_names: + continue + LOGGER.info( + "Creating partition %s on %s for [%s, %s)", + name, + table, + lower.isoformat(), + upper.isoformat(), + ) + session.execute( + text( + f"CREATE TABLE {name} PARTITION OF {table} " + f"FOR VALUES FROM ('{ts_literal(lower)}') " + f"TO ('{ts_literal(upper)}')" + ) + ) + + @staticmethod + def _drop_past_cutoff(session: DbSession, table: str, cutoff: dt.datetime) -> None: + """DETACH + DROP partitions whose upper bound is <= cutoff. + + DETACH briefly takes ACCESS EXCLUSIVE on the parent, then the + DROP only touches the now-standalone child. Metrics tables are + not on a latency-sensitive read path so plain DETACH is fine; + CONCURRENTLY would require autocommit, which the cron's + transactional session doesn't offer.""" + for name, lower, upper in _list_partitions(session, table): + if upper is None or lower is None: + # The DEFAULT partition has no bounds. Skip. + continue + if upper <= cutoff: + LOGGER.info( + "Dropping partition %s on %s (upper=%s <= cutoff=%s)", + name, + table, + upper.isoformat(), + cutoff.isoformat(), + ) + session.execute(text(f"ALTER TABLE {table} DETACH PARTITION {name}")) + session.execute(text(f"DROP TABLE {name}")) + + @staticmethod + def _warn_if_default_has_rows(session: DbSession, table: str) -> None: + default_name = f"{table}_default" + result = session.execute(text(f"SELECT count(*) FROM {default_name}")).scalar() + if result and result > 0: + LOGGER.warning( + "DEFAULT partition %s holds %s rows. Lookahead may be too " + "short, or out-of-range timestamps are arriving.", + default_name, + result, + ) + + +def _list_partitions( + session: DbSession, parent: str +) -> List[Tuple[str, dt.datetime, dt.datetime]]: + """Return (child_name, lower_bound, upper_bound) for every existing + partition of `parent`. The DEFAULT partition appears with + (name, None, None).""" + rows: Iterable = session.execute( + text( + """ + SELECT c.relname AS child_name, + pg_get_expr(c.relpartbound, c.oid) AS bound_expr + FROM pg_inherits i + JOIN pg_class p ON p.oid = i.inhparent + JOIN pg_class c ON c.oid = i.inhrelid + WHERE p.relname = :parent + """ + ), + {"parent": parent}, + ).fetchall() + + out: List[Tuple[str, dt.datetime, dt.datetime]] = [] + for name, expr in rows: + bounds = _parse_bound_expr(expr) + if bounds is None: + out.append((name, None, None)) # type: ignore[arg-type] + else: + lower, upper = bounds + out.append((name, lower, upper)) + return out + + +def _parse_bound_expr(expr: str): + """Parse pg_get_expr output for a RANGE partition. + + Examples: + FOR VALUES FROM ('2026-05-01 00:00:00+00') TO ('2026-06-01 00:00:00+00') + DEFAULT + """ + if expr is None or "DEFAULT" in expr: + return None + # The expression is well-formed Postgres output; parse the two + # quoted timestamps in order. + parts = expr.split("'") + if len(parts) < 5: + return None + try: + lower = dt.datetime.fromisoformat(parts[1]) + upper = dt.datetime.fromisoformat(parts[3]) + except ValueError: + return None + if lower.tzinfo is None: + lower = lower.replace(tzinfo=dt.timezone.utc) + if upper.tzinfo is None: + upper = upper.replace(tzinfo=dt.timezone.utc) + return lower, upper diff --git a/src/aleph/toolkit/partitions.py b/src/aleph/toolkit/partitions.py new file mode 100644 index 000000000..59601557f --- /dev/null +++ b/src/aleph/toolkit/partitions.py @@ -0,0 +1,52 @@ +"""Helpers for monthly RANGE-partitioned tables on a TIMESTAMPTZ column. + +Used by: +* the alembic migration that creates crn_metrics / ccn_metrics +* the metrics_partition cron job that maintains them (create next month, + drop past-cutoff) + +Keeping the naming and bounds logic in one place means migration-time +partitions and cron-created partitions follow identical conventions. +""" + +import datetime as dt +from typing import List, Tuple + + +def month_floor(d: dt.datetime) -> dt.datetime: + """First instant of d's month, UTC.""" + return dt.datetime(d.year, d.month, 1, tzinfo=dt.timezone.utc) + + +def add_months(d: dt.datetime, months: int) -> dt.datetime: + """Shift d by N calendar months (positive or negative). Snaps to the + first day of the resulting month.""" + total = d.month - 1 + months + year = d.year + total // 12 + month = total % 12 + 1 + return dt.datetime(year, month, 1, tzinfo=dt.timezone.utc) + + +def monthly_bounds( + start: dt.datetime, end_exclusive: dt.datetime +) -> List[Tuple[dt.datetime, dt.datetime]]: + """List of [lower, upper) month-aligned ranges from start to + end_exclusive. Both arguments should already be at month + boundaries.""" + bounds: List[Tuple[dt.datetime, dt.datetime]] = [] + cursor = start + while cursor < end_exclusive: + upper = add_months(cursor, 1) + bounds.append((cursor, upper)) + cursor = upper + return bounds + + +def partition_name(table: str, lower: dt.datetime) -> str: + """Naming convention: