Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
"""Persist node scoring metrics in range-partitioned tables.

Revision ID: b9c4f1e6a2d7
Revises: 4f1e8d2a6c3b
Create Date: 2026-05-18

Creates crn_metrics and ccn_metrics as RANGE-partitioned tables on
measured_at (TIMESTAMPTZ), with monthly child partitions, a DEFAULT
catch-all partition, and a 12-month retention horizon at backfill.
Backfills from the existing JSON-unnesting views (filtered to the
retention window), then drops the views. Seeds a cron job that
maintains partitions (creates next month, drops past-cutoff).

Partition design:
* Partition key: measured_at TIMESTAMPTZ
* Granularity: 1 month
* Initial coverage: [now - 12 months, now + 1 month) (13 child partitions)
* Plus a DEFAULT partition as a safety net for out-of-range timestamps
* PK is (id, measured_at) because Postgres requires the partition key in
any PK on a partitioned table; id stays a BIGSERIAL sequence column
* Indexes: ix_*_item_hash, ix_*_node_id_measured_at (both built on
populated tables after backfill, replicated to children via the
partitioned-index mechanism)
* FK to messages(item_hash) ON DELETE CASCADE, added after backfill.
PG (as of 17) does not support NOT VALID on partitioned tables, so
the ADD CONSTRAINT validates each child in one pass; this is still
cheaper than the per-row FK check we'd pay if the constraint were
present during the bulk INSERT.

The retention cron (metrics_partition) runs daily and is responsible
for creating next month's partition and dropping past-cutoff
partitions. See aleph.jobs.cron.metrics_partition_job.
"""

import datetime as dt

import sqlalchemy as sa
from alembic import op
from sqlalchemy import text

from aleph.toolkit.partitions import (
add_months,
month_floor,
monthly_bounds,
partition_name,
ts_literal,
)

revision = "b9c4f1e6a2d7"
down_revision = "4f1e8d2a6c3b"
branch_labels = None
depends_on = None


RETENTION_MONTHS = 12
LOOKAHEAD_MONTHS = 1


def upgrade() -> None:
now_month = month_floor(dt.datetime.now(tz=dt.timezone.utc))
cutoff = add_months(now_month, -RETENTION_MONTHS)
upper_bound = add_months(now_month, LOOKAHEAD_MONTHS + 1)

# 1. Create both parent tables (PARTITION BY RANGE on measured_at).
op.execute(
text(
"""
CREATE TABLE crn_metrics (
id BIGINT GENERATED BY DEFAULT AS IDENTITY,
measured_at TIMESTAMPTZ NOT NULL,
item_hash TEXT NOT NULL,
node_id TEXT NOT NULL,
base_latency DOUBLE PRECISION,
base_latency_ipv4 DOUBLE PRECISION,
full_check_latency DOUBLE PRECISION,
diagnostic_vm_latency DOUBLE PRECISION,
PRIMARY KEY (id, measured_at)
) PARTITION BY RANGE (measured_at)
"""
)
)
op.execute(
text(
"""
CREATE TABLE ccn_metrics (
id BIGINT GENERATED BY DEFAULT AS IDENTITY,
measured_at TIMESTAMPTZ NOT NULL,
item_hash TEXT NOT NULL,
node_id TEXT NOT NULL,
base_latency DOUBLE PRECISION,
base_latency_ipv4 DOUBLE PRECISION,
metrics_latency DOUBLE PRECISION,
aggregate_latency DOUBLE PRECISION,
file_download_latency DOUBLE PRECISION,
pending_messages INTEGER,
eth_height_remaining INTEGER,
PRIMARY KEY (id, measured_at)
) PARTITION BY RANGE (measured_at)
"""
)
)

