Skip to content

catalog: Hook up expression cache into startup #30227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 13, 2024
8 changes: 8 additions & 0 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ pub const ENABLE_CONTINUAL_TASK_BUILTINS: Config<bool> = Config::new(
"Create system builtin continual tasks on boot.",
);

/// Whether to use an expression cache on boot.
pub const ENABLE_EXPRESSION_CACHE: Config<bool> = Config::new(
"enable_expression_cache",
true,
"Use a cache to store optimized expressions to help speed up start times.",
);

/// Adds the full set of all compute `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand All @@ -120,4 +127,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&PLAN_INSIGHTS_NOTICE_FAST_PATH_CLUSTERS_OPTIMIZE_DURATION)
.add(&DEFAULT_SINK_PARTITION_STRATEGY)
.add(&ENABLE_CONTINUAL_TASK_BUILTINS)
.add(&ENABLE_EXPRESSION_CACHE)
}
134 changes: 120 additions & 14 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
//! Persistent metadata storage for the coordinator.

use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::convert;
use std::sync::Arc;

use futures::Future;
use futures::future::BoxFuture;
use futures::{Future, FutureExt};
use itertools::Itertools;
use mz_adapter_types::connection::ConnectionId;
use mz_audit_log::{EventType, FullNameV1, ObjectType, VersionedStorageUsage};
Expand All @@ -29,6 +30,7 @@ use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Conf
#[cfg(test)]
use mz_catalog::durable::CatalogError;
use mz_catalog::durable::{test_bootstrap_args, DurableCatalogState, TestCatalogStateBuilder};
use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogEntry, CatalogItem, Cluster, ClusterReplica, Database, NetworkPolicy, Role, Schema,
Expand All @@ -37,6 +39,7 @@ use mz_compute_types::dataflows::DataflowDescription;
use mz_controller::clusters::ReplicaLocation;
use mz_controller_types::{ClusterId, ReplicaId};
use mz_expr::OptimizedMirRelationExpr;
use mz_ore::collections::HashSet;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
use mz_ore::result::ResultExt as _;
Expand All @@ -51,9 +54,9 @@ use mz_repr::{CatalogItemId, Diff, GlobalId, RelationVersionSelector, ScalarType
use mz_secrets::InMemorySecretsController;
use mz_sql::catalog::{
CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
SessionCatalog, SystemObjectType,
CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
};
use mz_sql::names::{
CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
Expand Down Expand Up @@ -127,6 +130,7 @@ mod transact;
pub struct Catalog {
state: CatalogState,
plans: CatalogPlans,
expr_cache_handle: Option<ExpressionCacheHandle>,
storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
transient_revision: u64,
}
Expand All @@ -138,6 +142,7 @@ impl Clone for Catalog {
Self {
state: self.state.clone(),
plans: self.plans.clone(),
expr_cache_handle: self.expr_cache_handle.clone(),
storage: Arc::clone(&self.storage),
transient_revision: self.transient_revision,
}
Expand Down Expand Up @@ -361,6 +366,58 @@ impl Catalog {
}
policies
}

/// Return a set of [`GlobalId`]s for items that need to have their cache entries invalidated
/// as a result of creating new indexes on the items in `ons`.
///
/// When creating and inserting a new index, we need to invalidate some entries that may
/// optimize to new expressions. When creating index `i` on object `o`, we need to invalidate
/// the following objects:
///
/// - `o`.
/// - All compute objects that depend directly on `o`.
/// - All compute objects that would directly depend on `o`, if all views were inlined.
pub(crate) fn invalidate_for_index(
&self,
ons: impl Iterator<Item = GlobalId>,
) -> BTreeSet<GlobalId> {
let mut dependencies = BTreeSet::new();
let mut queue = VecDeque::new();
let mut seen = HashSet::new();
for on in ons {
let entry = self.get_entry_by_global_id(&on);
dependencies.insert(on);
seen.insert(entry.id);
let uses = entry.uses();
queue.extend(uses.clone());
}

while let Some(cur) = queue.pop_front() {
let entry = self.get_entry(&cur);
if seen.insert(cur) {
let global_ids = entry.global_ids();
match entry.item_type() {
CatalogItemType::Table
| CatalogItemType::Source
| CatalogItemType::MaterializedView
| CatalogItemType::Sink
| CatalogItemType::Index
| CatalogItemType::Type
| CatalogItemType::Func
| CatalogItemType::Secret
| CatalogItemType::Connection
| CatalogItemType::ContinualTask => {
dependencies.extend(global_ids);
}
CatalogItemType::View => {
dependencies.extend(global_ids);
queue.extend(entry.uses());
}
}
}
}
dependencies
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -478,15 +535,21 @@ impl Catalog {
) -> Result<Catalog, anyhow::Error> {
let now = SYSTEM_TIME.clone();
let environment_id = None;
let openable_storage = TestCatalogStateBuilder::new(persist_client)
let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
.with_organization_id(organization_id)
.with_default_deploy_generation()
.build()
.await?;
let storage = openable_storage.open(now(), &test_bootstrap_args()).await?;
let system_parameter_defaults = BTreeMap::default();
Self::open_debug_catalog_inner(storage, now, environment_id, system_parameter_defaults)
.await
Self::open_debug_catalog_inner(
persist_client,
storage,
now,
environment_id,
system_parameter_defaults,
)
.await
}

/// Opens a read only debug persist backed catalog defined by `persist_client` and
Expand All @@ -499,16 +562,22 @@ impl Catalog {
) -> Result<Catalog, anyhow::Error> {
let now = SYSTEM_TIME.clone();
let environment_id = None;
let openable_storage = TestCatalogStateBuilder::new(persist_client)
let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
.with_organization_id(organization_id)
.build()
.await?;
let storage = openable_storage
.open_read_only(&test_bootstrap_args())
.await?;
let system_parameter_defaults = BTreeMap::default();
Self::open_debug_catalog_inner(storage, now, environment_id, system_parameter_defaults)
.await
Self::open_debug_catalog_inner(
persist_client,
storage,
now,
environment_id,
system_parameter_defaults,
)
.await
}

/// Opens a read only debug persist backed catalog defined by `persist_client` and
Expand All @@ -522,7 +591,7 @@ impl Catalog {
system_parameter_defaults: BTreeMap<String, String>,
version: semver::Version,
) -> Result<Catalog, anyhow::Error> {
let openable_storage = TestCatalogStateBuilder::new(persist_client)
let openable_storage = TestCatalogStateBuilder::new(persist_client.clone())
.with_organization_id(environment_id.organization_id())
.with_version(version)
.build()
Expand All @@ -531,6 +600,7 @@ impl Catalog {
.open_read_only(&test_bootstrap_args())
.await?;
Self::open_debug_catalog_inner(
persist_client,
storage,
now,
Some(environment_id),
Expand All @@ -540,6 +610,7 @@ impl Catalog {
}

async fn open_debug_catalog_inner(
persist_client: PersistClient,
storage: Box<dyn DurableCatalogState>,
now: NowFn,
environment_id: Option<EnvironmentId>,
Expand All @@ -557,6 +628,8 @@ impl Catalog {
migrated_storage_collections_0dt: _,
new_builtin_collections: _,
builtin_table_updates: _,
cached_global_exprs: _,
uncached_local_exprs: _,
} = Catalog::open(Config {
storage,
metrics_registry,
Expand All @@ -565,6 +638,7 @@ impl Catalog {
all_features: false,
build_info: &DUMMY_BUILD_INFO,
environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()),
read_only: false,
now,
boot_ts: previous_ts,
skip_migrations: true,
Expand All @@ -584,6 +658,7 @@ impl Catalog {
connection_context: ConnectionContext::for_tests(secrets_reader),
active_connection_count,
builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy,
persist_client,
helm_chart_version: None,
},
})
Expand Down Expand Up @@ -1286,6 +1361,31 @@ impl Catalog {
.deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
}

pub(crate) fn update_expression_cache<'a, 'b>(
&'a self,
new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
) -> BoxFuture<'b, ()> {
if let Some(expr_cache) = &self.expr_cache_handle {
let ons = new_local_expressions
.iter()
.map(|(id, _)| id)
.chain(new_global_expressions.iter().map(|(id, _)| id))
.filter_map(|id| self.get_entry_by_global_id(id).index())
.map(|index| index.on);
let invalidate_ids = self.invalidate_for_index(ons);
expr_cache
.update(
new_local_expressions,
new_global_expressions,
invalidate_ids,
)
.boxed()
} else {
async {}.boxed()
}
}

/// Listen for and apply all unconsumed updates to the durable catalog state.
// TODO(jkosh44) When this method is actually used outside of a test we can remove the
// `#[cfg(test)]` annotation.
Expand Down Expand Up @@ -2124,6 +2224,7 @@ mod tests {
use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
use mz_sql::session::vars::VarInput;

use crate::catalog::state::LocalExpressionCache;
use crate::catalog::{Catalog, Op};
use crate::optimize::dataflows::{prep_scalar_expr, EvalTime, ExprPrepStyle};
use crate::session::Session;
Expand Down Expand Up @@ -2443,7 +2544,12 @@ mod tests {
.expect("unable to open debug catalog");
let item = catalog
.state()
.deserialize_item(gid, &create_sql, &BTreeMap::new())
.deserialize_item(
gid,
&create_sql,
&BTreeMap::new(),
&mut LocalExpressionCache::Closed,
)
.expect("unable to parse view");
catalog
.transact(
Expand Down Expand Up @@ -3342,7 +3448,7 @@ mod tests {
.deserialize_item(
mv_gid,
&format!("CREATE MATERIALIZED VIEW {database_name}.{schema_name}.{mv_name} AS SELECT name FROM mz_tables"),
&BTreeMap::new(),
&BTreeMap::new(), &mut LocalExpressionCache::Closed
)
.expect("unable to deserialize item");
catalog
Expand Down
Loading
Loading