Skip to content

Commit e8d3a28

Browse files
committed
WIP
1 parent fc4678d commit e8d3a28

File tree

9 files changed

+188
-102
lines changed

9 files changed

+188
-102
lines changed

src/adapter/src/catalog.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,13 @@ pub struct CatalogPlans {
154154
notices_by_dep_id: BTreeMap<GlobalId, SmallVec<[Arc<OptimizerNotice>; 4]>>,
155155
}
156156

157+
pub(crate) struct OptimizedPlan {
158+
global_mir: DataflowDescription<OptimizedMirRelationExpr>,
159+
physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
160+
dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
161+
notices: SmallVec<[Arc<OptimizerNotice>; 4]>,
162+
}
163+
157164
impl Catalog {
158165
/// Set the optimized plan for the item identified by `id`.
159166
#[mz_ore::instrument(level = "trace")]
@@ -563,6 +570,7 @@ impl Catalog {
563570
} = Catalog::open(Config {
564571
storage,
565572
metrics_registry,
573+
expr_cache_handle: None,
566574
state: StateConfig {
567575
unsafe_mode: true,
568576
all_features: false,

src/adapter/src/catalog/apply.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use mz_catalog::SYSTEM_CONN_ID;
3636
use mz_compute_client::controller::ComputeReplicaConfig;
3737
use mz_controller::clusters::{ReplicaConfig, ReplicaLogging};
3838
use mz_controller_types::ClusterId;
39-
use mz_expr::MirScalarExpr;
39+
use mz_expr::{MirScalarExpr, OptimizedMirRelationExpr};
4040
use mz_ore::tracing::OpenTelemetryContext;
4141
use mz_ore::{instrument, soft_assert_no_log};
4242
use mz_pgrepr::oid::INVALID_OID;

src/adapter/src/catalog/migrate.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use mz_repr::{GlobalId, Timestamp};
1919
use mz_sql::ast::display::AstDisplay;
2020
use mz_sql_parser::ast::{Raw, Statement};
2121
use semver::Version;
22+
use timely::Container;
2223
use tracing::info;
2324
// DO NOT add any more imports from `crate` outside of `crate::catalog`.
2425
use crate::catalog::open::into_consolidatable_updates_startup;

src/adapter/src/catalog/open.rs

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use mz_catalog::SYSTEM_CONN_ID;
4040
use mz_compute_client::logging::LogVariant;
4141
use mz_controller::clusters::ReplicaLogging;
4242
use mz_controller_types::{is_cluster_size_v2, ClusterId};
43+
use mz_expr::OptimizedMirRelationExpr;
4344
use mz_ore::cast::usize_to_u64;
4445
use mz_ore::collections::{CollectionExt, HashSet};
4546
use mz_ore::now::to_datetime;
@@ -59,16 +60,16 @@ use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
5960
use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
6061
use mz_sql_parser::ast::display::AstDisplay;
6162
use mz_storage_client::controller::StorageController;
62-
use timely::Container;
63+
use timely::{execute, Container};
6364
use tracing::{error, info, warn, Instrument};
6465
use uuid::Uuid;
65-
6666
// DO NOT add any more imports from `crate` outside of `crate::catalog`.
6767
use crate::catalog::open::builtin_item_migration::{
6868
migrate_builtin_items, BuiltinItemMigrationResult,
6969
};
7070
use crate::catalog::{
7171
is_reserved_name, migrate, BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config,
72+
OptimizedPlan,
7273
};
7374
use crate::AdapterError;
7475

@@ -266,6 +267,7 @@ impl Catalog {
266267
aws_principal_context: config.aws_principal_context,
267268
aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
268269
http_host_name: config.http_host_name,
270+
cached_local_exprs,
269271
};
270272

271273
let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
@@ -431,6 +433,9 @@ impl Catalog {
431433

432434
cleanup_action.await;
433435

436+
// Clear out the expressions cache now that we're done initializing.
437+
state.cached_local_exprs.clear();
438+
434439
Ok(InitializeStateResult {
435440
state,
436441
storage_collections_to_drop,
@@ -455,6 +460,32 @@ impl Catalog {
455460
pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
456461
async move {
457462
let mut storage = config.storage;
463+
464+
let (expr_cache_handle, expr_cache_contents) =
465+
if let Some(handle) = config.expr_cache_handle {
466+
let (expr_cache_handle, expr_cache_contents) = handle.await?;
467+
(Some(expr_cache_handle), expr_cache_contents)
468+
} else {
469+
(None, BTreeMap::new())
470+
};
471+
let (local_exprs, global_exprs): (BTreeMap<_, _>, BTreeMap<_, _>) = expr_cache_contents
472+
.into_iter()
473+
.map(|(id, exprs)| {
474+
(
475+
(id, exprs.local_mir),
476+
(
477+
id,
478+
OptimizedPlan {
479+
global_mir: exprs.global_mir,
480+
physical_plan: exprs.physical_plan,
481+
dataflow_metainfos: exprs.dataflow_metainfos,
482+
notices: exprs.notices,
483+
},
484+
),
485+
)
486+
})
487+
.unzip();
488+
458489
let InitializeStateResult {
459490
state,
460491
storage_collections_to_drop,
@@ -466,7 +497,7 @@ impl Catalog {
466497
// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
467498
// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
468499
// Because of that we purposefully move this Future onto the heap (i.e. Box it).
469-
Self::initialize_state(config.state, &mut storage)
500+
Self::initialize_state(config.state, local_exprs, &mut storage)
470501
.instrument(tracing::info_span!("catalog::initialize_state"))
471502
.boxed()
472503
.await?;

src/adapter/src/catalog/state.rs

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use mz_controller::clusters::{
3838
UnmanagedReplicaLocation,
3939
};
4040
use mz_controller_types::{ClusterId, ReplicaId};
41+
use mz_expr::OptimizedMirRelationExpr;
4142
use mz_ore::collections::CollectionExt;
4243
use mz_ore::now::NOW_ZERO;
4344
use mz_ore::soft_assert_no_log;
@@ -84,7 +85,6 @@ use serde::Serialize;
8485
use timely::progress::Antichain;
8586
use tokio::sync::mpsc;
8687
use tracing::{debug, warn};
87-
8888
// DO NOT add any more imports from `crate` outside of `crate::catalog`.
8989
use crate::catalog::{Catalog, ConnCatalog};
9090
use crate::coord::ConnMeta;
@@ -149,6 +149,9 @@ pub struct CatalogState {
149149
pub(super) aws_principal_context: Option<AwsPrincipalContext>,
150150
pub(super) aws_privatelink_availability_zones: Option<BTreeSet<String>>,
151151
pub(super) http_host_name: Option<String>,
152+
153+
// Mutable state that's only used during startup.
154+
pub(super) cached_local_exprs: BTreeMap<GlobalId, OptimizedMirRelationExpr>,
152155
}
153156

154157
fn skip_temp_items<S>(
@@ -831,7 +834,7 @@ impl CatalogState {
831834
/// Parses the given SQL string into a `CatalogItem`.
832835
#[mz_ore::instrument]
833836
pub(crate) fn parse_item(
834-
&self,
837+
&mut self,
835838
id: GlobalId,
836839
create_sql: &str,
837840
pcx: Option<&PlanContext>,
@@ -935,16 +938,21 @@ impl CatalogState {
935938
is_retained_metrics_object,
936939
}),
937940
Plan::CreateView(CreateViewPlan { view, .. }) => {
938-
// Collect optimizer parameters.
939-
let optimizer_config =
940-
optimize::OptimizerConfig::from(session_catalog.system_vars());
941-
942-
// Build an optimizer for this VIEW.
943-
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
944-
945-
// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
946-
let raw_expr = view.expr;
947-
let optimized_expr = optimizer.optimize(raw_expr.clone())?;
941+
let optimized_expr = match self.cached_local_exprs.remove(&id) {
942+
Some(optimized_expr) => optimized_expr,
943+
None => {
944+
// Collect optimizer parameters.
945+
let optimizer_config =
946+
optimize::OptimizerConfig::from(session_catalog.system_vars());
947+
948+
// Build an optimizer for this VIEW.
949+
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
950+
951+
// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local)
952+
let raw_expr = view.expr;
953+
optimizer.optimize(raw_expr.clone())?
954+
}
955+
};
948956

949957
CatalogItem::View(View {
950958
create_sql: view.create_sql,
@@ -958,15 +966,20 @@ impl CatalogState {
958966
Plan::CreateMaterializedView(CreateMaterializedViewPlan {
959967
materialized_view, ..
960968
}) => {
961-
// Collect optimizer parameters.
962-
let optimizer_config =
963-
optimize::OptimizerConfig::from(session_catalog.system_vars());
964-
// Build an optimizer for this VIEW.
965-
// TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
966-
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
967-
968-
let raw_expr = materialized_view.expr;
969-
let optimized_expr = optimizer.optimize(raw_expr.clone())?;
969+
let optimized_expr = match self.cached_local_exprs.remove(&id) {
970+
Some(optimized_expr) => optimized_expr,
971+
None => {
972+
// Collect optimizer parameters.
973+
let optimizer_config =
974+
optimize::OptimizerConfig::from(session_catalog.system_vars());
975+
// Build an optimizer for this VIEW.
976+
// TODO(aalexandrov): ideally this should be a materialized_view::Optimizer.
977+
let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None);
978+
979+
let raw_expr = materialized_view.expr;
980+
optimizer.optimize(raw_expr.clone())?
981+
}
982+
};
970983
let mut typ = optimized_expr.typ();
971984
for &i in &materialized_view.non_null_assertions {
972985
typ.column_types[i].nullable = false;

src/adapter/src/coord.rs

Lines changed: 61 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,36 @@ use std::thread;
8989
use std::time::{Duration, Instant};
9090
use thiserror::Error;
9191

92+
use self::statement_logging::{StatementLogging, StatementLoggingId};
93+
use crate::active_compute_sink::ActiveComputeSink;
94+
use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
95+
use crate::client::{Client, Handle};
96+
use crate::command::{Command, ExecuteResponse};
97+
use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
98+
use crate::coord::appends::{
99+
BuiltinTableAppendNotify, Deferred, GroupCommitPermit, PendingWriteTxn,
100+
};
101+
use crate::coord::caught_up::CaughtUpCheckContext;
102+
use crate::coord::cluster_scheduling::SchedulingDecision;
103+
use crate::coord::id_bundle::CollectionIdBundle;
104+
use crate::coord::introspection::IntrospectionSubscribe;
105+
use crate::coord::peek::PendingPeek;
106+
use crate::coord::timeline::{TimelineContext, TimelineState};
107+
use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
108+
use crate::coord::validity::PlanValidity;
109+
use crate::error::AdapterError;
110+
use crate::explain::insights::PlanInsightsContext;
111+
use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
112+
use crate::metrics::Metrics;
113+
use crate::optimize::dataflows::{
114+
dataflow_import_id_bundle, ComputeInstanceSnapshot, DataflowBuilder,
115+
};
116+
use crate::optimize::{self, Optimize, OptimizerConfig};
117+
use crate::session::{EndTransactionAction, Session};
118+
use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
119+
use crate::util::{ClientTransmitter, ResultExt};
120+
use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
121+
use crate::{flags, AdapterNotice, ReadHolds};
92122
use derivative::Derivative;
93123
use differential_dataflow::lattice::Lattice;
94124
use fail::fail_point;
@@ -102,6 +132,7 @@ use mz_build_info::BuildInfo;
102132
use mz_catalog::builtin::{BUILTINS, BUILTINS_STATIC, MZ_STORAGE_USAGE_BY_SHARD};
103133
use mz_catalog::config::{AwsPrincipalContext, BuiltinItemMigrationConfig, ClusterReplicaSizeMap};
104134
use mz_catalog::durable::OpenableDurableCatalogState;
135+
use mz_catalog::expr_cache::{ExpressionCacheConfig, ExpressionCacheHandle};
105136
use mz_catalog::memory::objects::{
106137
CatalogEntry, CatalogItem, ClusterReplicaProcessStatus, ClusterVariantManaged, Connection,
107138
DataSourceDesc, TableDataSource,
@@ -128,6 +159,7 @@ use mz_ore::{assert_none, instrument, soft_assert_or_log, soft_panic_or_log, sta
128159
use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
129160
use mz_repr::explain::{ExplainConfig, ExplainFormat};
130161
use mz_repr::global_id::TransientIdGen;
162+
use mz_repr::optimize::OptimizerFeatures;
131163
use mz_repr::role_id::RoleId;
132164
use mz_repr::{Diff, GlobalId, RelationDesc, Row, Timestamp};
133165
use mz_secrets::cache::CachingSecretsReader;
@@ -165,38 +197,6 @@ use tracing::{debug, info, info_span, span, warn, Instrument, Level, Span};
165197
use tracing_opentelemetry::OpenTelemetrySpanExt;
166198
use uuid::Uuid;
167199

168-
use crate::active_compute_sink::ActiveComputeSink;
169-
use crate::catalog::{BuiltinTableUpdate, Catalog, OpenCatalogResult};
170-
use crate::client::{Client, Handle};
171-
use crate::command::{Command, ExecuteResponse};
172-
use crate::config::{SynchronizedParameters, SystemParameterFrontend, SystemParameterSyncConfig};
173-
use crate::coord::appends::{
174-
BuiltinTableAppendNotify, Deferred, GroupCommitPermit, PendingWriteTxn,
175-
};
176-
use crate::coord::caught_up::CaughtUpCheckContext;
177-
use crate::coord::cluster_scheduling::SchedulingDecision;
178-
use crate::coord::id_bundle::CollectionIdBundle;
179-
use crate::coord::introspection::IntrospectionSubscribe;
180-
use crate::coord::peek::PendingPeek;
181-
use crate::coord::timeline::{TimelineContext, TimelineState};
182-
use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
183-
use crate::coord::validity::PlanValidity;
184-
use crate::error::AdapterError;
185-
use crate::explain::insights::PlanInsightsContext;
186-
use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
187-
use crate::metrics::Metrics;
188-
use crate::optimize::dataflows::{
189-
dataflow_import_id_bundle, ComputeInstanceSnapshot, DataflowBuilder,
190-
};
191-
use crate::optimize::{self, Optimize, OptimizerConfig};
192-
use crate::session::{EndTransactionAction, Session};
193-
use crate::statement_logging::{StatementEndedExecutionReason, StatementLifecycleEvent};
194-
use crate::util::{ClientTransmitter, ResultExt};
195-
use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
196-
use crate::{flags, AdapterNotice, ReadHolds};
197-
198-
use self::statement_logging::{StatementLogging, StatementLoggingId};
199-
200200
pub(crate) mod id_bundle;
201201
pub(crate) mod in_memory_oracle;
202202
pub(crate) mod peek;
@@ -3476,7 +3476,7 @@ pub fn serve(
34763476
Config {
34773477
controller_config,
34783478
controller_envd_epoch,
3479-
storage,
3479+
mut storage,
34803480
timestamp_oracle_url,
34813481
unsafe_mode,
34823482
all_features,
@@ -3547,6 +3547,33 @@ pub fn serve(
35473547
let aws_privatelink_availability_zones = aws_privatelink_availability_zones
35483548
.map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));
35493549

3550+
let expr_cache_handle = {
3551+
let persist = controller_config
3552+
.persist_clients
3553+
.open(controller_config.persist_location.clone())
3554+
.await
3555+
.context("opening expression cache")?;
3556+
let current_ids = storage
3557+
.transaction()
3558+
.await?
3559+
.get_items()
3560+
.into_iter()
3561+
.map(|item| item.id)
3562+
.collect();
3563+
let expr_cache_config = ExpressionCacheConfig {
3564+
deploy_generation: controller_config.deploy_generation,
3565+
persist,
3566+
organization_id: environment_id.organization_id(),
3567+
current_ids,
3568+
// TODO(jkosh44)
3569+
optimizer_features: OptimizerFeatures::default(),
3570+
remove_prior_gens: !read_only_controllers,
3571+
};
3572+
spawn(|| "expr-cache-handle-spawn", async move {
3573+
ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await
3574+
})
3575+
};
3576+
35503577
info!(
35513578
"startup: coordinator init: preamble complete in {:?}",
35523579
coord_start.elapsed()
@@ -3625,6 +3652,7 @@ pub fn serve(
36253652
} = Catalog::open(mz_catalog::config::Config {
36263653
storage,
36273654
metrics_registry: &metrics_registry,
3655+
expr_cache_handle: Some(expr_cache_handle),
36283656
state: mz_catalog::config::StateConfig {
36293657
unsafe_mode,
36303658
all_features,

src/catalog/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ use mz_controller::clusters::ReplicaAllocation;
1818
use mz_orchestrator::MemoryLimit;
1919
use mz_ore::cast::CastFrom;
2020
use mz_ore::metrics::MetricsRegistry;
21+
use mz_ore::task::JoinHandle;
2122
use mz_persist_client::PersistClient;
2223
use mz_repr::GlobalId;
2324
use mz_sql::catalog::EnvironmentId;
2425
use mz_sql::session::vars::ConnectionCounter;
2526
use serde::{Deserialize, Serialize};
2627

2728
use crate::durable::DurableCatalogState;
29+
use crate::expr_cache::{ExpressionCacheHandle, Expressions};
2830

2931
/// Configures a catalog.
3032
#[derive(Debug)]
@@ -33,6 +35,8 @@ pub struct Config<'a> {
3335
pub storage: Box<dyn DurableCatalogState>,
3436
/// The registry that catalog uses to report metrics.
3537
pub metrics_registry: &'a MetricsRegistry,
38+
pub expr_cache_handle:
39+
Option<JoinHandle<(ExpressionCacheHandle, BTreeMap<GlobalId, Expressions>)>>,
3640
pub state: StateConfig,
3741
}
3842

0 commit comments

Comments
 (0)