Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Lazily deserialize the audit log #30782

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 92 additions & 20 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_adapter_types::dyncfgs::WITH_0DT_DEPLOYMENT_CAUGHT_UP_CHECK_INTERVAL;
use mz_build_info::BuildInfo;
use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_STORAGE_USAGE_BY_SHARD};
use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_AUDIT_EVENTS, MZ_STORAGE_USAGE_BY_SHARD};
use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
use mz_catalog::durable::OpenableDurableCatalogState;
use mz_catalog::durable::{AuditLogIterator, OpenableDurableCatalogState};
use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
DataSourceDesc, StateUpdate, Table, TableDataSource,
DataSourceDesc, StateDiff, StateUpdate, StateUpdateKind, Table, TableDataSource,
};
use mz_cloud_resources::{CloudResourceController, VpcEndpointConfig, VpcEndpointEvent};
use mz_compute_client::as_of_selection;
Expand Down Expand Up @@ -977,7 +977,7 @@ pub struct Config {
pub controller_config: ControllerConfig,
pub controller_envd_epoch: NonZeroI64,
pub storage: Box<dyn mz_catalog::durable::DurableCatalogState>,
pub audit_logs_handle: thread::JoinHandle<Vec<StateUpdate>>,
pub audit_logs_iterator: AuditLogIterator,
pub timestamp_oracle_url: Option<SensitiveUrl>,
pub unsafe_mode: bool,
pub all_features: bool,
Expand Down Expand Up @@ -1788,7 +1788,7 @@ impl Coordinator {
mut builtin_table_updates: Vec<BuiltinTableUpdate>,
cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
audit_logs_handle: std::thread::JoinHandle<Vec<StateUpdate>>,
audit_logs_iterator: AuditLogIterator,
) -> Result<(), AdapterError> {
let bootstrap_start = Instant::now();
info!("startup: coordinator init: bootstrap beginning");
Expand Down Expand Up @@ -2247,29 +2247,35 @@ impl Coordinator {

let builtin_update_start = Instant::now();
info!("startup: coordinator init: bootstrap: generate builtin updates beginning");

if self.controller.read_only() {
info!("coordinator init: bootstrap: stashing builtin table updates while in read-only mode");

// TODO(jkosh44) Optimize deserializing the audit log in read-only mode.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't contribute to downtime, so it's not that important.

let audit_join_start = Instant::now();
info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
let audit_log_updates = audit_logs_handle
.join()
.expect("unable to deserialize audit log");
info!("startup: coordinator init: bootstrap: audit log deserialization beginning");
let audit_log_updates: Vec<_> = audit_logs_iterator
.map(|(audit_log, ts)| StateUpdate {
kind: StateUpdateKind::AuditLog(audit_log),
ts,
diff: StateDiff::Addition,
})
.collect();
let audit_log_builtin_table_updates = self
.catalog()
.state()
.generate_builtin_table_updates(audit_log_updates);
builtin_table_updates.extend(audit_log_builtin_table_updates);
info!(
"startup: coordinator init: bootstrap: join audit log deserialization complete in {:?}",
"startup: coordinator init: bootstrap: audit log deserialization complete in {:?}",
audit_join_start.elapsed()
);
self.buffered_builtin_table_updates
.as_mut()
.expect("in read-only mode")
.append(&mut builtin_table_updates);
} else {
self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_handle)
self.bootstrap_tables(&entries, builtin_table_updates, audit_logs_iterator)
.await;
};
info!(
Expand Down Expand Up @@ -2384,7 +2390,7 @@ impl Coordinator {
&mut self,
entries: &[CatalogEntry],
mut builtin_table_updates: Vec<BuiltinTableUpdate>,
audit_logs_handle: thread::JoinHandle<Vec<StateUpdate>>,
audit_logs_iterator: AuditLogIterator,
) {
/// Smaller helper struct of metadata for bootstrapping tables.
struct TableMetadata<'a> {
Expand Down Expand Up @@ -2442,9 +2448,26 @@ impl Coordinator {
};

let mut retraction_tasks = Vec::new();
let system_tables = table_metas
let mut system_tables: Vec<_> = table_metas
.iter()
.filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta))
.collect();

// Special case audit events because it's append only.
let (audit_events_idx, _) = system_tables
.iter()
.filter(|meta| meta.id.is_system() && !is_storage_usage_by_shard(meta));
.find_position(|table| {
table.id == self.catalog().resolve_builtin_table(&MZ_AUDIT_EVENTS)
})
.expect("mz_audit_events must exist");
let audit_events = system_tables.remove(audit_events_idx);
let audit_log_task = self.bootstrap_audit_log_table(
audit_events.id,
audit_events.name,
audit_events.table,
audit_logs_iterator,
read_ts,
);

for system_table in system_tables {
let table_id = system_table.id;
Expand All @@ -2462,7 +2485,7 @@ impl Coordinator {
.await
.unwrap_or_terminate("cannot fail to fetch snapshot");
let contents_len = current_contents.len();
debug!("coordinator init: table ({table_id}) size {contents_len}",);
debug!("coordinator init: table ({table_id}) size {contents_len}");

// Retract the current contents.
current_contents
Expand All @@ -2485,9 +2508,9 @@ impl Coordinator {

let audit_join_start = Instant::now();
info!("startup: coordinator init: bootstrap: join audit log deserialization beginning");
let audit_log_updates = audit_logs_handle
.join()
.expect("unable to deserialize audit log");
let audit_log_updates = audit_log_task
.await
.expect("cannot fail to fetch audit log updates");
let audit_log_builtin_table_updates = self
.catalog()
.state()
Expand All @@ -2514,6 +2537,55 @@ impl Coordinator {
}
}

/// Prepare updates to the audit log table. The audit log table append only and very large, so
/// we only need to find the events present in `audit_logs_iterator` but not in the audit log
/// table.
#[instrument]
fn bootstrap_audit_log_table<'a>(
&mut self,
table_id: CatalogItemId,
name: &'a QualifiedItemName,
table: &'a Table,
audit_logs_iterator: AuditLogIterator,
read_ts: Timestamp,
) -> JoinHandle<Vec<StateUpdate>> {
let full_name = self.catalog().resolve_full_name(name, None);
debug!("coordinator init: reconciling audit log: {full_name} ({table_id})");
let current_contents_fut = self
.controller
.storage
.snapshot(table.global_id_writes(), read_ts);
spawn(|| format!("snapshot-audit-log-{table_id}"), async move {
let current_contents = current_contents_fut
.await
.unwrap_or_terminate("cannot fail to fetch snapshot");
let contents_len = current_contents.len();
debug!("coordinator init: audit log table ({table_id}) size {contents_len}");

// Fetch the largest audit log event ID that has been written to the table.
let max_table_id = current_contents
.into_iter()
.filter(|(_, diff)| *diff == 1)
.map(|(row, _diff)| row.unpack_first().unwrap_uint64())
.sorted()
.rev()
.next();

// Filter audit log catalog updates to those that are not present in the table.
audit_logs_iterator
.take_while(|(audit_log, _)| match max_table_id {
Some(id) => audit_log.event.sortable_id() > id,
None => true,
})
.map(|(audit_log, ts)| StateUpdate {
kind: StateUpdateKind::AuditLog(audit_log),
ts,
diff: StateDiff::Addition,
})
.collect::<Vec<_>>()
})
}

/// Initializes all storage collections required by catalog objects in the storage controller.
///
/// This method takes care of collection creation, as well as migration of existing
Expand Down Expand Up @@ -3715,7 +3787,7 @@ pub fn serve(
controller_config,
controller_envd_epoch,
mut storage,
audit_logs_handle,
audit_logs_iterator,
timestamp_oracle_url,
unsafe_mode,
all_features,
Expand Down Expand Up @@ -4099,7 +4171,7 @@ pub fn serve(
builtin_table_updates,
cached_global_exprs,
uncached_local_exprs,
audit_logs_handle,
audit_logs_iterator,
)
.await?;
coord
Expand Down
79 changes: 63 additions & 16 deletions src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use itertools::Itertools;
use mz_audit_log::VersionedEvent;
use mz_controller_types::ClusterId;
use mz_ore::collections::CollectionExt;
use mz_ore::metrics::MetricsRegistry;
use mz_persist_client::PersistClient;
use mz_repr::{CatalogItemId, GlobalId};
use mz_repr::{CatalogItemId, Diff, GlobalId};
use mz_sql::catalog::CatalogError as SqlCatalogError;
use uuid::Uuid;

Expand All @@ -29,7 +30,8 @@ use crate::durable::debug::{DebugCatalogState, Trace};
pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
pub use crate::durable::metrics::Metrics;
pub use crate::durable::objects::state_update::StateUpdate;
use crate::durable::objects::Snapshot;
use crate::durable::objects::state_update::{StateUpdateKindJson, TryIntoStateUpdateKind};
use crate::durable::objects::{AuditLog, Snapshot};
pub use crate::durable::objects::{
Cluster, ClusterConfig, ClusterReplica, ClusterVariant, ClusterVariantManaged, Comment,
Database, DefaultPrivilege, IntrospectionSourceIndex, Item, NetworkPolicy, ReplicaConfig,
Expand Down Expand Up @@ -105,13 +107,7 @@ pub trait OpenableDurableCatalogState: Debug + Send {
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
>;
) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;

/// Opens the catalog in read only mode. All mutating methods
/// will return an error.
Expand All @@ -134,13 +130,7 @@ pub trait OpenableDurableCatalogState: Debug + Send {
mut self: Box<Self>,
initial_ts: Timestamp,
bootstrap_args: &BootstrapArgs,
) -> Result<
(
Box<dyn DurableCatalogState>,
std::thread::JoinHandle<Vec<memory::objects::StateUpdate>>,
),
CatalogError,
>;
) -> Result<(Box<dyn DurableCatalogState>, AuditLogIterator), CatalogError>;

/// Opens the catalog for manual editing of the underlying data. This is helpful for
/// fixing a corrupt catalog.
Expand Down Expand Up @@ -362,6 +352,63 @@ pub trait DurableCatalogState: ReadOnlyDurableCatalogState {
}
}

trait AuditLogIteratorTrait: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug {}
impl<T: Iterator<Item = (AuditLog, Timestamp)> + Send + Sync + Debug> AuditLogIteratorTrait for T {}

/// An iterator that returns audit log events in reverse ID order.
#[derive(Debug)]
pub struct AuditLogIterator {
// We store an interator instead of a sorted `Vec`, so we can lazily sort the contents on the
// first call to `next`, instead of sorting the contents on initialization.
audit_logs: Box<dyn AuditLogIteratorTrait>,
}

impl AuditLogIterator {
fn new(audit_logs: Vec<(StateUpdateKindJson, Timestamp, Diff)>) -> Self {
let audit_logs = audit_logs
.into_iter()
.map(|(kind, ts, diff)| {
assert_eq!(
diff, 1,
"audit log is append only: ({kind:?}, {ts:?}, {diff:?})"
);
assert!(
kind.is_audit_log(),
"unexpected update kind: ({kind:?}, {ts:?}, {diff:?})"
);
let id = kind.audit_log_id();
(kind, ts, id)
})
.sorted_by_key(|(_, ts, id)| (*ts, *id))
.map(|(kind, ts, _id)| (kind, ts))
.rev()
.map(|(kind, ts)| {
// Each event will be deserialized lazily on a call to `next`.
let kind = TryIntoStateUpdateKind::try_into(kind).expect("kind decoding error");
let kind: Option<memory::objects::StateUpdateKind> = (&kind)
.try_into()
.expect("invalid persisted update: {update:#?}");
let kind = kind.expect("audit log always produces im-memory updates");
let audit_log = match kind {
memory::objects::StateUpdateKind::AuditLog(audit_log) => audit_log,
kind => unreachable!("invalid kind: {kind:?}"),
};
(audit_log, ts)
});
Self {
audit_logs: Box::new(audit_logs),
}
}
}

impl Iterator for AuditLogIterator {
type Item = (AuditLog, Timestamp);

fn next(&mut self) -> Option<Self::Item> {
self.audit_logs.next()
}
}

/// A builder to help create an [`OpenableDurableCatalogState`] for tests.
#[derive(Debug, Clone)]
pub struct TestCatalogStateBuilder {
Expand Down
Loading
Loading