Skip to content

Record wallclock lag histogram #32010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,8 @@ for each table, source, index, materialized view, and sink in the system.

<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_wallclock_global_lag_history -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_wallclock_global_lag_recent_history -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_wallclock_global_lag_histogram -->
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_wallclock_global_lag_histogram_raw -->

## `mz_webhook_sources`

Expand Down
27 changes: 27 additions & 0 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use mz_sql::session::user::{
MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER_NAME, SYSTEM_USER_NAME,
};
use mz_storage_client::controller::IntrospectionType;
use mz_storage_client::healthcheck::WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC;
use mz_storage_client::healthcheck::{
MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_PREPARED_STATEMENT_HISTORY_DESC,
MZ_SESSION_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
Expand Down Expand Up @@ -3691,6 +3692,30 @@ ORDER BY object_id, occurred_at DESC",
access: vec![PUBLIC_SELECT],
});

pub static MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW: LazyLock<BuiltinSource> =
LazyLock::new(|| BuiltinSource {
name: "mz_wallclock_global_lag_histogram_raw",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::SOURCE_MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_OID,
desc: WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC.clone(),
data_source: IntrospectionType::WallclockLagHistogram,
is_retained_metrics_object: false,
access: vec![PUBLIC_SELECT],
});

pub static MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM: LazyLock<BuiltinView> =
LazyLock::new(|| BuiltinView {
name: "mz_wallclock_global_lag_histogram",
schema: MZ_INTERNAL_SCHEMA,
oid: oid::VIEW_MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_OID,
column_defs: None,
sql: "
SELECT *, count(*) AS count
FROM mz_internal.mz_wallclock_global_lag_histogram_raw
GROUP BY period_start, period_end, object_id, lag_seconds, labels",
access: vec![PUBLIC_SELECT],
});

