Skip to content

Commit 350f435

Browse files
committed
WIP
1 parent 26eed45 commit 350f435

File tree

8 files changed

+389
-157
lines changed

8 files changed

+389
-157
lines changed

src/adapter/src/catalog.rs

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//! Persistent metadata storage for the coordinator.
1313
1414
use std::borrow::Cow;
15-
use std::collections::{BTreeMap, BTreeSet};
15+
use std::collections::{BTreeMap, BTreeSet, VecDeque};
1616
use std::convert;
1717
use std::sync::Arc;
1818

@@ -29,6 +29,7 @@ use mz_catalog::config::{BuiltinItemMigrationConfig, ClusterReplicaSizeMap, Conf
2929
#[cfg(test)]
3030
use mz_catalog::durable::CatalogError;
3131
use mz_catalog::durable::{test_bootstrap_args, DurableCatalogState, TestCatalogStateBuilder};
32+
use mz_catalog::expr_cache::{ExpressionCacheHandle, GlobalExpressions, LocalExpressions};
3233
use mz_catalog::memory::error::{Error, ErrorKind};
3334
use mz_catalog::memory::objects::{
3435
CatalogEntry, Cluster, ClusterReplica, Database, NetworkPolicy, Role, Schema,
@@ -37,6 +38,7 @@ use mz_compute_types::dataflows::DataflowDescription;
3738
use mz_controller::clusters::ReplicaLocation;
3839
use mz_controller_types::{ClusterId, ReplicaId};
3940
use mz_expr::OptimizedMirRelationExpr;
41+
use mz_ore::collections::HashSet;
4042
use mz_ore::metrics::MetricsRegistry;
4143
use mz_ore::now::{EpochMillis, NowFn, SYSTEM_TIME};
4244
use mz_ore::option::FallibleMapExt;
@@ -52,9 +54,9 @@ use mz_repr::{Diff, GlobalId, ScalarType};
5254
use mz_secrets::InMemorySecretsController;
5355
use mz_sql::catalog::{
5456
CatalogCluster, CatalogClusterReplica, CatalogDatabase, CatalogError as SqlCatalogError,
55-
CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogNetworkPolicy,
56-
CatalogRole, CatalogSchema, DefaultPrivilegeAclItem, DefaultPrivilegeObject, EnvironmentId,
57-
SessionCatalog, SystemObjectType,
57+
CatalogItem as SqlCatalogItem, CatalogItemType as SqlCatalogItemType, CatalogItemType,
58+
CatalogNetworkPolicy, CatalogRole, CatalogSchema, DefaultPrivilegeAclItem,
59+
DefaultPrivilegeObject, EnvironmentId, SessionCatalog, SystemObjectType,
5860
};
5961
use mz_sql::names::{
6062
CommentObjectId, DatabaseId, FullItemName, FullSchemaName, ItemQualifiers, ObjectId,
@@ -128,6 +130,7 @@ mod transact;
128130
pub struct Catalog {
129131
state: CatalogState,
130132
plans: CatalogPlans,
133+
expr_cache_handle: Option<ExpressionCacheHandle>,
131134
storage: Arc<tokio::sync::Mutex<Box<dyn mz_catalog::durable::DurableCatalogState>>>,
132135
transient_revision: u64,
133136
}
@@ -139,6 +142,7 @@ impl Clone for Catalog {
139142
Self {
140143
state: self.state.clone(),
141144
plans: self.plans.clone(),
145+
expr_cache_handle: self.expr_cache_handle.clone(),
142146
storage: Arc::clone(&self.storage),
143147
transient_revision: self.transient_revision,
144148
}
@@ -363,6 +367,43 @@ impl Catalog {
363367
}
364368
policies
365369
}
370+
371+
/// TODO(jkosh44)
372+
pub(crate) fn invalidate_for_index(&self, on: &GlobalId) -> Vec<GlobalId> {
373+
let mut dependencies = Vec::new();
374+
let mut queue = VecDeque::new();
375+
let mut seen = HashSet::new();
376+
{
377+
dependencies.push(*on);
378+
seen.insert(*on);
379+
let entry = self.get_entry(on);
380+
let uses = entry.uses();
381+
queue.extend(uses.clone());
382+
seen.extend(uses);
383+
}
384+
385+
while let Some(cur) = queue.pop_front() {
386+
if seen.insert(cur) {
387+
let entry = self.get_entry(&cur);
388+
match entry.item_type() {
389+
CatalogItemType::Table
390+
| CatalogItemType::Source
391+
| CatalogItemType::MaterializedView
392+
| CatalogItemType::Sink
393+
| CatalogItemType::Index
394+
| CatalogItemType::Type
395+
| CatalogItemType::Func
396+
| CatalogItemType::Secret
397+
| CatalogItemType::Connection
398+
| CatalogItemType::ContinualTask => dependencies.push(cur),
399+
CatalogItemType::View => {
400+
queue.extend(entry.uses());
401+
}
402+
}
403+
}
404+
}
405+
dependencies
406+
}
366407
}
367408

368409
#[derive(Debug)]
@@ -559,9 +600,11 @@ impl Catalog {
559600
migrated_storage_collections_0dt: _,
560601
new_builtins: _,
561602
builtin_table_updates: _,
603+
cached_global_exprs: _,
562604
} = Catalog::open(Config {
563605
storage,
564606
metrics_registry,
607+
expr_cache_handle: None,
565608
state: StateConfig {
566609
unsafe_mode: true,
567610
all_features: false,
@@ -1259,6 +1302,21 @@ impl Catalog {
12591302
.deserialize_plan_with_enable_for_item_parsing(create_sql, force_if_exists_skip)
12601303
}
12611304

1305+
pub(crate) fn update_expression_cache(
1306+
&self,
1307+
new_local_expressions: Vec<(GlobalId, LocalExpressions)>,
1308+
new_global_expressions: Vec<(GlobalId, GlobalExpressions)>,
1309+
invalidate_ids: BTreeSet<GlobalId>,
1310+
) {
1311+
if let Some(expr_cache) = &self.expr_cache_handle {
1312+
let _ = expr_cache.update(
1313+
new_local_expressions,
1314+
new_global_expressions,
1315+
invalidate_ids,
1316+
);
1317+
}
1318+
}
1319+
12621320
/// Listen for and apply all unconsumed updates to the durable catalog state.
12631321
// TODO(jkosh44) When this method is actually used outside of a test we can remove the
12641322
// `#[cfg(test)]` annotation.

src/adapter/src/catalog/open.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use mz_catalog::durable::objects::{
3131
use mz_catalog::durable::{
3232
ClusterVariant, ClusterVariantManaged, Transaction, SYSTEM_CLUSTER_ID_ALLOC_KEY,
3333
};
34+
use mz_catalog::expr_cache::{GlobalExpressions, LocalExpressions};
3435
use mz_catalog::memory::error::{Error, ErrorKind};
3536
use mz_catalog::memory::objects::{
3637
BootstrapStateUpdateKind, CatalogEntry, CatalogItem, CommentsMap, DefaultPrivileges,
@@ -62,7 +63,6 @@ use mz_storage_client::controller::StorageController;
6263
use timely::Container;
6364
use tracing::{error, info, warn, Instrument};
6465
use uuid::Uuid;
65-
6666
// DO NOT add any more imports from `crate` outside of `crate::catalog`.
6767
use crate::catalog::open::builtin_item_migration::{
6868
migrate_builtin_items, BuiltinItemMigrationResult,
@@ -188,6 +188,8 @@ pub struct OpenCatalogResult {
188188
pub new_builtins: BTreeSet<GlobalId>,
189189
/// A list of builtin table updates corresponding to the initialized state.
190190
pub builtin_table_updates: Vec<BuiltinTableUpdate>,
191+
/// TODO(jkosh44)
192+
pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
191193
}
192194

193195
impl Catalog {
@@ -196,6 +198,7 @@ impl Catalog {
196198
/// (for example: no [mz_secrets::SecretsReader]).
197199
pub async fn initialize_state<'a>(
198200
config: StateConfig,
201+
cached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
199202
storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
200203
) -> Result<InitializeStateResult, AdapterError> {
201204
for builtin_role in BUILTIN_ROLES {
@@ -266,6 +269,7 @@ impl Catalog {
266269
aws_principal_context: config.aws_principal_context,
267270
aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
268271
http_host_name: config.http_host_name,
272+
cached_local_exprs,
269273
};
270274

271275
let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
@@ -431,6 +435,9 @@ impl Catalog {
431435

432436
cleanup_action.await;
433437

438+
// Clear out the expressions cache now that we're done initializing.
439+
state.cached_local_exprs.clear();
440+
434441
Ok(InitializeStateResult {
435442
state,
436443
storage_collections_to_drop,
@@ -455,6 +462,22 @@ impl Catalog {
455462
pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
456463
async move {
457464
let mut storage = config.storage;
465+
466+
let (expr_cache_handle, local_exprs, global_exprs) =
467+
if let Some(handle) = config.expr_cache_handle {
468+
match handle.await {
469+
Ok((expr_cache_handle, local_exprs, global_exprs)) => {
470+
(Some(expr_cache_handle), local_exprs, global_exprs)
471+
}
472+
Err(err) => {
473+
warn!("unable to open expression cache: {err:?}");
474+
(None, BTreeMap::new(), BTreeMap::new())
475+
}
476+
}
477+
} else {
478+
(None, BTreeMap::new(), BTreeMap::new())
479+
};
480+
458481
let InitializeStateResult {
459482
state,
460483
storage_collections_to_drop,
@@ -466,14 +489,15 @@ impl Catalog {
466489
// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
467490
// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
468491
// Because of that we purposefully move this Future onto the heap (i.e. Box it).
469-
Self::initialize_state(config.state, &mut storage)
492+
Self::initialize_state(config.state, local_exprs, &mut storage)
470493
.instrument(tracing::info_span!("catalog::initialize_state"))
471494
.boxed()
472495
.await?;
473496

474497
let catalog = Catalog {
475498
state,
476499
plans: CatalogPlans::default(),
500+
expr_cache_handle,
477501
transient_revision: 1,
478502
storage: Arc::new(tokio::sync::Mutex::new(storage)),
479503
};
@@ -507,6 +531,7 @@ impl Catalog {
507531
migrated_storage_collections_0dt,
508532
new_builtins,
509533
builtin_table_updates,
534+
cached_global_exprs: global_exprs,
510535
})
511536
}
512537
.instrument(tracing::info_span!("catalog::open"))

src/adapter/src/catalog/state.rs

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use mz_catalog::builtin::{
2626
Builtin, BuiltinCluster, BuiltinLog, BuiltinSource, BuiltinTable, BuiltinType, BUILTINS,
2727
};
2828
use mz_catalog::config::{AwsPrincipalContext, ClusterReplicaSizeMap};
29+
use mz_catalog::expr_cache::LocalExpressions;
2930
use mz_catalog::memory::error::{Error, ErrorKind};
3031
use mz_catalog::memory::objects::{
3132
CatalogEntry, CatalogItem, Cluster, ClusterReplica, CommentsMap, Connection, DataSourceDesc,
@@ -84,7 +85,6 @@ use serde::Serialize;
8485
use timely::progress::Antichain;
8586
use tokio::sync::mpsc;
8687
use tracing::{debug, warn};
87-
8888
// DO NOT add any more imports from `crate` outside of `crate::catalog`.
8989
use crate::catalog::{Catalog, ConnCatalog};
9090
use crate::coord::ConnMeta;
@@ -149,6 +149,9 @@ pub struct CatalogState {
149149
pub(super) aws_principal_context: Option<AwsPrincipalContext>,
150150
pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
151151
pub(super) http_host_name: Option<String>,
152+
153+
// Mutable state that's only used during startup.
154+
pub(super) cached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
152155
}
153156

154157
fn skip_temp_items<S>(
@@ -208,6 +211,7 @@ impl CatalogState {
208211
comments: Default::default(),
209212
source_references: Default::default(),
210213
storage_metadata: Default::default(),
214+
cached_local_exprs: Default::default(),
211215
}
212216
}
213217

@@ -959,16 +963,23 @@ impl CatalogState {
959963
is_retained_metrics_object,
960964
}),
961965
Plan::CreateView(CreateViewPlan { view, .. }) => {
962-
// Collect optimizer parameters.
963-
let optimizer_config =
964-
optimize::OptimizerConfig::from(session_catalog.system_vars());
966+
let (raw_expr, optimized_expr) = match self.cached_local_exprs.get(&id) {
967+
Some(local_expr) => (view.expr, local_expr.local_mir.clone()),
968+
None => {
969+
// Collect optimizer parameters.
970+
let optimizer_config =
971+
optimize::OptimizerConfig::from(session_catalog.system_vars());
965972

966-
// Build an optimizer for this VIEW.
967-
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
973+
// Build an optimizer for this VIEW.
974+
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
968975

969-
// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
970-
let raw_expr = view.expr;
971-
let optimized_expr = optimizer.optimize(raw_expr.clone())?;
976+
// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
977+
let raw_expr = view.expr;
978+
let optimized_expr = optimizer.optimize(raw_expr.clone())?;
979+
980+
(raw_expr, optimized_expr)
981+
}
982+
};
972983

973984
CatalogItem::View(View {
974985
create_sql: view.create_sql,
@@ -982,15 +993,22 @@ impl CatalogState {
982993
Plan::CreateMaterializedView(CreateMaterializedViewPlan {
983994
materialized_view, ..
984995
}) => {
985-
// Collect optimizer parameters.
986-
let optimizer_config =
987-
optimize::OptimizerConfig::from(session_catalog.system_vars());
988-
// Build an optimizer for this VIEW.
989-
// TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
990-
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
991-
992-
let raw_expr = materialized_view.expr;
993-
let optimized_expr = optimizer.optimize(raw_expr.clone())?;
996+
let (raw_expr, optimized_expr) = match self.cached_local_exprs.get(&id) {
997+
Some(local_expr) => (materialized_view.expr, local_expr.local_mir.clone()),
998+
None => {
999+
// Collect optimizer parameters.
1000+
let optimizer_config =
1001+
optimize::OptimizerConfig::from(session_catalog.system_vars());
1002+
// Build an optimizer for this VIEW.
1003+
// TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
1004+
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
1005+
1006+
let raw_expr = materialized_view.expr;
1007+
let optimized_expr = optimizer.optimize(raw_expr.clone())?;
1008+
1009+
(raw_expr, optimized_expr)
1010+
}
1011+
};
9941012
let mut typ = optimized_expr.typ();
9951013
for &i in &materialized_view.non_null_assertions {
9961014
typ.column_types[i].nullable = false;

0 commit comments

Comments
 (0)