diff --git a/Cargo.lock b/Cargo.lock index 7267b10d80a26..213905aa20a1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7231,6 +7231,7 @@ dependencies = [ "mz-build-tools", "mz-ccsr", "mz-cluster-client", + "mz-controller-types", "mz-dyncfg", "mz-dyncfgs", "mz-kafka-util", diff --git a/doc/user/content/sql/system-catalog/mz_internal.md b/doc/user/content/sql/system-catalog/mz_internal.md index ae5325333216a..6f9f013ed4281 100644 --- a/doc/user/content/sql/system-catalog/mz_internal.md +++ b/doc/user/content/sql/system-catalog/mz_internal.md @@ -1251,6 +1251,8 @@ for each table, source, index, materialized view, and sink in the system. + + ## `mz_webhook_sources` diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index 4fac049f25859..ead438060d2d5 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -50,6 +50,7 @@ use mz_sql::session::user::{ MZ_SUPPORT_ROLE_ID, MZ_SYSTEM_ROLE_ID, SUPPORT_USER_NAME, SYSTEM_USER_NAME, }; use mz_storage_client::controller::IntrospectionType; +use mz_storage_client::healthcheck::WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC; use mz_storage_client::healthcheck::{ MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_PREPARED_STATEMENT_HISTORY_DESC, MZ_SESSION_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, @@ -3691,6 +3692,30 @@ ORDER BY object_id, occurred_at DESC", access: vec![PUBLIC_SELECT], }); +pub static MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW: LazyLock = + LazyLock::new(|| BuiltinSource { + name: "mz_wallclock_global_lag_histogram_raw", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::SOURCE_MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_OID, + desc: WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC.clone(), + data_source: IntrospectionType::WallclockLagHistogram, + is_retained_metrics_object: false, + access: vec![PUBLIC_SELECT], + }); + +pub static MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM: LazyLock = + LazyLock::new(|| BuiltinView { + name: "mz_wallclock_global_lag_histogram", + schema: MZ_INTERNAL_SCHEMA, + oid: oid::VIEW_MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_OID, + column_defs: None, + sql: " +SELECT *, count(*) AS count +FROM mz_internal.mz_wallclock_global_lag_histogram_raw +GROUP BY period_start, period_end, object_id, lag_seconds, labels", + access: vec![PUBLIC_SELECT], + }); + pub static MZ_MATERIALIZED_VIEW_REFRESHES: LazyLock = LazyLock::new(|| BuiltinSource { name: "mz_materialized_view_refreshes", @@ -9700,6 +9725,8 @@ pub static BUILTINS_STATIC: LazyLock>> = LazyLock::ne Builtin::View(&MZ_WALLCLOCK_GLOBAL_LAG_HISTORY), Builtin::View(&MZ_WALLCLOCK_GLOBAL_LAG_RECENT_HISTORY), Builtin::View(&MZ_WALLCLOCK_GLOBAL_LAG), + Builtin::Source(&MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW), + Builtin::View(&MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM), Builtin::Source(&MZ_MATERIALIZED_VIEW_REFRESHES), Builtin::Source(&MZ_COMPUTE_DEPENDENCIES), Builtin::Source(&MZ_COMPUTE_OPERATOR_HYDRATION_STATUSES_PER_WORKER), diff --git a/src/compute-client/src/controller.rs b/src/compute-client/src/controller.rs index 7a16f37b3b26f..85584db80fe5d 100644 --- a/src/compute-client/src/controller.rs +++ b/src/compute-client/src/controller.rs @@ -1081,7 +1081,7 @@ where let op = StorageWriteOp::Append { updates }; storage.update_introspection_collection(type_, op); } - WallclockLagHistory => { + WallclockLagHistory | WallclockLagHistogram => { storage.append_introspection_updates(type_, updates); } _ => panic!("unexpected introspection type: {type_:?}"), diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index 9f4230de27f18..55d1080d8eb61 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -26,7 +26,10 @@ use mz_compute_types::sinks::{ }; use mz_compute_types::sources::SourceInstanceDesc; use mz_compute_types::ComputeInstanceId; -use mz_controller_types::dyncfgs::WALLCLOCK_LAG_REFRESH_INTERVAL; +use mz_controller_types::dyncfgs::{ + ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION, WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL, + WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL, +}; use mz_dyncfg::ConfigSet; use mz_expr::RowSetFinishing; use mz_ore::cast::CastFrom; @@ -35,9 +38,10 @@ use mz_ore::now::NowFn; use mz_ore::tracing::OpenTelemetryContext; use mz_ore::{soft_assert_or_log, soft_panic_or_log}; use mz_repr::adt::interval::Interval; +use mz_repr::adt::timestamp::CheckedTimestamp; use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::{Datum, Diff, GlobalId, Row}; -use mz_storage_client::controller::IntrospectionType; +use mz_storage_client::controller::{IntrospectionType, WallclockLagHistogramPeriod}; use mz_storage_types::read_holds::{self, ReadHold}; use mz_storage_types::read_policy::ReadPolicy; use serde::Serialize; @@ -225,6 +229,10 @@ pub(super) struct Instance { /// controlled by it are allowed to affect changes to external systems /// (largely persist). read_only: bool, + /// The workload class of this instance. + /// + /// This is currently only used to annotate metrics. + workload_class: Option, /// The replicas of this compute instance. replicas: BTreeMap>, /// Currently installed compute collections. @@ -289,8 +297,10 @@ pub(super) struct Instance { now: NowFn, /// A function that computes the lag between the given time and wallclock time. wallclock_lag: WallclockLagFn, - /// The last time wallclock lag introspection was refreshed. - wallclock_lag_last_refresh: Instant, + /// The last time `WallclockLagHistory` introspection was refreshed. + wallclock_lag_history_last_refresh: Instant, + /// The last time `WallclockLagHistogram` introspection was refreshed. + wallclock_lag_histogram_last_refresh: Instant, /// Sender for updates to collection read holds. /// @@ -547,40 +557,45 @@ impl Instance { .set(u64::cast_from(connected_replica_count)); } - /// Refresh the `WallclockLagHistory` introspection and the `wallclock_lag_*_seconds` metrics - /// with the current lag values. + /// Refresh the wallclock lag introspection and metrics with the current lag values. /// /// This method is invoked by `ComputeController::maintain`, which we expect to be called once /// per second during normal operation. fn refresh_wallclock_lag(&mut self) { - let refresh_introspection = !self.read_only - && self.wallclock_lag_last_refresh.elapsed() - >= WALLCLOCK_LAG_REFRESH_INTERVAL.get(&self.dyncfg); - let mut introspection_updates = refresh_introspection.then(Vec::new); - - let now = mz_ore::now::to_datetime((self.now)()); - let now_tz = now.try_into().expect("must fit"); + let refresh_history = !self.read_only + && self.wallclock_lag_history_last_refresh.elapsed() + >= WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL.get(&self.dyncfg); + let refresh_histogram = !self.read_only + && self.wallclock_lag_histogram_last_refresh.elapsed() + >= WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL.get(&self.dyncfg); + + let now_ms = (self.now)(); + let now_dt = mz_ore::now::to_datetime(now_ms); + let now_ts: CheckedTimestamp<_> = now_dt.try_into().expect("must fit"); + + let frontier_lag = |frontier: &Antichain<_>| match frontier.as_option() { + Some(ts) => (self.wallclock_lag)(ts), + None => Duration::ZERO, + }; + let mut history_updates = Vec::new(); for (replica_id, replica) in &mut self.replicas { for (collection_id, collection) in &mut replica.collections { - let lag = match collection.write_frontier.as_option() { - Some(ts) => (self.wallclock_lag)(ts), - None => Duration::ZERO, - }; + let lag = frontier_lag(&collection.write_frontier); if let Some(wallclock_lag_max) = &mut collection.wallclock_lag_max { *wallclock_lag_max = std::cmp::max(*wallclock_lag_max, lag); - if let Some(updates) = &mut introspection_updates { + if refresh_history { let max_lag = std::mem::take(wallclock_lag_max); let max_lag_us = i64::try_from(max_lag.as_micros()).expect("must fit"); let row = Row::pack_slice(&[ Datum::String(&collection_id.to_string()), Datum::String(&replica_id.to_string()), Datum::Interval(Interval::new(0, 0, max_lag_us)), - Datum::TimestampTz(now_tz), + Datum::TimestampTz(now_ts), ]); - updates.push((row, 1)); + history_updates.push((row, 1)); } } @@ -589,10 +604,57 @@ impl Instance { }; } } + if !history_updates.is_empty() { + self.deliver_introspection_updates( + IntrospectionType::WallclockLagHistory, + history_updates, + ); + self.wallclock_lag_history_last_refresh = Instant::now(); + } + + let histogram_period = WallclockLagHistogramPeriod::from_epoch_millis(now_ms, &self.dyncfg); + let histogram_labels = match &self.workload_class { + Some(wc) => [("workload_class", wc.clone())].into(), + None => BTreeMap::new(), + }; + + let mut histogram_updates = Vec::new(); + let mut row_buf = Row::default(); + for (collection_id, collection) in &mut self.collections { + if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(&self.dyncfg) { + continue; + } + + if let Some(stash) = &mut collection.wallclock_lag_histogram_stash { + let lag = collection.shared.lock_write_frontier(|f| frontier_lag(f)); + let bucket = lag.as_secs().next_power_of_two(); + + let key = (histogram_period, bucket, histogram_labels.clone()); + *stash.entry(key).or_default() += 1; + + if refresh_histogram { + for ((period, lag, labels), count) in std::mem::take(stash) { + let mut packer = row_buf.packer(); + packer.extend([ + Datum::TimestampTz(period.start), + Datum::TimestampTz(period.end), + Datum::String(&collection_id.to_string()), + Datum::UInt64(lag), + ]); + let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v))); + packer.push_dict(labels); - if let Some(updates) = introspection_updates { - self.deliver_introspection_updates(IntrospectionType::WallclockLagHistory, updates); - self.wallclock_lag_last_refresh = Instant::now(); + histogram_updates.push((row_buf.clone(), count)); + } + } + } + } + if !histogram_updates.is_empty() { + self.deliver_introspection_updates( + IntrospectionType::WallclockLagHistogram, + histogram_updates, + ); + self.wallclock_lag_histogram_last_refresh = Instant::now(); } } @@ -797,6 +859,7 @@ impl Instance { storage_collections: _, initialized, read_only, + workload_class, replicas, collections, log_sources: _, @@ -813,7 +876,8 @@ impl Instance { dyncfg: _, now: _, wallclock_lag: _, - wallclock_lag_last_refresh, + wallclock_lag_history_last_refresh, + wallclock_lag_histogram_last_refresh, read_hold_tx: _, replica_tx: _, replica_rx: _, @@ -848,11 +912,14 @@ impl Instance { .iter() .map(|(id, epoch)| (id.to_string(), epoch)) .collect(); - let wallclock_lag_last_refresh = format!("{wallclock_lag_last_refresh:?}"); + let wallclock_lag_history_last_refresh = format!("{wallclock_lag_history_last_refresh:?}"); + let wallclock_lag_histogram_last_refresh = + format!("{wallclock_lag_histogram_last_refresh:?}"); let map = serde_json::Map::from_iter([ field("initialized", initialized)?, field("read_only", read_only)?, + field("workload_class", workload_class)?, field("replicas", replicas)?, field("collections", collections)?, field("peeks", peeks)?, @@ -860,7 +927,14 @@ impl Instance { field("copy_tos", copy_tos)?, field("envd_epoch", envd_epoch)?, field("replica_epochs", replica_epochs)?, - field("wallclock_lag_last_refresh", wallclock_lag_last_refresh)?, + field( + "wallclock_lag_history_last_refresh", + wallclock_lag_history_last_refresh, + )?, + field( + "wallclock_lag_histogram_last_refresh", + wallclock_lag_histogram_last_refresh, + )?, ]); Ok(serde_json::Value::Object(map)) } @@ -909,6 +983,7 @@ where storage_collections: storage, initialized: false, read_only: true, + workload_class: None, replicas: Default::default(), collections, log_sources, @@ -925,7 +1000,8 @@ where dyncfg, now, wallclock_lag, - wallclock_lag_last_refresh: Instant::now(), + wallclock_lag_history_last_refresh: Instant::now(), + wallclock_lag_histogram_last_refresh: Instant::now(), read_hold_tx, replica_tx, replica_rx, @@ -962,6 +1038,10 @@ where /// Update instance configuration. #[mz_ore::instrument(level = "debug")] pub fn update_configuration(&mut self, config_params: ComputeParameters) { + if let Some(workload_class) = &config_params.workload_class { + self.workload_class = workload_class.clone(); + } + self.send(ComputeCommand::UpdateConfiguration(config_params)); } @@ -2147,6 +2227,23 @@ struct CollectionState { /// Introspection state associated with this collection. introspection: CollectionIntrospection, + + /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram` + /// introspection update. + /// + /// Keys are `(period, lag, labels)` triples, values are counts. + /// + /// If this is `None`, wallclock lag is not tracked for this collection. + wallclock_lag_histogram_stash: Option< + BTreeMap< + ( + WallclockLagHistogramPeriod, + u64, + BTreeMap<&'static str, String>, + ), + i64, + >, + >, } impl CollectionState { @@ -2181,6 +2278,14 @@ impl CollectionState { c.update_iter(updates); }); + // In an effort to keep the produced wallclock lag introspection data small and + // predictable, we disable wallclock lag tracking for transient collections, i.e. slow-path + // select indexes and subscribes. + let wallclock_lag_histogram_stash = match collection_id.is_transient() { + true => None, + false => Some(Default::default()), + }; + Self { log_collection: false, dropped: false, @@ -2192,6 +2297,7 @@ impl CollectionState { storage_dependencies, compute_dependencies, introspection, + wallclock_lag_histogram_stash, } } @@ -2785,7 +2891,7 @@ struct ReplicaCollectionState { /// compaction of compute inputs is implicitly held back by Timely/DD. input_read_holds: Vec>, - /// Maximum frontier wallclock lag since the last introspection update. + /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update. /// /// If this is `None`, wallclock lag is not tracked for this collection. wallclock_lag_max: Option, diff --git a/src/controller-types/src/dyncfgs.rs b/src/controller-types/src/dyncfgs.rs index 65ba19f6d5799..1897347119f8e 100644 --- a/src/controller-types/src/dyncfgs.rs +++ b/src/controller-types/src/dyncfgs.rs @@ -26,11 +26,28 @@ pub const ENABLE_0DT_DEPLOYMENT_SOURCES: Config = Config::new( "Whether to enable zero-downtime deployments for sources that support it (experimental).", ); -/// The interval at which to refresh wallclock lag introspection. -pub const WALLCLOCK_LAG_REFRESH_INTERVAL: Config = Config::new( - "wallclock_lag_refresh_interval", +pub const WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL: Config = Config::new( + "wallclock_lag_history_refresh_interval", Duration::from_secs(60), - "The interval at which to refresh wallclock lag introspection.", + "The interval at which to refresh `WallclockLagHistory` introspection.", +); + +pub const ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION: Config = Config::new( + "enable_wallclock_lag_histogram_collection", + true, + "Whether to record `WallclockLagHistogram` introspection.", +); + +pub const WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL: Config = Config::new( + "wallclock_lag_histogram_refresh_interval", + Duration::from_secs(60), + "The interval at which to refresh `WallclockLagHistogram` introspection.", +); + +pub const WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL: Config = Config::new( + "wallclock_lag_histogram_period_interval", + Duration::from_secs(24 * 60 * 60), + "The period interval of histograms in `WallclockLagHistogram` introspection.", ); pub const ENABLE_TIMELY_ZERO_COPY: Config = Config::new( @@ -56,7 +73,10 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs .add(&CONTROLLER_PAST_GENERATION_REPLICA_CLEANUP_RETRY_INTERVAL) .add(&ENABLE_0DT_DEPLOYMENT_SOURCES) - .add(&WALLCLOCK_LAG_REFRESH_INTERVAL) + .add(&WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL) + .add(&ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION) + .add(&WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL) + .add(&WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL) .add(&ENABLE_TIMELY_ZERO_COPY) .add(&ENABLE_TIMELY_ZERO_COPY_LGALLOC) .add(&TIMELY_ZERO_COPY_LIMIT) diff --git a/src/controller/src/clusters.rs b/src/controller/src/clusters.rs index d36c1b277bbf7..28dcf67da260b 100644 --- a/src/controller/src/clusters.rs +++ b/src/controller/src/clusters.rs @@ -364,6 +364,8 @@ where id: ClusterId, workload_class: Option, ) -> Result<(), anyhow::Error> { + self.storage + .update_instance_workload_class(id, workload_class.clone()); self.compute .update_instance_workload_class(id, workload_class)?; Ok(()) diff --git a/src/environmentd/tests/testdata/http/ws b/src/environmentd/tests/testdata/http/ws index b1da95d5310c4..c11d1107ee924 100644 --- a/src/environmentd/tests/testdata/http/ws +++ b/src/environmentd/tests/testdata/http/ws @@ -402,7 +402,7 @@ ws-text ws-text {"query": "SELECT 1 FROM mz_sources LIMIT 1"} ---- -{"type":"Notice","payload":{"message":"{\n \"plans\": {\n \"raw\": {\n \"text\": \"Finish limit=1 output=[#0]\\n Project (#15)\\n Map (1)\\n Get mz_catalog.mz_sources\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 465\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"data\": [\n 45,\n 1\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n },\n \"optimized\": {\n \"global\": {\n \"text\": \"t72:\\n Finish limit=1 output=[#0]\\n ArrangeBy keys=[[#0]]\\n ReadGlobalFromSameDataflow t71\\n\\nt71:\\n Project (#15)\\n Map (1)\\n ReadIndex on=mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"t72\",\n \"plan\": {\n \"ArrangeBy\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"Transient\": 71\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ],\n \"keys\": []\n },\n \"access_strategy\": \"SameDataflow\"\n }\n },\n \"keys\": [\n [\n {\n \"Column\": 0\n }\n ]\n ]\n }\n }\n },\n {\n \"id\": \"t71\",\n \"plan\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 465\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n },\n \"access_strategy\": {\n \"Index\": [\n [\n {\n \"System\": 743\n },\n \"FullScan\"\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"Ok\": {\n \"data\": [\n 45,\n 1\n ]\n }\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n }\n ],\n \"sources\": []\n }\n },\n \"fast_path\": {\n \"text\": \"Explained Query (fast path):\\n Finish limit=1 output=[#0]\\n Project (#15)\\n Map (1)\\n ReadIndex on=mz_catalog.mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"Explained Query (fast path)\",\n \"plan\": {\n \"PeekExisting\": [\n {\n \"System\": 465\n },\n {\n \"System\": 743\n },\n null,\n {\n \"mfp\": {\n \"expressions\": [\n {\n \"Literal\": [\n {\n \"Ok\": {\n \"data\": [\n 45,\n 1\n ]\n }\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ],\n \"predicates\": [],\n \"projection\": [\n 15\n ],\n \"input_arity\": 15\n }\n }\n ]\n }\n }\n ],\n \"sources\": []\n }\n }\n }\n },\n \"insights\": {\n \"imports\": {\n \"s743\": {\n \"name\": {\n \"schema\": \"mz_catalog\",\n \"item\": \"mz_sources_ind\"\n },\n \"type\": \"compute\"\n }\n },\n \"fast_path_clusters\": {},\n \"fast_path_limit\": null,\n \"persist_count\": []\n },\n \"cluster\": {\n \"name\": \"mz_catalog_server\",\n \"id\": {\n \"System\": 2\n }\n },\n \"redacted_sql\": \"SELECT '' FROM [s465 AS mz_catalog.mz_sources] LIMIT ''\"\n}","code":"MZ001","severity":"notice"}} +{"type":"Notice","payload":{"message":"{\n \"plans\": {\n \"raw\": {\n \"text\": \"Finish limit=1 output=[#0]\\n Project (#15)\\n Map (1)\\n Get mz_catalog.mz_sources\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 465\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"data\": [\n 45,\n 1\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n },\n \"optimized\": {\n \"global\": {\n \"text\": \"t72:\\n Finish limit=1 output=[#0]\\n ArrangeBy keys=[[#0]]\\n ReadGlobalFromSameDataflow t71\\n\\nt71:\\n Project (#15)\\n Map (1)\\n ReadIndex on=mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"t72\",\n \"plan\": {\n \"ArrangeBy\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"Transient\": 71\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ],\n \"keys\": []\n },\n \"access_strategy\": \"SameDataflow\"\n }\n },\n \"keys\": [\n [\n {\n \"Column\": 0\n }\n ]\n ]\n }\n }\n },\n {\n \"id\": \"t71\",\n \"plan\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 465\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n },\n \"access_strategy\": {\n \"Index\": [\n [\n {\n \"System\": 745\n },\n \"FullScan\"\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"Ok\": {\n \"data\": [\n 45,\n 1\n ]\n }\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n }\n ],\n \"sources\": []\n }\n },\n \"fast_path\": {\n \"text\": \"Explained Query (fast path):\\n Finish limit=1 output=[#0]\\n Project (#15)\\n Map (1)\\n ReadIndex on=mz_catalog.mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"Explained Query (fast path)\",\n \"plan\": {\n \"PeekExisting\": [\n {\n \"System\": 465\n },\n {\n \"System\": 745\n },\n null,\n {\n \"mfp\": {\n \"expressions\": [\n {\n \"Literal\": [\n {\n \"Ok\": {\n \"data\": [\n 45,\n 1\n ]\n }\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ],\n \"predicates\": [],\n \"projection\": [\n 15\n ],\n \"input_arity\": 15\n }\n }\n ]\n }\n }\n ],\n \"sources\": []\n }\n }\n }\n },\n \"insights\": {\n \"imports\": {\n \"s745\": {\n \"name\": {\n \"schema\": \"mz_catalog\",\n \"item\": \"mz_sources_ind\"\n },\n \"type\": \"compute\"\n }\n },\n \"fast_path_clusters\": {},\n \"fast_path_limit\": null,\n \"persist_count\": []\n },\n \"cluster\": {\n \"name\": \"mz_catalog_server\",\n \"id\": {\n \"System\": 2\n }\n },\n \"redacted_sql\": \"SELECT '' FROM [s465 AS mz_catalog.mz_sources] LIMIT ''\"\n}","code":"MZ001","severity":"notice"}} {"type":"CommandStarting","payload":{"has_rows":true,"is_streaming":false}} {"type":"Rows","payload":{"columns":[{"name":"?column?","type_oid":23,"type_len":4,"type_mod":-1}]}} {"type":"Row","payload":["1"]} @@ -412,7 +412,7 @@ ws-text ws-text {"query": "SELECT 1 / 0 FROM mz_sources LIMIT 1"} ---- -{"type":"Notice","payload":{"message":"{\n \"plans\": {\n \"raw\": {\n \"text\": \"Finish limit=1 output=[#0]\\n Project (#15)\\n Map ((1 / 0))\\n Get mz_catalog.mz_sources\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 465\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"CallBinary\": {\n \"func\": \"DivInt32\",\n \"expr1\": {\n \"Literal\": [\n {\n \"data\": [\n 45,\n 1\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n },\n \"expr2\": {\n \"Literal\": [\n {\n \"data\": [\n 44\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n }\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n },\n \"optimized\": {\n \"global\": {\n \"text\": \"t75:\\n Finish limit=1 output=[#0]\\n ArrangeBy keys=[[#0]]\\n ReadGlobalFromSameDataflow t74\\n\\nt74:\\n Project (#15)\\n Map (error(\\\"division by zero\\\"))\\n ReadIndex on=mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"t75\",\n \"plan\": {\n \"ArrangeBy\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"Transient\": 74\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ],\n \"keys\": []\n },\n \"access_strategy\": \"SameDataflow\"\n }\n },\n \"keys\": [\n [\n {\n \"Column\": 0\n }\n ]\n ]\n }\n }\n },\n {\n \"id\": \"t74\",\n \"plan\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 465\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n },\n \"access_strategy\": {\n \"Index\": [\n [\n {\n \"System\": 743\n },\n \"FullScan\"\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"Err\": \"DivisionByZero\"\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n }\n ],\n \"sources\": []\n }\n },\n \"fast_path\": {\n \"text\": \"Explained Query (fast path):\\n Finish limit=1 output=[#0]\\n Project (#15)\\n Map (error(\\\"division by zero\\\"))\\n ReadIndex on=mz_catalog.mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"Explained Query (fast path)\",\n \"plan\": {\n \"PeekExisting\": [\n {\n \"System\": 465\n },\n {\n \"System\": 743\n },\n null,\n {\n \"mfp\": {\n \"expressions\": [\n {\n \"Literal\": [\n {\n \"Err\": \"DivisionByZero\"\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ],\n \"predicates\": [],\n \"projection\": [\n 15\n ],\n \"input_arity\": 15\n }\n }\n ]\n }\n }\n ],\n \"sources\": []\n }\n }\n }\n },\n \"insights\": {\n \"imports\": {\n \"s743\": {\n \"name\": {\n \"schema\": \"mz_catalog\",\n \"item\": \"mz_sources_ind\"\n },\n \"type\": \"compute\"\n }\n },\n \"fast_path_clusters\": {},\n \"fast_path_limit\": null,\n \"persist_count\": []\n },\n \"cluster\": {\n \"name\": \"mz_catalog_server\",\n \"id\": {\n \"System\": 2\n }\n },\n \"redacted_sql\": \"SELECT '' / '' FROM [s465 AS mz_catalog.mz_sources] LIMIT ''\"\n}","code":"MZ001","severity":"notice"}} +{"type":"Notice","payload":{"message":"{\n \"plans\": {\n \"raw\": {\n \"text\": \"Finish limit=1 output=[#0]\\n Project (#15)\\n Map ((1 / 0))\\n Get mz_catalog.mz_sources\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 465\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"CallBinary\": {\n \"func\": \"DivInt32\",\n \"expr1\": {\n \"Literal\": [\n {\n \"data\": [\n 45,\n 1\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n },\n \"expr2\": {\n \"Literal\": [\n {\n \"data\": [\n 44\n ]\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n }\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n },\n \"optimized\": {\n \"global\": {\n \"text\": \"t75:\\n Finish limit=1 output=[#0]\\n ArrangeBy keys=[[#0]]\\n ReadGlobalFromSameDataflow t74\\n\\nt74:\\n Project (#15)\\n Map (error(\\\"division by zero\\\"))\\n ReadIndex on=mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"t75\",\n \"plan\": {\n \"ArrangeBy\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"Transient\": 74\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ],\n \"keys\": []\n },\n \"access_strategy\": \"SameDataflow\"\n }\n },\n \"keys\": [\n [\n {\n \"Column\": 0\n }\n ]\n ]\n }\n }\n },\n {\n \"id\": \"t74\",\n \"plan\": {\n \"Project\": {\n \"input\": {\n \"Map\": {\n \"input\": {\n \"Get\": {\n \"id\": {\n \"Global\": {\n \"System\": 465\n }\n },\n \"typ\": {\n \"column_types\": [\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"Oid\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": false\n },\n {\n \"scalar_type\": {\n \"Array\": \"MzAclItem\"\n },\n \"nullable\": false\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n },\n {\n \"scalar_type\": \"String\",\n \"nullable\": true\n }\n ],\n \"keys\": [\n [\n 0\n ],\n [\n 1\n ]\n ]\n },\n \"access_strategy\": {\n \"Index\": [\n [\n {\n \"System\": 745\n },\n \"FullScan\"\n ]\n ]\n }\n }\n },\n \"scalars\": [\n {\n \"Literal\": [\n {\n \"Err\": \"DivisionByZero\"\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ]\n }\n },\n \"outputs\": [\n 15\n ]\n }\n }\n }\n ],\n \"sources\": []\n }\n },\n \"fast_path\": {\n \"text\": \"Explained Query (fast path):\\n Finish limit=1 output=[#0]\\n Project (#15)\\n Map (error(\\\"division by zero\\\"))\\n ReadIndex on=mz_catalog.mz_sources mz_sources_ind=[*** full scan ***]\\n\\nTarget cluster: mz_catalog_server\\n\",\n \"json\": {\n \"plans\": [\n {\n \"id\": \"Explained Query (fast path)\",\n \"plan\": {\n \"PeekExisting\": [\n {\n \"System\": 465\n },\n {\n \"System\": 745\n },\n null,\n {\n \"mfp\": {\n \"expressions\": [\n {\n \"Literal\": [\n {\n \"Err\": \"DivisionByZero\"\n },\n {\n \"scalar_type\": \"Int32\",\n \"nullable\": false\n }\n ]\n }\n ],\n \"predicates\": [],\n \"projection\": [\n 15\n ],\n \"input_arity\": 15\n }\n }\n ]\n }\n }\n ],\n \"sources\": []\n }\n }\n }\n },\n \"insights\": {\n \"imports\": {\n \"s745\": {\n \"name\": {\n \"schema\": \"mz_catalog\",\n \"item\": \"mz_sources_ind\"\n },\n \"type\": \"compute\"\n }\n },\n \"fast_path_clusters\": {},\n \"fast_path_limit\": null,\n \"persist_count\": []\n },\n \"cluster\": {\n \"name\": \"mz_catalog_server\",\n \"id\": {\n \"System\": 2\n }\n },\n \"redacted_sql\": \"SELECT '' / '' FROM [s465 AS mz_catalog.mz_sources] LIMIT ''\"\n}","code":"MZ001","severity":"notice"}} {"type":"CommandStarting","payload":{"has_rows":false,"is_streaming":false}} {"type":"Error","payload":{"message":"division by zero","code":"XX000"}} {"type":"ReadyForQuery","payload":"I"} diff --git a/src/pgrepr-consts/src/oid.rs b/src/pgrepr-consts/src/oid.rs index b33f9c08c617e..d3b66c38b27c5 100644 --- a/src/pgrepr-consts/src/oid.rs +++ b/src/pgrepr-consts/src/oid.rs @@ -774,3 +774,5 @@ pub const INDEX_MZ_CLUSTER_REPLICA_FRONTIERS_IND_OID: u32 = 17051; pub const INDEX_MZ_COMPUTE_HYDRATION_TIMES_IND_OID: u32 = 17052; pub const VIEW_MZ_MAPPABLE_OBJECTS_OID: u32 = 17053; pub const VIEW_MZ_WALLCLOCK_GLOBAL_LAG_OID: u32 = 17054; +pub const SOURCE_MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_OID: u32 = 17055; +pub const VIEW_MZ_WALLCLOCK_GLOBAL_LAG_HISTOGRAM_OID: u32 = 17056; diff --git a/src/storage-client/BUILD.bazel b/src/storage-client/BUILD.bazel index f58e59e8b9b9f..d253ad0e921ff 100644 --- a/src/storage-client/BUILD.bazel +++ b/src/storage-client/BUILD.bazel @@ -34,6 +34,8 @@ rust_library( ":mz_storage_client_build_script", "//src/ccsr:mz_ccsr", "//src/cluster-client:mz_cluster_client", + "//src/controller-types:mz_controller_types", + "//src/dyncfg:mz_dyncfg", "//src/dyncfgs:mz_dyncfgs", "//src/kafka-util:mz_kafka_util", "//src/ore:mz_ore", @@ -80,6 +82,7 @@ rust_test( "//src/build-info:mz_build_info", "//src/ccsr:mz_ccsr", "//src/cluster-client:mz_cluster_client", + "//src/controller-types:mz_controller_types", "//src/dyncfg:mz_dyncfg", "//src/dyncfgs:mz_dyncfgs", "//src/kafka-util:mz_kafka_util", @@ -107,6 +110,7 @@ rust_doc_test( "//src/build-info:mz_build_info", "//src/ccsr:mz_ccsr", "//src/cluster-client:mz_cluster_client", + "//src/controller-types:mz_controller_types", "//src/dyncfg:mz_dyncfg", "//src/dyncfgs:mz_dyncfgs", "//src/kafka-util:mz_kafka_util", diff --git a/src/storage-client/Cargo.toml b/src/storage-client/Cargo.toml index b78c79db645a6..4bb85fdbb18fd 100644 --- a/src/storage-client/Cargo.toml +++ b/src/storage-client/Cargo.toml @@ -20,6 +20,8 @@ itertools = { version = "0.12.1" } maplit = "1.0.2" mz-ccsr = { path = "../ccsr" } mz-cluster-client = { path = "../cluster-client" } +mz-controller-types = { path = "../controller-types" } +mz-dyncfg = { path = "../dyncfg" } mz-dyncfgs = { path = "../dyncfgs" } mz-kafka-util = { path = "../kafka-util" } mz-ore = { path = "../ore", features = ["async", "process", "tracing"] } diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index b7f6f335c8816..7c86f882082ff 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -27,11 +27,16 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use chrono::{DateTime, Utc}; use differential_dataflow::lattice::Lattice; use mz_cluster_client::client::ClusterReplicaLocation; use mz_cluster_client::ReplicaId; +use mz_controller_types::dyncfgs::WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL; +use mz_dyncfg::ConfigSet; +use mz_ore::soft_panic_or_log; use mz_persist_client::batch::ProtoBatch; use mz_persist_types::{Codec64, Opaque, ShardId}; +use mz_repr::adt::timestamp::CheckedTimestamp; use mz_repr::{Diff, GlobalId, RelationDesc, RelationVersion, Row}; use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::inline::InlinedConnection; @@ -69,6 +74,7 @@ pub enum IntrospectionType { ReplicaStatusHistory, ReplicaMetricsHistory, WallclockLagHistory, + WallclockLagHistogram, // Note that this single-shard introspection source will be changed to per-replica, // once we allow multiplexing multiple sources/sinks on a single cluster. @@ -377,6 +383,13 @@ pub trait StorageController: Debug { /// Panics if a storage instance with the given ID does not exist. fn drop_instance(&mut self, id: StorageInstanceId); + /// Updates a storage instance's workload class. + fn update_instance_workload_class( + &mut self, + id: StorageInstanceId, + workload_class: Option, + ); + /// Connects the storage instance to the specified replica. /// /// If the storage instance is already attached to a replica, communication @@ -836,6 +849,36 @@ impl MonotonicAppender { } } +/// The period covered by a wallclock lag histogram, represented as a `[start, end)` range. +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct WallclockLagHistogramPeriod { + pub start: CheckedTimestamp>, + pub end: CheckedTimestamp>, +} + +impl WallclockLagHistogramPeriod { + /// Construct a `WallclockLagHistogramPeriod` from the given epoch timestamp and dyncfg. + pub fn from_epoch_millis(epoch_ms: u64, dyncfg: &ConfigSet) -> Self { + let interval = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.get(dyncfg); + let interval_ms = u64::try_from(interval.as_millis()).unwrap_or_else(|_| { + soft_panic_or_log!("excessive wallclock lag histogram period interval: {interval:?}"); + let default = WALLCLOCK_LAG_HISTOGRAM_PERIOD_INTERVAL.default(); + u64::try_from(default.as_millis()).unwrap() + }); + let interval_ms = std::cmp::max(interval_ms, 1); + + let start_ms = epoch_ms - (epoch_ms % interval_ms); + let start_dt = mz_ore::now::to_datetime(start_ms); + let start = start_dt.try_into().expect("must fit"); + + let end_ms = start_ms + interval_ms; + let end_dt = mz_ore::now::to_datetime(end_ms); + let end = end_dt.try_into().expect("must fit"); + + Self { start, end } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/storage-client/src/healthcheck.rs b/src/storage-client/src/healthcheck.rs index 8627e66313a75..cc33a00912412 100644 --- a/src/storage-client/src/healthcheck.rs +++ b/src/storage-client/src/healthcheck.rs @@ -180,3 +180,19 @@ pub static WALLCLOCK_LAG_HISTORY_DESC: LazyLock = LazyLock::new(|| ) .finish() }); + +pub static WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC: LazyLock = LazyLock::new(|| { + RelationDesc::builder() + .with_column( + "period_start", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .with_column( + "period_end", + ScalarType::TimestampTz { precision: None }.nullable(false), + ) + .with_column("object_id", ScalarType::String.nullable(false)) + .with_column("lag_seconds", ScalarType::UInt64.nullable(false)) + .with_column("labels", ScalarType::Jsonb.nullable(false)) + .finish() +}); diff --git a/src/storage-controller/src/collection_mgmt.rs b/src/storage-controller/src/collection_mgmt.rs index 8020ccfaa3b0a..4cdd0cb9687d0 100644 --- a/src/storage-controller/src/collection_mgmt.rs +++ b/src/storage-controller/src/collection_mgmt.rs @@ -90,14 +90,15 @@ use mz_storage_client::client::{AppendOnlyUpdate, Status, TimestamplessUpdate}; use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp}; use mz_storage_client::healthcheck::{ MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC, - WALLCLOCK_LAG_HISTORY_DESC, + WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC, WALLCLOCK_LAG_HISTORY_DESC, }; use mz_storage_client::metrics::StorageControllerMetrics; use mz_storage_client::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate}; use mz_storage_client::storage_collections::StorageCollections; use mz_storage_types::controller::InvalidUpper; use mz_storage_types::dyncfgs::{ - REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL, + REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL, + WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL, }; use mz_storage_types::parameters::{ StorageParameters, STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, @@ -677,6 +678,7 @@ where introspection_type @ IntrospectionType::ReplicaMetricsHistory | introspection_type @ IntrospectionType::WallclockLagHistory + | introspection_type @ IntrospectionType::WallclockLagHistogram | introspection_type @ IntrospectionType::PreparedStatementHistory | introspection_type @ IntrospectionType::StatementExecutionHistory | introspection_type @ IntrospectionType::SessionHistory @@ -1081,6 +1083,7 @@ where Some(IntrospectionType::ReplicaMetricsHistory) | Some(IntrospectionType::WallclockLagHistory) + | Some(IntrospectionType::WallclockLagHistogram) | Some(IntrospectionType::PrivatelinkConnectionStatusHistory) | Some(IntrospectionType::ReplicaStatusHistory) | Some(IntrospectionType::PreparedStatementHistory) @@ -1143,7 +1146,9 @@ where return; }; let initial_statuses = match introspection_type { - IntrospectionType::ReplicaMetricsHistory | IntrospectionType::WallclockLagHistory => { + IntrospectionType::ReplicaMetricsHistory + | IntrospectionType::WallclockLagHistory + | IntrospectionType::WallclockLagHistogram => { let result = partially_truncate_metrics_history( self.id, introspection_type, @@ -1481,6 +1486,13 @@ where .expect("schema has not changed") .0, ), + IntrospectionType::WallclockLagHistogram => ( + WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL.get(&config_set), + WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC + .get_by_name(&ColumnName::from("period_start")) + .expect("schema has not changed") + .0, + ), _ => panic!("not a metrics history: {introspection_type:?}"), }; diff --git a/src/storage-controller/src/instance.rs b/src/storage-controller/src/instance.rs index c6ac18f7ae682..ce8c96dda3f9f 100644 --- a/src/storage-controller/src/instance.rs +++ b/src/storage-controller/src/instance.rs @@ -55,6 +55,10 @@ use crate::history::CommandHistory; /// lead to panics. #[derive(Debug)] pub(crate) struct Instance { + /// The workload class of this instance. + /// + /// This is currently only used to annotate metrics. + pub workload_class: Option, /// The replicas connected to this storage instance. replicas: BTreeMap>, /// The ingestions currently running on this instance. @@ -114,6 +118,7 @@ where let epoch = ClusterStartupEpoch::new(envd_epoch, 0); let mut instance = Self { + workload_class: None, replicas: Default::default(), active_ingestions: Default::default(), ingestion_exports: Default::default(), diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 1cc03aa207e40..f6c494c335c68 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -28,7 +28,10 @@ use mz_build_info::BuildInfo; use mz_cluster_client::client::ClusterReplicaLocation; use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics}; use mz_cluster_client::{ReplicaId, WallclockLagFn}; -use mz_controller_types::dyncfgs::{ENABLE_0DT_DEPLOYMENT_SOURCES, WALLCLOCK_LAG_REFRESH_INTERVAL}; +use mz_controller_types::dyncfgs::{ + ENABLE_0DT_DEPLOYMENT_SOURCES, ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION, + WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL, WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL, +}; use mz_ore::collections::CollectionExt; use mz_ore::metrics::MetricsRegistry; use mz_ore::now::{EpochMillis, NowFn}; @@ -54,7 +57,7 @@ use mz_storage_client::client::{ use mz_storage_client::controller::{ BoxFuture, CollectionDescription, DataSource, ExportDescription, ExportState, IntrospectionType, MonotonicAppender, PersistEpoch, Response, StorageController, - StorageMetadata, StorageTxn, StorageWriteOp, + StorageMetadata, StorageTxn, StorageWriteOp, WallclockLagHistogramPeriod, }; use mz_storage_client::healthcheck::{ MZ_AWS_PRIVATELINK_CONNECTION_STATUS_HISTORY_DESC, MZ_SINK_STATUS_HISTORY_DESC, @@ -220,8 +223,10 @@ pub struct Controller + Tim /// A function that computes the lag between the given time and wallclock time. #[derivative(Debug = "ignore")] wallclock_lag: WallclockLagFn, - /// The last time wallclock lag introspection was refreshed. - wallclock_lag_last_refresh: Instant, + /// The last time `WallclockLagHistory` introspection was refreshed. + wallclock_lag_history_last_refresh: Instant, + /// The last time `WallclockLagHistogram` introspection was refreshed. + wallclock_lag_histogram_last_refresh: Instant, /// Handle to a [StorageCollections]. storage_collections: Arc + Send + Sync>, @@ -530,6 +535,19 @@ where assert!(instance.is_some(), "storage instance {id} does not exist"); } + fn update_instance_workload_class( + &mut self, + id: StorageInstanceId, + workload_class: Option, + ) { + let instance = self + .instances + .get_mut(&id) + .unwrap_or_else(|| panic!("instance {id} does not exist")); + + instance.workload_class = workload_class; + } + fn connect_replica( &mut self, instance_id: StorageInstanceId, @@ -1123,13 +1141,8 @@ where } let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(id, maybe_instance_id); - let collection_state = CollectionState { - data_source, - collection_metadata: metadata, - extra_state, - wallclock_lag_max: Default::default(), - wallclock_lag_metrics, - }; + let collection_state = + CollectionState::new(data_source, metadata, extra_state, wallclock_lag_metrics); self.collections.insert(id, collection_state); } @@ -1455,13 +1468,12 @@ where }; // TODO(alter_table): Support schema evolution on sources. let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None); - let collection_state = CollectionState:: { - data_source: collection_desc.data_source.clone(), - collection_metadata: collection_meta, - extra_state: CollectionStateExtra::None, - wallclock_lag_max: Default::default(), + let collection_state = CollectionState::new( + collection_desc.data_source.clone(), + collection_meta, + CollectionStateExtra::None, wallclock_lag_metrics, - }; + ); // Great! We have successfully evolved the schema of our Table, now we need to update our // in-memory data structures. @@ -2675,7 +2687,8 @@ where recorded_frontiers: BTreeMap::new(), recorded_replica_frontiers: BTreeMap::new(), wallclock_lag, - wallclock_lag_last_refresh: Instant::now(), + wallclock_lag_history_last_refresh: Instant::now(), + wallclock_lag_histogram_last_refresh: Instant::now(), storage_collections, migrated_storage_collections: BTreeSet::new(), maintenance_ticker, @@ -3447,8 +3460,7 @@ where .differential_append(id, replica_updates); } - /// Refresh the `WallclockLagHistory` introspection and the `wallclock_lag_*_seconds` metrics - /// with the current lag values. + /// Refresh the wallclock lag introspection and metrics with the current lag values. /// /// We measure the lag of write frontiers behind the wallclock time every second and track the /// maximum over 60 measurements (i.e., one minute). Every minute, we emit a new lag event to @@ -3457,48 +3469,105 @@ where /// This method is invoked by `ComputeController::maintain`, which we expect to be called once /// per second during normal operation. fn refresh_wallclock_lag(&mut self) { - let refresh_introspection = !self.read_only - && self.wallclock_lag_last_refresh.elapsed() - >= WALLCLOCK_LAG_REFRESH_INTERVAL.get(self.config.config_set()); - let mut introspection_updates = refresh_introspection.then(Vec::new); + let refresh_history = !self.read_only + && self.wallclock_lag_history_last_refresh.elapsed() + >= WALLCLOCK_LAG_HISTORY_REFRESH_INTERVAL.get(self.config.config_set()); + let refresh_histogram = !self.read_only + && self.wallclock_lag_histogram_last_refresh.elapsed() + >= WALLCLOCK_LAG_HISTOGRAM_REFRESH_INTERVAL.get(self.config.config_set()); + + let now_ms = (self.now)(); + let now_dt = mz_ore::now::to_datetime(now_ms); + let now_ts: CheckedTimestamp<_> = now_dt.try_into().expect("must fit"); - let now = mz_ore::now::to_datetime((self.now)()); - let now_tz = now.try_into().expect("must fit"); + let histogram_period = + WallclockLagHistogramPeriod::from_epoch_millis(now_ms, self.config.config_set()); let frontier_lag = |frontier: &Antichain<_>| match frontier.as_option() { Some(ts) => (self.wallclock_lag)(ts), None => Duration::ZERO, }; - let pack_row = |id: GlobalId, lag: Duration| { - let lag_us = i64::try_from(lag.as_micros()).expect("must fit"); - Row::pack_slice(&[ - Datum::String(&id.to_string()), - Datum::Null, - Datum::Interval(Interval::new(0, 0, lag_us)), - Datum::TimestampTz(now_tz), - ]) - }; + let mut history_updates = Vec::new(); + let mut histogram_updates = Vec::new(); + let mut row_buf = Row::default(); for frontiers in self.storage_collections.active_collection_frontiers() { let id = frontiers.id; let Some(collection) = self.collections.get_mut(&id) else { continue; }; + let lag = frontier_lag(&frontiers.write_frontier); collection.wallclock_lag_max = std::cmp::max(collection.wallclock_lag_max, lag); - if let Some(updates) = &mut introspection_updates { - let lag = std::mem::take(&mut collection.wallclock_lag_max); - let row = pack_row(id, lag); - updates.push((row, 1)); + if refresh_history { + let max_lag = std::mem::take(&mut collection.wallclock_lag_max); + let max_lag_us = i64::try_from(max_lag.as_micros()).expect("must fit"); + let row = Row::pack_slice(&[ + Datum::String(&id.to_string()), + Datum::Null, + Datum::Interval(Interval::new(0, 0, max_lag_us)), + Datum::TimestampTz(now_ts), + ]); + history_updates.push((row, 1)); } collection.wallclock_lag_metrics.observe(lag); + + if !ENABLE_WALLCLOCK_LAG_HISTOGRAM_COLLECTION.get(self.config.config_set()) { + continue; + } + + if let Some(stash) = &mut collection.wallclock_lag_histogram_stash { + let bucket = lag.as_secs().next_power_of_two(); + + let instance_id = match &collection.extra_state { + CollectionStateExtra::Ingestion(i) => Some(i.instance_id), + CollectionStateExtra::Export(e) => Some(e.cluster_id()), + CollectionStateExtra::None => None, + }; + let workload_class = instance_id + .and_then(|id| self.instances.get(&id)) + .and_then(|i| i.workload_class.clone()); + let labels = match workload_class { + Some(wc) => [("workload_class", wc.clone())].into(), + None => BTreeMap::new(), + }; + + let key = (histogram_period, bucket, labels); + *stash.entry(key).or_default() += 1; + + if refresh_histogram { + for ((period, lag, labels), count) in std::mem::take(stash) { + let mut packer = row_buf.packer(); + packer.extend([ + Datum::TimestampTz(period.start), + Datum::TimestampTz(period.end), + Datum::String(&id.to_string()), + Datum::UInt64(lag), + ]); + let labels = labels.iter().map(|(k, v)| (*k, Datum::String(v))); + packer.push_dict(labels); + + histogram_updates.push((row_buf.clone(), count)); + } + } + } } - if let Some(updates) = introspection_updates { - self.append_introspection_updates(IntrospectionType::WallclockLagHistory, updates); - self.wallclock_lag_last_refresh = Instant::now(); + if !history_updates.is_empty() { + self.append_introspection_updates( + IntrospectionType::WallclockLagHistory, + history_updates, + ); + self.wallclock_lag_history_last_refresh = Instant::now(); + } + if !histogram_updates.is_empty() { + self.append_introspection_updates( + IntrospectionType::WallclockLagHistogram, + histogram_updates, + ); + self.wallclock_lag_histogram_last_refresh = Instant::now(); } } @@ -3537,6 +3606,7 @@ impl From<&IntrospectionType> for CollectionManagerKind { | IntrospectionType::ReplicaStatusHistory | IntrospectionType::ReplicaMetricsHistory | IntrospectionType::WallclockLagHistory + | IntrospectionType::WallclockLagHistogram | IntrospectionType::PreparedStatementHistory | IntrospectionType::StatementExecutionHistory | IntrospectionType::SessionHistory @@ -3621,12 +3691,54 @@ struct CollectionState { pub extra_state: CollectionStateExtra, - /// Maximum frontier wallclock lag since the last introspection update. + /// Maximum frontier wallclock lag since the last `WallclockLagHistory` introspection update. wallclock_lag_max: Duration, + /// Frontier wallclock lag measurements stashed until the next `WallclockLagHistogram` + /// introspection update. + /// + /// Keys are `(period, lag, labels)` triples, values are counts. + /// + /// If this is `None`, wallclock lag is not tracked for this collection. + wallclock_lag_histogram_stash: Option< + BTreeMap< + ( + WallclockLagHistogramPeriod, + u64, + BTreeMap<&'static str, String>, + ), + i64, + >, + >, /// Frontier wallclock lag metrics tracked for this collection. wallclock_lag_metrics: WallclockLagMetrics, } +impl CollectionState { + fn new( + data_source: DataSource, + collection_metadata: CollectionMetadata, + extra_state: CollectionStateExtra, + wallclock_lag_metrics: WallclockLagMetrics, + ) -> Self { + // Only collect wallclock lag histogram data for collections written by storage, to avoid + // duplicate measurements. Collections written by other components (e.g. compute) have + // their wallclock lags recorded by these components. + let wallclock_lag_histogram_stash = match &data_source { + DataSource::Other => None, + _ => Some(Default::default()), + }; + + Self { + data_source, + collection_metadata, + extra_state, + wallclock_lag_max: Default::default(), + wallclock_lag_histogram_stash, + wallclock_lag_metrics, + } + } +} + /// Additional state that the controller maintains for select collection types. #[derive(Debug)] enum CollectionStateExtra { diff --git a/src/storage-types/src/dyncfgs.rs b/src/storage-types/src/dyncfgs.rs index a525b5d06641c..f10c079d1a6d1 100644 --- a/src/storage-types/src/dyncfgs.rs +++ b/src/storage-types/src/dyncfgs.rs @@ -71,6 +71,13 @@ pub const WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL: Config = Config::n "The interval of time to keep when truncating the wallclock lag history.", ); +/// The interval of time to keep when truncating the wallclock lag histogram. +pub const WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL: Config = Config::new( + "wallclock_global_lag_histogram_retention_interval", + Duration::from_secs(60 * 60 * 24 * 30), // 30 days + "The interval of time to keep when truncating the wallclock lag histogram.", +); + // Kafka /// Rules for enriching the `client.id` property of Kafka clients with @@ -281,6 +288,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&STORAGE_DOWNGRADE_SINCE_DURING_FINALIZATION) .add(&REPLICA_METRICS_HISTORY_RETENTION_INTERVAL) .add(&WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL) + .add(&WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL) .add(&KAFKA_CLIENT_ID_ENRICHMENT_RULES) .add(&KAFKA_POLL_MAX_WAIT) .add(&KAFKA_METADATA_FETCH_INTERVAL) diff --git a/test/sqllogictest/autogenerated/mz_internal.slt b/test/sqllogictest/autogenerated/mz_internal.slt index df92a423678b2..e772692978287 100644 --- a/test/sqllogictest/autogenerated/mz_internal.slt +++ b/test/sqllogictest/autogenerated/mz_internal.slt @@ -777,6 +777,8 @@ mz_storage_usage_by_shard mz_subscriptions mz_type_pg_metadata mz_wallclock_global_lag +mz_wallclock_global_lag_histogram +mz_wallclock_global_lag_histogram_raw mz_wallclock_global_lag_history mz_wallclock_global_lag_recent_history mz_wallclock_lag_history diff --git a/test/sqllogictest/information_schema_tables.slt b/test/sqllogictest/information_schema_tables.slt index e087f55c8a107..ac2fbee2b8a2c 100644 --- a/test/sqllogictest/information_schema_tables.slt +++ b/test/sqllogictest/information_schema_tables.slt @@ -725,6 +725,14 @@ mz_wallclock_global_lag VIEW materialize mz_internal +mz_wallclock_global_lag_histogram +VIEW +materialize +mz_internal +mz_wallclock_global_lag_histogram_raw +SOURCE +materialize +mz_internal mz_wallclock_global_lag_history VIEW materialize diff --git a/test/sqllogictest/mz_catalog_server_index_accounting.slt b/test/sqllogictest/mz_catalog_server_index_accounting.slt index fd219feb61fe2..b3ba2eaaca6e3 100644 --- a/test/sqllogictest/mz_catalog_server_index_accounting.slt +++ b/test/sqllogictest/mz_catalog_server_index_accounting.slt @@ -37,8 +37,8 @@ mz_arrangement_heap_capacity_raw_s2_primary_idx CREATE␠INDEX␠"mz_arrangemen mz_arrangement_heap_size_raw_s2_primary_idx CREATE␠INDEX␠"mz_arrangement_heap_size_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_arrangement_heap_size_raw"␠("operator_id",␠"worker_id") mz_arrangement_records_raw_s2_primary_idx CREATE␠INDEX␠"mz_arrangement_records_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_arrangement_records_raw"␠("operator_id",␠"worker_id") mz_arrangement_sharing_raw_s2_primary_idx CREATE␠INDEX␠"mz_arrangement_sharing_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_arrangement_sharing_raw"␠("operator_id",␠"worker_id") -mz_cluster_deployment_lineage_ind CREATE␠INDEX␠"mz_cluster_deployment_lineage_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s723␠AS␠"mz_internal"."mz_cluster_deployment_lineage"]␠("cluster_id") -mz_cluster_replica_frontiers_ind CREATE␠INDEX␠"mz_cluster_replica_frontiers_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s718␠AS␠"mz_catalog"."mz_cluster_replica_frontiers"]␠("object_id") +mz_cluster_deployment_lineage_ind CREATE␠INDEX␠"mz_cluster_deployment_lineage_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s725␠AS␠"mz_internal"."mz_cluster_deployment_lineage"]␠("cluster_id") +mz_cluster_replica_frontiers_ind CREATE␠INDEX␠"mz_cluster_replica_frontiers_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s720␠AS␠"mz_catalog"."mz_cluster_replica_frontiers"]␠("object_id") mz_cluster_replica_history_ind CREATE␠INDEX␠"mz_cluster_replica_history_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s584␠AS␠"mz_internal"."mz_cluster_replica_history"]␠("dropped_at") mz_cluster_replica_metrics_history_ind CREATE␠INDEX␠"mz_cluster_replica_metrics_history_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s496␠AS␠"mz_internal"."mz_cluster_replica_metrics_history"]␠("replica_id") mz_cluster_replica_metrics_ind CREATE␠INDEX␠"mz_cluster_replica_metrics_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s495␠AS␠"mz_internal"."mz_cluster_replica_metrics"]␠("replica_id") @@ -51,17 +51,17 @@ mz_clusters_ind CREATE␠INDEX␠"mz_clusters_ind"␠IN␠CLUSTER␠[s2]␠ON mz_columns_ind CREATE␠INDEX␠"mz_columns_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s461␠AS␠"mz_catalog"."mz_columns"]␠("name") mz_comments_ind CREATE␠INDEX␠"mz_comments_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s511␠AS␠"mz_internal"."mz_comments"]␠("id") mz_compute_dataflow_global_ids_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_compute_dataflow_global_ids_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_compute_dataflow_global_ids_per_worker"␠("id",␠"worker_id") -mz_compute_dependencies_ind CREATE␠INDEX␠"mz_compute_dependencies_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s706␠AS␠"mz_internal"."mz_compute_dependencies"]␠("dependency_id") +mz_compute_dependencies_ind CREATE␠INDEX␠"mz_compute_dependencies_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s708␠AS␠"mz_internal"."mz_compute_dependencies"]␠("dependency_id") mz_compute_error_counts_raw_s2_primary_idx CREATE␠INDEX␠"mz_compute_error_counts_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_compute_error_counts_raw"␠("export_id",␠"worker_id") mz_compute_exports_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_compute_exports_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_compute_exports_per_worker"␠("export_id",␠"worker_id") mz_compute_frontiers_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_compute_frontiers_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_compute_frontiers_per_worker"␠("export_id",␠"worker_id") -mz_compute_hydration_times_ind CREATE␠INDEX␠"mz_compute_hydration_times_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s714␠AS␠"mz_internal"."mz_compute_hydration_times"]␠("replica_id") +mz_compute_hydration_times_ind CREATE␠INDEX␠"mz_compute_hydration_times_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s716␠AS␠"mz_internal"."mz_compute_hydration_times"]␠("replica_id") mz_compute_hydration_times_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_compute_hydration_times_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_compute_hydration_times_per_worker"␠("export_id",␠"worker_id") mz_compute_import_frontiers_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_compute_import_frontiers_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_compute_import_frontiers_per_worker"␠("export_id",␠"import_id",␠"worker_id") mz_compute_lir_mapping_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_compute_lir_mapping_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_compute_lir_mapping_per_worker"␠("global_id",␠"lir_id",␠"worker_id") mz_compute_operator_durations_histogram_raw_s2_primary_idx CREATE␠INDEX␠"mz_compute_operator_durations_histogram_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_compute_operator_durations_histogram_raw"␠("id",␠"worker_id",␠"duration_ns") mz_connections_ind CREATE␠INDEX␠"mz_connections_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s492␠AS␠"mz_catalog"."mz_connections"]␠("schema_id") -mz_console_cluster_utilization_overview_ind CREATE␠INDEX␠"mz_console_cluster_utilization_overview_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s710␠AS␠"mz_internal"."mz_console_cluster_utilization_overview"]␠("cluster_id") +mz_console_cluster_utilization_overview_ind CREATE␠INDEX␠"mz_console_cluster_utilization_overview_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s712␠AS␠"mz_internal"."mz_console_cluster_utilization_overview"]␠("cluster_id") mz_continual_tasks_ind CREATE␠INDEX␠"mz_continual_tasks_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s514␠AS␠"mz_internal"."mz_continual_tasks"]␠("id") mz_databases_ind CREATE␠INDEX␠"mz_databases_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s459␠AS␠"mz_catalog"."mz_databases"]␠("name") mz_dataflow_addresses_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_addresses_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_addresses_per_worker"␠("id",␠"worker_id") @@ -77,7 +77,7 @@ mz_message_batch_counts_received_raw_s2_primary_idx CREATE␠INDEX␠"mz_messag mz_message_batch_counts_sent_raw_s2_primary_idx CREATE␠INDEX␠"mz_message_batch_counts_sent_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_message_batch_counts_sent_raw"␠("channel_id",␠"from_worker_id",␠"to_worker_id") mz_message_counts_received_raw_s2_primary_idx CREATE␠INDEX␠"mz_message_counts_received_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_message_counts_received_raw"␠("channel_id",␠"from_worker_id",␠"to_worker_id") mz_message_counts_sent_raw_s2_primary_idx CREATE␠INDEX␠"mz_message_counts_sent_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_message_counts_sent_raw"␠("channel_id",␠"from_worker_id",␠"to_worker_id") -mz_notices_ind CREATE␠INDEX␠"mz_notices_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s790␠AS␠"mz_internal"."mz_notices"]␠("id") +mz_notices_ind CREATE␠INDEX␠"mz_notices_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s792␠AS␠"mz_internal"."mz_notices"]␠("id") mz_object_dependencies_ind CREATE␠INDEX␠"mz_object_dependencies_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s458␠AS␠"mz_internal"."mz_object_dependencies"]␠("object_id") mz_object_history_ind CREATE␠INDEX␠"mz_object_history_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s522␠AS␠"mz_internal"."mz_object_history"]␠("id") mz_object_lifetimes_ind CREATE␠INDEX␠"mz_object_lifetimes_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s523␠AS␠"mz_internal"."mz_object_lifetimes"]␠("id") @@ -86,14 +86,14 @@ mz_objects_ind CREATE␠INDEX␠"mz_objects_ind"␠IN␠CLUSTER␠[s2]␠ON␠[ mz_peek_durations_histogram_raw_s2_primary_idx CREATE␠INDEX␠"mz_peek_durations_histogram_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_peek_durations_histogram_raw"␠("worker_id",␠"type",␠"duration_ns") mz_recent_activity_log_thinned_ind CREATE␠INDEX␠"mz_recent_activity_log_thinned_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s683␠AS␠"mz_internal"."mz_recent_activity_log_thinned"]␠("sql_hash") mz_recent_sql_text_ind CREATE␠INDEX␠"mz_recent_sql_text_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s679␠AS␠"mz_internal"."mz_recent_sql_text"]␠("sql_hash") -mz_recent_storage_usage_ind CREATE␠INDEX␠"mz_recent_storage_usage_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s782␠AS␠"mz_catalog"."mz_recent_storage_usage"]␠("object_id") +mz_recent_storage_usage_ind CREATE␠INDEX␠"mz_recent_storage_usage_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s784␠AS␠"mz_catalog"."mz_recent_storage_usage"]␠("object_id") mz_roles_ind CREATE␠INDEX␠"mz_roles_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s481␠AS␠"mz_catalog"."mz_roles"]␠("id") mz_scheduling_elapsed_raw_s2_primary_idx CREATE␠INDEX␠"mz_scheduling_elapsed_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_scheduling_elapsed_raw"␠("id",␠"worker_id") mz_scheduling_parks_histogram_raw_s2_primary_idx CREATE␠INDEX␠"mz_scheduling_parks_histogram_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_scheduling_parks_histogram_raw"␠("worker_id",␠"slept_for_ns",␠"requested_ns") mz_schemas_ind CREATE␠INDEX␠"mz_schemas_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s460␠AS␠"mz_catalog"."mz_schemas"]␠("database_id") mz_secrets_ind CREATE␠INDEX␠"mz_secrets_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s491␠AS␠"mz_catalog"."mz_secrets"]␠("name") mz_show_all_objects_ind CREATE␠INDEX␠"mz_show_all_objects_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s568␠AS␠"mz_internal"."mz_show_all_objects"]␠("schema_id") -mz_show_cluster_replicas_ind CREATE␠INDEX␠"mz_show_cluster_replicas_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s721␠AS␠"mz_internal"."mz_show_cluster_replicas"]␠("cluster") +mz_show_cluster_replicas_ind CREATE␠INDEX␠"mz_show_cluster_replicas_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s723␠AS␠"mz_internal"."mz_show_cluster_replicas"]␠("cluster") mz_show_clusters_ind CREATE␠INDEX␠"mz_show_clusters_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s570␠AS␠"mz_internal"."mz_show_clusters"]␠("name") mz_show_columns_ind CREATE␠INDEX␠"mz_show_columns_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s569␠AS␠"mz_internal"."mz_show_columns"]␠("id") mz_show_connections_ind CREATE␠INDEX␠"mz_show_connections_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s578␠AS␠"mz_internal"."mz_show_connections"]␠("schema_id") diff --git a/test/sqllogictest/oid.slt b/test/sqllogictest/oid.slt index 049d6d8168212..771f79b229ee2 100644 --- a/test/sqllogictest/oid.slt +++ b/test/sqllogictest/oid.slt @@ -1164,3 +1164,5 @@ SELECT oid, name FROM mz_objects WHERE id LIKE 's%' AND oid < 20000 ORDER BY oid 17052 mz_compute_hydration_times_ind 17053 mz_mappable_objects 17054 mz_wallclock_global_lag +17055 mz_wallclock_global_lag_histogram_raw +17056 mz_wallclock_global_lag_histogram diff --git a/test/testdrive-old-kafka-src-syntax/wallclock-lag.td b/test/testdrive-old-kafka-src-syntax/wallclock-lag.td deleted file mode 100644 index f49e50994826c..0000000000000 --- a/test/testdrive-old-kafka-src-syntax/wallclock-lag.td +++ /dev/null @@ -1,98 +0,0 @@ -# Copyright Materialize, Inc. and contributors. All rights reserved. -# -# Use of this software is governed by the Business Source License -# included in the LICENSE file at the root of this repository. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0. - -# Test the contents of `mz_wallclock_lag_history`. -# -# These tests rely on testdrive's retry feature, as `mz_wallclock_lag_history` -# is only refreshed periodically, so data is likely not immediately available. - -$ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} - -$ postgres-execute connection=mz_system -ALTER SYSTEM SET wallclock_lag_refresh_interval = '1s' - -> CREATE CLUSTER storage SIZE '1' -> CREATE CLUSTER compute SIZE '1', REPLICATION FACTOR 2 - -# Set up a bunch of frontiered objects and test that their wallclock lags get -# reported and are reasonably small. - -> CREATE SOURCE src IN CLUSTER storage FROM LOAD GENERATOR counter (UP TO 100) - -> CREATE TABLE tbl (a int) - -> CREATE VIEW src_plus_tbl AS SELECT counter + a AS a FROM src, tbl -> CREATE INDEX idx IN CLUSTER compute ON src_plus_tbl (a) -> CREATE MATERIALIZED VIEW mv IN CLUSTER compute AS SELECT * FROM src_plus_tbl - -> CREATE MATERIALIZED VIEW mv_const IN CLUSTER compute AS SELECT 1 -> CREATE DEFAULT INDEX idx_const IN CLUSTER compute ON mv_const - -> CREATE CONNECTION kafka_conn - TO KAFKA (BROKER '${testdrive.kafka-addr}', SECURITY PROTOCOL PLAINTEXT) -> CREATE CONNECTION csr_conn - TO CONFLUENT SCHEMA REGISTRY (URL '${testdrive.schema-registry-url}') -> CREATE SINK snk - IN CLUSTER storage - FROM mv - INTO KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-sink1-${testdrive.seed}') - FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn - ENVELOPE DEBEZIUM - -> SELECT DISTINCT ON(o.name, r.name) - o.name, r.name, l.lag >= '0s', l.lag < '10s' - FROM mz_internal.mz_wallclock_lag_history l - JOIN mz_objects o ON o.id = l.object_id - LEFT JOIN mz_cluster_replicas r ON r.id = l.replica_id - WHERE l.object_id LIKE 'u%' - ORDER BY o.name, r.name, l.occurred_at DESC -idx r1 true true -idx r2 true true -idx_const r1 true true -idx_const r2 true true -mv r1 true true -mv r2 true true -mv true true -mv_const r1 true true -mv_const r2 true true -mv_const true true -snk true true -src true true -src_progress true true -tbl true true - -> SELECT DISTINCT ON(o.name) - o.name, l.lag >= '0s', l.lag < '10s' - FROM mz_internal.mz_wallclock_global_lag_history l - JOIN mz_objects o ON o.id = l.object_id - WHERE l.object_id LIKE 'u%' - ORDER BY o.name, l.occurred_at DESC -idx true true -idx_const true true -mv true true -mv_const true true -snk true true -src true true -src_progress true true -tbl true true - -> SELECT DISTINCT ON(o.name) - o.name, l.lag >= '0s', l.lag < '10s' - FROM mz_internal.mz_wallclock_global_lag_recent_history l - JOIN mz_objects o ON o.id = l.object_id - WHERE l.object_id LIKE 'u%' - ORDER BY o.name, l.occurred_at DESC -idx true true -idx_const true true -mv true true -mv_const true true -snk true true -src true true -src_progress true true -tbl true true diff --git a/test/testdrive/catalog.td b/test/testdrive/catalog.td index 87699a89ee032..684d564489ab7 100644 --- a/test/testdrive/catalog.td +++ b/test/testdrive/catalog.td @@ -589,6 +589,7 @@ mz_sql_text source "" mz_statement_execution_history source "" mz_statement_lifecycle_history source "" mz_storage_shards source "" +mz_wallclock_global_lag_histogram_raw source "" mz_wallclock_lag_history source "" > SHOW TABLES FROM mz_internal @@ -694,6 +695,7 @@ mz_sql_text_redacted "" mz_aws_privatelink_connection_statuses "" mz_statement_execution_history_redacted "" mz_wallclock_global_lag "" +mz_wallclock_global_lag_histogram "" mz_wallclock_global_lag_history "" mz_wallclock_global_lag_recent_history "" pg_class_all_databases "" diff --git a/test/testdrive/wallclock-lag.td b/test/testdrive/wallclock-lag.td index b351009a8a38f..7478a2332046a 100644 --- a/test/testdrive/wallclock-lag.td +++ b/test/testdrive/wallclock-lag.td @@ -15,7 +15,8 @@ $ postgres-connect name=mz_system url=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} $ postgres-execute connection=mz_system -ALTER SYSTEM SET wallclock_lag_refresh_interval = '1s' +ALTER SYSTEM SET wallclock_lag_history_refresh_interval = '1s' +ALTER SYSTEM SET wallclock_lag_histogram_refresh_interval = '1s' > CREATE CLUSTER storage SIZE '1' > CREATE CLUSTER compute SIZE '1', REPLICATION FACTOR 2 @@ -109,3 +110,75 @@ snk true true src true true src_progress true true tbl true true + +> SELECT DISTINCT o.name, l.count > 0, l.labels + FROM mz_internal.mz_wallclock_global_lag_histogram l + JOIN mz_objects o ON o.id = l.object_id + WHERE l.object_id LIKE 'u%' AND l.lag_seconds < 10 +idx true {} +idx_const true {} +mv true {} +mv_const true {} +snk true {} +src true {} +src_progress true {} +tbl true {} + +# Test annotation of histogram measurements with labels. + +$ postgres-execute connection=mz_system +ALTER CLUSTER compute SET (WORKLOAD CLASS 'compute') +ALTER CLUSTER storage SET (WORKLOAD CLASS 'storage') + +> SELECT DISTINCT o.name, l.count > 0, l.labels + FROM mz_internal.mz_wallclock_global_lag_histogram l + JOIN mz_objects o ON o.id = l.object_id + WHERE l.object_id LIKE 'u%' AND l.lag_seconds < 10 +idx true "{}" +idx true "{\"workload_class\":\"compute\"}" +idx_const true "{}" +idx_const true "{\"workload_class\":\"compute\"}" +mv true "{}" +mv true "{\"workload_class\":\"compute\"}" +mv_const true "{}" +mv_const true "{\"workload_class\":\"compute\"}" +snk true "{}" +snk true "{\"workload_class\":\"storage\"}" +src true "{}" +src true "{\"workload_class\":\"storage\"}" +src_progress true "{}" +tbl true "{}" + +# Test changing the histogram period interval. + +$ postgres-execute connection=mz_system +ALTER SYSTEM SET wallclock_lag_histogram_period_interval = '1d' + +> CREATE TABLE tbl_1day (x int) +> CREATE INDEX idx_1day ON tbl_1day (x) +> SELECT DISTINCT + o.name, + l.period_end - l.period_start, + date_trunc('day', l.period_start) = l.period_start, + date_trunc('day', l.period_end) = l.period_end + FROM mz_internal.mz_wallclock_global_lag_histogram l + JOIN mz_objects o ON o.id = l.object_id + WHERE o.name LIKE '%_1day' +idx_1day 24:00:00 true true +tbl_1day 24:00:00 true true + +$ postgres-execute connection=mz_system +ALTER SYSTEM SET wallclock_lag_histogram_period_interval = '1h' + +> CREATE TABLE tbl_1hour (x int) +> CREATE INDEX idx_1hour ON tbl_1day (x) +> SELECT DISTINCT + o.name, + l.period_end - l.period_start, + date_trunc('hour', l.period_start) = l.period_start, + date_trunc('hour', l.period_end) = l.period_end + FROM mz_internal.mz_wallclock_global_lag_histogram l + JOIN mz_objects o ON o.id = l.object_id + WHERE o.name LIKE '%_1hour' +idx_1hour 01:00:00 true true +tbl_1hour 01:00:00 true true