Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: disable compaction in read-only mode #30725

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/clusterd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
persist_cfg.is_cc_active = args.is_cc;
persist_cfg.announce_memory_limit = args.announce_memory_limit;
// Start with compaction disabled, will get enabled once a cluster receives AllowWrites.
persist_cfg.disable_compaction();

let persist_clients = Arc::new(PersistClientCache::new(
persist_cfg,
&metrics_registry,
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
.read_only_tx
.send(false)
.expect("we're holding one other end");
self.compute_state.persist_clients.cfg().enable_compaction();
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/environmentd/src/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,9 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {

let mut persist_config =
PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
// Start with compaction disabled, later enable it if we're not in read-only mode.
persist_config.disable_compaction();

let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();

Expand Down
6 changes: 5 additions & 1 deletion src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,6 @@ impl Listeners {
let (adapter_storage, audit_logs_handle) = openable_adapter_storage
.open_savepoint(boot_ts, &bootstrap_args)
.await?;

// In read-only mode, we intentionally do not call `set_is_leader`,
// because we are by definition not the leader if we are in
// read-only mode.
Expand All @@ -586,6 +585,11 @@ impl Listeners {
(adapter_storage, audit_logs_handle)
};

// Enable Persist compaction if we're not in read only.
if !read_only {
config.controller.persist_clients.cfg().enable_compaction();
}

info!(
"startup: envd serve: durable catalog open complete in {:?}",
catalog_open_start.elapsed()
Expand Down
17 changes: 17 additions & 0 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

//! The tunable knobs for persist.

use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -108,6 +109,8 @@ pub struct PersistConfig {
configs_synced_once: Arc<watch::Sender<bool>>,
/// Whether to physically and logically compact batches in blob storage.
pub compaction_enabled: bool,
/// Whether the `Compactor` will process compaction requests, or drop them on the floor.
pub compaction_process_requests: Arc<AtomicBool>,
/// In Compactor::compact_and_apply_background, the maximum number of concurrent
/// compaction requests that can execute for a given shard.
pub compaction_concurrency_limit: usize,
Expand Down Expand Up @@ -180,6 +183,7 @@ impl PersistConfig {
configs: Arc::new(configs),
configs_synced_once: Arc::new(configs_synced_once),
compaction_enabled: !compaction_disabled,
compaction_process_requests: Arc::new(AtomicBool::new(true)),
compaction_concurrency_limit: 5,
compaction_queue_size: 20,
compaction_yield_after_n_updates: 100_000,
Expand Down Expand Up @@ -270,6 +274,18 @@ impl PersistConfig {
self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
}

pub fn disable_compaction(&self) {
tracing::info!("Disabling Persist Compaction");
self.compaction_process_requests
.store(false, std::sync::atomic::Ordering::Relaxed);
}

pub fn enable_compaction(&self) {
tracing::info!("Enabling Persist Compaction");
self.compaction_process_requests
.store(true, std::sync::atomic::Ordering::Relaxed);
}

/// Returns a new instance of [PersistConfig] for tests.
pub fn new_for_tests() -> Self {
use mz_build_info::DUMMY_BUILD_INFO;
Expand Down Expand Up @@ -330,6 +346,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&crate::internal::cache::BLOB_CACHE_MEM_LIMIT_BYTES)
.add(&crate::internal::compact::COMPACTION_MINIMUM_TIMEOUT)
.add(&crate::internal::compact::COMPACTION_USE_MOST_RECENT_SCHEMA)
.add(&crate::internal::compact::COMPACTION_CHECK_PROCESS_FLAG)
.add(&crate::internal::machine::CLAIM_UNCLAIMED_COMPACTIONS)
.add(&crate::internal::machine::CLAIM_COMPACTION_PERCENT)
.add(&crate::internal::machine::CLAIM_COMPACTION_MIN_VERSION)
Expand Down
117 changes: 117 additions & 0 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ pub(crate) const COMPACTION_USE_MOST_RECENT_SCHEMA: Config<bool> = Config::new(
",
);

pub(crate) const COMPACTION_CHECK_PROCESS_FLAG: Config<bool> = Config::new(
"persist_compaction_check_process_flag",
true,
"Whether Compactor will obey the process_requests flag in PersistConfig, \
which allows dynamically disabling compaction. If false, all compaction requests will be processed.",
);

impl<K, V, T, D> Compactor<K, V, T, D>
where
K: Debug + Codec,
Expand All @@ -166,6 +173,8 @@ where
let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
cfg.compaction_concurrency_limit,
));
let check_process_requests = COMPACTION_CHECK_PROCESS_FLAG.handle(&cfg.configs);
let process_requests = Arc::clone(&cfg.compaction_process_requests);

// spin off a single task responsible for executing compaction requests.
// work is enqueued into the task through a channel
Expand All @@ -175,6 +184,19 @@ where
assert_eq!(req.shard_id, machine.shard_id());
let metrics = Arc::clone(&machine.applier.metrics);

// Only allow skipping compaction requests if the dyncfg is enabled.
if check_process_requests.get()
&& !process_requests.load(std::sync::atomic::Ordering::Relaxed)
{
// Respond to the requester, track in our metrics, and log
// that compaction is disabled.
let _ = completer.send(Err(anyhow::anyhow!("compaction disabled")));
metrics.compaction.disabled.inc();
tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");

continue;
}

let permit = {
let inner = Arc::clone(&concurrency_limit);
// perform a non-blocking attempt to acquire a permit so we can
Expand Down Expand Up @@ -980,6 +1002,7 @@ impl Timings {
#[cfg(test)]
mod tests {
use mz_dyncfg::ConfigUpdates;
use mz_ore::{assert_contains, assert_err};
use mz_persist_types::codec_impls::StringSchema;
use timely::order::Product;
use timely::progress::Antichain;
Expand Down Expand Up @@ -1136,4 +1159,98 @@ mod tests {
assert_eq!(part.desc, res.output.desc);
assert_eq!(updates, all_ok(&data, Product::new(10, 0)));
}

#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn disable_compaction(dyncfgs: ConfigUpdates) {
let data = [
(("0".to_owned(), "zero".to_owned()), 0, 1),
(("0".to_owned(), "zero".to_owned()), 1, -1),
(("1".to_owned(), "one".to_owned()), 1, 1),
];

let cache = new_test_client_cache(&dyncfgs);
cache.cfg.set_config(&BLOB_TARGET_SIZE, 100);
let (mut write, _) = cache
.open(PersistLocation::new_in_mem())
.await
.expect("client construction failed")
.expect_open::<String, String, u64, i64>(ShardId::new())
.await;
let b0 = write
.expect_batch(&data[..1], 0, 1)
.await
.into_hollow_batch();
let b1 = write
.expect_batch(&data[1..], 1, 2)
.await
.into_hollow_batch();

let req = CompactReq {
shard_id: write.machine.shard_id(),
desc: Description::new(
b0.desc.lower().clone(),
b1.desc.upper().clone(),
Antichain::from_elem(10u64),
),
inputs: vec![b0, b1],
};
write.cfg.set_config(&COMPACTION_HEURISTIC_MIN_INPUTS, 1);
let compactor = write.compact.as_ref().expect("compaction hard disabled");

write.cfg.disable_compaction();
let result = compactor
.compact_and_apply_background(req.clone(), &write.machine)
.expect("listener")
.await
.expect("channel closed");
assert_err!(result);
assert_contains!(result.unwrap_err().to_string(), "compaction disabled");

write.cfg.enable_compaction();
compactor
.compact_and_apply_background(req, &write.machine)
.expect("listener")
.await
.expect("channel closed")
.expect("compaction success");

// Make sure our CYA dyncfg works.
let data2 = [
(("2".to_owned(), "two".to_owned()), 2, 1),
(("2".to_owned(), "two".to_owned()), 3, -1),
(("3".to_owned(), "three".to_owned()), 3, 1),
];

let b2 = write
.expect_batch(&data2[..1], 2, 3)
.await
.into_hollow_batch();
let b3 = write
.expect_batch(&data2[1..], 3, 4)
.await
.into_hollow_batch();

let req = CompactReq {
shard_id: write.machine.shard_id(),
desc: Description::new(
b2.desc.lower().clone(),
b3.desc.upper().clone(),
Antichain::from_elem(20u64),
),
inputs: vec![b2, b3],
};
let compactor = write.compact.as_ref().expect("compaction hard disabled");

// When the dyncfg is set to false we should ignore the process flag.
write.cfg.set_config(&COMPACTION_CHECK_PROCESS_FLAG, false);
write.cfg.disable_compaction();
// Compaction still succeeded!
compactor
.compact_and_apply_background(req, &write.machine)
.expect("listener")
.await
.expect("channel closed")
.expect("compaction success");
}
}
5 changes: 5 additions & 0 deletions src/persist-client/src/internal/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ impl BatchWriteMetrics {
pub struct CompactionMetrics {
pub(crate) requested: IntCounter,
pub(crate) dropped: IntCounter,
pub(crate) disabled: IntCounter,
pub(crate) skipped: IntCounter,
pub(crate) started: IntCounter,
pub(crate) applied: IntCounter,
Expand Down Expand Up @@ -843,6 +844,10 @@ impl CompactionMetrics {
name: "mz_persist_compaction_dropped",
help: "count of total compaction requests dropped due to a full queue",
)),
disabled: registry.register(metric!(
name: "mz_persist_compaction_disabled",
help: "count of total compaction requests dropped because compaction was disabled",
)),
skipped: registry.register(metric!(
name: "mz_persist_compaction_skipped",
help: "count of compactions skipped due to heuristics",
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ impl StorageState {
self.read_only_tx
.send(false)
.expect("we're holding one other end");
self.persist_clients.cfg().enable_compaction();
}
StorageCommand::UpdateConfiguration(params) => {
// These can be done from all workers safely.
Expand Down
Loading