Skip to content

Commit

Permalink
add flag to prevent processing compaction requests
Browse files Browse the repository at this point in the history
  • Loading branch information
ParkMyCar committed Dec 3, 2024
1 parent 497a477 commit b8ca9ba
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 4 deletions.
6 changes: 3 additions & 3 deletions misc/python/materialize/checks/all_checks/continual_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class AuditLogCT(Check):
"""Continual Task for audit logging"""

def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse_mz("v0.120.0-dev")
return self.base_version > MzVersion.parse_mz("v0.127.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
Expand Down Expand Up @@ -70,7 +70,7 @@ class StreamTableJoinCT(Check):
"""Continual Task for stream table join"""

def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse_mz("v0.120.0-dev")
return self.base_version > MzVersion.parse_mz("v0.127.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
Expand Down Expand Up @@ -136,7 +136,7 @@ class UpsertCT(Check):
"""Continual Task for upserts"""

def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse_mz("v0.120.0-dev")
return self.base_version > MzVersion.parse_mz("v0.127.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
Expand Down
3 changes: 3 additions & 0 deletions src/clusterd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
persist_cfg.is_cc_active = args.is_cc;
persist_cfg.announce_memory_limit = args.announce_memory_limit;
// Start with compaction disabled, will get enabled once a cluster receives AllowWrites.
persist_cfg.disable_compaction();

let persist_clients = Arc::new(PersistClientCache::new(
persist_cfg,
&metrics_registry,
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
.read_only_tx
.send(false)
.expect("we're holding one other end");
self.compute_state.persist_clients.cfg().enable_compaction();
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/environmentd/src/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,9 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {

let mut persist_config =
PersistConfig::new(&BUILD_INFO, now.clone(), mz_dyncfgs::all_dyncfgs());
// Start with compaction disabled, later enable it if we're not in read-only mode.
persist_config.disable_compaction();

let persist_pubsub_server = PersistGrpcPubSubServer::new(&persist_config, &metrics_registry);
let persist_pubsub_client = persist_pubsub_server.new_same_process_connection();

Expand Down
8 changes: 7 additions & 1 deletion src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,6 @@ impl Listeners {
let (adapter_storage, audit_logs_handle) = openable_adapter_storage
.open_savepoint(boot_ts, &bootstrap_args)
.await?;

// In read-only mode, we intentionally do not call `set_is_leader`,
// because we are by definition not the leader if we are in
// read-only mode.
Expand All @@ -586,6 +585,13 @@ impl Listeners {
(adapter_storage, audit_logs_handle)
};

// Enable Persist compaction if we're not in read only.
if read_only {
config.controller.persist_clients.cfg().disable_compaction();
} else {
config.controller.persist_clients.cfg().enable_compaction();
}

info!(
"startup: envd serve: durable catalog open complete in {:?}",
catalog_open_start.elapsed()
Expand Down
16 changes: 16 additions & 0 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

//! The tunable knobs for persist.
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -108,6 +109,8 @@ pub struct PersistConfig {
configs_synced_once: Arc<watch::Sender<bool>>,
/// Whether to physically and logically compact batches in blob storage.
pub compaction_enabled: bool,
/// Whether to process compaction requests.
pub compaction_enabled2: Arc<AtomicBool>,
/// In Compactor::compact_and_apply_background, the maximum number of concurrent
/// compaction requests that can execute for a given shard.
pub compaction_concurrency_limit: usize,
Expand Down Expand Up @@ -180,6 +183,7 @@ impl PersistConfig {
configs: Arc::new(configs),
configs_synced_once: Arc::new(configs_synced_once),
compaction_enabled: !compaction_disabled,
compaction_enabled2: Arc::new(AtomicBool::new(true)),
compaction_concurrency_limit: 5,
compaction_queue_size: 20,
compaction_yield_after_n_updates: 100_000,
Expand Down Expand Up @@ -270,6 +274,18 @@ impl PersistConfig {
self.set_config(&NEXT_LISTEN_BATCH_RETRYER_CLAMP, val.clamp);
}

pub fn disable_compaction(&self) {
tracing::info!("Disabling Perist Compaction");
self.compaction_enabled2
.store(false, std::sync::atomic::Ordering::Relaxed);
}

pub fn enable_compaction(&self) {
tracing::info!("Enabling Perist Compaction");
self.compaction_enabled2
.store(true, std::sync::atomic::Ordering::Relaxed);
}

/// Returns a new instance of [PersistConfig] for tests.
pub fn new_for_tests() -> Self {
use mz_build_info::DUMMY_BUILD_INFO;
Expand Down
6 changes: 6 additions & 0 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ where
let concurrency_limit = Arc::new(tokio::sync::Semaphore::new(
cfg.compaction_concurrency_limit,
));
let compaction_enabled = Arc::clone(&cfg.compaction_enabled2);

// spin off a single task responsible for executing compaction requests.
// work is enqueued into the task through a channel
Expand All @@ -175,6 +176,11 @@ where
assert_eq!(req.shard_id, machine.shard_id());
let metrics = Arc::clone(&machine.applier.metrics);

if !compaction_enabled.load(std::sync::atomic::Ordering::Relaxed) {
tracing::warn!(shard_id = ?req.shard_id, "Dropping compaction request on the floor.");
continue;
}

let permit = {
let inner = Arc::clone(&concurrency_limit);
// perform a non-blocking attempt to acquire a permit so we can
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ impl StorageState {
self.read_only_tx
.send(false)
.expect("we're holding one other end");
self.persist_clients.cfg().enable_compaction();
}
StorageCommand::UpdateConfiguration(params) => {
// These can be done from all workers safely.
Expand Down

0 comments on commit b8ca9ba

Please sign in to comment.