Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Correction buffer metrics #31262

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions src/compute/src/sink/materialized_view_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ mod mint {
/// Implementation of the `write` operator.
mod write {
use super::*;
use differential_dataflow::lattice::Lattice;

/// Render the `write` operator.
///
Expand Down Expand Up @@ -776,7 +777,15 @@ mod write {
}
state.maybe_write_batch().await
}
Event::Progress(_frontier) => None,
Event::Progress(frontier) => {
state.description_frontier.join_assign(&frontier);
if let Some(ts) = state.description_frontier().as_option() {
let stepped = Antichain::from_elem(ts.step_back().unwrap_or(Timestamp::MIN));
state.corrections.ok.advance_since(stepped.clone());
state.corrections.err.advance_since(stepped);
}
None
},
}
}
// All inputs are exhausted, so we can shut down.
Expand Down Expand Up @@ -814,6 +823,9 @@ mod write {
persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
/// The current valid batch description and associated output capability, if any.
batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
/// The latest frontier on the batch description input. This is a lower bound on any
/// descriptions we may be assigned in the future.
description_frontier: Antichain<Timestamp>,
/// A request to force a consolidation of `corrections` once both `desired_frontiers` and
/// `persist_frontiers` become greater than the given frontier.
///
Expand Down Expand Up @@ -850,6 +862,7 @@ mod write {
desired_frontiers: OkErr::new_frontiers(),
persist_frontiers: OkErr::new_frontiers(),
batch_description: None,
description_frontier: Antichain::from_elem(Timestamp::MIN),
force_consolidation_after,
}
}
Expand Down Expand Up @@ -901,16 +914,17 @@ mod write {

/// Apply the effects of a previous `persist` frontier advancement.
fn apply_persist_frontier_advancement(&mut self) {
let frontier = self.persist_frontiers.frontier();

// We will only emit times at or after the `persist` frontier, so now is a good time to
// advance the times of stashed updates.
self.corrections.ok.advance_since(frontier.clone());
self.corrections.err.advance_since(frontier.clone());

self.maybe_force_consolidation();
}

/// A lower bound on any data that may be emitted in the future.
fn description_frontier(&self) -> &Antichain<Timestamp> {
self.batch_description
.as_ref()
.map(|(d, _)| &d.lower)
.unwrap_or(&self.description_frontier)
}

/// If the current consolidation request has become applicable, apply it.
fn maybe_force_consolidation(&mut self) {
let Some(request) = &self.force_consolidation_after else {
Expand Down Expand Up @@ -946,6 +960,12 @@ mod write {
}
}

if let Some(ts) = self.description_frontier().as_option() {
let stepped = Antichain::from_elem(ts.step_back().unwrap_or(Timestamp::MIN));
self.corrections.ok.advance_since(stepped.clone());
self.corrections.err.advance_since(stepped);
}

self.batch_description = Some((desc, cap));
self.trace("set batch description");
}
Expand All @@ -970,7 +990,18 @@ mod write {

let oks = ok_updates.map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r));
let errs = err_updates.map(|(d, t, r)| ((SourceData(Err(d)), ()), t, r));
let mut updates = oks.chain(errs).peekable();
let mut lagging = 0;
let lower = desc.lower.clone();
let mut updates = oks
.chain(errs)
.map(|(kv, mut t, d)| {
if !lower.less_equal(&t) {
lagging += 1;
}
t.advance_by(lower.borrow());
(kv, t, d)
})
.peekable();

// Don't write empty batches.
if updates.peek().is_none() {
Expand All @@ -986,6 +1017,8 @@ mod write {
.expect("valid usage")
.into_transmittable_batch();

assert_eq!(lagging, 0);

self.trace("wrote a batch");
Some((batch, cap))
}
Expand Down
22 changes: 22 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1932,6 +1932,10 @@ pub struct SinkMetrics {
correction_max_per_sink_worker_len_updates: raw::UIntGaugeVec,
/// Maximum capacity observed for any one correction buffer per worker
correction_max_per_sink_worker_capacity_updates: raw::UIntGaugeVec,
/// Total nonzero diffs encountered when advancing the since
correction_nonzero_diff_count: IntCounter,
/// Total number of updates in nonzero diffs when advancing the since
correction_nonzero_diff_update_count: IntCounter,
}

impl SinkMetrics {
Expand Down Expand Up @@ -1963,6 +1967,14 @@ impl SinkMetrics {
help: "The maximum capacity observed for the correction buffer of any single persist sink per worker.",
var_labels: ["worker_id"],
)),
correction_nonzero_diff_count: registry.register(metric!(
name: "mz_persist_sink_correction_nonzero_diff_count",
help: "The total number of nonzero diffs encountered when advancing the since.",
)),
correction_nonzero_diff_update_count: registry.register(metric!(
name: "mz_persist_sink_correction_nonzero_diff_update_count",
help: "The total number of updates in nonzero diffs encountered when advancing the since.",
)),
}
}

Expand Down Expand Up @@ -2013,6 +2025,16 @@ impl SinkMetrics {
UpdateDelta::Negative(delta) => self.correction_capacity_decreases_total.inc_by(delta),
}
}

/// Reports the number of updates remaining when consolidating data after advancing the since.
pub fn report_correction_since_advanced(&self, update_count: usize) {
if update_count > 0 {
panic!("just checkin... {update_count}");
self.correction_nonzero_diff_count.inc();
self.correction_nonzero_diff_update_count
.inc_by(u64::cast_from(update_count));
}
}
}

/// Metrics for the persist sink that are labeled per-worker.
Expand Down