pub static MZ_MATERIALIZED_VIEW_REFRESHES: LazyLock<BuiltinSource> =
LazyLock::new(|| BuiltinSource {
name: "mz_materialized_view_refreshes",
Expand Down Expand Up @@ -9700,6 +9725,8 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::View(&MZ_WALLCLOCK_GLOBAL_LAG_HISTORY),
Builtin::View(&MZ_WALLCLOCK_GLOBAL_LAG_RECENT_HISTORY),
Builtin::View(&MZ_WALLCLOCK_GLOBAL_LAG),
Builtin::Source(&MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW),
Builtin::View(&MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM),
Builtin::Source(&MZ_MATERIALIZED_VIEW_REFRESHES),
Builtin::Source(&MZ_COMPUTE_DEPENDENCIES),
Builtin::Source(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER),
Expand Down
2 changes: 1 addition & 1 deletion src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1081,7 +1081,7 @@ where
let op = StorageWriteOp::Append { updates };
storage.update_introspection_collection(type_, op);
}
WallclockLagHistory => {
WallclockLagHistory | WallclockLagHistogram => {
storage.append_introspection_updates(type_, updates);
}
_ => panic!("unexpected introspection type: {type_:?}"),
Expand Down
162 changes: 134 additions & 28 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ use mz_compute_types::sinks::{
};
use mz_compute_types::sources::SourceInstanceDesc;
use mz_compute_types::ComputeInstanceId;
use mz_controller_types::dyncfgs::WALLCLOCK_LAG_REFRESH_INTERVAL;
use mz_controller_types::dyncfgs::{
ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION, WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL,
WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL,
};
use mz_dyncfg::ConfigSet;
use mz_expr::RowSetFinishing;
use mz_ore::cast::CastFrom;
Expand All @@ -35,9 +38,10 @@ use mz_ore::now::NowFn;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{soft_assert_or_log, soft_panic_or_log};
use mz_repr::adt::interval::Interval;
use mz_repr::adt::timestamp::CheckedTimestamp;
use mz_repr::refresh_schedule::RefreshSchedule;
use mz_repr::{Datum, Diff, GlobalId, Row};
use mz_storage_client::controller::IntrospectionType;
use mz_storage_client::controller::{IntrospectionType, WallclockLagHistogramPeriod};
use mz_storage_types::read_holds::{self, ReadHold};
use mz_storage_types::read_policy::ReadPolicy;
use serde::Serialize;
Expand Down Expand Up @@ -225,6 +229,10 @@ pub(super) struct Instance<T: ComputeControllerTimestamp> {
/// controlled by it are allowed to affect changes to external systems
/// (largely persist).
read_only: bool,
/// The workload class of this instance.
///
/// This is currently only used to annotate metrics.
workload_class: Option<String>,
/// The replicas of this compute instance.
replicas: BTreeMap<ReplicaId, ReplicaState<T>>,
/// Currently installed compute collections.
Expand Down Expand Up @@ -289,8 +297,10 @@ pub(super) struct Instance<T: ComputeControllerTimestamp> {
now: NowFn,
/// A function that computes the lag between the given time and wallclock time.
wallclock_lag: WallclockLagFn<T>,
/// The last time wallclock lag introspection was refreshed.
wallclock_lag_last_refresh: Instant,
/// The last time `WallclockLagHistory` introspection was refreshed.
wallclock_lag_history_last_refresh: Instant,
/// The last time `WallclockLagHistogram` introspection was refreshed.
wallclock_lag_histogram_last_refresh: Instant,

/// Sender for updates to collection read holds.
///
Expand Down Expand Up @@ -547,40 +557,45 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
.set(u64::cast_from(connected_replica_count));
}

/// Refresh the `WallclockLagHistory` introspection and the `wallclock_lag_*_seconds` metrics
/// with the current lag values.
/// Refresh the wallclock lag introspection and metrics with the current lag values.
///
/// This method is invoked by `ComputeController::maintain`, which we expect to be called once
/// per second during normal operation.
fn refresh_wallclock_lag(&mut self) {
let refresh_introspection = !self.read_only
&& self.wallclock_lag_last_refresh.elapsed()
>= WALLCLOCK_LAG_REFRESH_INTERVAL.get(&self.dyncfg);
let mut introspection_updates = refresh_introspection.then(Vec::new);

let now = mz_ore::now::to_datetime((self.now)());
let now_tz = now.try_into().expect("must fit");
let refresh_history = !self.read_only
&& self.wallclock_lag_history_last_refresh.elapsed()
>= WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL.get(&self.dyncfg);
let refresh_histogram = !self.read_only
&& self.wallclock_lag_histogram_last_refresh.elapsed()
>= WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL.get(&self.dyncfg);

let now_ms = (self.now)();
let now_dt = mz_ore::now::to_datetime(now_ms);
let now_ts: CheckedTimestamp<_> = now_dt.try_into().expect("must fit");

let frontier_lag = |frontier: &Antichain<_>| match frontier.as_option() {
Some(ts) => (self.wallclock_lag)(ts),
None => Duration::ZERO,
};

let mut history_updates = Vec::new();
for (replica_id, replica) in &mut self.replicas {
for (collection_id, collection) in &mut replica.collections {
let lag = match collection.write_frontier.as_option() {
Some(ts) => (self.wallclock_lag)(ts),
None => Duration::ZERO,
};
let lag = frontier_lag(&collection.write_frontier);

if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max {
*wallclock_lag_max = std::cmp::max(*wallclock_lag_max, lag);

if let Some(updates) = &mut introspection_updates {
if refresh_history {
let max_lag = std::mem::take(wallclock_lag_max);
let max_lag_us = i64::try_from(max_lag.as_micros()).expect("must fit");
let row = Row::pack_slice(&[
Datum::String(&collection_id.to_string()),
Datum::String(&replica_id.to_string()),
Datum::Interval(Interval::new(0, 0, max_lag_us)),
Datum::TimestampTz(now_tz),
Datum::TimestampTz(now_ts),
]);
updates.push((row, 1));
history_updates.push((row, 1));
}
}

Expand All @@ -589,10 +604,57 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
};
}
}
if !history_updates.is_empty() {
self.deliver_introspection_updates(
IntrospectionType::WallclockLagHistory,
history_updates,
);
self.wallclock_lag_history_last_refresh = Instant::now();
}

let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg);
let histogram_labels = match &self.workload_class {
Some(wc) => [("workload_class", wc.clone())].into(),
None => BTreeMap::new(),
};

let mut histogram_updates = Vec::new();
let mut row_buf = Row::default();
for (collection_id, collection) in &mut self.collections {
if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(&self.dyncfg) {
continue;
}

if let Some(stash) = &mut collection.wallclock_lag_histogram_stash {
let lag = collection.shared.lock_write_frontier(|f| frontier_lag(f));
let bucket = lag.as_secs().next_power_of_two();

let key = (histogram_period, bucket, histogram_labels.clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clone isn't nice, but I can't think of an easy way to avoid it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole handling of labels in wallclock_lag_histogram_stash is not great. Label values are fixed to strings right now, which will stop working once we add other types (like bools). Ideally we'd store a Datum but it's impossible to store an owned String as a Datum. I've been considering storing a DatumMap instead of the BTreeMap, but I don't know how to construct one from its contents. Should we store a Row?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even a row would need to be cloned, so I think it's fine to leave as-is.

*stash.entry(key).or_default() += 1;

if refresh_histogram {
for ((period, lag, labels), count) in std::mem::take(stash) {
let mut packer = row_buf.packer();
packer.extend([
Datum::TimestampTz(period.start),
Datum::TimestampTz(period.end),
Datum::String(&collection_id.to_string()),
Datum::UInt64(lag),
]);
let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v)));
packer.push_dict(labels);

if let Some(updates) = introspection_updates {
self.deliver_introspection_updates(IntrospectionType::WallclockLagHistory, updates);
self.wallclock_lag_last_refresh = Instant::now();
histogram_updates.push((row_buf.clone(), count));
}
}
}
}
if !histogram_updates.is_empty() {
self.deliver_introspection_updates(
IntrospectionType::WallclockLagHistogram,
histogram_updates,
);
self.wallclock_lag_histogram_last_refresh = Instant::now();
}
}

Expand Down Expand Up @@ -797,6 +859,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
storage_collections: _,
initialized,
read_only,
workload_class,
replicas,
collections,
log_sources: _,
Expand All @@ -813,7 +876,8 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
dyncfg: _,
now: _,
wallclock_lag: _,
wallclock_lag_last_refresh,
wallclock_lag_history_last_refresh,
wallclock_lag_histogram_last_refresh,
read_hold_tx: _,
replica_tx: _,
replica_rx: _,
Expand Down Expand Up @@ -848,19 +912,29 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
.iter()
.map(|(id, epoch)| (id.to_string(), epoch))
.collect();
let wallclock_lag_last_refresh = format!("{wallclock_lag_last_refresh:?}");
let wallclock_lag_history_last_refresh = format!("{wallclock_lag_history_last_refresh:?}");
let wallclock_lag_histogram_last_refresh =
format!("{wallclock_lag_histogram_last_refresh:?}");

let map = serde_json::Map::from_iter([
field("initialized", initialized)?,
field("read_only", read_only)?,
field("workload_class", workload_class)?,
field("replicas", replicas)?,
field("collections", collections)?,
field("peeks", peeks)?,
field("subscribes", subscribes)?,
field("copy_tos", copy_tos)?,
field("envd_epoch", envd_epoch)?,
field("replica_epochs", replica_epochs)?,
field("wallclock_lag_last_refresh", wallclock_lag_last_refresh)?,
field(
"wallclock_lag_history_last_refresh",
wallclock_lag_history_last_refresh,
)?,
field(
"wallclock_lag_histogram_last_refresh",
wallclock_lag_histogram_last_refresh,
)?,
]);
Ok(serde_json::Value::Object(map))
}
Expand Down Expand Up @@ -909,6 +983,7 @@ where
storage_collections: storage,
initialized: false,
read_only: true,
workload_class: None,
replicas: Default::default(),
collections,
log_sources,
Expand All @@ -925,7 +1000,8 @@ where
dyncfg,
now,
wallclock_lag,
wallclock_lag_last_refresh: Instant::now(),
wallclock_lag_history_last_refresh: Instant::now(),
wallclock_lag_histogram_last_refresh: Instant::now(),
read_hold_tx,
replica_tx,
replica_rx,
Expand Down Expand Up @@ -962,6 +1038,10 @@ where
/// Update instance configuration.
#[mz_ore::instrument(level = "debug")]
pub fn update_configuration(&mut self, config_params: ComputeParameters) {
if let Some(workload_class) = &config_params.workload_class {
self.workload_class = workload_class.clone();
}

self.send(ComputeCommand::UpdateConfiguration(config_params));
}