# 2. Create monthly child partitions for [cutoff, upper_bound).
for table in ("crn_metrics", "ccn_metrics"):
for lower, upper in monthly_bounds(cutoff, upper_bound):
op.execute(
text(
f"CREATE TABLE {partition_name(table, lower)} "
f"PARTITION OF {table} "
f"FOR VALUES FROM ('{ts_literal(lower)}') "
f"TO ('{ts_literal(upper)}')"
)
)
# DEFAULT partition: catch-all for timestamps outside the rolling
# range (e.g. far-future scoring posts). The cron job alerts if
# this ever holds rows; data here is hard to migrate to a proper
# partition without DETACH + INSERT, so it should stay empty in
# steady state.
op.execute(text(f"CREATE TABLE {table}_default PARTITION OF {table} DEFAULT"))

# 3. Backfill from the existing views, filtered to the retention
# window. Cast the float epoch to TIMESTAMPTZ in the SELECT so
# partition routing picks the right child.
cutoff_epoch = cutoff.timestamp()
op.execute(
text(
f"""
INSERT INTO crn_metrics (
item_hash, measured_at, node_id, base_latency, base_latency_ipv4,
full_check_latency, diagnostic_vm_latency
)
SELECT
item_hash,
to_timestamp(measured_at) AT TIME ZONE 'UTC' AS measured_at,
node_id, base_latency, base_latency_ipv4,
full_check_latency, diagnostic_vm_latency
FROM crn_metric_view
WHERE node_id IS NOT NULL
AND measured_at IS NOT NULL
AND measured_at >= {cutoff_epoch}
"""
)
)
op.execute(
text(
f"""
INSERT INTO ccn_metrics (
item_hash, measured_at, node_id, base_latency, base_latency_ipv4,
metrics_latency, aggregate_latency, file_download_latency,
pending_messages, eth_height_remaining
)
SELECT
item_hash,
to_timestamp(measured_at) AT TIME ZONE 'UTC' AS measured_at,
node_id, base_latency, base_latency_ipv4,
metrics_latency, aggregate_latency, file_download_latency,
pending_messages, eth_height_remaining
FROM ccn_metric_view
WHERE node_id IS NOT NULL
AND measured_at IS NOT NULL
AND measured_at >= {cutoff_epoch}
"""
)
)

# 4. Build indexes on the parent (partitioned-index, cascades to
# every child).
op.create_index("ix_crn_metrics_item_hash", "crn_metrics", ["item_hash"])
op.create_index(
"ix_crn_metrics_node_id_measured_at",
"crn_metrics",
["node_id", sa.text("measured_at DESC")],
)
op.create_index("ix_ccn_metrics_item_hash", "ccn_metrics", ["item_hash"])
op.create_index(
"ix_ccn_metrics_node_id_measured_at",
"ccn_metrics",
["node_id", sa.text("measured_at DESC")],
)

# 5. FK to messages. PG does not support NOT VALID on partitioned
# tables (yet, as of PG 17). The ADD CONSTRAINT validates each
# child partition in one pass — still faster than the per-INSERT
# FK check we'd pay if the constraint were present during step 3.
op.execute(
text(
"""
ALTER TABLE crn_metrics
ADD CONSTRAINT fk_crn_metrics_item_hash
FOREIGN KEY (item_hash) REFERENCES messages(item_hash) ON DELETE CASCADE
"""
)
)
op.execute(
text(
"""
ALTER TABLE ccn_metrics
ADD CONSTRAINT fk_ccn_metrics_item_hash
FOREIGN KEY (item_hash) REFERENCES messages(item_hash) ON DELETE CASCADE
"""
)
)

op.execute(text("DROP VIEW IF EXISTS crn_metric_view"))
op.execute(text("DROP VIEW IF EXISTS ccn_metric_view"))

# 6. Seed the cron-job row that drives partition maintenance.
# Daily interval. last_run set to epoch so the first scheduler tick
# runs the job (creates next month if missing, drops anything that
# slipped past retention).
op.execute(
text(
"""
INSERT INTO cron_jobs(id, interval, last_run)
VALUES ('metrics_partition', 86400, '1970-01-01 00:00:00+00')
"""
)
)


def downgrade() -> None:
op.execute(text("DELETE FROM cron_jobs WHERE id = 'metrics_partition'"))

