Skip to content

Commit

Permalink
catalog: Deserialize audit log in background
Browse files Browse the repository at this point in the history
This commit optimizes the startup process to deserialize the audit log
in the background. Opening the durable catalog involves deserializing
all updates, migrating them, and then storing them in memory. Later the
in-memory catalog will take each update and generate a builtin table
update and apply that update in-memory. Throughout the startup process
audit logs are heavily special cased and only used to generate builtin
table updates. Additionally, audit logs are by far the largest catalog
collection and can take a long time to deserialize.

This commit creates a new thread at the beginning of the startup
process that deserializes all audit log updates. A handle for the
thread is plumbed throughout startup and then joined only when we
actually need the builtin table updates. By deserializing these updates
in the background we can reduce the total time spent in startup.

In order for this all to work, we have to disallow migrating audit log
updates, because now the audit log updates skip over migrations. In
practice this is OK, because the audit log has its own versioning
scheme that allows us to add new audit log variants without a
migration. The only thing we lose is the ability to re-write old audit
logs.

This speedup works only because other parts of startup take long enough
to hide the time spent deserializing the audit log. As other parts of
startup get faster, joining on the deserializing thread will get
slower. Some additional optimizations we can make in the future are:

  - Remove the audit log from the catalog.
  - Make the catalog shard queryable and remove the need to generate
    builtin table updates.
  - Now that the audit log is truly append-only, we could lazily
    deserialize the audit log updates in order by ID (how to order them
    is something we'd need to figure out), and stop once we've found
    the latest audit log update that is already in the builtin table.
    In general that will usually be the first audit log update we
    deserialize, unless we crashed after committing a catalog
    transaction but before updating the builtin tables.

Works towards resolving #MaterializeInc/database-issues/issues/8384
  • Loading branch information
jkosh44 committed Nov 27, 2024
1 parent 14cc787 commit e093826
Showing 14 changed files with 229 additions and 63 deletions.
5 changes: 3 additions & 2 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -552,7 +552,8 @@ impl Catalog {
let mut storage = openable_storage
.open(now().into(), &bootstrap_args)
.await
.expect("can open durable catalog");
.expect("can open durable catalog")
.0;
// Drain updates.
let _ = storage
.sync_to_current_updates()
@@ -578,7 +579,7 @@ impl Catalog {
.with_default_deploy_generation()
.build()
.await?;
let storage = openable_storage.open(now().into(), bootstrap_args).await?;
let storage = openable_storage.open(now().into(), bootstrap_args).await?.0;
let system_parameter_defaults = BTreeMap::default();
Self::open_debug_catalog_inner(
persist_client,
39 changes: 37 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
@@ -98,7 +98,7 @@ use mz_catalog::durable::OpenableDurableCatalogState;
use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
DataSourceDesc, Table, TableDataSource,
DataSourceDesc, StateUpdate, Table, TableDataSource,
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::as_of_selection;
@@ -977,6 +977,7 @@ pub struct Config {
pub controller_config: ControllerConfig,
pub controller_envd_epoch: NonZeroI64,
pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
pub audit_logs_handle: thread::JoinHandle<Vec<StateUpdate>>,
pub timestamp_oracle_url: Option<SensitiveUrl>,
pub unsafe_mode: bool,
pub all_features: bool,
@@ -1787,6 +1788,7 @@ impl Coordinator {
mut builtin_table_updates: Vec<BuiltinTableUpdate>,
cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
audit_logs_handle: std::thread::JoinHandle<Vec<StateUpdate>>,
) -> Result<(), AdapterError> {
let bootstrap_start = Instant::now();
info!("startup: coordinator init: bootstrap beginning");
@@ -2248,12 +2250,27 @@ impl Coordinator {
if self.controller.read_only() {
info!("coordinator init: bootstrap: stashing builtin table updates while in read-only mode");

let audit_join_start = Instant::now();
info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
let audit_log_updates = audit_logs_handle
.join()
.expect("unable to deserialize audit log");
let audit_log_builtin_table_updates = self
.catalog()
.state()
.generate_builtin_table_updates(audit_log_updates);
builtin_table_updates.extend(audit_log_builtin_table_updates);
info!(
"startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
audit_join_start.elapsed()
);
self.buffered_builtin_table_updates
.as_mut()
.expect("in read-only mode")
.append(&mut builtin_table_updates);
} else {
self.bootstrap_tables(&entries, builtin_table_updates).await;
self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_handle)
.await;
};
info!(
"startup: coordinator init: bootstrap: generate builtin updates complete in {:?}",
@@ -2367,6 +2384,7 @@ impl Coordinator {
&mut self,
entries: &[CatalogEntry],
mut builtin_table_updates: Vec<BuiltinTableUpdate>,
audit_logs_handle: thread::JoinHandle<Vec<StateUpdate>>,
) {
/// Smaller helper struct of metadata for bootstrapping tables.
struct TableMetadata<'a> {
@@ -2465,6 +2483,21 @@ impl Coordinator {
builtin_table_updates.extend(retractions);
}

let audit_join_start = Instant::now();
info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
let audit_log_updates = audit_logs_handle
.join()
.expect("unable to deserialize audit log");
let audit_log_builtin_table_updates = self
.catalog()
.state()
.generate_builtin_table_updates(audit_log_updates);
builtin_table_updates.extend(audit_log_builtin_table_updates);
info!(
"startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
audit_join_start.elapsed()
);

// Now that the snapshots are complete, the appends must also be complete.
table_fence_rx
.await
@@ -3682,6 +3715,7 @@ pub fn serve(
controller_config,
controller_envd_epoch,
mut storage,
audit_logs_handle,
timestamp_oracle_url,
unsafe_mode,
all_features,
@@ -4065,6 +4099,7 @@ pub fn serve(
builtin_table_updates,
cached_global_exprs,
uncached_local_exprs,
audit_logs_handle,
)
.await?;
coord
3 changes: 2 additions & 1 deletion src/catalog-debug/src/main.rs
Original file line number Diff line number Diff line change
@@ -551,7 +551,8 @@ async fn upgrade_check(
cluster_replica_size_map: cluster_replica_sizes.clone(),
},
)
.await?;
.await?
.0;

// If this upgrade has new builtin replicas, then we need to assign some size to it. It doesn't
// really matter what size since it's not persisted, so we pick a random valid one.
20 changes: 18 additions & 2 deletions src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
@@ -99,11 +99,19 @@ pub trait OpenableDurableCatalogState: Debug + Send {
/// - Catalog migrations fail.
///
/// `initial_ts` is used as the initial timestamp for new environments.
///
/// Also returns a handle to a thread that is deserializing all of the audit logs.
async fn open_savepoint(
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
>;

/// Opens the catalog in read only mode. All mutating methods
/// will return an error.
@@ -120,11 +128,19 @@ pub trait OpenableDurableCatalogState: Debug + Send {
/// needed.
///
/// `initial_ts` is used as the initial timestamp for new environments.
///
/// Also returns a handle to a thread that is deserializing all of the audit logs.
async fn open(
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError>;
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
>;

/// Opens the catalog for manual editing of the underlying data. This is helpful for
/// fixing a corrupt catalog.
13 changes: 13 additions & 0 deletions src/catalog/src/durable/objects/state_update.rs
Original file line number Diff line number Diff line change
@@ -332,6 +332,7 @@ impl StateUpdateKindJson {
value: String::new(),
},
),
StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ()),
]
.into_iter()
.map(|kind| {
@@ -342,6 +343,18 @@ impl StateUpdateKindJson {
});
DESERIALIZABLE_KINDS.contains(self.kind())
}

/// Returns true if this is an audit log update. Otherwise, returns false.
pub(crate) fn is_audit_log(&self) -> bool {
// Construct a fake audit log so we can extract exactly what the kind field will serialize
// as.
static AUDIT_LOG_KIND: LazyLock<String> = LazyLock::new(|| {
let audit_log = StateUpdateKind::AuditLog(proto::AuditLogKey { event: None }, ());
let json_kind: StateUpdateKindJson = audit_log.into();
json_kind.kind().to_string()
});
&*AUDIT_LOG_KIND == self.kind()
}
}

/// Version of [`StateUpdateKind`] that is stored directly in persist.
56 changes: 52 additions & 4 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
@@ -1118,7 +1118,13 @@ impl UnopenedPersistCatalogState {
mode: Mode,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
> {
// It would be nice to use `initial_ts` here, but it comes from the system clock, not the
// timestamp oracle.
let mut commit_ts = self.upper;
@@ -1198,6 +1204,35 @@ impl UnopenedPersistCatalogState {
}
soft_assert_ne_or_log!(self.upper, Timestamp::minimum());

// Remove all audit log entries.
let (audit_logs, snapshot): (Vec<_>, Vec<_>) = self
.snapshot
.into_iter()
.partition(|(update, _, _)| update.is_audit_log());
self.snapshot = snapshot;

// Create thread to deserialize audit logs.
let audit_log_handle = std::thread::spawn(move || {
let updates: Vec<_> = audit_logs
.into_iter()
.map(|(kind, ts, diff)| {
assert_eq!(
diff, 1,
"audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
);
let diff = memory::objects::StateDiff::Addition;

let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
let kind: Option<memory::objects::StateUpdateKind> = (&kind)
.try_into()
.expect("invalid persisted update: {update:#?}");
let kind = kind.expect("audit log always produces im-memory updates");
memory::objects::StateUpdate { kind, ts, diff }
})
.collect();
updates
});

// Perform data migrations.
if is_initialized && !read_only {
commit_ts = upgrade(&mut self, commit_ts).await?;
@@ -1303,7 +1338,7 @@ impl UnopenedPersistCatalogState {
});
}

Ok(Box::new(catalog))
Ok((Box::new(catalog), audit_log_handle))
}

/// Reports if the catalog state has been initialized.
@@ -1367,7 +1402,13 @@ impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
> {
self.open_inner(Mode::Savepoint, initial_ts, bootstrap_args)
.boxed()
.await
@@ -1381,14 +1422,21 @@ impl OpenableDurableCatalogState for UnopenedPersistCatalogState {
self.open_inner(Mode::Readonly, EpochMillis::MIN.into(), bootstrap_args)
.boxed()
.await
.map(|(catalog, _)| catalog)
}

#[mz_ore::instrument]
async fn open(
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<Box<dyn DurableCatalogState>, CatalogError> {
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
> {
self.open_inner(Mode::Writable, initial_ts, bootstrap_args)
.boxed()
.await
15 changes: 10 additions & 5 deletions src/catalog/src/durable/persist/tests.rs
Original file line number Diff line number Diff line change
@@ -51,7 +51,8 @@ async fn test_upgrade_shard() {
let _persist_state = persist_openable_state
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open persist catalog");
.expect("failed to open persist catalog")
.0;

assert_eq!(
Some(first_version.clone()),
@@ -111,7 +112,8 @@ async fn test_upgrade_shard() {
let _persist_state = persist_openable_state
.open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open savepoint persist catalog");
.expect("failed to open savepoint persist catalog")
.0;

assert_eq!(
Some(first_version.clone()),
@@ -140,7 +142,8 @@ async fn test_upgrade_shard() {
let _persist_state = persist_openable_state
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open readonly persist catalog");
.expect("failed to open readonly persist catalog")
.0;

assert_eq!(
Some(second_version),
@@ -180,7 +183,8 @@ async fn test_version_regression() {
let _persist_state = persist_openable_state
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open persist catalog");
.expect("failed to open persist catalog")
.0;

assert_eq!(
Some(first_version.clone()),
@@ -201,7 +205,8 @@ async fn test_version_regression() {
let _persist_state = persist_openable_state
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.expect("failed to open readonly persist catalog");
.expect("failed to open readonly persist catalog")
.0;

assert_eq!(
Some(second_version.clone()),
6 changes: 4 additions & 2 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
@@ -3643,13 +3643,15 @@ mod tests {
.await
.open(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.unwrap();
.unwrap()
.0;
let mut savepoint_state = state_builder
.unwrap_build()
.await
.open_savepoint(SYSTEM_TIME().into(), &test_bootstrap_args())
.await
.unwrap();
.unwrap()
.0;

let initial_snapshot = savepoint_state.sync_to_current_updates().await.unwrap();
assert!(!initial_snapshot.is_empty());
Loading

0 comments on commit e093826

Please sign in to comment.