Expand Down Expand Up @@ -2147,6 +2227,23 @@ struct CollectionState<T: ComputeControllerTimestamp> {

/// Introspection state associated with this collection.
introspection: CollectionIntrospection<T>,

/// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram`
/// introspection update.
///
/// Keys are `(period, lag, labels)` triples, values are counts.
///
/// If this is `None`, wallclock lag is not tracked for this collection.
wallclock_lag_histogram_stash: Option<
BTreeMap<
(
WallclockLagHistogramPeriod,
u64,
BTreeMap<&'static str, String>,
),
i64,
>,
>,
}

impl<T: ComputeControllerTimestamp> CollectionState<T> {
Expand Down Expand Up @@ -2181,6 +2278,14 @@ impl<T: ComputeControllerTimestamp> CollectionState<T> {
c.update_iter(updates);
});

// In an effort to keep the produced wallclock lag introspection data small and
// predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path
// select indexes and subscribes.
let wallclock_lag_histogram_stash = match collection_id.is_transient() {
true => None,
false => Some(Default::default()),
};

Self {
log_collection: false,
dropped: false,
Expand All @@ -2192,6 +2297,7 @@ impl<T: ComputeControllerTimestamp> CollectionState<T> {
storage_dependencies,
compute_dependencies,
introspection,
wallclock_lag_histogram_stash,
}
}

Expand Down Expand Up @@ -2785,7 +2891,7 @@ struct ReplicaCollectionState<T: ComputeControllerTimestamp> {
/// compaction of compute inputs is implicitly held back by Timely/DD.
input_read_holds: Vec<ReadHold<T>>,

/// Maximum frontier wallclock lag since the last introspection update.
/// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update.
///
/// If this is `None`, wallclock lag is not tracked for this collection.
wallclock_lag_max: Option<Duration>,
Expand Down
Loading