Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jkosh44 committed Nov 27, 2024
1 parent 14cc787 commit 8cca855
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 62 deletions.
5 changes: 3 additions & 2 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,8 @@ impl Catalog {
let mut storage = openable_storage
.open(now().into(), &bootstrap_args)
.await
.expect("can open durable catalog");
.expect("can open durable catalog")
.0;
// Drain updates.
let _ = storage
.sync_to_current_updates()
Expand All @@ -578,7 +579,7 @@ impl Catalog {
.with_default_deploy_generation()
.build()
.await?;
let storage = openable_storage.open(now().into(), bootstrap_args).await?;
let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
let system_parameter_defaults = BTreeMap::default();
Self::open_debug_catalog_inner(
persist_client,
Expand Down
21 changes: 20 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ use mz_catalog::durable::OpenableDurableCatalogState;
use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
DataSourceDesc, Table, TableDataSource,
DataSourceDesc, StateUpdate, Table, TableDataSource,
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::as_of_selection;
Expand Down Expand Up @@ -977,6 +977,7 @@ pub struct Config {
pub controller_config: ControllerConfig,
pub controller_envd_epoch: NonZeroI64,
pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
pub audit_logs_handle: thread::JoinHandle<Vec<StateUpdate>>,
pub timestamp_oracle_url: Option<SensitiveUrl>,
pub unsafe_mode: bool,
pub all_features: bool,
Expand Down Expand Up @@ -1787,6 +1788,7 @@ impl Coordinator {
mut builtin_table_updates: Vec<BuiltinTableUpdate>,
cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
audit_logs_handle: std::thread::JoinHandle<Vec<StateUpdate>>,
) -> Result<(), AdapterError> {
let bootstrap_start = Instant::now();
info!("startup: coordinator init: bootstrap beginning");
Expand Down Expand Up @@ -2243,6 +2245,21 @@ impl Coordinator {
postamble_start.elapsed()
);

let audit_join_start = Instant::now();
info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
let audit_log_updates = audit_logs_handle
.join()
.expect("unable to deserialize audit log");
let audit_log_builtin_table_updates = self
.catalog()
.state()
.generate_builtin_table_updates(audit_log_updates);
builtin_table_updates.extend(audit_log_builtin_table_updates);
info!(
"startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
audit_join_start.elapsed()
);

let builtin_update_start = Instant::now();
info!("startup: coordinator init: bootstrap: generate builtin updates beginning");
if self.controller.read_only() {
Expand Down Expand Up @@ -3682,6 +3699,7 @@ pub fn serve(
controller_config,
controller_envd_epoch,
mut storage,
audit_logs_handle,
timestamp_oracle_url,
unsafe_mode,
all_features,
Expand Down Expand Up @@ -4065,6 +4083,7 @@ pub fn serve(
builtin_table_updates,
cached_global_exprs,
uncached_local_exprs,
audit_logs_handle,
)
.await?;
coord
Expand Down
3 changes: 2 additions & 1 deletion src/catalog-debug/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,8 @@ async fn upgrade_check(
cluster_replica_size_map: cluster_replica_sizes.clone(),
},
)
.await?;
.await?
.0;

// If this upgrade has new builtin replicas, then we need to assign some size to it. It doesn't
// really matter what size since it's not persisted, so we pick a random valid one.
Expand Down
20 changes: 18 additions & 2 deletions src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,19 @@ pub trait OpenableDurableCatalogState: Debug + Send {
/// - Catalog migrations fail.
///
/// `initial_ts` is used as the initial timestamp for new environments.
///
/// Also returns a handle to a thread that is deserializing all of the audit logs.
async fn open_savepoint(
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
>;

/// Opens the catalog in read only mode. All mutating methods
/// will return an error.
Expand All @@ -120,11 +128,19 @@ pub trait OpenableDurableCatalogState: Debug + Send {
/// needed.
///
/// `initial_ts` is used as the initial timestamp for new environments.
///
/// Also returns a handle to a thread that is deserializing all of the audit logs.
async fn open(
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
>;

/// Opens the catalog for manual editing of the underlying data. This is helpful for
/// fixing a corrupt catalog.
Expand Down
13 changes: 13 additions & 0 deletions src/catalog/src/durable/objects/state_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ impl StateUpdateKindJson {
value: String::new(),
},
),
StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()),
]
.into_iter()
.map(|kind| {
Expand All @@ -342,6 +343,18 @@ impl StateUpdateKindJson {
});
DESERIALIZABLE_KINDS.contains(self.kind())
}

/// Returns true if this is an audit log update. Otherwise, returns false.
pub(crate) fn is_audit_log(&self) -> bool {
// Construct a fake audit log so we can extract exactly what the kind field will serialize
// as.
static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
let audit_log = StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ());
let json_kind: StateUpdateKindJson = audit_log.into();
json_kind.kind().to_string()
});
&*AUDIT_LOG_KIND == self.kind()
}
}

/// Version of [`StateUpdateKind`] that is stored directly in persist.
Expand Down
56 changes: 52 additions & 4 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,13 @@ impl UnopenedPersistCatalogState {
mode: Mode,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
> {
// It would be nice to use `initial_ts` here, but it comes from the system clock, not the
// timestamp oracle.
let mut commit_ts = self.upper;
Expand Down Expand Up @@ -1198,6 +1204,35 @@ impl UnopenedPersistCatalogState {
}
soft_assert_ne_or_log!(self.upper, Timestamp::minimum());

// Remove all audit log entries.
let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
.snapshot
.into_iter()
.partition(|(update, _, _)| update.is_audit_log());
self.snapshot = snapshot;

// Create thread to deserialize audit logs.
let audit_log_handle = std::thread::spawn(move || {
let updates: Vec<_> = audit_logs
.into_iter()
.map(|(kind, ts, diff)| {
assert_eq!(
diff, 1,
"audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
);
let diff = memory::objects::StateDiff::Addition;

let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
let kind: Option<memory::objects::StateUpdateKind> = (&kind)
.try_into()
.expect("invalid persisted update: {update:#?}");
let kind = kind.expect("audit log always produces im-memory updates");
memory::objects::StateUpdate { kind, ts, diff }
})
.collect();
updates
});

// Perform data migrations.
if is_initialized && !read_only {
commit_ts = upgrade(&mut self, commit_ts).await?;
Expand Down Expand Up @@ -1303,7 +1338,7 @@ impl UnopenedPersistCatalogState {
});
}

Ok(Box::new(catalog))
Ok((Box::new(catalog), audit_log_handle))
}

/// Reports if the catalog state has been initialized.
Expand Down Expand Up @@ -1367,7 +1402,13 @@ impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
> {
self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
.boxed()
.await
Expand All @@ -1381,14 +1422,21 @@ impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
.boxed()
.await
.map(|(catalog, _)| catalog)
}

#[mz_ore::instrument]
async fn open(
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
> {
self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
.boxed()
.await
Expand Down
15 changes: 10 additions & 5 deletions src/catalog/src/durable/persist/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ async fn test_upgrade_shard() {
let _persist_state = persist_openable_state
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open persist catalog");
.expect("failed to open persist catalog")
.0;

assert_eq!(
Some(first_version.clone()),
Expand Down Expand Up @@ -111,7 +112,8 @@ async fn test_upgrade_shard() {
let _persist_state = persist_openable_state
.open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open savepoint persist catalog");
.expect("failed to open savepoint persist catalog")
.0;

assert_eq!(
Some(first_version.clone()),
Expand Down Expand Up @@ -140,7 +142,8 @@ async fn test_upgrade_shard() {
let _persist_state = persist_openable_state
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open readonly persist catalog");
.expect("failed to open readonly persist catalog")
.0;

assert_eq!(
Some(second_version),
Expand Down Expand Up @@ -180,7 +183,8 @@ async fn test_version_regression() {
let _persist_state = persist_openable_state
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open persist catalog");
.expect("failed to open persist catalog")
.0;

assert_eq!(
Some(first_version.clone()),
Expand All @@ -201,7 +205,8 @@ async fn test_version_regression() {
let _persist_state = persist_openable_state
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open readonly persist catalog");
.expect("failed to open readonly persist catalog")
.0;

assert_eq!(
Some(second_version.clone()),
Expand Down
6 changes: 4 additions & 2 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3643,13 +3643,15 @@ mod tests {
.await
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.unwrap();
.unwrap()
.0;
let mut savepoint_state = state_builder
.unwrap_build()
.await
.open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.unwrap();
.unwrap()
.0;

let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
assert!(!initial_snapshot.is_empty());
Expand Down
10 changes: 8 additions & 2 deletions src/catalog/src/durable/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! - Config
//! - Setting
//! - FenceToken
//! - AuditLog
//!
//! When you want to make a change to the `Catalog` you need to follow these steps:
//!
Expand Down Expand Up @@ -57,15 +58,14 @@ mod tests;

use mz_ore::{soft_assert_eq_or_log, soft_assert_ne_or_log};
use mz_repr::Diff;
use timely::progress::Timestamp as TimelyTimestamp;

use paste::paste;
#[cfg(test)]
use proptest::prelude::*;
#[cfg(test)]
use proptest::strategy::ValueTree;
#[cfg(test)]
use proptest_derive::Arbitrary;
use timely::progress::Timestamp as TimelyTimestamp;

use crate::durable::initialize::USER_VERSION_KEY;
use crate::durable::objects::serialization::proto;
Expand Down Expand Up @@ -377,6 +377,12 @@ async fn run_versioned_upgrade<V1: IntoStateUpdateKindJson, V2: IntoStateUpdateK
.into_iter()
.flat_map(|action| action.into_updates().into_iter())
.collect();
// Validate that we're not migrating an un-migratable collection.
for (update, _) in &updates {
if update.is_always_deserializable() {
panic!("migration to un-migratable collection: {update:?}\nall updates: {updates:?}");
}
}

// 3. Add a retraction for old version and insertion for new version into updates.
let next_version = current_version + 1;
Expand Down
Loading

0 comments on commit 8cca855

Please sign in to comment.