diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 65fa2f08847c8..db4a00712e1ca 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -2643,6 +2643,7 @@ mod tests { &create_sql, &BTreeMap::new(), &mut LocalExpressionCache::Closed, + None, ) .expect("unable to parse view"); let commit_ts = catalog.current_upper().await; @@ -3549,7 +3550,9 @@ mod tests { .deserialize_item( mv_gid, &format!("CREATE MATERIALIZED VIEW {database_name}.{schema_name}.{mv_name} AS SELECT name FROM mz_tables"), - &BTreeMap::new(), &mut LocalExpressionCache::Closed + &BTreeMap::new(), + &mut LocalExpressionCache::Closed, + None, ) .expect("unable to deserialize item"); let commit_ts = catalog.current_upper().await; diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs index 4b03c9124f8f7..0e549130b31a3 100644 --- a/src/adapter/src/catalog/apply.rs +++ b/src/adapter/src/catalog/apply.rs @@ -684,7 +684,9 @@ impl CatalogState { &versions, None, index.is_retained_metrics_object, - custom_logical_compaction_window,local_expression_cache, + custom_logical_compaction_window, + local_expression_cache, + None, ) .unwrap_or_else(|e| { panic!( @@ -826,6 +828,7 @@ impl CatalogState { false, None, local_expression_cache, + None, ) .unwrap_or_else(|e| { panic!( @@ -862,6 +865,7 @@ impl CatalogState { false, None, local_expression_cache, + None, ) .unwrap_or_else(|e| { panic!( @@ -986,6 +990,7 @@ impl CatalogState { &create_sql, &extra_versions, local_expression_cache, + Some(retraction.item), ) .unwrap_or_else(|e| { panic!("{e:?}: invalid persisted SQL: {create_sql}") @@ -1007,6 +1012,7 @@ impl CatalogState { &create_sql, &extra_versions, local_expression_cache, + None, ) .unwrap_or_else(|e| { panic!("{e:?}: invalid persisted SQL: {create_sql}") @@ -1370,6 +1376,7 @@ impl CatalogState { false, None, cached_expr, + None, ); (id, global_id, res) } diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index 53480a4287e24..2a1cb4c74411d 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -168,6 +168,7 @@ impl CatalogItemRebuilder { is_retained_metrics_object, custom_logical_compaction_window, &mut LocalExpressionCache::Closed, + None, ) .unwrap_or_else(|error| panic!("invalid persisted create sql ({error:?}): {sql}")), } diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index 23d34cfc233d8..d1659fd44a6b0 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -949,6 +949,7 @@ impl CatalogState { create_sql: &str, extra_versions: &BTreeMap, local_expression_cache: &mut LocalExpressionCache, + previous_item: Option, ) -> Result { self.parse_item( global_id, @@ -958,6 +959,7 @@ impl CatalogState { false, None, local_expression_cache, + previous_item, ) } @@ -972,6 +974,7 @@ impl CatalogState { is_retained_metrics_object: bool, custom_logical_compaction_window: Option, local_expression_cache: &mut LocalExpressionCache, + previous_item: Option, ) -> Result { let cached_expr = local_expression_cache.remove_cached_expression(&global_id); match self.parse_item_inner( @@ -982,6 +985,7 @@ impl CatalogState { is_retained_metrics_object, custom_logical_compaction_window, cached_expr, + previous_item, ) { Ok((item, uncached_expr)) => { if let Some((uncached_expr, optimizer_features)) = uncached_expr { @@ -1018,6 +1022,7 @@ impl CatalogState { is_retained_metrics_object: bool, custom_logical_compaction_window: Option, cached_expr: Option, + previous_item: Option, ) -> Result< ( CatalogItem, @@ -1166,15 +1171,23 @@ impl CatalogState { // Collect optimizer parameters. let optimizer_config = optimize::OptimizerConfig::from(session_catalog.system_vars()); + let previous_exprs = previous_item.map(|item| match item { + CatalogItem::View(view) => (view.raw_expr, view.optimized_expr), + item => unreachable!("expected view, found: {item:#?}"), + }); - let (raw_expr, optimized_expr) = match cached_expr { - Some(local_expr) + let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) { + (Some(local_expr), _) if local_expr.optimizer_features == optimizer_config.features => { debug!("local expression cache hit for {global_id:?}"); - (view.expr, local_expr.local_mir) + (Arc::new(view.expr), Arc::new(local_expr.local_mir)) } - Some(_) | None => { + // If the new expr is equivalent to the old expr, then we don't need to re-optimize. + (_, Some((raw_expr, optimized_expr))) if *raw_expr == view.expr => { + (Arc::clone(&raw_expr), Arc::clone(&optimized_expr)) + } + (cached_expr, _) => { let optimizer_features = optimizer_config.features.clone(); // Build an optimizer for this VIEW. let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None); @@ -1188,7 +1201,7 @@ impl CatalogState { uncached_expr = Some((optimized_expr.clone(), optimizer_features)); - (raw_expr, optimized_expr) + (Arc::new(raw_expr), Arc::new(optimized_expr)) } }; @@ -1202,9 +1215,9 @@ impl CatalogState { CatalogItem::View(View { create_sql: view.create_sql, global_id, - raw_expr: raw_expr.into(), + raw_expr, desc: RelationDesc::new(optimized_expr.typ(), view.column_names), - optimized_expr: optimized_expr.into(), + optimized_expr, conn_id: None, resolved_ids, dependencies: DependencyIds(dependencies), @@ -1216,15 +1229,30 @@ impl CatalogState { // Collect optimizer parameters. let optimizer_config = optimize::OptimizerConfig::from(session_catalog.system_vars()); + let previous_exprs = previous_item.map(|item| match item { + CatalogItem::MaterializedView(materialized_view) => { + (materialized_view.raw_expr, materialized_view.optimized_expr) + } + item => unreachable!("expected materialized view, found: {item:#?}"), + }); - let (raw_expr, optimized_expr) = match cached_expr { - Some(local_expr) + let (raw_expr, optimized_expr) = match (cached_expr, previous_exprs) { + (Some(local_expr), _) if local_expr.optimizer_features == optimizer_config.features => { debug!("local expression cache hit for {global_id:?}"); - (materialized_view.expr, local_expr.local_mir) + ( + Arc::new(materialized_view.expr), + Arc::new(local_expr.local_mir), + ) + } + // If the new expr is equivalent to the old expr, then we don't need to re-optimize. + (_, Some((raw_expr, optimized_expr))) + if *raw_expr == materialized_view.expr => + { + (Arc::clone(&raw_expr), Arc::clone(&optimized_expr)) } - Some(_) | None => { + (cached_expr, _) => { let optimizer_features = optimizer_config.features.clone(); // TODO(aalexandrov): ideally this should be a materialized_view::Optimizer. let mut optimizer = optimize::view::Optimizer::new(optimizer_config, None); @@ -1237,7 +1265,7 @@ impl CatalogState { uncached_expr = Some((optimized_expr.clone(), optimizer_features)); - (raw_expr, optimized_expr) + (Arc::new(raw_expr), Arc::new(optimized_expr)) } }; let mut typ = optimized_expr.typ(); @@ -1258,8 +1286,8 @@ impl CatalogState { CatalogItem::MaterializedView(MaterializedView { create_sql: materialized_view.create_sql, global_id, - raw_expr: raw_expr.into(), - optimized_expr: optimized_expr.into(), + raw_expr, + optimized_expr, desc, resolved_ids, dependencies,