Skip to content

Commit

Permalink
persist: disable compaction in read-only mode (#30725)
Browse files Browse the repository at this point in the history
This PR disables Persist's compaction when environmentd or clusterd are
in read-only mode. As part of
#30205 we discovered
that during a 0dt deployment the read-only instance of Materialize was
scheduling compaction requests which was causing it to write data.

We disable compaction by adding a `process_compaction_requests:
Arc<AtomicBool>` to the `PersistConfig`, and when setting it to `true`
when `clusterd` receives the already existing `AllowWrites` command.
Also included is a CYA dyncfg that gates whether or not we check the
flag, it's set to `true` by default.

I also added a Prometheus metric to count the number of requests dropped
because compaction is disabled, a unit test to exercise the basic
enable/disable behavior, and manually checked when running a 0dt test
that compaction was successfully disabled and then enabled across
`environmentd` and all `clusterd`s when a deployment was promoted.

### Motivation

Fix issue found in
#30205

### Checklist

- [x] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [x] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [x] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [x] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [x] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

---------

Co-authored-by: Nikhil Benesch <[email protected]>
Co-authored-by: Ben Kirwin <[email protected]>
  • Loading branch information
3 people authored Dec 4, 2024
1 parent 8a9287d commit d101d9e
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/clusterd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
persist_cfg.is_cc_active = args.is_cc;
persist_cfg.announce_memory_limit = args.announce_memory_limit;
// Start with compaction disabled, will get enabled once a cluster receives AllowWrites.
persist_cfg.disable_compaction();

let persist_clients = Arc::new(PersistClientCache::new(
persist_cfg,
&metrics_registry,
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
.read_only_tx
.send(false)
.expect("we're holding one other end");
self.compute_state.persist_clients.cfg().enable_compaction();
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/environmentd/src/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,9 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {

let mut persist_config =
PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
// Start with compaction disabled, later enable it if we're not in read-only mode.
persist_config.disable_compaction();

let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();

Expand Down
6 changes: 5 additions & 1 deletion src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,6 @@ impl Listeners {
let (adapter_storage, audit_logs_handle) = openable_adapter_storage
.open_savepoint(boot_ts, &bootstrap_args)
.await?;

// In read-only mode, we intentionally do not call `set_is_leader`,
// because we are by definition not the leader if we are in
// read-only mode.
Expand All @@ -569,6 +568,11 @@ impl Listeners {
(adapter_storage, audit_logs_handle)
};

// Enable Persist compaction if we're not in read only.
if !read_only {
config.controller.persist_clients.cfg().enable_compaction();
}

info!(
"startup: envd serve: durable catalog open complete in {:?}",
catalog_open_start.elapsed()
Expand Down
17 changes: 17 additions & 0 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

//! The tunable knobs for persist.
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -108,6 +109,8 @@ pub struct PersistConfig {
configs_synced_once: Arc<watch::Sender<bool>>,
/// Whether to physically and logically compact batches in blob storage.
pub compaction_enabled: bool,
/// Whether the `Compactor` will process compaction requests, or drop them on the floor.
pub compaction_process_requests: Arc<AtomicBool>,
/// In Compactor::compact_and_apply_background, the maximum number of concurrent
/// compaction requests that can execute for a given shard.
pub compaction_concurrency_limit: usize,
Expand Down Expand Up @@ -180,6 +183,7 @@ impl PersistConfig {
configs: Arc::new(configs),
configs_synced_once: Arc::new(configs_synced_once),
compaction_enabled: !compaction_disabled,
compaction_process_requests: Arc::new(AtomicBool::new(true)),
compaction_concurrency_limit: 5,
compaction_queue_size: 20,
compaction_yield_after_n_updates: 100_000,
Expand Down Expand Up @@ -270,6 +274,18 @@ impl PersistConfig {
self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
}

pub fn disable_compaction(&self) {
tracing::info!("Disabling Persist Compaction");
self.compaction_process_requests
.store(false, std::sync::atomic::Ordering::Relaxed);
}

pub fn enable_compaction(&self) {
tracing::info!("Enabling Persist Compaction");
self.compaction_process_requests
.store(true, std::sync::atomic::Ordering::Relaxed);
}

/// Returns a new instance of [PersistConfig] for tests.
pub fn new_for_tests() -> Self {
use mz_build_info::DUMMY_BUILD_INFO;
Expand Down Expand Up @@ -330,6 +346,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
.add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT)
.add(&crate::internal::compact::COMPACTION_USE_MOST_RECENT_SCHEMA)
.add(&crate::internal::compact::COMPACTION_CHECK_PROCESS_FLAG)
.add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS)
.add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT)
.add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION)
Expand Down
117 changes: 117 additions & 0 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ pub(crate) const COMPACTION_USE_MOST_RECENT_SCHEMA: Config<bool> = Config::new(
",
);

pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
"persist_compaction_check_process_flag",
true,
"Whether Compactor will obey the process_requests flag in PersistConfig, \
which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
);

impl<K, V, T, D> Compactor<K, V, T, D>
where
K: Debug + Codec,
Expand All @@ -166,6 +173,8 @@ where
let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
cfg.compaction_concurrency_limit,
));
let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
let process_requests = Arc::clone(&cfg.compaction_process_requests);

// spin off a single task responsible for executing compaction requests.
// work is enqueued into the task through a channel
Expand All @@ -175,6 +184,19 @@ where
assert_eq!(req.shard_id, machine.shard_id());
let metrics = Arc::clone(&machine.applier.metrics);

// Only allow skipping compaction requests if the dyncfg is enabled.
if check_process_requests.get()
&& !process_requests.load(std::sync::atomic::Ordering::Relaxed)
{
// Respond to the requester, track in our metrics, and log
// that compaction is disabled.
let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
metrics.compaction.disabled.inc();
tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");

continue;
}

let permit = {
let inner = Arc::clone(&concurrency_limit);
// perform a non-blocking attempt to acquire a permit so we can
Expand Down Expand Up @@ -980,6 +1002,7 @@ impl Timings {
#[cfg(test)]
mod tests {
use mz_dyncfg::ConfigUpdates;
use mz_ore::{assert_contains, assert_err};
use mz_persist_types::codec_impls::StringSchema;
use timely::order::Product;
use timely::progress::Antichain;
Expand Down Expand Up @@ -1136,4 +1159,98 @@ mod tests {
assert_eq!(part.desc, res.output.desc);
assert_eq!(updates, all_ok(&data, Product::new(10, 0)));
}

#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn disable_compaction(dyncfgs: ConfigUpdates) {
let data = [
(("0".to_owned(), "zero".to_owned()), 0, 1),
(("0".to_owned(), "zero".to_owned()), 1, -1),
(("1".to_owned(), "one".to_owned()), 1, 1),
];

let cache = new_test_client_cache(&dyncfgs);
cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
let (mut write, _) = cache
.open(PersistLocation::new_in_mem())
.await
.expect("client construction failed")
.expect_open::<String, String, u64, i64>(ShardId::new())
.await;
let b0 = write
.expect_batch(&data[..1], 0, 1)
.await
.into_hollow_batch();
let b1 = write
.expect_batch(&data[1..], 1, 2)
.await
.into_hollow_batch();

let req = CompactReq {
shard_id: write.machine.shard_id(),
desc: Description::new(
b0.desc.lower().clone(),
b1.desc.upper().clone(),
Antichain::from_elem(10u64),
),
inputs: vec![b0, b1],
};
write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
let compactor = write.compact.as_ref().expect("compaction hard disabled");

write.cfg.disable_compaction();
let result = compactor
.compact_and_apply_background(req.clone(), &write.machine)
.expect("listener")
.await
.expect("channel closed");
assert_err!(result);
assert_contains!(result.unwrap_err().to_string(), "compaction disabled");

write.cfg.enable_compaction();
compactor
.compact_and_apply_background(req, &write.machine)
.expect("listener")
.await
.expect("channel closed")
.expect("compaction success");

// Make sure our CYA dyncfg works.
let data2 = [
(("2".to_owned(), "two".to_owned()), 2, 1),
(("2".to_owned(), "two".to_owned()), 3, -1),
(("3".to_owned(), "three".to_owned()), 3, 1),
];

let b2 = write
.expect_batch(&data2[..1], 2, 3)
.await
.into_hollow_batch();
let b3 = write
.expect_batch(&data2[1..], 3, 4)
.await
.into_hollow_batch();

let req = CompactReq {
shard_id: write.machine.shard_id(),
desc: Description::new(
b2.desc.lower().clone(),
b3.desc.upper().clone(),
Antichain::from_elem(20u64),
),
inputs: vec![b2, b3],
};
let compactor = write.compact.as_ref().expect("compaction hard disabled");

// When the dyncfg is set to false we should ignore the process flag.
write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
write.cfg.disable_compaction();
// Compaction still succeeded!
compactor
.compact_and_apply_background(req, &write.machine)
.expect("listener")
.await
.expect("channel closed")
.expect("compaction success");
}
}
5 changes: 5 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ impl BatchWriteMetrics {
pub struct CompactionMetrics {
pub(crate) requested: IntCounter,
pub(crate) dropped: IntCounter,
pub(crate) disabled: IntCounter,
pub(crate) skipped: IntCounter,
pub(crate) started: IntCounter,
pub(crate) applied: IntCounter,
Expand Down Expand Up @@ -843,6 +844,10 @@ impl CompactionMetrics {
name: "mz_persist_compaction_dropped",
help: "count of total compaction requests dropped due to a full queue",
)),
disabled: registry.register(metric!(
name: "mz_persist_compaction_disabled",
help: "count of total compaction requests dropped because compaction was disabled",
)),
skipped: registry.register(metric!(
name: "mz_persist_compaction_skipped",
help: "count of compactions skipped due to heuristics",
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,7 @@ impl StorageState {
self.read_only_tx
.send(false)
.expect("we're holding one other end");
self.persist_clients.cfg().enable_compaction();
}
StorageCommand::UpdateConfiguration(params) => {
// These can be done from all workers safely.
Expand Down

0 comments on commit d101d9e

Please sign in to comment.