From 8cca8555fc9e72e20a6a12c04ac7513f0a7dee5f Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Wed, 27 Nov 2024 14:38:05 -0500 Subject: [PATCH] WIP --- src/adapter/src/catalog.rs | 5 +- src/adapter/src/coord.rs | 21 +++++- src/catalog-debug/src/main.rs | 3 +- src/catalog/src/durable.rs | 20 +++++- .../src/durable/objects/state_update.rs | 13 ++++ src/catalog/src/durable/persist.rs | 56 ++++++++++++++-- src/catalog/src/durable/persist/tests.rs | 15 +++-- src/catalog/src/durable/transaction.rs | 6 +- src/catalog/src/durable/upgrade.rs | 10 ++- src/catalog/tests/debug.rs | 22 +++++-- src/catalog/tests/open.rs | 66 ++++++++++++------- src/catalog/tests/read-write.rs | 24 ++++--- src/environmentd/src/deployment/preflight.rs | 2 +- src/environmentd/src/lib.rs | 11 ++-- 14 files changed, 212 insertions(+), 62 deletions(-) diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 0afa71e0d3f98..844f4741f1bee 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -552,7 +552,8 @@ impl Catalog { let mut storage = openable_storage .open(now().into(), &bootstrap_args) .await - .expect("can open durable catalog"); + .expect("can open durable catalog") + .0; // Drain updates. let _ = storage .sync_to_current_updates() @@ -578,7 +579,7 @@ impl Catalog { .with_default_deploy_generation() .build() .await?; - let storage = openable_storage.open(now().into(), bootstrap_args).await?; + let storage = openable_storage.open(now().into(), bootstrap_args).await?.0; let system_parameter_defaults = BTreeMap::default(); Self::open_debug_catalog_inner( persist_client, diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index eb3049ebc995d..40f371aa44e61 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -98,7 +98,7 @@ use mz_catalog::durable::OpenableDurableCatalogState; use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions}; use mz_catalog::memory::objects::{ CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection, - DataSourceDesc, Table, TableDataSource, + DataSourceDesc, StateUpdate, Table, TableDataSource, }; use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent}; use mz_compute_client::as_of_selection; @@ -977,6 +977,7 @@ pub struct Config { pub controller_config: ControllerConfig, pub controller_envd_epoch: NonZeroI64, pub storage: Box, + pub audit_logs_handle: thread::JoinHandle>, pub timestamp_oracle_url: Option, pub unsafe_mode: bool, pub all_features: bool, @@ -1787,6 +1788,7 @@ impl Coordinator { mut builtin_table_updates: Vec, cached_global_exprs: BTreeMap, uncached_local_exprs: BTreeMap, + audit_logs_handle: std::thread::JoinHandle>, ) -> Result<(), AdapterError> { let bootstrap_start = Instant::now(); info!("startup: coordinator init: bootstrap beginning"); @@ -2243,6 +2245,21 @@ impl Coordinator { postamble_start.elapsed() ); + let audit_join_start = Instant::now(); + info!("startup: coordinator init: bootstrap: join audit log deserialization beginning"); + let audit_log_updates = audit_logs_handle + .join() + .expect("unable to deserialize audit log"); + let audit_log_builtin_table_updates = self + .catalog() + .state() + .generate_builtin_table_updates(audit_log_updates); + builtin_table_updates.extend(audit_log_builtin_table_updates); + info!( + "startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}", + audit_join_start.elapsed() + ); + let builtin_update_start = Instant::now(); info!("startup: coordinator init: bootstrap: generate builtin updates beginning"); if self.controller.read_only() { @@ -3682,6 +3699,7 @@ pub fn serve( controller_config, controller_envd_epoch, mut storage, + audit_logs_handle, timestamp_oracle_url, unsafe_mode, all_features, @@ -4065,6 +4083,7 @@ pub fn serve( builtin_table_updates, cached_global_exprs, uncached_local_exprs, + audit_logs_handle, ) .await?; coord diff --git a/src/catalog-debug/src/main.rs b/src/catalog-debug/src/main.rs index bef65fab13cdc..f4588d5888687 100644 --- a/src/catalog-debug/src/main.rs +++ b/src/catalog-debug/src/main.rs @@ -551,7 +551,8 @@ async fn upgrade_check( cluster_replica_size_map: cluster_replica_sizes.clone(), }, ) - .await?; + .await? + .0; // If this upgrade has new builtin replicas, then we need to assign some size to it. It doesn't // really matter what size since it's not persisted, so we pick a random valid one. diff --git a/src/catalog/src/durable.rs b/src/catalog/src/durable.rs index 5b1151a104a16..123e92a1604db 100644 --- a/src/catalog/src/durable.rs +++ b/src/catalog/src/durable.rs @@ -99,11 +99,19 @@ pub trait OpenableDurableCatalogState: Debug + Send { /// - Catalog migrations fail. /// /// `initial_ts` is used as the initial timestamp for new environments. + /// + /// Also returns a handle to a thread that is deserializing all of the audit logs. async fn open_savepoint( mut self: Box, initial_ts: Timestamp, bootstrap_args: &BootstrapArgs, - ) -> Result, CatalogError>; + ) -> Result< + ( + Box, + std::thread::JoinHandle>, + ), + CatalogError, + >; /// Opens the catalog in read only mode. All mutating methods /// will return an error. @@ -120,11 +128,19 @@ pub trait OpenableDurableCatalogState: Debug + Send { /// needed. /// /// `initial_ts` is used as the initial timestamp for new environments. + /// + /// Also returns a handle to a thread that is deserializing all of the audit logs. async fn open( mut self: Box, initial_ts: Timestamp, bootstrap_args: &BootstrapArgs, - ) -> Result, CatalogError>; + ) -> Result< + ( + Box, + std::thread::JoinHandle>, + ), + CatalogError, + >; /// Opens the catalog for manual editing of the underlying data. This is helpful for /// fixing a corrupt catalog. diff --git a/src/catalog/src/durable/objects/state_update.rs b/src/catalog/src/durable/objects/state_update.rs index 2a39c72c8aff4..e87cee2f4a02e 100644 --- a/src/catalog/src/durable/objects/state_update.rs +++ b/src/catalog/src/durable/objects/state_update.rs @@ -332,6 +332,7 @@ impl StateUpdateKindJson { value: String::new(), }, ), + StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()), ] .into_iter() .map(|kind| { @@ -342,6 +343,18 @@ impl StateUpdateKindJson { }); DESERIALIZABLE_KINDS.contains(self.kind()) } + + /// Returns true if this is an audit log update. Otherwise, returns false. + pub(crate) fn is_audit_log(&self) -> bool { + // Construct a fake audit log so we can extract exactly what the kind field will serialize + // as. + static AUDIT_LOG_KIND: LazyLock = LazyLock::new(|| { + let audit_log = StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()); + let json_kind: StateUpdateKindJson = audit_log.into(); + json_kind.kind().to_string() + }); + &*AUDIT_LOG_KIND == self.kind() + } } /// Version of [`StateUpdateKind`] that is stored directly in persist. diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index f231538e8f705..7ede27cc5d215 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -1118,7 +1118,13 @@ impl UnopenedPersistCatalogState { mode: Mode, initial_ts: Timestamp, bootstrap_args: &BootstrapArgs, - ) -> Result, CatalogError> { + ) -> Result< + ( + Box, + std::thread::JoinHandle>, + ), + CatalogError, + > { // It would be nice to use `initial_ts` here, but it comes from the system clock, not the // timestamp oracle. let mut commit_ts = self.upper; @@ -1198,6 +1204,35 @@ impl UnopenedPersistCatalogState { } soft_assert_ne_or_log!(self.upper, Timestamp::minimum()); + // Remove all audit log entries. + let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self + .snapshot + .into_iter() + .partition(|(update, _, _)| update.is_audit_log()); + self.snapshot = snapshot; + + // Create thread to deserialize audit logs. + let audit_log_handle = std::thread::spawn(move || { + let updates: Vec<_> = audit_logs + .into_iter() + .map(|(kind, ts, diff)| { + assert_eq!( + diff, 1, + "audit log is append only: ({kind:?}, {ts:?}, {diff:?})" + ); + let diff = memory::objects::StateDiff::Addition; + + let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error"); + let kind: Option = (&kind) + .try_into() + .expect("invalid persisted update: {update:#?}"); + let kind = kind.expect("audit log always produces im-memory updates"); + memory::objects::StateUpdate { kind, ts, diff } + }) + .collect(); + updates + }); + // Perform data migrations. if is_initialized && !read_only { commit_ts = upgrade(&mut self, commit_ts).await?; @@ -1303,7 +1338,7 @@ impl UnopenedPersistCatalogState { }); } - Ok(Box::new(catalog)) + Ok((Box::new(catalog), audit_log_handle)) } /// Reports if the catalog state has been initialized. @@ -1367,7 +1402,13 @@ impl OpenableDurableCatalogState for UnopenedPersistCatalogState { mut self: Box, initial_ts: Timestamp, bootstrap_args: &BootstrapArgs, - ) -> Result, CatalogError> { + ) -> Result< + ( + Box, + std::thread::JoinHandle>, + ), + CatalogError, + > { self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args) .boxed() .await @@ -1381,6 +1422,7 @@ impl OpenableDurableCatalogState for UnopenedPersistCatalogState { self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args) .boxed() .await + .map(|(catalog, _)| catalog) } #[mz_ore::instrument] @@ -1388,7 +1430,13 @@ impl OpenableDurableCatalogState for UnopenedPersistCatalogState { mut self: Box, initial_ts: Timestamp, bootstrap_args: &BootstrapArgs, - ) -> Result, CatalogError> { + ) -> Result< + ( + Box, + std::thread::JoinHandle>, + ), + CatalogError, + > { self.open_inner(Mode::Writable, initial_ts, bootstrap_args) .boxed() .await diff --git a/src/catalog/src/durable/persist/tests.rs b/src/catalog/src/durable/persist/tests.rs index 5274664dda274..cb1111b7863a5 100644 --- a/src/catalog/src/durable/persist/tests.rs +++ b/src/catalog/src/durable/persist/tests.rs @@ -51,7 +51,8 @@ async fn test_upgrade_shard() { let _persist_state = persist_openable_state .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .expect("failed to open persist catalog"); + .expect("failed to open persist catalog") + .0; assert_eq!( Some(first_version.clone()), @@ -111,7 +112,8 @@ async fn test_upgrade_shard() { let _persist_state = persist_openable_state .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .expect("failed to open savepoint persist catalog"); + .expect("failed to open savepoint persist catalog") + .0; assert_eq!( Some(first_version.clone()), @@ -140,7 +142,8 @@ async fn test_upgrade_shard() { let _persist_state = persist_openable_state .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .expect("failed to open readonly persist catalog"); + .expect("failed to open readonly persist catalog") + .0; assert_eq!( Some(second_version), @@ -180,7 +183,8 @@ async fn test_version_regression() { let _persist_state = persist_openable_state .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .expect("failed to open persist catalog"); + .expect("failed to open persist catalog") + .0; assert_eq!( Some(first_version.clone()), @@ -201,7 +205,8 @@ async fn test_version_regression() { let _persist_state = persist_openable_state .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .expect("failed to open readonly persist catalog"); + .expect("failed to open readonly persist catalog") + .0; assert_eq!( Some(second_version.clone()), diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs index c42928461a197..2d34e0ae5f7d1 100644 --- a/src/catalog/src/durable/transaction.rs +++ b/src/catalog/src/durable/transaction.rs @@ -3643,13 +3643,15 @@ mod tests { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; let mut savepoint_state = state_builder .unwrap_build() .await .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap(); assert!(!initial_snapshot.is_empty()); diff --git a/src/catalog/src/durable/upgrade.rs b/src/catalog/src/durable/upgrade.rs index d94f1756b541e..19c8ac1de21a8 100644 --- a/src/catalog/src/durable/upgrade.rs +++ b/src/catalog/src/durable/upgrade.rs @@ -20,6 +20,7 @@ //! - Config //! - Setting //! - FenceToken +//! - AuditLog //! //! When you want to make a change to the `Catalog` you need to follow these steps: //! @@ -57,8 +58,6 @@ mod tests; use mz_ore::{soft_assert_eq_or_log, soft_assert_ne_or_log}; use mz_repr::Diff; -use timely::progress::Timestamp as TimelyTimestamp; - use paste::paste; #[cfg(test)] use proptest::prelude::*; @@ -66,6 +65,7 @@ use proptest::prelude::*; use proptest::strategy::ValueTree; #[cfg(test)] use proptest_derive::Arbitrary; +use timely::progress::Timestamp as TimelyTimestamp; use crate::durable::initialize::USER_VERSION_KEY; use crate::durable::objects::serialization::proto; @@ -377,6 +377,12 @@ async fn run_versioned_upgrade(state_builder: TestCatalogStateBuilder) { let _ = openable_state1 .open(NOW_ZERO().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Check epoch let mut openable_state2 = state_builder.clone().unwrap_build().await; @@ -347,7 +348,8 @@ async fn test_debug_edit_fencing<'a>(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; let mut debug_state = state_builder .clone() @@ -401,7 +403,8 @@ async fn test_debug_edit_fencing<'a>(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Now debug state should be fenced. let err = debug_state @@ -440,7 +443,8 @@ async fn test_debug_delete_fencing<'a>(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Drain state updates. let _ = state.sync_to_current_updates().await; @@ -496,7 +500,8 @@ async fn test_debug_delete_fencing<'a>(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Now debug state should be fenced. let err = debug_state @@ -551,7 +556,8 @@ async fn test_concurrent_debugs(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; let state_handle = mz_ore::task::spawn(|| "state", async move { // Eventually this state should get fenced by the edit below. let err = run_state(&mut state).await.unwrap_err(); @@ -583,7 +589,8 @@ async fn test_concurrent_debugs(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; let configs = state.snapshot().await.unwrap().configs; assert_eq!(configs.get(&key).unwrap(), &value); @@ -619,6 +626,7 @@ async fn test_concurrent_debugs(state_builder: TestCatalogStateBuilder) { .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await .unwrap() + .0 .snapshot() .await .unwrap() diff --git a/src/catalog/tests/open.rs b/src/catalog/tests/open.rs index 88e56a6409b20..1cf244111d463 100644 --- a/src/catalog/tests/open.rs +++ b/src/catalog/tests/open.rs @@ -141,7 +141,8 @@ async fn test_is_initialized(state_builder: TestCatalogStateBuilder) { let state = openable_state1 .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; state.expire().await; let mut openable_state2 = state_builder.unwrap_build().await; @@ -185,7 +186,8 @@ async fn test_get_deployment_generation(state_builder: TestCatalogStateBuilder) let state = openable_state .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; state.expire().await; } @@ -243,7 +245,8 @@ async fn test_open_savepoint(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; assert_eq!(state.epoch(), Epoch::new(2).expect("known to be non-zero")); Box::new(state).expire().await; } @@ -256,7 +259,8 @@ async fn test_open_savepoint(state_builder: TestCatalogStateBuilder) { .await .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Drain initial updates. let _ = state .sync_to_current_updates() @@ -361,7 +365,8 @@ async fn test_open_savepoint(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Write should not have persisted. let db = state .snapshot() @@ -406,7 +411,8 @@ async fn test_open_read_only(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Drain initial updates. let _ = state .sync_to_current_updates() @@ -481,7 +487,8 @@ async fn test_open(state_builder: TestCatalogStateBuilder) { // Use `NOW_ZERO` for consistent timestamps in the snapshots. .open(NOW_ZERO().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; assert_eq!(state.epoch(), Epoch::new(2).expect("known to be non-zero")); // Check initial snapshot. @@ -513,7 +520,8 @@ async fn test_open(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; assert_eq!(state.epoch(), Epoch::new(3).expect("known to be non-zero")); assert_eq!(state.snapshot().await.unwrap(), snapshot); @@ -528,7 +536,8 @@ async fn test_open(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; assert_eq!(state.epoch(), Epoch::new(4).expect("known to be non-zero")); assert_eq!(state.snapshot().await.unwrap(), snapshot); @@ -560,7 +569,8 @@ async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogState .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // drain catalog updates. let _ = state.sync_to_current_updates().await.unwrap(); let mut txn = state.transaction().await.unwrap(); @@ -589,7 +599,8 @@ async fn test_unopened_deploy_generation_fencing(state_builder: TestCatalogState .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Unopened catalog should be fenced now with a deploy generation fence. let err = openable_state @@ -659,7 +670,8 @@ async fn test_opened_epoch_fencing(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Open catalog, which will bump the epoch. let _state = state_builder @@ -668,7 +680,8 @@ async fn test_opened_epoch_fencing(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Opened catalog should be fenced now with an epoch fence. let err = state.snapshot().await.unwrap_err(); @@ -707,7 +720,8 @@ async fn test_opened_deploy_generation_fencing(state_builder: TestCatalogStateBu .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Open catalog, which will bump the epoch AND deploy generation. let _state = state_builder @@ -717,7 +731,8 @@ async fn test_opened_deploy_generation_fencing(state_builder: TestCatalogStateBu .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Opened catalog should be fenced now with an epoch fence. let err = state.snapshot().await.unwrap_err(); @@ -760,7 +775,8 @@ async fn test_fencing_during_write(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Drain updates. let _ = state.sync_to_current_updates().await; let mut txn = state.transaction().await.unwrap(); @@ -774,7 +790,8 @@ async fn test_fencing_during_write(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Drain updates. let _ = state.sync_to_current_updates().await; @@ -800,7 +817,8 @@ async fn test_fencing_during_write(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Committing results in a deploy generation fence error. let commit_ts = txn.upper(); @@ -840,7 +858,8 @@ async fn test_persist_version_fencing() { let _persist_state = persist_openable_state .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; persist_cache.cfg.build_version = reader_version.clone(); let persist_client = persist_cache @@ -903,7 +922,8 @@ async fn test_concurrent_open(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; let state_handle = mz_ore::task::spawn(|| "state", async move { // Eventually this state should get fenced by the open below. let err = run_state(&mut state).await.unwrap_err(); @@ -920,7 +940,8 @@ async fn test_concurrent_open(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; state_handle.await.unwrap(); @@ -930,5 +951,6 @@ async fn test_concurrent_open(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; } diff --git a/src/catalog/tests/read-write.rs b/src/catalog/tests/read-write.rs index d5b90e62cf94b..58b845aff8e6a 100644 --- a/src/catalog/tests/read-write.rs +++ b/src/catalog/tests/read-write.rs @@ -44,7 +44,8 @@ async fn test_confirm_leadership(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; assert_ok!(state1.confirm_leadership().await); let mut state2 = state_builder @@ -52,7 +53,8 @@ async fn test_confirm_leadership(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; assert_ok!(state2.confirm_leadership().await); let err = state1.confirm_leadership().await.unwrap_err(); @@ -91,7 +93,8 @@ async fn test_allocate_id(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; let start_id = state.get_next_id(id_type).await.unwrap(); let commit_ts = state.current_upper().await; @@ -169,7 +172,8 @@ async fn test_audit_logs(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Drain initial updates. let _ = state .sync_to_current_updates() @@ -231,7 +235,8 @@ async fn test_items(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Drain initial updates. let _ = state .sync_to_current_updates() @@ -288,7 +293,8 @@ async fn test_schemas(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; // Drain initial updates. let _ = state .sync_to_current_updates() @@ -353,14 +359,16 @@ async fn test_non_writer_commits(state_builder: TestCatalogStateBuilder) { .await .open(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; let mut savepoint_state = state_builder .clone() .unwrap_build() .await .open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args()) .await - .unwrap(); + .unwrap() + .0; let mut reader_state = state_builder .clone() .unwrap_build() diff --git a/src/environmentd/src/deployment/preflight.rs b/src/environmentd/src/deployment/preflight.rs index a66eded46cdf7..3734b25ce83f7 100644 --- a/src/environmentd/src/deployment/preflight.rs +++ b/src/environmentd/src/deployment/preflight.rs @@ -75,7 +75,7 @@ pub async fn preflight_legacy( .open_savepoint(boot_ts.clone(), &bootstrap_args) .await { - Ok(adapter_storage) => Box::new(adapter_storage).expire().await, + Ok((adapter_storage, _)) => Box::new(adapter_storage).expire().await, Err(CatalogError::Durable(e)) if e.can_recover_with_write_mode() => { // This is theoretically possible if catalog implementation A is // initialized, implementation B is uninitialized, and we are going to diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index f9c213c25b03d..8360e3ce208b6 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -561,10 +561,10 @@ impl Listeners { }; // Load the adapter durable storage. - let adapter_storage = if read_only { + let (adapter_storage, audit_logs_handle) = if read_only { // TODO: behavior of migrations when booting in savepoint mode is // not well defined. - let adapter_storage = openable_adapter_storage + let (adapter_storage, audit_logs_handle) = openable_adapter_storage .open_savepoint(boot_ts, &bootstrap_args) .await?; @@ -572,9 +572,9 @@ impl Listeners { // because we are by definition not the leader if we are in // read-only mode. - adapter_storage + (adapter_storage, audit_logs_handle) } else { - let adapter_storage = openable_adapter_storage + let (adapter_storage, audit_logs_handle) = openable_adapter_storage .open(boot_ts, &bootstrap_args) .await?; @@ -583,7 +583,7 @@ impl Listeners { // fenced out all other environments using the adapter storage. deployment_state.set_is_leader(); - adapter_storage + (adapter_storage, audit_logs_handle) }; info!( @@ -633,6 +633,7 @@ impl Listeners { controller_config: config.controller, controller_envd_epoch: envd_epoch, storage: adapter_storage, + audit_logs_handle, timestamp_oracle_url: config.timestamp_oracle_url, unsafe_mode: config.unsafe_mode, all_features: config.all_features,