Skip to content

Commit

Permalink
More scaffolding
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jan 30, 2025
1 parent d11f2d3 commit 14b0ba3
Show file tree
Hide file tree
Showing 17 changed files with 281 additions and 43 deletions.
5 changes: 5 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ pub struct PeekStageLinearizeTimestamp {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
Expand All @@ -479,6 +480,7 @@ pub struct PeekStageRealTimeRecency {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
Expand All @@ -494,6 +496,7 @@ pub struct PeekStageTimestampReadHold {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
source_ids: BTreeSet<GlobalId>,
target_replica: Option<ReplicaId>,
timeline_context: TimelineContext,
Expand All @@ -510,6 +513,7 @@ pub struct PeekStageOptimize {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
source_ids: BTreeSet<GlobalId>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
Expand All @@ -525,6 +529,7 @@ pub struct PeekStageFinish {
validity: PlanValidity,
plan: mz_sql::plan::SelectPlan,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
id_bundle: CollectionIdBundle,
target_replica: Option<ReplicaId>,
source_ids: BTreeSet<GlobalId>,
Expand Down
19 changes: 14 additions & 5 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,10 @@ impl Coordinator {
self.sequence_end_transaction(ctx, action).await;
}
Plan::Select(plan) => {
let max = Some(ctx.session().vars().max_query_result_size());
self.sequence_peek(ctx, plan, target_cluster, max).await;
let max_query = Some(ctx.session().vars().max_query_result_size());
let max_heap = ctx.session().vars().max_query_heap_size();
self.sequence_peek(ctx, plan, target_cluster, max_query, max_heap)
.await;
}
Plan::Subscribe(plan) => {
self.sequence_subscribe(ctx, plan, target_cluster).await;
Expand All @@ -356,9 +358,16 @@ impl Coordinator {
ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
}
Plan::ShowColumns(show_columns_plan) => {
let max = Some(ctx.session().vars().max_query_result_size());
self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
.await;
let max_query = Some(ctx.session().vars().max_query_result_size());
let max_heap = ctx.session().vars().max_query_heap_size();
self.sequence_peek(
ctx,
show_columns_plan.select_plan,
target_cluster,
max_query,
max_heap,
)
.await;
}
Plan::CopyFrom(plan) => match plan.source {
CopyFromSource::Stdin => {
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2541,6 +2541,7 @@ impl Coordinator {
None,
ExplainContext::Pushdown,
Some(ctx.session().vars().max_query_result_size()),
ctx.session().vars().max_query_heap_size(),
),
ctx
);
Expand Down Expand Up @@ -2965,6 +2966,7 @@ impl Coordinator {
},
TargetCluster::Active,
None,
None,
)
.await;

Expand Down
25 changes: 23 additions & 2 deletions src/adapter/src/coord/sequencer/inner/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl Coordinator {
plan: plan::SelectPlan,
target_cluster: TargetCluster,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
) {
let explain_ctx = if ctx.session().vars().emit_plan_insights_notice() {
let optimizer_trace = OptimizerTrace::new(ExplainStage::PlanInsights.paths());
Expand All @@ -143,7 +144,8 @@ impl Coordinator {
target_cluster,
None,
explain_ctx,
max_query_result_size
max_query_result_size,
max_query_heap_size,
),
ctx
);
Expand Down Expand Up @@ -209,6 +211,7 @@ impl Coordinator {
}),
ExplainContext::None,
Some(ctx.session().vars().max_query_result_size()),
ctx.session().vars().max_query_heap_size(),
),
ctx
);
Expand Down Expand Up @@ -258,6 +261,7 @@ impl Coordinator {
optimizer_trace,
}),
Some(ctx.session().vars().max_query_result_size()),
ctx.session().vars().max_query_heap_size(),
),
ctx
);
Expand All @@ -274,6 +278,7 @@ impl Coordinator {
copy_to_ctx: Option<CopyToContext>,
explain_ctx: ExplainContext,
max_query_result_size: Option<u64>,
max_query_heap_size: Option<u64>,
) -> Result<PeekStage, AdapterError> {
// Collect optimizer parameters.
let catalog = self.owned_catalog();
Expand Down Expand Up @@ -391,6 +396,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
target_replica,
timeline_context,
Expand All @@ -409,6 +415,7 @@ impl Coordinator {
source_ids,
plan,
max_query_result_size,
max_query_heap_size,
target_replica,
timeline_context,
optimizer,
Expand All @@ -424,6 +431,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
target_replica,
timeline_context,
Expand Down Expand Up @@ -468,6 +476,7 @@ impl Coordinator {
mut validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
target_replica,
timeline_context,
Expand Down Expand Up @@ -508,6 +517,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
id_bundle,
target_replica,
Expand All @@ -526,6 +536,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
id_bundle,
target_replica,
Expand Down Expand Up @@ -640,6 +651,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
id_bundle,
target_replica,
source_ids,
Expand All @@ -656,6 +668,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
id_bundle,
target_replica,
source_ids,
Expand Down Expand Up @@ -745,6 +758,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
source_ids,
target_replica,
timeline_context,
Expand Down Expand Up @@ -772,6 +786,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
target_replica,
timeline_context,
source_ids,
Expand All @@ -790,6 +805,7 @@ impl Coordinator {
validity,
plan,
max_query_result_size,
max_query_heap_size,
target_replica,
timeline_context,
source_ids,
Expand All @@ -810,6 +826,7 @@ impl Coordinator {
validity: _,
plan,
max_query_result_size,
max_query_heap_size,
id_bundle,
target_replica,
source_ids,
Expand All @@ -833,9 +850,13 @@ impl Coordinator {
let session = ctx.session_mut();
let conn_id = session.conn_id().clone();

let (peek_plan, df_meta, typ) = global_lir_plan.unapply();
let (mut peek_plan, df_meta, typ) = global_lir_plan.unapply();
let source_arity = typ.arity();

if let peek::PeekPlan::SlowPath(PeekDataflowPlan { desc, .. }) = &mut peek_plan {
desc.memory_limit = max_query_heap_size;
}

self.emit_optimizer_notices(&*session, &df_meta.optimizer_notices);

let target_cluster = self.catalog().get_cluster(cluster_id);
Expand Down
17 changes: 15 additions & 2 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ use crate::protocol::command::{
};
use crate::protocol::history::ComputeCommandHistory;
use crate::protocol::response::{
ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
StatusResponse, SubscribeBatch, SubscribeResponse,
ComputeResponse, CopyToResponse, DataflowLimitStatus, FrontiersResponse,
OperatorHydrationStatus, PeekResponse, StatusResponse, SubscribeBatch, SubscribeResponse,
};
use crate::service::{ComputeClient, ComputeGrpcClient};

Expand Down Expand Up @@ -739,6 +739,15 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
.expect("Cannot error if target_replica_ids is None")
}

/// Update the tracked hydration status for an operator according to a received status update.
fn update_dataflow_limit_status(&mut self, replica_id: ReplicaId, status: DataflowLimitStatus) {
tracing::warn!(
"Dataflow limit exceeded on replica {}: {:?}",
replica_id,
status
);
}

/// Clean up collection state that is not needed anymore.
///
/// Three conditions need to be true before we can remove state for a collection:
Expand Down Expand Up @@ -1352,6 +1361,7 @@ where
refresh_schedule: dataflow.refresh_schedule,
debug_name: dataflow.debug_name,
time_dependence: dataflow.time_dependence,
memory_limit: dataflow.memory_limit,
};

if augmented_dataflow.is_transient() {
Expand Down Expand Up @@ -2007,6 +2017,9 @@ where
StatusResponse::OperatorHydration(status) => {
self.update_operator_hydration_status(replica_id, status)
}
StatusResponse::DataflowLimitExceeded(status) => {
self.update_dataflow_limit_status(replica_id, status)
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/compute-client/src/protocol/response.proto
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ message ProtoSubscribeBatch {
message ProtoStatusResponse {
oneof kind {
ProtoOperatorHydrationStatus operator_hydration = 1;
ProtoDataflowLimitStatus dataflow_limit_status = 2;
}
}

Expand All @@ -116,3 +117,7 @@ message ProtoOperatorHydrationStatus {
uint64 worker_id = 3;
bool hydrated = 4;
}

message ProtoDataflowLimitStatus {
mz_repr.global_id.ProtoGlobalId collection_id = 1;
}
29 changes: 29 additions & 0 deletions src/compute-client/src/protocol/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,8 @@ impl Arbitrary for SubscribeBatch<mz_repr::Timestamp> {
pub enum StatusResponse {
/// Reports the hydration status of dataflow operators.
OperatorHydration(OperatorHydrationStatus),
/// Reports limit violations for dataflows.
DataflowLimitExceeded(DataflowLimitStatus),
}

impl RustType<ProtoStatusResponse> for StatusResponse {
Expand All @@ -597,6 +599,7 @@ impl RustType<ProtoStatusResponse> for StatusResponse {

let kind = match self {
Self::OperatorHydration(status) => Kind::OperatorHydration(status.into_proto()),
Self::DataflowLimitExceeded(status) => Kind::DataflowLimitStatus(status.into_proto()),
};
ProtoStatusResponse { kind: Some(kind) }
}
Expand All @@ -608,6 +611,9 @@ impl RustType<ProtoStatusResponse> for StatusResponse {
Some(Kind::OperatorHydration(status)) => {
Ok(Self::OperatorHydration(status.into_rust()?))
}
Some(Kind::DataflowLimitStatus(status)) => {
Ok(Self::DataflowLimitExceeded(status.into_rust()?))
}
None => Err(TryFromProtoError::missing_field(
"ProtoStatusResponse::kind",
)),
Expand Down Expand Up @@ -650,6 +656,29 @@ impl RustType<ProtoOperatorHydrationStatus> for OperatorHydrationStatus {
}
}

/// A dataflow exceeded some limit.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct DataflowLimitStatus {
/// The ID of the compute collection exported by the dataflow.
pub collection_id: GlobalId,
}

impl RustType<ProtoDataflowLimitStatus> for DataflowLimitStatus {
fn into_proto(&self) -> ProtoDataflowLimitStatus {
ProtoDataflowLimitStatus {
collection_id: Some(self.collection_id.into_proto()),
}
}

fn from_proto(proto: ProtoDataflowLimitStatus) -> Result<Self, TryFromProtoError> {
Ok(Self {
collection_id: proto
.collection_id
.into_rust_if_some("ProtoDataflowLimitStatus::collection_id")?,
})
}
}

#[cfg(test)]
mod tests {
use mz_ore::assert_ok;
Expand Down
2 changes: 2 additions & 0 deletions src/compute-types/src/dataflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ message ProtoDataflowDescription {
optional mz_storage_types.time_dependence.ProtoTimeDependence time_dependence = 11;

string debug_name = 8;

optional uint64 memory_limit = 12;
}

message ProtoIndexDesc {
Expand Down
Loading

0 comments on commit 14b0ba3

Please sign in to comment.