diff --git a/src/clusterd/src/lib.rs b/src/clusterd/src/lib.rs index c3982e2681f14..d93ae95a3ed6c 100644 --- a/src/clusterd/src/lib.rs +++ b/src/clusterd/src/lib.rs @@ -272,6 +272,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> { PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs()); persist_cfg.is_cc_active = args.is_cc; persist_cfg.announce_memory_limit = args.announce_memory_limit; + // Start with compaction disabled, will get enabled once a cluster receives AllowWrites. + persist_cfg.disable_compaction(); + let persist_clients = Arc::new(PersistClientCache::new( persist_cfg, &metrics_registry, diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index c1bc061a754ec..0c184e0325d27 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -410,6 +410,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { .read_only_tx .send(false) .expect("we're holding one other end"); + self.compute_state.persist_clients.cfg().enable_compaction(); } } diff --git a/src/environmentd/src/environmentd/main.rs b/src/environmentd/src/environmentd/main.rs index e7fd62925913a..59587e5ef25e3 100644 --- a/src/environmentd/src/environmentd/main.rs +++ b/src/environmentd/src/environmentd/main.rs @@ -875,6 +875,9 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { let mut persist_config = PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs()); + // Start with compaction disabled, later enable it if we're not in read-only mode. + persist_config.disable_compaction(); + let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry); let persist_pubsub_client = persist_pubsub_server.new_same_process_connection(); diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index 8360e3ce208b6..c15327a48a370 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -567,7 +567,6 @@ impl Listeners { let (adapter_storage, audit_logs_handle) = openable_adapter_storage .open_savepoint(boot_ts, &bootstrap_args) .await?; - // In read-only mode, we intentionally do not call `set_is_leader`, // because we are by definition not the leader if we are in // read-only mode. @@ -586,6 +585,11 @@ impl Listeners { (adapter_storage, audit_logs_handle) }; + // Enable Persist compaction if we're not in read only. + if !read_only { + config.controller.persist_clients.cfg().enable_compaction(); + } + info!( "startup: envd serve: durable catalog open complete in {:?}", catalog_open_start.elapsed() diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index b50cc7146bf43..f57eb126311e8 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -11,6 +11,7 @@ //! The tunable knobs for persist. +use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; @@ -108,6 +109,8 @@ pub struct PersistConfig { configs_synced_once: Arc>, /// Whether to physically and logically compact batches in blob storage. pub compaction_enabled: bool, + /// Whether the `Compactor` will process compaction requests, or drop them on the floor. + pub compaction_process_requests: Arc, /// In Compactor::compact_and_apply_background, the maximum number of concurrent /// compaction requests that can execute for a given shard. pub compaction_concurrency_limit: usize, @@ -180,6 +183,7 @@ impl PersistConfig { configs: Arc::new(configs), configs_synced_once: Arc::new(configs_synced_once), compaction_enabled: !compaction_disabled, + compaction_process_requests: Arc::new(AtomicBool::new(true)), compaction_concurrency_limit: 5, compaction_queue_size: 20, compaction_yield_after_n_updates: 100_000, @@ -270,6 +274,18 @@ impl PersistConfig { self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp); } + pub fn disable_compaction(&self) { + tracing::info!("Disabling Persist Compaction"); + self.compaction_process_requests + .store(false, std::sync::atomic::Ordering::Relaxed); + } + + pub fn enable_compaction(&self) { + tracing::info!("Enabling Persist Compaction"); + self.compaction_process_requests + .store(true, std::sync::atomic::Ordering::Relaxed); + } + /// Returns a new instance of [PersistConfig] for tests. pub fn new_for_tests() -> Self { use mz_build_info::DUMMY_BUILD_INFO; @@ -330,6 +346,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES) .add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT) .add(&crate::internal::compact::COMPACTION_USE_MOST_RECENT_SCHEMA) + .add(&crate::internal::compact::COMPACTION_CHECK_PROCESS_FLAG) .add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS) .add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT) .add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION) diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 455566e941e5d..7fb2b0097e165 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -144,6 +144,13 @@ pub(crate) const COMPACTION_USE_MOST_RECENT_SCHEMA: Config = Config::new( ", ); +pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config = Config::new( + "persist_compaction_check_process_flag", + true, + "Whether Compactor will obey the process_requests flag in PersistConfig, \ + which allows dynamically disabling compaction. If false, all compaction requests will be processed.", +); + impl Compactor where K: Debug + Codec, @@ -166,6 +173,8 @@ where let concurrency_limit = Arc::new(tokio::sync::Semaphore::new( cfg.compaction_concurrency_limit, )); + let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs); + let process_requests = Arc::clone(&cfg.compaction_process_requests); // spin off a single task responsible for executing compaction requests. // work is enqueued into the task through a channel @@ -175,6 +184,19 @@ where assert_eq!(req.shard_id, machine.shard_id()); let metrics = Arc::clone(&machine.applier.metrics); + // Only allow skipping compaction requests if the dyncfg is enabled. + if check_process_requests.get() + && !process_requests.load(std::sync::atomic::Ordering::Relaxed) + { + // Respond to the requester, track in our metrics, and log + // that compaction is disabled. + let _ = completer.send(Err(anyhow::anyhow!("compaction disabled"))); + metrics.compaction.disabled.inc(); + tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor."); + + continue; + } + let permit = { let inner = Arc::clone(&concurrency_limit); // perform a non-blocking attempt to acquire a permit so we can @@ -980,6 +1002,7 @@ impl Timings { #[cfg(test)] mod tests { use mz_dyncfg::ConfigUpdates; + use mz_ore::{assert_contains, assert_err}; use mz_persist_types::codec_impls::StringSchema; use timely::order::Product; use timely::progress::Antichain; @@ -1136,4 +1159,98 @@ mod tests { assert_eq!(part.desc, res.output.desc); assert_eq!(updates, all_ok(&data, Product::new(10, 0))); } + + #[mz_persist_proc::test(tokio::test)] + #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented + async fn disable_compaction(dyncfgs: ConfigUpdates) { + let data = [ + (("0".to_owned(), "zero".to_owned()), 0, 1), + (("0".to_owned(), "zero".to_owned()), 1, -1), + (("1".to_owned(), "one".to_owned()), 1, 1), + ]; + + let cache = new_test_client_cache(&dyncfgs); + cache.cfg.set_config(&BLOB_TARGET_SIZE, 100); + let (mut write, _) = cache + .open(PersistLocation::new_in_mem()) + .await + .expect("client construction failed") + .expect_open::(ShardId::new()) + .await; + let b0 = write + .expect_batch(&data[..1], 0, 1) + .await + .into_hollow_batch(); + let b1 = write + .expect_batch(&data[1..], 1, 2) + .await + .into_hollow_batch(); + + let req = CompactReq { + shard_id: write.machine.shard_id(), + desc: Description::new( + b0.desc.lower().clone(), + b1.desc.upper().clone(), + Antichain::from_elem(10u64), + ), + inputs: vec![b0, b1], + }; + write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1); + let compactor = write.compact.as_ref().expect("compaction hard disabled"); + + write.cfg.disable_compaction(); + let result = compactor + .compact_and_apply_background(req.clone(), &write.machine) + .expect("listener") + .await + .expect("channel closed"); + assert_err!(result); + assert_contains!(result.unwrap_err().to_string(), "compaction disabled"); + + write.cfg.enable_compaction(); + compactor + .compact_and_apply_background(req, &write.machine) + .expect("listener") + .await + .expect("channel closed") + .expect("compaction success"); + + // Make sure our CYA dyncfg works. + let data2 = [ + (("2".to_owned(), "two".to_owned()), 2, 1), + (("2".to_owned(), "two".to_owned()), 3, -1), + (("3".to_owned(), "three".to_owned()), 3, 1), + ]; + + let b2 = write + .expect_batch(&data2[..1], 2, 3) + .await + .into_hollow_batch(); + let b3 = write + .expect_batch(&data2[1..], 3, 4) + .await + .into_hollow_batch(); + + let req = CompactReq { + shard_id: write.machine.shard_id(), + desc: Description::new( + b2.desc.lower().clone(), + b3.desc.upper().clone(), + Antichain::from_elem(20u64), + ), + inputs: vec![b2, b3], + }; + let compactor = write.compact.as_ref().expect("compaction hard disabled"); + + // When the dyncfg is set to false we should ignore the process flag. + write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false); + write.cfg.disable_compaction(); + // Compaction still succeeded! + compactor + .compact_and_apply_background(req, &write.machine) + .expect("listener") + .await + .expect("channel closed") + .expect("compaction success"); + } } diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs index 818cd4712e960..f667b030c0966 100644 --- a/src/persist-client/src/internal/metrics.rs +++ b/src/persist-client/src/internal/metrics.rs @@ -792,6 +792,7 @@ impl BatchWriteMetrics { pub struct CompactionMetrics { pub(crate) requested: IntCounter, pub(crate) dropped: IntCounter, + pub(crate) disabled: IntCounter, pub(crate) skipped: IntCounter, pub(crate) started: IntCounter, pub(crate) applied: IntCounter, @@ -843,6 +844,10 @@ impl CompactionMetrics { name: "mz_persist_compaction_dropped", help: "count of total compaction requests dropped due to a full queue", )), + disabled: registry.register(metric!( + name: "mz_persist_compaction_disabled", + help: "count of total compaction requests dropped because compaction was disabled", + )), skipped: registry.register(metric!( name: "mz_persist_compaction_skipped", help: "count of compactions skipped due to heuristics", diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index dd729d5368139..12bfa3a44390c 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -1100,6 +1100,7 @@ impl StorageState { self.read_only_tx .send(false) .expect("we're holding one other end"); + self.persist_clients.cfg().enable_compaction(); } StorageCommand::UpdateConfiguration(params) => { // These can be done from all workers safely.