op.execute(
text(
"""
CREATE OR REPLACE VIEW ccn_metric_view AS
WITH json_data AS (
SELECT item_hash,
jsonb_array_elements(content -> 'content' -> 'metrics' -> 'ccn') as ccn_data
FROM messages
WHERE channel = 'aleph-scoring'
AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4'
)
SELECT item_hash,
(ccn_data ->> 'measured_at')::float as measured_at,
ccn_data ->> 'node_id' as node_id,
(ccn_data ->> 'base_latency')::float as base_latency,
(ccn_data ->> 'metrics_latency')::float as metrics_latency,
(ccn_data ->> 'aggregate_latency')::float as aggregate_latency,
(ccn_data ->> 'base_latency_ipv4')::float as base_latency_ipv4,
(ccn_data ->> 'file_download_latency')::float as file_download_latency,
(ccn_data ->> 'pending_messages')::int as pending_messages,
(ccn_data ->> 'eth_height_remaining')::int as eth_height_remaining
FROM json_data
"""
)
)
op.execute(
text(
"""
CREATE OR REPLACE VIEW crn_metric_view AS
WITH json_data AS (
SELECT item_hash,
jsonb_array_elements(content -> 'content' -> 'metrics' -> 'crn') as crn_data
FROM messages
WHERE channel = 'aleph-scoring'
AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4'
)
SELECT item_hash as item_hash,
(crn_data ->> 'measured_at')::float as measured_at,
crn_data ->> 'node_id' as node_id,
(crn_data ->> 'base_latency')::float as base_latency,
(crn_data ->> 'base_latency_ipv4')::float as base_latency_ipv4,
(crn_data ->> 'full_check_latency')::float as full_check_latency,
(crn_data ->> 'diagnostic_vm_latency')::float as diagnostic_vm_latency
FROM json_data
"""
)
)

# Dropping the parent cascades to all child partitions and the FK.
op.execute(text("DROP TABLE IF EXISTS ccn_metrics CASCADE"))
op.execute(text("DROP TABLE IF EXISTS crn_metrics CASCADE"))
6 changes: 6 additions & 0 deletions src/aleph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from aleph.jobs.cron.balance_job import BalanceCronJob
from aleph.jobs.cron.credit_balance_job import CreditBalanceCronJob
from aleph.jobs.cron.cron_job import CronJob, cron_job_task
from aleph.jobs.cron.metrics_partition_job import MetricsPartitionCronJob
from aleph.network import listener_tasks
from aleph.repair import repair_node
from aleph.services import p2p
Expand Down Expand Up @@ -171,6 +172,11 @@ async def main(args: List[str]) -> None:
session_factory=session_factory,
max_unauthenticated_upload_file_size=config.storage.max_unauthenticated_upload_file_size.value,
),
"metrics_partition": MetricsPartitionCronJob(
session_factory=session_factory,
retention_months=config.aleph.scoring.retention_months.value,
lookahead_months=config.aleph.scoring.partition_lookahead_months.value,
),
},
)
chain_data_service = ChainDataService(
Expand Down
18 changes: 18 additions & 0 deletions src/aleph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ def get_defaults():
# Allowed channels for credit balance messages.
"channels": ["ALEPH_CREDIT"],
},
"scoring": {
# Addresses allowed to publish node scoring metrics.
"addresses": [
"0x4D52380D3191274a04846c89c069E6C3F2Ed94e4",
],
# Channel scoring messages are published on.
"channel": "aleph-scoring",
# POST message type that carries the node metrics payload.
"metrics_post_type": "aleph-network-metrics",
# Retention horizon for crn_metrics / ccn_metrics. Partitions
# whose upper bound is older than this are detached and dropped
# by the metrics_partition cron job.
"retention_months": 12,
# How many months ahead of "now" to keep partitions
# pre-created. Guards against incoming scoring posts
# falling into the DEFAULT catch-all partition.
"partition_lookahead_months": 1,
},
"jobs": {
"pending_messages": {
# Maximum number of retries for a message.
Expand Down
Loading
Loading