Skip to content
17 changes: 2 additions & 15 deletions misc/python/materialize/scalability/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,6 @@ def __str__(self) -> str:
return f"MaterializeLocal ({self.host()})"


# TODO(def-,aljoscha) Switch this to "postgres" before #22029 is enabled in
# production Also verify that there is no regression when the "other" side uses
# catalog ts oracle and "this" uses postgres ts oracle
ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS = {"timestamp_oracle": "catalog"}


class MaterializeContainer(MaterializeNonRemote):
def __init__(
self,
Expand Down Expand Up @@ -142,10 +136,7 @@ def up(self) -> None:

if self.image is not None and self.alternative_image is not None:
if not self.composition.try_pull_service_image(
Materialized(
image=self.image,
additional_system_parameter_defaults=ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS,
)
Materialized(image=self.image)
):
# explicitly specified image cannot be found and alternative exists
print(
Expand All @@ -160,11 +151,7 @@ def up(self) -> None:

def up_internal(self) -> None:
with self.composition.override(
Materialized(
image=self.image,
sanity_restart=False,
additional_system_parameter_defaults=ADDITIONAL_SYSTEM_PARAMETER_DEFAULTS,
)
Materialized(image=self.image, sanity_restart=False)
):
self.composition.up("materialized")
self._port = self.composition.default_port("materialized")
Expand Down
34 changes: 29 additions & 5 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ pub enum RealTimeRecencyContext {
when: QueryWhen,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
source_ids: BTreeSet<GlobalId>,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
Expand All @@ -343,17 +344,19 @@ impl RealTimeRecencyContext {
#[derive(Debug)]
pub enum PeekStage {
Validate(PeekStageValidate),
Optimize(PeekStageOptimize),
Timestamp(PeekStageTimestamp),
Optimize(PeekStageOptimize),
RealTimeRecency(PeekStageRealTimeRecency),
Finish(PeekStageFinish),
}

impl PeekStage {
fn validity(&mut self) -> Option<&mut PlanValidity> {
match self {
PeekStage::Validate(_) => None,
PeekStage::Optimize(PeekStageOptimize { validity, .. })
| PeekStage::Timestamp(PeekStageTimestamp { validity, .. })
PeekStage::Timestamp(PeekStageTimestamp { validity, .. })
| PeekStage::Optimize(PeekStageOptimize { validity, .. })
| PeekStage::RealTimeRecency(PeekStageRealTimeRecency { validity, .. })
| PeekStage::Finish(PeekStageFinish { validity, .. }) => Some(validity),
}
}
Expand All @@ -365,6 +368,19 @@ pub struct PeekStageValidate {
target_cluster: TargetCluster,
}

#[derive(Debug)]
pub struct PeekStageTimestamp {
validity: PlanValidity,
source: MirRelationExpr,
copy_to: Option<CopyFormat>,
source_ids: BTreeSet<GlobalId>,
when: QueryWhen,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
}

#[derive(Debug)]
pub struct PeekStageOptimize {
validity: PlanValidity,
Expand All @@ -374,19 +390,21 @@ pub struct PeekStageOptimize {
when: QueryWhen,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
}

#[derive(Debug)]
pub struct PeekStageTimestamp {
pub struct PeekStageRealTimeRecency {
validity: PlanValidity,
copy_to: Option<CopyFormat>,
source_ids: BTreeSet<GlobalId>,
id_bundle: CollectionIdBundle,
when: QueryWhen,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
in_immediate_multi_stmt_txn: bool,
optimizer: optimize::peek::Optimizer,
global_mir_plan: optimize::peek::GlobalMirPlan,
Expand All @@ -400,6 +418,7 @@ pub struct PeekStageFinish {
when: QueryWhen,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
oracle_read_ts: Option<Timestamp>,
source_ids: BTreeSet<GlobalId>,
real_time_recency_ts: Option<mz_repr::Timestamp>,
optimizer: optimize::peek::Optimizer,
Expand Down Expand Up @@ -682,7 +701,12 @@ impl PendingRead {
PendingRead::ReadThenWrite {
timestamp: (timestamp, timeline),
..
} => TimestampContext::TimelineTimestamp(timeline.clone(), timestamp.clone()),
} => TimestampContext::TimelineTimestamp {
timeline: timeline.clone(),
chosen_ts: timestamp.clone(),
oracle_ts: None, // For writes, we always pick the oracle
// timestamp!
},
}
}

Expand Down
52 changes: 45 additions & 7 deletions src/adapter/src/coord/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
//! Logic for processing [`Coordinator`] messages. The [`Coordinator`] receives
//! messages from various sources (ex: controller, clients, background tasks, etc).

use std::collections::{BTreeMap, BTreeSet};
use std::collections::{btree_map, BTreeMap, BTreeSet};
use std::time::{Duration, Instant};

use futures::future::LocalBoxFuture;
Expand Down Expand Up @@ -617,16 +617,52 @@ impl Coordinator {
let mut ready_txns = Vec::new();
let mut deferred_txns = Vec::new();

// Cache for `TimestampOracle::read_ts` calls. These are somewhat
// expensive so we cache the value. This is correct since all we're
// risking is being too conservative. We will not accidentally "release"
// a result too early.
let mut cached_oracle_ts = BTreeMap::new();

for mut read_txn in pending_read_txns {
if let TimestampContext::TimelineTimestamp(timeline, timestamp) =
read_txn.txn.timestamp_context()
if let TimestampContext::TimelineTimestamp {
timeline,
chosen_ts,
oracle_ts,
} = read_txn.txn.timestamp_context()
{
let timestamp_oracle = self.get_timestamp_oracle(&timeline);
let read_ts = timestamp_oracle.read_ts().await;
if timestamp <= read_ts {
let oracle_ts = match oracle_ts {
Some(oracle_ts) => oracle_ts,
None => {
// There was no oracle timestamp, so no need to delay.
ready_txns.push(read_txn);
continue;
}
};

if chosen_ts <= oracle_ts {
// Chosen ts was already <= the oracle ts, so we're good
// to go!
ready_txns.push(read_txn);
continue;
}

// See what the oracle timestamp is now and delay when needed.
let current_oracle_ts = cached_oracle_ts.entry(timeline.clone());
let current_oracle_ts = match current_oracle_ts {
btree_map::Entry::Vacant(entry) => {
let timestamp_oracle = self.get_timestamp_oracle(&timeline);
let read_ts = timestamp_oracle.read_ts().await;
entry.insert(read_ts.clone());
read_ts
}
btree_map::Entry::Occupied(entry) => entry.get().clone(),
};

if chosen_ts <= current_oracle_ts {
ready_txns.push(read_txn);
} else {
let wait = Duration::from_millis(timestamp.saturating_sub(read_ts).into());
let wait =
Duration::from_millis(chosen_ts.saturating_sub(current_oracle_ts).into());
if wait < shortest_wait {
shortest_wait = wait;
}
Expand Down Expand Up @@ -724,6 +760,7 @@ impl Coordinator {
when,
target_replica,
timeline_context,
oracle_read_ts,
source_ids,
in_immediate_multi_stmt_txn: _,
optimizer,
Expand All @@ -738,6 +775,7 @@ impl Coordinator {
when,
target_replica,
timeline_context,
oracle_read_ts,
source_ids,
real_time_recency_ts: Some(real_time_recency_ts),
optimizer,
Expand Down
Loading