diff --git a/misc/python/materialize/scalability/endpoints.py b/misc/python/materialize/scalability/endpoints.py index 3cdd71704630e..d4369032ea544 100644 --- a/misc/python/materialize/scalability/endpoints.py +++ b/misc/python/materialize/scalability/endpoints.py @@ -105,12 +105,6 @@ def __str__(self) -> str: return f"MaterializeLocal ({self.host()})" -# TODO(def-,aljoscha) Switch this to "postgres" before #22029 is enabled in -# production Also verify that there is no regression when the "other" side uses -# catalog ts oracle and "this" uses postgres ts oracle -ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS = {"timestamp_oracle": "catalog"} - - class MaterializeContainer(MaterializeNonRemote): def __init__( self, @@ -142,10 +136,7 @@ def up(self) -> None: if self.image is not None and self.alternative_image is not None: if not self.composition.try_pull_service_image( - Materialized( - image=self.image, - additional_system_parameter_defaults=ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS, - ) + Materialized(image=self.image) ): # explicitly specified image cannot be found and alternative exists print( @@ -160,11 +151,7 @@ def up(self) -> None: def up_internal(self) -> None: with self.composition.override( - Materialized( - image=self.image, - sanity_restart=False, - additional_system_parameter_defaults=ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS, - ) + Materialized(image=self.image, sanity_restart=False) ): self.composition.up("materialized") self._port = self.composition.default_port("materialized") diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 4d3d0dc8e0ad4..132940262c717 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -324,6 +324,7 @@ pub enum RealTimeRecencyContext { when: QueryWhen, target_replica: Option, timeline_context: TimelineContext, + oracle_read_ts: Option, source_ids: BTreeSet, in_immediate_multi_stmt_txn: bool, optimizer: optimize::peek::Optimizer, @@ -343,8 +344,9 @@ impl RealTimeRecencyContext { #[derive(Debug)] pub enum PeekStage { Validate(PeekStageValidate), - Optimize(PeekStageOptimize), Timestamp(PeekStageTimestamp), + Optimize(PeekStageOptimize), + RealTimeRecency(PeekStageRealTimeRecency), Finish(PeekStageFinish), } @@ -352,8 +354,9 @@ impl PeekStage { fn validity(&mut self) -> Option<&mut PlanValidity> { match self { PeekStage::Validate(_) => None, - PeekStage::Optimize(PeekStageOptimize { validity, .. }) - | PeekStage::Timestamp(PeekStageTimestamp { validity, .. }) + PeekStage::Timestamp(PeekStageTimestamp { validity, .. }) + | PeekStage::Optimize(PeekStageOptimize { validity, .. }) + | PeekStage::RealTimeRecency(PeekStageRealTimeRecency { validity, .. }) | PeekStage::Finish(PeekStageFinish { validity, .. }) => Some(validity), } } @@ -365,6 +368,19 @@ pub struct PeekStageValidate { target_cluster: TargetCluster, } +#[derive(Debug)] +pub struct PeekStageTimestamp { + validity: PlanValidity, + source: MirRelationExpr, + copy_to: Option, + source_ids: BTreeSet, + when: QueryWhen, + target_replica: Option, + timeline_context: TimelineContext, + in_immediate_multi_stmt_txn: bool, + optimizer: optimize::peek::Optimizer, +} + #[derive(Debug)] pub struct PeekStageOptimize { validity: PlanValidity, @@ -374,12 +390,13 @@ pub struct PeekStageOptimize { when: QueryWhen, target_replica: Option, timeline_context: TimelineContext, + oracle_read_ts: Option, in_immediate_multi_stmt_txn: bool, optimizer: optimize::peek::Optimizer, } #[derive(Debug)] -pub struct PeekStageTimestamp { +pub struct PeekStageRealTimeRecency { validity: PlanValidity, copy_to: Option, source_ids: BTreeSet, @@ -387,6 +404,7 @@ pub struct PeekStageTimestamp { when: QueryWhen, target_replica: Option, timeline_context: TimelineContext, + oracle_read_ts: Option, in_immediate_multi_stmt_txn: bool, optimizer: optimize::peek::Optimizer, global_mir_plan: optimize::peek::GlobalMirPlan, @@ -400,6 +418,7 @@ pub struct PeekStageFinish { when: QueryWhen, target_replica: Option, timeline_context: TimelineContext, + oracle_read_ts: Option, source_ids: BTreeSet, real_time_recency_ts: Option, optimizer: optimize::peek::Optimizer, @@ -682,7 +701,12 @@ impl PendingRead { PendingRead::ReadThenWrite { timestamp: (timestamp, timeline), .. - } => TimestampContext::TimelineTimestamp(timeline.clone(), timestamp.clone()), + } => TimestampContext::TimelineTimestamp { + timeline: timeline.clone(), + chosen_ts: timestamp.clone(), + oracle_ts: None, // For writes, we always pick the oracle + // timestamp! + }, } } diff --git a/src/adapter/src/coord/message_handler.rs b/src/adapter/src/coord/message_handler.rs index 302dcb68bd0c4..70060004945dd 100644 --- a/src/adapter/src/coord/message_handler.rs +++ b/src/adapter/src/coord/message_handler.rs @@ -10,7 +10,7 @@ //! Logic for processing [`Coordinator`] messages. The [`Coordinator`] receives //! messages from various sources (ex: controller, clients, background tasks, etc). -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{btree_map, BTreeMap, BTreeSet}; use std::time::{Duration, Instant}; use futures::future::LocalBoxFuture; @@ -617,16 +617,52 @@ impl Coordinator { let mut ready_txns = Vec::new(); let mut deferred_txns = Vec::new(); + // Cache for `TimestampOracle::read_ts` calls. These are somewhat + // expensive so we cache the value. This is correct since all we're + // risking is being too conservative. We will not accidentally "release" + // a result too early. + let mut cached_oracle_ts = BTreeMap::new(); + for mut read_txn in pending_read_txns { - if let TimestampContext::TimelineTimestamp(timeline, timestamp) = - read_txn.txn.timestamp_context() + if let TimestampContext::TimelineTimestamp { + timeline, + chosen_ts, + oracle_ts, + } = read_txn.txn.timestamp_context() { - let timestamp_oracle = self.get_timestamp_oracle(&timeline); - let read_ts = timestamp_oracle.read_ts().await; - if timestamp <= read_ts { + let oracle_ts = match oracle_ts { + Some(oracle_ts) => oracle_ts, + None => { + // There was no oracle timestamp, so no need to delay. + ready_txns.push(read_txn); + continue; + } + }; + + if chosen_ts <= oracle_ts { + // Chosen ts was already <= the oracle ts, so we're good + // to go! + ready_txns.push(read_txn); + continue; + } + + // See what the oracle timestamp is now and delay when needed. + let current_oracle_ts = cached_oracle_ts.entry(timeline.clone()); + let current_oracle_ts = match current_oracle_ts { + btree_map::Entry::Vacant(entry) => { + let timestamp_oracle = self.get_timestamp_oracle(&timeline); + let read_ts = timestamp_oracle.read_ts().await; + entry.insert(read_ts.clone()); + read_ts + } + btree_map::Entry::Occupied(entry) => entry.get().clone(), + }; + + if chosen_ts <= current_oracle_ts { ready_txns.push(read_txn); } else { - let wait = Duration::from_millis(timestamp.saturating_sub(read_ts).into()); + let wait = + Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into()); if wait < shortest_wait { shortest_wait = wait; } @@ -724,6 +760,7 @@ impl Coordinator { when, target_replica, timeline_context, + oracle_read_ts, source_ids, in_immediate_multi_stmt_txn: _, optimizer, @@ -738,6 +775,7 @@ impl Coordinator { when, target_replica, timeline_context, + oracle_read_ts, source_ids, real_time_recency_ts: Some(real_time_recency_ts), optimizer, diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 32817c40d891c..6e6b0043f7a7b 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -100,9 +100,9 @@ use crate::coord::timestamp_selection::{ }; use crate::coord::{ peek, AlterConnectionValidationReady, Coordinator, CreateConnectionValidationReady, - ExecuteContext, Message, PeekStage, PeekStageFinish, PeekStageOptimize, PeekStageTimestamp, - PeekStageValidate, PendingRead, PendingReadTxn, PendingTxn, PendingTxnResponse, PlanValidity, - RealTimeRecencyContext, TargetCluster, + ExecuteContext, Message, PeekStage, PeekStageFinish, PeekStageOptimize, + PeekStageRealTimeRecency, PeekStageTimestamp, PeekStageValidate, PendingRead, PendingReadTxn, + PendingTxn, PendingTxnResponse, PlanValidity, RealTimeRecencyContext, TargetCluster, }; use crate::error::AdapterError; use crate::explain::explain_dataflow; @@ -2154,16 +2154,23 @@ impl Coordinator { PeekStage::Validate(stage) => { let next = return_if_err!(self.peek_stage_validate(ctx.session_mut(), stage), ctx); - (ctx, PeekStage::Optimize(next)) + + (ctx, PeekStage::Timestamp(next)) + } + PeekStage::Timestamp(stage) => { + self.peek_stage_timestamp(ctx, stage).await; + return; } PeekStage::Optimize(stage) => { self.peek_stage_optimize(ctx, stage).await; return; } - PeekStage::Timestamp(stage) => match self.peek_stage_timestamp(ctx, stage) { - Some((ctx, next)) => (ctx, PeekStage::Finish(next)), - None => return, - }, + PeekStage::RealTimeRecency(stage) => { + match self.peek_stage_real_time_recency(ctx, stage) { + Some((ctx, next)) => (ctx, PeekStage::Finish(next)), + None => return, + } + } PeekStage::Finish(stage) => { let res = self.peek_stage_finish(&mut ctx, stage).await; ctx.retire(res); @@ -2181,7 +2188,7 @@ impl Coordinator { plan, target_cluster, }: PeekStageValidate, - ) -> Result { + ) -> Result { let plan::SelectPlan { source, when, @@ -2257,7 +2264,7 @@ impl Coordinator { role_metadata: session.role_metadata().clone(), }; - Ok(PeekStageOptimize { + Ok(PeekStageTimestamp { validity, source, copy_to, @@ -2270,6 +2277,80 @@ impl Coordinator { }) } + /// Determine a linearized read timestamp (from a `TimestampOracle`), if + /// needed. + async fn peek_stage_timestamp( + &mut self, + ctx: ExecuteContext, + PeekStageTimestamp { + validity, + source, + copy_to, + source_ids, + when, + target_replica, + timeline_context, + in_immediate_multi_stmt_txn, + optimizer, + }: PeekStageTimestamp, + ) { + let isolation_level = ctx.session.vars().transaction_isolation().clone(); + let linearized_timeline = + Coordinator::get_linearized_timeline(&isolation_level, &when, &timeline_context); + + let internal_cmd_tx = self.internal_cmd_tx.clone(); + + let build_optimize_stage = move |oracle_read_ts: Option| -> PeekStageOptimize { + PeekStageOptimize { + validity, + source, + copy_to, + source_ids, + when, + target_replica, + timeline_context, + oracle_read_ts, + in_immediate_multi_stmt_txn, + optimizer, + } + }; + + match linearized_timeline { + Some(timeline) => { + let shared_oracle = self.get_shared_timestamp_oracle(&timeline); + + if let Some(shared_oracle) = shared_oracle { + // We can do it in an async task, because we can ship off + // the timetamp oracle. + mz_ore::task::spawn(|| "linearized timestamp task", async move { + let oracle_read_ts = shared_oracle.read_ts().await; + let stage = build_optimize_stage(Some(oracle_read_ts)); + + let stage = PeekStage::Optimize(stage); + // Ignore errors if the coordinator has shut down. + let _ = internal_cmd_tx.send(Message::PeekStageReady { ctx, stage }); + }); + } else { + // Timestamp oracle can't be shipped to an async task, we + // have to do it here. + let oracle = self.get_timestamp_oracle(&timeline); + let oracle_read_ts = oracle.read_ts().await; + let stage = build_optimize_stage(Some(oracle_read_ts)); + + let stage = PeekStage::Optimize(stage); + // Ignore errors if the coordinator has shut down. + let _ = internal_cmd_tx.send(Message::PeekStageReady { ctx, stage }); + } + } + None => { + let stage = build_optimize_stage(None); + let stage = PeekStage::Optimize(stage); + // Ignore errors if the coordinator has shut down. + let _ = internal_cmd_tx.send(Message::PeekStageReady { ctx, stage }); + } + } + } + async fn peek_stage_optimize(&mut self, ctx: ExecuteContext, mut stage: PeekStageOptimize) { // Generate data structures that can be moved to another task where we will perform possibly // expensive optimizations. @@ -2292,6 +2373,7 @@ impl Coordinator { &stage.when, stage.optimizer.cluster_id(), &stage.timeline_context, + stage.oracle_read_ts.clone(), None, ) .await @@ -2313,7 +2395,7 @@ impl Coordinator { || "optimize peek", move || match Self::optimize_peek(ctx.session(), stats, id_bundle, stage) { Ok(stage) => { - let stage = PeekStage::Timestamp(stage); + let stage = PeekStage::RealTimeRecency(stage); // Ignore errors if the coordinator has shut down. let _ = internal_cmd_tx.send(Message::PeekStageReady { ctx, stage }); } @@ -2334,15 +2416,16 @@ impl Coordinator { when, target_replica, timeline_context, + oracle_read_ts, in_immediate_multi_stmt_txn, mut optimizer, }: PeekStageOptimize, - ) -> Result { + ) -> Result { let local_mir_plan = optimizer.optimize(source)?; let local_mir_plan = local_mir_plan.resolve(session, stats); let global_mir_plan = optimizer.optimize(local_mir_plan)?; - Ok(PeekStageTimestamp { + Ok(PeekStageRealTimeRecency { validity, copy_to, source_ids, @@ -2350,6 +2433,7 @@ impl Coordinator { when, target_replica, timeline_context, + oracle_read_ts, in_immediate_multi_stmt_txn, optimizer, global_mir_plan, @@ -2357,10 +2441,10 @@ impl Coordinator { } #[tracing::instrument(level = "debug", skip_all)] - fn peek_stage_timestamp( + fn peek_stage_real_time_recency( &mut self, ctx: ExecuteContext, - PeekStageTimestamp { + PeekStageRealTimeRecency { validity, copy_to, source_ids, @@ -2368,10 +2452,11 @@ impl Coordinator { when, target_replica, timeline_context, + oracle_read_ts, in_immediate_multi_stmt_txn, optimizer, global_mir_plan, - }: PeekStageTimestamp, + }: PeekStageRealTimeRecency, ) -> Option<(ExecuteContext, PeekStageFinish)> { match self.recent_timestamp(ctx.session(), source_ids.iter().cloned()) { Some(fut) => { @@ -2385,6 +2470,7 @@ impl Coordinator { when, target_replica, timeline_context, + oracle_read_ts: oracle_read_ts.clone(), source_ids, in_immediate_multi_stmt_txn, optimizer, @@ -2414,6 +2500,7 @@ impl Coordinator { when, target_replica, timeline_context, + oracle_read_ts, source_ids, real_time_recency_ts: None, optimizer, @@ -2434,6 +2521,7 @@ impl Coordinator { when, target_replica, timeline_context, + oracle_read_ts, source_ids, real_time_recency_ts, mut optimizer, @@ -2449,6 +2537,7 @@ impl Coordinator { ctx.session_mut(), &when, timeline_context, + oracle_read_ts, source_ids, &id_bundle, real_time_recency_ts, @@ -2511,6 +2600,7 @@ impl Coordinator { when: &QueryWhen, cluster_id: ClusterId, timeline_context: TimelineContext, + oracle_read_ts: Option, source_bundle: &CollectionIdBundle, source_ids: &BTreeSet, real_time_recency_ts: Option, @@ -2525,7 +2615,7 @@ impl Coordinator { // Use the transaction's timestamp if it exists and this isn't an AS OF query. Some( determination @ TimestampDetermination { - timestamp_context: TimestampContext::TimelineTimestamp(_, _), + timestamp_context: TimestampContext::TimelineTimestamp { .. }, .. }, ) if in_immediate_multi_stmt_txn => (determination, None), @@ -2551,6 +2641,7 @@ impl Coordinator { when, cluster_id, &timeline_context, + oracle_read_ts, real_time_recency_ts, ) .await?; @@ -2630,6 +2721,7 @@ impl Coordinator { session: &mut Session, when: &QueryWhen, timeline_context: TimelineContext, + oracle_read_ts: Option, source_ids: BTreeSet, id_bundle: &CollectionIdBundle, real_time_recency_ts: Option, @@ -2643,6 +2735,7 @@ impl Coordinator { when, optimizer.cluster_id(), timeline_context, + oracle_read_ts, id_bundle, &source_ids, real_time_recency_ts, @@ -2801,6 +2894,7 @@ impl Coordinator { // MIR ⇒ MIR optimization (global) let global_mir_plan = optimizer.optimize(from)?; // Timestamp selection + let oracle_read_ts = self.oracle_read_ts(&ctx.session, &timeline, &when).await; let as_of = self .determine_timestamp( ctx.session(), @@ -2808,6 +2902,7 @@ impl Coordinator { &when, cluster_id, &timeline, + oracle_read_ts, None, ) .await? @@ -3340,12 +3435,18 @@ impl Coordinator { .sufficient_collections(&source_ids); // Acquire a timestamp (necessary for loading statistics). + let when = QueryWhen::Immediately; + let oracle_read_ts = self + .oracle_read_ts(session, &timeline_context, &when) + .with_subscriber(root_dispatch.clone()) + .await; let timestamp_ctx = self .sequence_peek_timestamp( session, - &QueryWhen::Immediately, + &when, target_cluster_id, timeline_context, + oracle_read_ts, &id_bundle, &source_ids, None, // no real-time recency @@ -3835,12 +3936,16 @@ impl Coordinator { let source_ids = source.depends_on(); let timeline_context = self.validate_timeline_context(source_ids.clone())?; + let when = QueryWhen::Immediately; + let oracle_read_ts = self.oracle_read_ts(session, &timeline_context, &when).await; + let determination = self .sequence_peek_timestamp( session, - &QueryWhen::Immediately, + &when, cluster_id, timeline_context, + oracle_read_ts, &id_bundle, &source_ids, real_time_recency_ts, @@ -4221,13 +4326,17 @@ impl Coordinator { // Note: It's only OK for the write to have a greater timestamp than the read // because the write lock prevents any other writes from happening in between // the read and write. - if let Some(TimestampContext::TimelineTimestamp(timeline, read_ts)) = timestamp_context + if let Some(TimestampContext::TimelineTimestamp { + timeline, + chosen_ts: chosen_read_ts, + oracle_ts: _, + }) = timestamp_context { let (tx, rx) = tokio::sync::oneshot::channel(); let result = strict_serializable_reads_tx.send(PendingReadTxn { txn: PendingRead::ReadThenWrite { tx, - timestamp: (read_ts, timeline), + timestamp: (chosen_read_ts, timeline), }, created: Instant::now(), num_requeues: 0, diff --git a/src/adapter/src/coord/timeline.rs b/src/adapter/src/coord/timeline.rs index 56737b103d17b..3f84926205000 100644 --- a/src/adapter/src/coord/timeline.rs +++ b/src/adapter/src/coord/timeline.rs @@ -31,6 +31,7 @@ use tracing::{debug, error, info}; use crate::catalog::CatalogItem; use crate::coord::id_bundle::CollectionIdBundle; use crate::coord::read_policy::ReadHolds; +use crate::coord::timestamp_oracle::batching_oracle::BatchingTimestampOracle; use crate::coord::timestamp_oracle::catalog_oracle::{ CatalogTimestampOracle, CatalogTimestampPersistence, TimestampPersistence, TIMESTAMP_INTERVAL_UPPER_BOUND, TIMESTAMP_PERSIST_INTERVAL, @@ -38,7 +39,7 @@ use crate::coord::timestamp_oracle::catalog_oracle::{ use crate::coord::timestamp_oracle::postgres_oracle::{ PostgresTimestampOracle, PostgresTimestampOracleConfig, }; -use crate::coord::timestamp_oracle::{self, TimestampOracle}; +use crate::coord::timestamp_oracle::{self, ShareableTimestampOracle, TimestampOracle}; use crate::coord::timestamp_selection::TimestampProvider; use crate::coord::Coordinator; use crate::AdapterError; @@ -118,6 +119,18 @@ impl Coordinator { .as_ref() } + #[allow(unused)] + pub(crate) fn get_shared_timestamp_oracle( + &self, + timeline: &Timeline, + ) -> Option + Send + Sync>> { + self.global_timelines + .get(timeline) + .expect("all timelines have a timestamp oracle") + .oracle + .get_shared() + } + /// Returns a reference to the timestamp oracle used for reads and writes /// from/to a local input. fn get_local_timestamp_oracle(&self) -> &dyn TimestampOracle { @@ -245,6 +258,16 @@ impl Coordinator { .await, ); + let shared_oracle = oracle + .get_shared() + .expect("postgres timestamp oracle is shareable"); + + let batching_oracle = + BatchingTimestampOracle::new(Arc::clone(metrics), shared_oracle); + + let oracle: Box> = + Box::new(batching_oracle); + oracle } TimestampOracleImpl::Catalog => { diff --git a/src/adapter/src/coord/timestamp_oracle.rs b/src/adapter/src/coord/timestamp_oracle.rs index d0794eda14e61..b9f064a9a8e20 100644 --- a/src/adapter/src/coord/timestamp_oracle.rs +++ b/src/adapter/src/coord/timestamp_oracle.rs @@ -14,11 +14,14 @@ //! reported completed write timestamps, and strictly less than all subsequently //! emitted write timestamps. +use std::sync::Arc; + use async_trait::async_trait; use mz_ore::now::NowFn; use crate::coord::timeline::WriteTimestamp; +pub mod batching_oracle; pub mod catalog_oracle; pub mod metrics; pub mod postgres_oracle; @@ -65,6 +68,40 @@ pub trait TimestampOracle { /// - r_0 <= input <= w_0 -> r_1 = input and w_1 = w_0 /// - r_0 <= w_0 <= input -> r_1 = input and w_1 = input async fn apply_write(&mut self, write_ts: T); + + /// Get a shared, shallow clone of the oracle. Returns `None` if this oracle + /// is not shareable. + fn get_shared(&self) -> Option + Send + Sync>>; +} + +/// A shareable version of [`TimestampOracle`] that is `Send` and `Sync`. +/// +/// We have this as a stop-gap solution while we still keep the legacy +/// in-memory/backed-by-Stash TimestampOracle around. Once we remove that we can +/// make [`TimestampOracle`] shareable. +#[async_trait] +pub trait ShareableTimestampOracle { + /// Acquire a new timestamp for writing. + /// + /// This timestamp will be strictly greater than all prior values of + /// `self.read_ts()` and `self.write_ts()`. + async fn write_ts(&self) -> WriteTimestamp; + + /// Peek the current write timestamp. + async fn peek_write_ts(&self) -> T; + + /// Acquire a new timestamp for reading. + /// + /// This timestamp will be greater or equal to all prior values of + /// `self.apply_write(write_ts)`, and strictly less than all subsequent + /// values of `self.write_ts()`. + async fn read_ts(&self) -> T; + + /// Mark a write at `write_ts` completed. + /// + /// All subsequent values of `self.read_ts()` will be greater or equal to + /// `write_ts`. + async fn apply_write(&self, lower_bound: T); } /// A [`NowFn`] that is generic over the timestamp. @@ -186,4 +223,99 @@ mod tests { Ok(()) } + + pub async fn shareable_timestamp_oracle_impl_test( + mut new_fn: NewFn, + ) -> Result<(), anyhow::Error> + where + F: Future + Send + Sync>>, + NewFn: FnMut(String, NowFn, Timestamp) -> F, + { + // Normally, these could all be separate test methods but we bundle them + // all together so that it's easier to call this one test method from + // the implementation tests. + + // Timestamp::MIN as initial timestamp + let timeline = uuid::Uuid::new_v4().to_string(); + let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await; + assert_eq!(oracle.read_ts().await, Timestamp::MIN); + assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN); + + // Timestamp::MAX as initial timestamp + let timeline = uuid::Uuid::new_v4().to_string(); + let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MAX).await; + assert_eq!(oracle.read_ts().await, Timestamp::MAX); + assert_eq!(oracle.peek_write_ts().await, Timestamp::MAX); + + // Timestamp::MAX-1 from NowFn. We have to step back by one, otherwise + // `write_ts` can't determine the "advance_to" timestamp. + let timeline = uuid::Uuid::new_v4().to_string(); + let oracle = new_fn( + timeline, + NowFn::from(|| Timestamp::MAX.step_back().expect("known to work").into()), + Timestamp::MIN, + ) + .await; + // At first, read_ts and peek_write_ts stay where they are. + assert_eq!(oracle.read_ts().await, Timestamp::MIN); + assert_eq!(oracle.peek_write_ts().await, Timestamp::MIN); + assert_eq!( + oracle.write_ts().await.timestamp, + Timestamp::MAX.step_back().expect("known to work") + ); + // Now peek_write_ts jump to MAX-1 but read_ts stays. + assert_eq!(oracle.read_ts().await, Timestamp::MIN); + assert_eq!( + oracle.peek_write_ts().await, + Timestamp::MAX.step_back().expect("known to work") + ); + + // Repeated write_ts calls advance the timestamp. + let timeline = uuid::Uuid::new_v4().to_string(); + let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await; + assert_eq!(oracle.write_ts().await.timestamp, 1u64.into()); + assert_eq!(oracle.write_ts().await.timestamp, 2u64.into()); + + // Repeated peek_write_ts calls _DON'T_ advance the timestamp. + let timeline = uuid::Uuid::new_v4().to_string(); + let oracle = new_fn(timeline, NowFn::from(|| 0u64), Timestamp::MIN).await; + assert_eq!(oracle.peek_write_ts().await, 0u64.into()); + assert_eq!(oracle.peek_write_ts().await, 0u64.into()); + + // Interesting scenarios around apply_write, from its rustdoc. + // + // Scenario #1: + // input <= r_0 <= w_0 -> r_1 = r_0 and w_1 = w_0 + let timeline = uuid::Uuid::new_v4().to_string(); + let oracle = new_fn(timeline, NowFn::from(|| 0u64), 10u64.into()).await; + oracle.apply_write(5u64.into()).await; + assert_eq!(oracle.peek_write_ts().await, 10u64.into()); + assert_eq!(oracle.read_ts().await, 10u64.into()); + + // Scenario #2: + // r_0 <= input <= w_0 -> r_1 = input and w_1 = w_0 + let timeline = uuid::Uuid::new_v4().to_string(); + let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await; + // Have to bump the write_ts up manually: + assert_eq!(oracle.write_ts().await.timestamp, 1u64.into()); + assert_eq!(oracle.write_ts().await.timestamp, 2u64.into()); + assert_eq!(oracle.write_ts().await.timestamp, 3u64.into()); + assert_eq!(oracle.write_ts().await.timestamp, 4u64.into()); + oracle.apply_write(2u64.into()).await; + assert_eq!(oracle.peek_write_ts().await, 4u64.into()); + assert_eq!(oracle.read_ts().await, 2u64.into()); + + // Scenario #3: + // r_0 <= w_0 <= input -> r_1 = input and w_1 = input + let timeline = uuid::Uuid::new_v4().to_string(); + let oracle = new_fn(timeline, NowFn::from(|| 0u64), 0u64.into()).await; + oracle.apply_write(2u64.into()).await; + assert_eq!(oracle.peek_write_ts().await, 2u64.into()); + assert_eq!(oracle.read_ts().await, 2u64.into()); + oracle.apply_write(4u64.into()).await; + assert_eq!(oracle.peek_write_ts().await, 4u64.into()); + assert_eq!(oracle.read_ts().await, 4u64.into()); + + Ok(()) + } } diff --git a/src/adapter/src/coord/timestamp_oracle/batching_oracle.rs b/src/adapter/src/coord/timestamp_oracle/batching_oracle.rs new file mode 100644 index 0000000000000..9eef4ca508a8a --- /dev/null +++ b/src/adapter/src/coord/timestamp_oracle/batching_oracle.rs @@ -0,0 +1,278 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! A timestamp oracle that wraps a `ShareableTimestampOracle` and batches calls +//! to it. + +use std::sync::Arc; + +use async_trait::async_trait; +use mz_ore::cast::CastFrom; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::oneshot; + +use crate::coord::timeline::WriteTimestamp; +use crate::coord::timestamp_oracle::metrics::Metrics; +use crate::coord::timestamp_oracle::{ShareableTimestampOracle, TimestampOracle}; + +/// A batching [`TimestampOracle`] backed by a [`ShareableTimestampOracle`] +/// +/// This will only batch calls to `read_ts` because the rest of the system +/// already naturally does batching of write-related calls via the group commit +/// mechanism. Write-related calls are passed straight through to the backing +/// oracle. +/// +/// For `read_ts` calls, we have to be careful to never cache results from the +/// backing oracle: for the timestamp to be linearized we can never return a +/// result as of an earlier moment, but batching them up is correct because this +/// can only make it so that we return later timestamps. Those later timestamps +/// still fall within the duration of the `read_ts` call and so are linearized. +pub struct BatchingTimestampOracle { + inner: Arc + Send + Sync>, + command_tx: UnboundedSender>, +} + +/// A command on the internal batching command stream. +enum Command { + ReadTs(oneshot::Sender), +} + +impl std::fmt::Debug for BatchingTimestampOracle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BatchingTimestampOracle").finish() + } +} + +impl BatchingTimestampOracle +where + T: Clone + Send + Sync + 'static, +{ + /// Crates a [`BatchingTimestampOracle`] that uses the given inner oracle. + pub(crate) fn new( + metrics: Arc, + oracle: Arc + Send + Sync>, + ) -> Self { + let (command_tx, mut command_rx) = tokio::sync::mpsc::unbounded_channel(); + + let task_oracle = Arc::clone(&oracle); + + mz_ore::task::spawn(|| "BatchingTimestampOracle Worker Task", async move { + let read_ts_metrics = &metrics.batching.read_ts; + + // See comment on `BatchingTimestampOracle` for why this batching is + // correct. + while let Some(cmd) = command_rx.recv().await { + let mut pending_cmds = vec![cmd]; + while let Ok(cmd) = command_rx.try_recv() { + pending_cmds.push(cmd); + } + + read_ts_metrics + .ops_count + .inc_by(u64::cast_from(pending_cmds.len())); + read_ts_metrics.batches_count.inc(); + + let ts = task_oracle.read_ts().await; + for cmd in pending_cmds { + match cmd { + Command::ReadTs(response_tx) => { + // It's okay if the receiver drops, just means + // they're not interested anymore. + let _ = response_tx.send(ts.clone()); + } + } + } + } + + tracing::debug!("shutting down BatchingTimestampOracle task"); + }); + + Self { + inner: oracle, + command_tx, + } + } +} + +#[async_trait] +impl ShareableTimestampOracle for BatchingTimestampOracle +where + T: Send + Sync, +{ + async fn write_ts(&self) -> WriteTimestamp { + self.inner.write_ts().await + } + + async fn peek_write_ts(&self) -> T { + self.inner.peek_write_ts().await + } + + async fn read_ts(&self) -> T { + let (tx, rx) = oneshot::channel(); + + self.command_tx.send(Command::ReadTs(tx)).expect( + "worker task cannot stop while we still have senders for the command/request channel", + ); + + rx.await + .expect("worker task cannot stop while there are outstanding commands/requests") + } + + async fn apply_write(&self, write_ts: T) { + self.inner.apply_write(write_ts).await + } +} + +#[async_trait(?Send)] +impl TimestampOracle for BatchingTimestampOracle +where + T: Send + Sync + 'static, +{ + #[tracing::instrument(name = "oracle::write_ts", level = "debug", skip_all)] + async fn write_ts(&mut self) -> WriteTimestamp { + ShareableTimestampOracle::write_ts(self).await + } + + #[tracing::instrument(name = "oracle::peek_write_ts", level = "debug", skip_all)] + async fn peek_write_ts(&self) -> T { + ShareableTimestampOracle::peek_write_ts(self).await + } + + #[tracing::instrument(name = "oracle::read_ts", level = "debug", skip_all)] + async fn read_ts(&self) -> T { + ShareableTimestampOracle::read_ts(self).await + } + + #[tracing::instrument(name = "oracle::apply_write", level = "debug", skip_all)] + async fn apply_write(&mut self, write_ts: T) { + ShareableTimestampOracle::apply_write(self, write_ts).await + } + + fn get_shared(&self) -> Option + Send + Sync>> { + let inner: Arc + Send + Sync> = Arc::clone(&self.inner); + let shallow_clone = Self { + inner, + command_tx: self.command_tx.clone(), + }; + + Some(Arc::new(shallow_clone)) + } +} + +#[cfg(test)] +mod tests { + + use futures::FutureExt; + use mz_ore::metrics::MetricsRegistry; + use mz_postgres_client::PostgresClient; + use tracing::info; + + use crate::coord::timestamp_oracle; + use crate::coord::timestamp_oracle::postgres_oracle::{ + PostgresTimestampOracle, PostgresTimestampOracleConfig, + }; + + use super::*; + + #[mz_ore::test(tokio::test)] + #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` + async fn test_batching_timestamp_oracle() -> Result<(), anyhow::Error> { + let config = match PostgresTimestampOracleConfig::new_for_test() { + Some(config) => config, + None => { + info!( + "{} env not set: skipping test that uses external service", + PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL + ); + return Ok(()); + } + }; + let metrics = Arc::new(Metrics::new(&MetricsRegistry::new())); + + cleanup(config.clone()).await?; + + timestamp_oracle::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| { + // We use the postgres oracle as the backing oracle because it's + // the only shareable oracle we have. + let oracle = + PostgresTimestampOracle::open(config.clone(), timeline, initial_ts, now_fn).map( + |oracle| { + let shared_oracle = oracle.get_shared().expect("known to be shareable"); + let batching_oracle = + BatchingTimestampOracle::new(Arc::clone(&metrics), shared_oracle); + + batching_oracle + }, + ); + + oracle + }) + .await?; + + cleanup(config).await?; + + Ok(()) + } + + #[mz_ore::test(tokio::test)] + #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` + async fn test_shareable_batching_timestamp_oracle() -> Result<(), anyhow::Error> { + let config = match PostgresTimestampOracleConfig::new_for_test() { + Some(config) => config, + None => { + info!( + "{} env not set: skipping test that uses external service", + PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL + ); + return Ok(()); + } + }; + let metrics = Arc::new(Metrics::new(&MetricsRegistry::new())); + + cleanup(config.clone()).await?; + + timestamp_oracle::tests::shareable_timestamp_oracle_impl_test( + |timeline, now_fn, initial_ts| { + // We use the postgres oracle as the backing oracle because it's + // the only shareable oracle we have. + let oracle = + PostgresTimestampOracle::open(config.clone(), timeline, initial_ts, now_fn) + .map(|oracle| { + let shared_oracle = oracle.get_shared().expect("known to be shareable"); + let batching_oracle = + BatchingTimestampOracle::new(Arc::clone(&metrics), shared_oracle); + + batching_oracle + }) + .map(|batching_oracle| { + batching_oracle.get_shared().expect("known to be shareable") + }); + + oracle + }, + ) + .await?; + + cleanup(config).await?; + + Ok(()) + } + + // Best-effort cleanup! + async fn cleanup(config: PostgresTimestampOracleConfig) -> Result<(), anyhow::Error> { + let postgres_client = PostgresClient::open(config.into())?; + let client = postgres_client.get_connection().await?; + + client + .execute("DROP TABLE IF EXISTS timestamp_oracle", &[]) + .await?; + + Ok(()) + } +} diff --git a/src/adapter/src/coord/timestamp_oracle/catalog_oracle.rs b/src/adapter/src/coord/timestamp_oracle/catalog_oracle.rs index bcd89d5166cb9..ab0adf1ee7527 100644 --- a/src/adapter/src/coord/timestamp_oracle/catalog_oracle.rs +++ b/src/adapter/src/coord/timestamp_oracle/catalog_oracle.rs @@ -24,7 +24,9 @@ use tracing::error; use crate::catalog::Catalog; use crate::coord::timeline::WriteTimestamp; -use crate::coord::timestamp_oracle::{catalog_oracle, GenericNowFn, TimestampOracle}; +use crate::coord::timestamp_oracle::{ + catalog_oracle, GenericNowFn, ShareableTimestampOracle, TimestampOracle, +}; use crate::util::ResultExt; /// A type that provides write and read timestamps, reads observe exactly their @@ -231,6 +233,14 @@ where self.timestamp_oracle.apply_write(write_ts.clone()); self.maybe_allocate_new_timestamps(&write_ts).await; } + + fn get_shared(&self) -> Option + Send + Sync>> { + // The in-memory TimestampOracle is not shareable: + // + // - we have in-memory state that we would have to share via an Arc/Mutec + // - we use TimestampPersistence, which is backed by Stash, which is also problematic for sharing + None + } } /// Provides persistence of timestamps for [`CatalogTimestampOracle`]. diff --git a/src/adapter/src/coord/timestamp_oracle/metrics.rs b/src/adapter/src/coord/timestamp_oracle/metrics.rs index ab7e808be5fda..abbf0b37ef264 100644 --- a/src/adapter/src/coord/timestamp_oracle/metrics.rs +++ b/src/adapter/src/coord/timestamp_oracle/metrics.rs @@ -12,9 +12,9 @@ use std::time::{Duration, Instant}; use mz_ore::metric; +use mz_ore::metrics::raw::{CounterVec, IntCounterVec}; use mz_ore::metrics::{Counter, IntCounter, MetricsRegistry}; use mz_postgres_client::metrics::PostgresClientMetrics; -use prometheus::{CounterVec, IntCounterVec}; use crate::coord::timestamp_oracle::retry::RetryStream; @@ -29,6 +29,11 @@ pub struct Metrics { /// [`TimestampOracle`](crate::coord::timestamp_oracle::TimestampOracle). pub oracle: OracleMetrics, + /// Metrics recording how many operations we batch into one oracle call, for + /// those operations that _do_ support batching, and only when using the + /// `BatchingTimestampOracle` wrapper. + pub batching: BatchingMetrics, + /// Metrics for each retry loop. pub retries: RetriesMetrics, @@ -49,6 +54,7 @@ impl Metrics { Metrics { oracle: vecs.oracle_metrics(), + batching: vecs.batching_metrics(), retries: vecs.retries_metrics(), postgres_client: PostgresClientMetrics::new(registry, "mz_ts_oracle"), _vecs: vecs, @@ -67,6 +73,9 @@ struct MetricsVecs { retry_finished: IntCounterVec, retry_retries: IntCounterVec, retry_sleep_seconds: CounterVec, + + batched_op_count: IntCounterVec, + batches_count: IntCounterVec, } impl MetricsVecs { @@ -113,6 +122,18 @@ impl MetricsVecs { help: "time spent in retry loop backoff", var_labels: ["op"], )), + + batched_op_count: registry.register(metric!( + name: "mz_ts_oracle_batched_op_count", + help: "count of batched operations", + var_labels: ["op"], + )), + + batches_count: registry.register(metric!( + name: "mz_ts_oracle_batches_count", + help: "count of batches of operations", + var_labels: ["op"], + )), } } @@ -134,6 +155,19 @@ impl MetricsVecs { } } + fn batching_metrics(&self) -> BatchingMetrics { + BatchingMetrics { + read_ts: self.batched_op_metrics("read_ts"), + } + } + + fn batched_op_metrics(&self, op: &str) -> BatchedOpMetrics { + BatchedOpMetrics { + ops_count: self.batched_op_count.with_label_values(&[op]), + batches_count: self.batches_count.with_label_values(&[op]), + } + } + fn retries_metrics(&self) -> RetriesMetrics { RetriesMetrics { open: self.retry_metrics("open"), @@ -193,6 +227,17 @@ pub struct OracleMetrics { pub apply_write: ExternalOpMetrics, } +#[derive(Debug)] +pub struct BatchedOpMetrics { + pub ops_count: IntCounter, + pub batches_count: IntCounter, +} + +#[derive(Debug)] +pub struct BatchingMetrics { + pub read_ts: BatchedOpMetrics, +} + #[derive(Debug)] pub struct RetryMetrics { pub(crate) name: String, diff --git a/src/adapter/src/coord/timestamp_oracle/postgres_oracle.rs b/src/adapter/src/coord/timestamp_oracle/postgres_oracle.rs index 2b66e0b7c7846..f6ecee45a71b1 100644 --- a/src/adapter/src/coord/timestamp_oracle/postgres_oracle.rs +++ b/src/adapter/src/coord/timestamp_oracle/postgres_oracle.rs @@ -27,7 +27,7 @@ use tracing::{debug, info}; use crate::coord::timeline::WriteTimestamp; use crate::coord::timestamp_oracle::metrics::{Metrics, RetryMetrics}; use crate::coord::timestamp_oracle::retry::Retry; -use crate::coord::timestamp_oracle::{GenericNowFn, TimestampOracle}; +use crate::coord::timestamp_oracle::{GenericNowFn, ShareableTimestampOracle, TimestampOracle}; // The timestamp columns are a `DECIMAL` that is big enough to hold // `18446744073709551615`, the maximum value of `u64` which is our underlying @@ -49,6 +49,7 @@ CREATE TABLE IF NOT EXISTS timestamp_oracle ( "; /// A [`TimestampOracle`] backed by "Postgres". +#[derive(Debug)] pub struct PostgresTimestampOracle where N: GenericNowFn, @@ -104,7 +105,7 @@ impl From for PostgresClientConfig { } impl PostgresTimestampOracleConfig { - const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "COCKROACH_URL"; + pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "COCKROACH_URL"; /// Returns a new instance of [`PostgresTimestampOracleConfig`] with default tuning. pub fn new(url: &str, metrics: Arc) -> Self { @@ -187,7 +188,7 @@ impl PostgresClientKnobs for PostgresTimestampOracleConfig { impl PostgresTimestampOracle where - N: GenericNowFn + 'static, + N: GenericNowFn + std::fmt::Debug + 'static, { /// Open a Postgres [`TimestampOracle`] instance with `config`, for the /// timeline named `timeline`. `next` generates new timestamps when invoked. @@ -223,7 +224,7 @@ where )) .await?; - let mut oracle = PostgresTimestampOracle { + let oracle = PostgresTimestampOracle { timeline: timeline.clone(), next: next.clone(), postgres_client: Arc::new(postgres_client), @@ -253,7 +254,7 @@ where // Forward timestamps to what we're given from outside. Remember, // the above query will only create the row at the initial timestamp // if it didn't exist before. - oracle.apply_write(initially).await; + ShareableTimestampOracle::apply_write(&oracle, initially).await; Result::<_, anyhow::Error>::Ok(oracle) }; @@ -448,12 +449,12 @@ where // that, and also make the types we store in the backing "Postgres" table // generic. But in practice we only use oracles for [`mz_repr::Timestamp`] so // don't do that extra work for now. -#[async_trait(?Send)] -impl TimestampOracle for PostgresTimestampOracle +#[async_trait] +impl ShareableTimestampOracle for PostgresTimestampOracle where - N: GenericNowFn + 'static, + N: GenericNowFn + std::fmt::Debug + 'static, { - async fn write_ts(&mut self) -> WriteTimestamp { + async fn write_ts(&self) -> WriteTimestamp { let metrics = &self.metrics.retries.write_ts; let res = retry_fallible(metrics, || { @@ -495,7 +496,7 @@ where res } - async fn apply_write(&mut self, write_ts: Timestamp) { + async fn apply_write(&self, write_ts: Timestamp) { let metrics = &self.metrics.retries.apply_write; let res = retry_fallible(metrics, || { @@ -570,6 +571,43 @@ where } } +#[async_trait(?Send)] +impl TimestampOracle for PostgresTimestampOracle +where + N: GenericNowFn + std::fmt::Debug + 'static, +{ + #[tracing::instrument(name = "oracle::write_ts", level = "debug", skip_all)] + async fn write_ts(&mut self) -> WriteTimestamp { + ShareableTimestampOracle::write_ts(self).await + } + + #[tracing::instrument(name = "oracle::peek_write_ts", level = "debug", skip_all)] + async fn peek_write_ts(&self) -> Timestamp { + ShareableTimestampOracle::peek_write_ts(self).await + } + + #[tracing::instrument(name = "oracle::read_ts", level = "debug", skip_all)] + async fn read_ts(&self) -> Timestamp { + ShareableTimestampOracle::read_ts(self).await + } + + #[tracing::instrument(name = "oracle::apply_write", level = "debug", skip_all)] + async fn apply_write(&mut self, write_ts: Timestamp) { + ShareableTimestampOracle::apply_write(self, write_ts).await + } + + fn get_shared(&self) -> Option + Send + Sync>> { + let shallow_clone = Self { + timeline: self.timeline.clone(), + next: self.next.clone(), + postgres_client: Arc::clone(&self.postgres_client), + metrics: Arc::clone(&self.metrics), + }; + + Some(Arc::new(shallow_clone)) + } +} + #[cfg(test)] mod tests { @@ -606,6 +644,42 @@ mod tests { Ok(()) } + #[mz_ore::test(tokio::test)] + #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux` + async fn test_shareable_postgres_timestamp_oracle() -> Result<(), anyhow::Error> { + let config = match PostgresTimestampOracleConfig::new_for_test() { + Some(config) => config, + None => { + info!( + "{} env not set: skipping test that uses external service", + PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL + ); + return Ok(()); + } + }; + + cleanup(config.clone()).await?; + + timestamp_oracle::tests::shareable_timestamp_oracle_impl_test( + |timeline, now_fn, initial_ts| { + let oracle = + PostgresTimestampOracle::open(config.clone(), timeline, initial_ts, now_fn); + + async { + oracle + .await + .get_shared() + .expect("postgres oracle is shareable") + } + }, + ) + .await?; + + cleanup(config).await?; + + Ok(()) + } + // Best-effort cleanup! async fn cleanup(config: PostgresTimestampOracleConfig) -> Result<(), anyhow::Error> { let postgres_client = PostgresClient::open(config.into())?; diff --git a/src/adapter/src/coord/timestamp_selection.rs b/src/adapter/src/coord/timestamp_selection.rs index 6493ba2efe0ce..81e0e03eb7afc 100644 --- a/src/adapter/src/coord/timestamp_selection.rs +++ b/src/adapter/src/coord/timestamp_selection.rs @@ -39,7 +39,20 @@ use crate::AdapterError; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum TimestampContext { /// Read is executed in a specific timeline with a specific timestamp. - TimelineTimestamp(Timeline, T), + TimelineTimestamp { + timeline: Timeline, + /// The timestamp that was chosen for a read. This can differ from the + /// `oracle_ts` when collections are not readable at the (linearized) + /// timestamp for the oracle. In those cases (when the chosen timestamp + /// is further ahead than the oracle timestamp) we have to delay + /// returning peek results until the timestamp oracle is also + /// sufficiently advanced. + chosen_ts: T, + /// The timestamp that would have been chosen for the read by the + /// (linearized) timestamp oracle). In most cases this will be picked as + /// the `chosen_ts`. + oracle_ts: Option, + }, /// Read is execute without a timeline or timestamp. NoTimestamp, } @@ -47,7 +60,8 @@ pub enum TimestampContext { impl TimestampContext { /// Creates a `TimestampContext` from a timestamp and `TimelineContext`. pub fn from_timeline_context( - ts: T, + chosen_ts: T, + oracle_ts: Option, transaction_timeline: Option, timeline_context: &TimelineContext, ) -> TimestampContext { @@ -56,14 +70,19 @@ impl TimestampContext { if let Some(transaction_timeline) = transaction_timeline { assert_eq!(timeline, &transaction_timeline); } - Self::TimelineTimestamp(timeline.clone(), ts) + Self::TimelineTimestamp { + timeline: timeline.clone(), + chosen_ts, + oracle_ts, + } } TimelineContext::TimestampDependent => { // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist. - Self::TimelineTimestamp( - transaction_timeline.unwrap_or(Timeline::EpochMilliseconds), - ts, - ) + Self::TimelineTimestamp { + timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds), + chosen_ts, + oracle_ts, + } } TimelineContext::TimestampIndependent => Self::NoTimestamp, } @@ -72,7 +91,7 @@ impl TimestampContext { /// The timeline belonging to this context, if one exists. pub fn timeline(&self) -> Option<&Timeline> { match self { - Self::TimelineTimestamp(timeline, _) => Some(timeline), + Self::TimelineTimestamp { timeline, .. } => Some(timeline), Self::NoTimestamp => None, } } @@ -80,7 +99,7 @@ impl TimestampContext { /// The timestamp belonging to this context, if one exists. pub fn timestamp(&self) -> Option<&T> { match self { - Self::TimelineTimestamp(_, ts) => Some(ts), + Self::TimelineTimestamp { chosen_ts, .. } => Some(chosen_ts), Self::NoTimestamp => None, } } @@ -88,7 +107,7 @@ impl TimestampContext { /// The timestamp belonging to this context, or a sensible default if one does not exists. pub fn timestamp_or_default(&self) -> T { match self { - Self::TimelineTimestamp(_, ts) => ts.clone(), + Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(), // Anything without a timestamp is given the maximum possible timestamp to indicate // that they have been closed up until the end of time. This allows us to SUBSCRIBE to // static views. @@ -109,11 +128,6 @@ impl TimestampContext { #[async_trait(?Send)] impl TimestampProvider for Coordinator { - async fn oracle_read_ts(&self, timeline: &Timeline) -> Option { - let timestamp_oracle = self.get_timestamp_oracle(timeline); - Some(timestamp_oracle.read_ts().await) - } - /// Reports a collection's current read frontier. fn compute_read_frontier<'a>( &'a self, @@ -206,7 +220,44 @@ pub trait TimestampProvider { fn storage_implied_capability<'a>(&'a self, id: GlobalId) -> &'a Antichain; fn storage_write_frontier<'a>(&'a self, id: GlobalId) -> &'a Antichain; - async fn oracle_read_ts(&self, timeline: &Timeline) -> Option; + fn get_timeline(timeline_context: &TimelineContext) -> Option { + let timeline = match timeline_context { + TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()), + // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist. + TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds), + TimelineContext::TimestampIndependent => None, + }; + + timeline + } + + /// Returns a `Timeline` whose timestamp oracle we have to use to get a + /// linearized read timestamp, _iff_ linearization is needed. + fn get_linearized_timeline( + isolation_level: &IsolationLevel, + when: &QueryWhen, + timeline_context: &TimelineContext, + ) -> Option { + let timeline = Self::get_timeline(timeline_context); + + // In order to use a timestamp oracle, we must be in the context of some timeline. In that + // context we would use the timestamp oracle in the following scenarios: + // - The isolation level is Strict Serializable and the `when` allows us to use the + // the timestamp oracle (ex: queries with no AS OF). + // - The `when` requires us to use the timestamp oracle (ex: read-then-write queries). + let linearized_timeline = match &timeline { + Some(timeline) + if when.must_advance_to_timeline_ts() + || (when.can_advance_to_timeline_ts() + && isolation_level == &IsolationLevel::StrictSerializable) => + { + Some(timeline.clone()) + } + _ => None, + }; + + linearized_timeline + } /// Determines the timestamp for a query. /// @@ -223,6 +274,7 @@ pub trait TimestampProvider { when: &QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &TimelineContext, + oracle_read_ts: Option, real_time_recency_ts: Option, isolation_level: &IsolationLevel, ) -> Result, AdapterError> { @@ -242,28 +294,25 @@ pub trait TimestampProvider { let upper = self.least_valid_write(id_bundle); let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper); - let timeline = match timeline_context { - TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()), - // We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist. - TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds), - TimelineContext::TimestampIndependent => None, - }; - - // In order to use a timestamp oracle, we must be in the context of some timeline. In that - // context we would use the timestamp oracle in the following scenarios: - // - The isolation level is Strict Serializable and the `when` allows us to use the - // the timestamp oracle (ex: queries with no AS OF). - // - The `when` requires us to use the timestamp oracle (ex: read-then-write queries). - let oracle_read_ts = match &timeline { - Some(timeline) - if when.must_advance_to_timeline_ts() - || (when.can_advance_to_timeline_ts() - && isolation_level == &IsolationLevel::StrictSerializable) => - { - self.oracle_read_ts(timeline).await - } - _ => None, - }; + let timeline = Self::get_timeline(timeline_context); + let linearized_timeline = + Self::get_linearized_timeline(isolation_level, when, timeline_context); + // TODO: We currently split out getting the oracle timestamp because + // it's a potentially expensive call, but a call that can be done in an + // async task. TimestampProvider is not Send (nor Sync), so we cannot do + // the call to `determine_timestamp_for` (including the oracle call) on + // an async task. If/when TimestampProvider can become Send, we can fold + // the call to the TimestampOracle back into this function. + // + // We assert here that the logic that determines the oracle timestamp + // matches our expectations. + if linearized_timeline.is_some() { + assert!( + oracle_read_ts.is_some(), + "should get a timestamp from the oracle for linearized timeline {:?} but didn't", + timeline + ); + } // Initialize candidate to the minimum correct time. let mut candidate = Timestamp::minimum(); @@ -331,8 +380,12 @@ pub trait TimestampProvider { )); }; - let timestamp_context = - TimestampContext::from_timeline_context(timestamp, timeline, timeline_context); + let timestamp_context = TimestampContext::from_timeline_context( + timestamp, + oracle_read_ts, + timeline, + timeline_context, + ); Ok(TimestampDetermination { timestamp_context, @@ -424,6 +477,26 @@ pub trait TimestampProvider { } impl Coordinator { + pub(crate) async fn oracle_read_ts( + &self, + session: &Session, + timeline_ctx: &TimelineContext, + when: &QueryWhen, + ) -> Option { + let isolation_level = session.vars().transaction_isolation().clone(); + let linearized_timeline = + Coordinator::get_linearized_timeline(&isolation_level, when, timeline_ctx); + let oracle_read_ts = match linearized_timeline { + Some(timeline) => { + let timestamp_oracle = self.get_timestamp_oracle(&timeline); + Some(timestamp_oracle.read_ts().await) + } + None => None, + }; + + oracle_read_ts + } + /// Determines the timestamp for a query. pub(crate) async fn determine_timestamp( &self, @@ -432,6 +505,7 @@ impl Coordinator { when: &QueryWhen, compute_instance: ComputeInstanceId, timeline_context: &TimelineContext, + oracle_read_ts: Option, real_time_recency_ts: Option, ) -> Result, AdapterError> { let isolation_level = session.vars().transaction_isolation(); @@ -443,6 +517,7 @@ impl Coordinator { when, compute_instance, timeline_context, + oracle_read_ts, real_time_recency_ts, isolation_level, ) @@ -471,6 +546,7 @@ impl Coordinator { when, compute_instance, timeline_context, + oracle_read_ts, real_time_recency_ts, &IsolationLevel::Serializable, ) @@ -566,7 +642,9 @@ pub struct TimestampDetermination { impl TimestampDetermination { pub fn respond_immediately(&self) -> bool { match &self.timestamp_context { - TimestampContext::TimelineTimestamp(_, timestamp) => !self.upper.less_equal(timestamp), + TimestampContext::TimelineTimestamp { chosen_ts, .. } => { + !self.upper.less_equal(chosen_ts) + } TimestampContext::NoTimestamp => true, } } diff --git a/src/adapter/src/session.rs b/src/adapter/src/session.rs index 50e163041c0f8..fc7cb44d0d8b2 100644 --- a/src/adapter/src/session.rs +++ b/src/adapter/src/session.rs @@ -473,7 +473,7 @@ impl Session { pcx: _, ops: TransactionOps::Peeks { determination: TimestampDetermination { - timestamp_context: TimestampContext::TimelineTimestamp(_, _), + timestamp_context: TimestampContext::TimelineTimestamp { .. }, .. }, .. @@ -1051,8 +1051,16 @@ impl TransactionStatus { &add_timestamp_determination.timestamp_context, ) { ( - TimestampContext::TimelineTimestamp(txn_timeline, txn_ts), - TimestampContext::TimelineTimestamp(add_timeline, add_ts), + TimestampContext::TimelineTimestamp { + timeline: txn_timeline, + chosen_ts: txn_ts, + oracle_ts: _, + }, + TimestampContext::TimelineTimestamp { + timeline: add_timeline, + chosen_ts: add_ts, + oracle_ts: _, + }, ) => { assert_eq!(txn_timeline, add_timeline); assert_eq!(txn_ts, add_ts); @@ -1173,7 +1181,7 @@ impl Transaction { TransactionOps::Peeks { determination: TimestampDetermination { - timestamp_context: TimestampContext::TimelineTimestamp(timeline, _), + timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. }, .. }, .. diff --git a/src/adapter/tests/timestamp_selection.rs b/src/adapter/tests/timestamp_selection.rs index 992b1d88cc78e..58decf65010c0 100644 --- a/src/adapter/tests/timestamp_selection.rs +++ b/src/adapter/tests/timestamp_selection.rs @@ -193,10 +193,6 @@ impl TimestampProvider for Frontiers { ) -> &'a timely::progress::Antichain { &self.storage.get(&id).unwrap().write } - - async fn oracle_read_ts(&self, timeline: &Timeline) -> Option { - matches!(timeline, Timeline::EpochMilliseconds).then(|| self.oracle) - } } #[derive(Deserialize, Debug, Clone)] @@ -309,6 +305,27 @@ fn test_timestamp_selection() { Some(isolation), ); + // TODO: Factor out into method, or somesuch! + let timeline_ctx = TimelineContext::TimestampDependent; + let isolation_level = IsolationLevel::from(isolation); + let when = parse_query_when(&det.when); + let linearized_timeline = + Frontiers::get_linearized_timeline(&isolation_level, &when, &timeline_ctx); + + let oracle_read_ts = if let Some(timeline) = linearized_timeline { + match timeline { + Timeline::EpochMilliseconds => Some(f.oracle), + timeline => { + unreachable!( + "only EpochMillis is used in tests but we got {:?}", + timeline + ) + } + } + } else { + None + }; + let ts = block_on(f.determine_timestamp_for( &catalog, &session, @@ -316,7 +333,8 @@ fn test_timestamp_selection() { &parse_query_when(&det.when), det.instance.parse().unwrap(), &TimelineContext::TimestampDependent, - None, + oracle_read_ts, + None, /* real_time_recency_ts */ &IsolationLevel::from(isolation), )) .unwrap(); diff --git a/src/environmentd/tests/sql.rs b/src/environmentd/tests/sql.rs index 17f0310c3ce34..60bffaaaf2ea8 100644 --- a/src/environmentd/tests/sql.rs +++ b/src/environmentd/tests/sql.rs @@ -1469,7 +1469,10 @@ async fn test_utilization_hold() { // If we're not in EpochMilliseconds, the timestamp math below is invalid, so assert that here. assert!(matches!( explain.determination.timestamp_context, - TimestampContext::TimelineTimestamp(Timeline::EpochMilliseconds, _) + TimestampContext::TimelineTimestamp { + timeline: Timeline::EpochMilliseconds, + .. + } )); let since = explain .determination diff --git a/test/feature-benchmark/mzcompose.py b/test/feature-benchmark/mzcompose.py index 54be4cd6b3508..cff134c7393ed 100644 --- a/test/feature-benchmark/mzcompose.py +++ b/test/feature-benchmark/mzcompose.py @@ -140,10 +140,6 @@ def run_one_scenario( additional_system_parameter_defaults = {} - # TODO(def-,aljoscha) Switch this to "postgres" before #22029 is enabled in production - # Also verify that there is no regression when the "other" side uses catalog ts oracle and "this" uses postgres ts oracle - additional_system_parameter_defaults["timestamp_oracle"] = "catalog" - if params is not None: for param in params.split(";"): param_name, param_value = param.split("=") diff --git a/test/scalability/mzcompose.py b/test/scalability/mzcompose.py index a901170cc009e..3689c7dba9439 100644 --- a/test/scalability/mzcompose.py +++ b/test/scalability/mzcompose.py @@ -51,9 +51,6 @@ image="materialize/materialized:latest", sanity_restart=False, catalog_store="stash", - # TODO(def-,aljoscha) Switch this to "postgres" before #22029 is enabled in production - # Also verify that there is no regression when the "other" side uses catalog ts oracle and "this" uses postgres ts oracle - additional_system_parameter_defaults={"timestamp_oracle": "catalog"}, ), Postgres(), ]