diff --git a/Cargo.lock b/Cargo.lock index 5b56f836f3b4d..88bf174d1dd42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4438,8 +4438,10 @@ dependencies = [ "mz-build-tools", "mz-cloud-resources", "mz-compute-client", + "mz-compute-types", "mz-controller", "mz-controller-types", + "mz-durable-cache", "mz-expr", "mz-orchestrator", "mz-ore", @@ -4456,6 +4458,7 @@ dependencies = [ "mz-storage-client", "mz-storage-controller", "mz-storage-types", + "mz-transform", "paste", "postgres-openssl", "prometheus", @@ -4470,6 +4473,7 @@ dependencies = [ "serde_plain", "sha2", "similar-asserts", + "smallvec", "static_assertions", "thiserror", "timely", @@ -6977,6 +6981,9 @@ dependencies = [ "ordered-float 4.2.0", "paste", "proc-macro2", + "proptest", + "proptest-derive", + "serde", "serde_json", "tracing", "workspace-hack", diff --git a/doc/developer/design/20241008_expression_cache.md b/doc/developer/design/20241008_expression_cache.md index b91b0d97b53db..ddafadb32e1f4 100644 --- a/doc/developer/design/20241008_expression_cache.md +++ b/doc/developer/design/20241008_expression_cache.md @@ -70,16 +70,7 @@ struct Expressions { physical_plan: DataflowDescription, dataflow_metainfos: DataflowMetainfo>, notices: SmallVec<[Arc; 4]>, - optimizer_feature_overrides: OptimizerFeatures, -} - -struct NewEntry { - /// `GlobalId` of the new expression. - id: GlobalId, - /// New `Expressions` to cache. - expressions: Expressions, - /// `GlobalId`s to invalidate as a result of the new entry. - invalidate_ids: BTreeSet, + optimizer_feature: OptimizerFeatures, } struct ExpressionCache { @@ -100,13 +91,12 @@ impl ExpressionCache { /// Returns all cached expressions in the current deploy generation, after reconciliation. fn open(&mut self, current_ids: &BTreeSet, optimizer_features: &OptimizerFeatures, remove_prior_gens: bool) -> Vec<(GlobalId, Expressions)>; - /// Durably inserts `expressions` into current deploy generation. This may also invalidate - /// entries giving by `expressions`. - /// - /// Returns a [`Future`] that completes once the changes have been made durable. + /// Durably removes all entries given by `invalidate_ids` and inserts `new_entries` into + /// current deploy generation. /// - /// Panics if any `GlobalId` already exists in the cache. - fn insert_expressions(&mut self, expressions: Vec) -> impl Future; + /// If there is a duplicate ID in both `invalidate_ids` and `new_entries`, then the final value + /// will be taken from `new_entries`. + fn insert_expressions(&mut self, new_entries: Vec<(GlobalId, Expressions)>, invalidate_ids: BTreeSet); /// Durably remove and return all entries in current deploy generation that depend on an ID in /// `dropped_ids` . diff --git a/src/catalog/BUILD.bazel b/src/catalog/BUILD.bazel index 9834aa4c38a92..c2affcc4b4472 100644 --- a/src/catalog/BUILD.bazel +++ b/src/catalog/BUILD.bazel @@ -36,8 +36,10 @@ rust_library( "//src/build-info:mz_build_info", "//src/cloud-resources:mz_cloud_resources", "//src/compute-client:mz_compute_client", + "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/expr:mz_expr", "//src/orchestrator:mz_orchestrator", "//src/ore:mz_ore", @@ -53,6 +55,7 @@ rust_library( "//src/storage-client:mz_storage_client", "//src/storage-controller:mz_storage_controller", "//src/storage-types:mz_storage_types", + "//src/transform:mz_transform", ] + all_crate_deps(normal = True), ) @@ -87,8 +90,10 @@ rust_test( "//src/build-tools:mz_build_tools", "//src/cloud-resources:mz_cloud_resources", "//src/compute-client:mz_compute_client", + "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/expr:mz_expr", "//src/orchestrator:mz_orchestrator", "//src/ore:mz_ore", @@ -105,6 +110,7 @@ rust_test( "//src/storage-client:mz_storage_client", "//src/storage-controller:mz_storage_controller", "//src/storage-types:mz_storage_types", + "//src/transform:mz_transform", ] + all_crate_deps( normal = True, normal_dev = True, @@ -121,8 +127,10 @@ rust_doc_test( "//src/build-tools:mz_build_tools", "//src/cloud-resources:mz_cloud_resources", "//src/compute-client:mz_compute_client", + "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/expr:mz_expr", "//src/orchestrator:mz_orchestrator", "//src/ore:mz_ore", @@ -139,6 +147,7 @@ rust_doc_test( "//src/storage-client:mz_storage_client", "//src/storage-controller:mz_storage_controller", "//src/storage-types:mz_storage_types", + "//src/transform:mz_transform", ] + all_crate_deps( normal = True, normal_dev = True, @@ -193,8 +202,10 @@ rust_test( "//src/build-tools:mz_build_tools", "//src/cloud-resources:mz_cloud_resources", "//src/compute-client:mz_compute_client", + "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/expr:mz_expr", "//src/orchestrator:mz_orchestrator", "//src/ore:mz_ore", @@ -211,6 +222,7 @@ rust_test( "//src/storage-client:mz_storage_client", "//src/storage-controller:mz_storage_controller", "//src/storage-types:mz_storage_types", + "//src/transform:mz_transform", ] + all_crate_deps( normal = True, normal_dev = True, @@ -247,8 +259,10 @@ rust_test( "//src/build-tools:mz_build_tools", "//src/cloud-resources:mz_cloud_resources", "//src/compute-client:mz_compute_client", + "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/expr:mz_expr", "//src/orchestrator:mz_orchestrator", "//src/ore:mz_ore", @@ -265,6 +279,7 @@ rust_test( "//src/storage-client:mz_storage_client", "//src/storage-controller:mz_storage_controller", "//src/storage-types:mz_storage_types", + "//src/transform:mz_transform", ] + all_crate_deps( normal = True, normal_dev = True, @@ -301,8 +316,10 @@ rust_test( "//src/build-tools:mz_build_tools", "//src/cloud-resources:mz_cloud_resources", "//src/compute-client:mz_compute_client", + "//src/compute-types:mz_compute_types", "//src/controller:mz_controller", "//src/controller-types:mz_controller_types", + "//src/durable-cache:mz_durable_cache", "//src/expr:mz_expr", "//src/orchestrator:mz_orchestrator", "//src/ore:mz_ore", @@ -319,6 +336,7 @@ rust_test( "//src/storage-client:mz_storage_client", "//src/storage-controller:mz_storage_controller", "//src/storage-types:mz_storage_types", + "//src/transform:mz_transform", ] + all_crate_deps( normal = True, normal_dev = True, diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 2720be308d0eb..ce3dbc520af0e 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -27,8 +27,10 @@ mz-audit-log = { path = "../audit-log" } mz-build-info = { path = "../build-info" } mz-cloud-resources = { path = "../cloud-resources" } mz-compute-client = { path = "../compute-client" } +mz-compute-types = { path = "../compute-types" } mz-controller = { path = "../controller" } mz-controller-types = { path = "../controller-types" } +mz-durable-cache = { path = "../durable-cache" } mz-expr = { path = "../expr" } mz-orchestrator = { path = "../orchestrator" } mz-ore = { path = "../ore", features = ["chrono", "async", "tracing_"] } @@ -44,6 +46,7 @@ mz-sql-parser = { path = "../sql-parser" } mz-storage-client = { path = "../storage-client" } mz-storage-controller = { path = "../storage-controller" } mz-storage-types = { path = "../storage-types" } +mz-transform = { path = "../transform" } paste = "1.0.11" prometheus = { version = "0.13.3", default-features = false } proptest = { version = "1.0.0", default-features = false, features = ["std"] } @@ -55,11 +58,13 @@ semver = { version = "1.0.16" } serde = "1.0.152" serde_json = "1.0.125" serde_plain = "1.0.1" +smallvec = { version = "1.10.0", features = ["union"] } static_assertions = "1.1" sha2 = "0.10.6" +thiserror = "1.0.37" timely = "0.12.0" +tokio = { version = "1.38.0" } tracing = "0.1.37" -thiserror = "1.0.37" uuid = "1.2.2" workspace-hack = { version = "0.0.0", path = "../workspace-hack" } @@ -69,7 +74,6 @@ insta = "1.32" mz-build-tools = { path = "../build-tools", default-features = false } mz-postgres-util = { path = "../postgres-util" } similar-asserts = "1.4" -tokio = { version = "1.38.0" } tokio-postgres = { version = "0.7.8" } [build-dependencies] diff --git a/src/catalog/src/durable.rs b/src/catalog/src/durable.rs index f3b8eb239b758..8f16a1f9624fe 100644 --- a/src/catalog/src/durable.rs +++ b/src/catalog/src/durable.rs @@ -36,7 +36,7 @@ pub use crate::durable::objects::{ Role, Schema, SourceReference, SourceReferences, StorageCollectionMetadata, SystemConfiguration, SystemObjectDescription, SystemObjectMapping, UnfinalizedShard, }; -pub use crate::durable::persist::builtin_migration_shard_id; +pub use crate::durable::persist::{builtin_migration_shard_id, expression_cache_shard_id}; use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState}; pub use crate::durable::transaction::Transaction; use crate::durable::transaction::TransactionBatch; diff --git a/src/catalog/src/durable/persist.rs b/src/catalog/src/durable/persist.rs index 2c53eb2e055ae..666355ce419a1 100644 --- a/src/catalog/src/durable/persist.rs +++ b/src/catalog/src/durable/persist.rs @@ -117,6 +117,8 @@ const CATALOG_SEED: usize = 1; const UPGRADE_SEED: usize = 2; /// Seed used to generate the persist shard ID for builtin table migrations. const BUILTIN_MIGRATION_SEED: usize = 3; +/// Seed used to generate the persist shard ID for the expression cache. +const EXPRESSION_CACHE_SEED: usize = 4; /// Durable catalog mode that dictates the effect of mutable operations. #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -1701,6 +1703,12 @@ pub fn builtin_migration_shard_id(organization_id: Uuid) -> ShardId { shard_id(organization_id, BUILTIN_MIGRATION_SEED) } +/// Deterministically generate an expression cache shard ID for the given +/// `organization_id`. +pub fn expression_cache_shard_id(organization_id: Uuid) -> ShardId { + shard_id(organization_id, EXPRESSION_CACHE_SEED) +} + /// Deterministically generate a shard ID for the given `organization_id` and `seed`. fn shard_id(organization_id: Uuid, seed: usize) -> ShardId { let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec(); diff --git a/src/catalog/src/expr_cache.rs b/src/catalog/src/expr_cache.rs new file mode 100644 index 0000000000000..f858695476d23 --- /dev/null +++ b/src/catalog/src/expr_cache.rs @@ -0,0 +1,594 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! A cache for optimized expressions. + +#![allow(dead_code)] + +use std::borrow::Cow; +use std::collections::{BTreeMap, BTreeSet}; +use std::future::Future; +use std::sync::Arc; + +use mz_compute_types::dataflows::DataflowDescription; +use mz_durable_cache::{DurableCache, DurableCacheCodec}; +use mz_expr::OptimizedMirRelationExpr; +use mz_ore::channel::trigger; +use mz_ore::task::spawn; +use mz_persist_client::PersistClient; +use mz_persist_types::codec_impls::UnitSchema; +use mz_persist_types::Codec; +use mz_repr::adt::jsonb::Jsonb; +use mz_repr::optimize::OptimizerFeatures; +use mz_repr::{GlobalId, RelationDesc, ScalarType}; +use mz_storage_types::sources::SourceData; +use mz_transform::dataflow::DataflowMetainfo; +use mz_transform::notice::OptimizerNotice; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; +use smallvec::SmallVec; +use timely::Container; +use tokio::sync::mpsc; +use tracing::debug; +use uuid::Uuid; + +use crate::durable::expression_cache_shard_id; + +/// The data that is cached per catalog object. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct Expressions { + local_mir: OptimizedMirRelationExpr, + global_mir: DataflowDescription, + physical_plan: DataflowDescription, + dataflow_metainfos: DataflowMetainfo>, + notices: SmallVec<[Arc; 4]>, + optimizer_features: OptimizerFeatures, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)] +struct CacheKey { + deploy_generation: u64, + id: GlobalId, +} + +#[derive(Debug, Serialize, Deserialize)] +struct CodecKey<'a> { + cache_key: Cow<'a, CacheKey>, + expressions: Cow<'a, Expressions>, +} + +#[derive(Debug)] +struct ExpressionCodec; + +impl DurableCacheCodec for ExpressionCodec { + type Key = CacheKey; + type Val = Expressions; + type KeyCodec = SourceData; + type ValCodec = (); + + fn schemas() -> ( + ::Schema, + ::Schema, + ) { + ( + RelationDesc::builder() + .with_column("data", ScalarType::Jsonb.nullable(false)) + .finish(), + UnitSchema::default(), + ) + } + + fn encode(key: &Self::Key, val: &Self::Val) -> (Self::KeyCodec, Self::ValCodec) { + let codec_key = CodecKey { + cache_key: Cow::Borrowed(key), + expressions: Cow::Borrowed(val), + }; + let serde_value = serde_json::to_value(codec_key).expect("valid json"); + let jsonb = + Jsonb::from_serde_json(serde_value).expect("contained integers should fit in f64"); + let row = jsonb.into_row(); + let source_data = SourceData(Ok(row)); + (source_data, ()) + } + + fn decode(key: Self::KeyCodec, _val: Self::ValCodec) -> (Self::Key, Self::Val) { + let row = key.0.expect("only Ok values stored in expression cache"); + let jsonb = Jsonb::from_row(row); + let serde_value = jsonb.as_ref().to_serde_json(); + let codec_key: CodecKey = + serde_json::from_value(serde_value).expect("jsonb should roundtrip"); + ( + codec_key.cache_key.into_owned(), + codec_key.expressions.into_owned(), + ) + } +} + +/// Configuration needed to initialize an [`ExpressionCache`]. +#[derive(Debug, Clone)] +pub struct ExpressionCacheConfig<'a> { + deploy_generation: u64, + persist: &'a PersistClient, + organization_id: Uuid, + current_ids: &'a BTreeSet, + optimizer_features: &'a OptimizerFeatures, + remove_prior_gens: bool, +} + +/// A durable cache of optimized expressions. +struct ExpressionCache { + deploy_generation: u64, + durable_cache: DurableCache, +} + +impl ExpressionCache { + /// Creates a new [`ExpressionCache`] and reconciles all entries in current deploy generation. + /// Reconcilliation will remove all entries that are not in `current_ids` and remove all + /// entries that have optimizer features that are not equal to `optimizer_features`. + /// + /// If `remove_prior_gens` is `true`, all previous generations are durably removed from the + /// cache. + /// + /// Returns all cached expressions in the current deploy generation, after reconciliation. + async fn open( + ExpressionCacheConfig { + deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }: ExpressionCacheConfig<'_>, + ) -> (Self, BTreeMap) { + let shard_id = expression_cache_shard_id(organization_id); + let durable_cache = DurableCache::new(persist, shard_id, "expressions").await; + let mut cache = Self { + deploy_generation, + durable_cache, + }; + + while let Err(err) = cache + .try_open(current_ids, optimizer_features, remove_prior_gens) + .await + { + debug!("failed to open cache: {err} ... retrying"); + } + + let entries = cache + .durable_cache + .entries_local() + .filter(|(key, _)| key.deploy_generation == cache.deploy_generation) + .map(|(key, expressions)| (key.id.clone(), expressions.clone())) + .collect(); + + (cache, entries) + } + + async fn try_open( + &mut self, + current_ids: &BTreeSet, + optimizer_features: &OptimizerFeatures, + remove_prior_gens: bool, + ) -> Result<(), mz_durable_cache::Error> { + let mut keys_to_remove = Vec::new(); + + for (key, expressions) in self.durable_cache.entries_local() { + if key.deploy_generation == self.deploy_generation { + // Remove dropped IDs. + if !current_ids.contains(&key.id) { + keys_to_remove.push((key.clone(), None)); + } + + // Remove expressions that were cached with different features. + if expressions.optimizer_features != *optimizer_features { + keys_to_remove.push((key.clone(), None)); + } + } else if remove_prior_gens { + // Remove expressions from previous generations. + keys_to_remove.push((key.clone(), None)); + } + } + + let keys_to_remove: Vec<_> = keys_to_remove + .iter() + .map(|(key, expressions)| (key, expressions.as_ref())) + .collect(); + self.durable_cache.try_set_many(&keys_to_remove).await + } + + /// Durably removes all entries given by `invalidate_ids` and inserts `new_entries` into + /// current deploy generation. + /// + /// If there is a duplicate ID in both `invalidate_ids` and `new_entries`, then the final value + /// will be taken from `new_entries`. + async fn insert_expressions( + &mut self, + new_entries: Vec<(GlobalId, Expressions)>, + invalidate_ids: BTreeSet, + ) { + let mut entries = BTreeMap::new(); + // Important to do `invalidate_ids` first, so that `new_entries` overwrites duplicate keys. + for id in invalidate_ids { + entries.insert( + CacheKey { + id, + deploy_generation: self.deploy_generation, + }, + None, + ); + } + for (id, expressions) in new_entries { + entries.insert( + CacheKey { + id, + deploy_generation: self.deploy_generation, + }, + Some(expressions), + ); + } + let entries: Vec<_> = entries + .iter() + .map(|(key, expressions)| (key, expressions.as_ref())) + .collect(); + self.durable_cache.set_many(&entries).await + } +} + +/// Operations to perform on the cache. +enum CacheOperation { + /// See [`ExpressionCache::insert_expressions`]. + Insert { + new_entries: Vec<(GlobalId, Expressions)>, + invalidate_ids: BTreeSet, + trigger: trigger::Trigger, + }, +} + +struct ExpressionCacheHandle { + tx: mpsc::UnboundedSender, +} + +impl ExpressionCacheHandle { + /// Spawns a task responsible for managing the expression cache. See [`ExpressionCache::open`]. + /// + /// Returns a handle to interact with the cache and the initial contents of the cache. + async fn spawn_expression_cache( + config: ExpressionCacheConfig<'_>, + ) -> (Self, BTreeMap) { + let (mut cache, entries) = ExpressionCache::open(config).await; + let (tx, mut rx) = mpsc::unbounded_channel(); + spawn(|| "expression-cache-task", async move { + loop { + while let Some(op) = rx.recv().await { + match op { + CacheOperation::Insert { + new_entries, + invalidate_ids, + trigger: _trigger, + } => cache.insert_expressions(new_entries, invalidate_ids).await, + } + } + } + }); + + (Self { tx }, entries) + } + + fn insert_expressions( + &self, + new_entries: Vec<(GlobalId, Expressions)>, + invalidate_ids: BTreeSet, + ) -> impl Future { + let (trigger, trigger_rx) = trigger::channel(); + let op = CacheOperation::Insert { + new_entries, + invalidate_ids, + trigger, + }; + // If the send fails, then we must be shutting down. + let _ = self.tx.send(op); + trigger_rx + } +} + +#[cfg(test)] +mod tests { + use std::collections::{BTreeMap, BTreeSet}; + use std::sync::Arc; + + use mz_compute_types::dataflows::{DataflowDescription, DataflowExpirationDesc}; + use mz_durable_cache::DurableCacheCodec; + use mz_expr::{MirRelationExpr, OptimizedMirRelationExpr}; + use mz_persist_client::PersistClient; + use mz_repr::optimize::OptimizerFeatures; + use mz_repr::{GlobalId, RelationType}; + use mz_transform::dataflow::DataflowMetainfo; + use mz_transform::notice::OptimizerNotice; + use proptest::arbitrary::{any, Arbitrary}; + use proptest::prelude::{BoxedStrategy, ProptestConfig}; + use proptest::proptest; + use proptest::strategy::Strategy; + use proptest::test_runner::TestRunner; + use smallvec::SmallVec; + use timely::progress::Antichain; + use uuid::Uuid; + + use crate::expr_cache::{ + CacheKey, ExpressionCacheConfig, ExpressionCacheHandle, ExpressionCodec, Expressions, + }; + + impl Arbitrary for Expressions { + type Parameters = (); + fn arbitrary_with((): Self::Parameters) -> Self::Strategy { + // It would be better to implement `Arbitrary for these types, but that would be extremely + // painful, so we just manually construct very simple instances. + let local_mir = OptimizedMirRelationExpr(MirRelationExpr::Constant { + rows: Ok(Vec::new()), + typ: RelationType::empty(), + }); + let global_mir = DataflowDescription::new("gmir".to_string()); + let physical_plan = DataflowDescription { + source_imports: Default::default(), + index_imports: Default::default(), + objects_to_build: Vec::new(), + index_exports: Default::default(), + sink_exports: Default::default(), + as_of: Default::default(), + until: Antichain::new(), + initial_storage_as_of: None, + refresh_schedule: None, + debug_name: "pp".to_string(), + dataflow_expiration_desc: DataflowExpirationDesc::default(), + }; + + let dataflow_metainfos = any::>>(); + let notices = any::<[Arc; 4]>(); + let optimizer_feature = any::(); + + (dataflow_metainfos, notices, optimizer_feature) + .prop_map(move |(dataflow_metainfos, notices, optimizer_feature)| { + let local_mir = local_mir.clone(); + let global_mir = global_mir.clone(); + let physical_plan = physical_plan.clone(); + let notices = SmallVec::from_const(notices); + Expressions { + local_mir, + global_mir, + physical_plan, + dataflow_metainfos, + notices, + optimizer_features: optimizer_feature, + } + }) + .boxed() + } + + type Strategy = BoxedStrategy; + } + + fn generate_expressions() -> Expressions { + Expressions::arbitrary() + .new_tree(&mut TestRunner::default()) + .expect("valid expression") + .current() + } + + #[mz_ore::test(tokio::test)] + async fn expression_cache() { + let first_deploy_generation = 0; + let second_deploy_generation = 1; + let persist = &PersistClient::new_for_tests().await; + let organization_id = Uuid::new_v4(); + + let current_ids = &mut BTreeSet::new(); + let optimizer_features = &mut OptimizerFeatures::default(); + let mut remove_prior_gens = false; + + let mut next_id = 0; + + let mut exps = { + // Open a new empty cache. + let (cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!(entries, BTreeMap::new(), "new cache should be empty"); + + // Insert some expressions into the cache. + let mut exps = BTreeMap::new(); + for _ in 0..5 { + let mut exp = (GlobalId::User(next_id), generate_expressions()); + next_id += 1; + exp.1.optimizer_features = optimizer_features.clone(); + cache + .insert_expressions(vec![exp.clone()], BTreeSet::new()) + .await; + current_ids.insert(exp.0); + exps.insert(exp.0, exp.1); + } + exps + }; + + { + // Re-open the cache. + let (cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, exps, + "re-opening the cache should recover the expressions" + ); + + // Insert an expression with non-matching optimizer features. + let mut exp = (GlobalId::User(next_id), generate_expressions()); + next_id += 1; + let mut optimizer_features = optimizer_features.clone(); + optimizer_features.enable_eager_delta_joins = + !optimizer_features.enable_eager_delta_joins; + exp.1.optimizer_features = optimizer_features.clone(); + cache + .insert_expressions(vec![exp.clone()], BTreeSet::new()) + .await; + current_ids.insert(exp.0); + } + + { + // Re-open the cache. + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, exps, + "expression with non-matching optimizer features should be removed during reconciliation" + ); + } + + { + // Simulate dropping an object. + let id_to_remove = exps.keys().next().expect("not empty").clone(); + current_ids.remove(&id_to_remove); + let _removed_exp = exps.remove(&id_to_remove); + + // Re-open the cache. + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, exps, + "dropped objects should be removed during reconciliation" + ); + } + + let new_gen_exps = { + // Open the cache at a new generation. + let (cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: second_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!(entries, BTreeMap::new(), "new generation should be empty"); + + // Insert some expressions at the new generation. + let mut exps = BTreeMap::new(); + for _ in 0..5 { + let mut exp = (GlobalId::User(next_id), generate_expressions()); + next_id += 1; + exp.1.optimizer_features = optimizer_features.clone(); + cache + .insert_expressions(vec![exp.clone()], BTreeSet::new()) + .await; + current_ids.insert(exp.0); + exps.insert(exp.0, exp.1); + } + exps + }; + + { + // Re-open the cache at the first generation. + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, exps, + "Previous generation expressions should still exist" + ); + } + + { + // Open the cache at a new generation and clear previous generations. + remove_prior_gens = true; + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: second_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, new_gen_exps, + "new generation expressions should be persisted" + ); + } + + { + // Re-open the cache at the first generation. + let (_cache, entries) = + ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { + deploy_generation: first_deploy_generation, + persist, + organization_id, + current_ids, + optimizer_features, + remove_prior_gens, + }) + .await; + assert_eq!( + entries, + BTreeMap::new(), + "Previous generation expressions should be cleared" + ); + } + } + + proptest! { + #![proptest_config(ProptestConfig::with_cases(32))] + + #[mz_ore::test] + #[cfg_attr(miri, ignore)] + fn expr_cache_roundtrip((key, val) in any::<(CacheKey, Expressions)>()) { + let (encoded_key, encoded_val) = ExpressionCodec::encode(&key, &val); + let (decoded_key, decoded_val) = ExpressionCodec::decode(encoded_key, encoded_val); + + assert_eq!(key, decoded_key); + assert_eq!(val, decoded_val); + } + } +} diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index be9e43d4575d3..d75b4e2db06f1 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -17,6 +17,7 @@ use mz_adapter_types::connection::ConnectionId; pub mod builtin; pub mod config; pub mod durable; +mod expr_cache; pub mod memory; pub static SYSTEM_CONN_ID: ConnectionId = ConnectionId::Static(0); diff --git a/src/durable-cache/src/lib.rs b/src/durable-cache/src/lib.rs index 34d5360fc8ccb..9e4392192f892 100644 --- a/src/durable-cache/src/lib.rs +++ b/src/durable-cache/src/lib.rs @@ -10,7 +10,7 @@ //! The crate provides a durable key-value cache abstraction implemented by persist. use std::collections::BTreeMap; -use std::fmt::Debug; +use std::fmt::{Debug, Formatter}; use std::hash::Hash; use std::sync::Arc; @@ -22,6 +22,7 @@ use mz_persist_client::write::WriteHandle; use mz_persist_client::{Diagnostics, PersistClient}; use mz_persist_types::{Codec, ShardId}; use timely::progress::Antichain; +use tracing::debug; pub trait DurableCacheCodec { type Key: Ord + Hash + Clone + Debug; @@ -37,9 +38,17 @@ pub trait DurableCacheCodec { fn decode(key: Self::KeyCodec, val: Self::ValCodec) -> (Self::Key, Self::Val); } -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum Error { - WriteConflict, + WriteConflict(UpperMismatch), +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Error::WriteConflict(err) => write!(f, "{err}"), + } + } } #[derive(Debug)] @@ -61,7 +70,10 @@ impl DurableCache { shard_id, Arc::new(key_schema), Arc::new(val_schema), - Diagnostics::from_purpose(&format!("durable persist cache: {purpose}")), + Diagnostics { + shard_name: format!("{purpose}_cache"), + handle_purpose: format!("durable persist cache: {purpose}"), + }, use_critical_since, ) .await @@ -186,7 +198,9 @@ impl DurableCache { /// /// Failures will update the cache and retry until the cache is written successfully. pub async fn set(&mut self, key: &C::Key, value: Option<&C::Val>) { - while self.try_set(key, value).await.is_err() {} + while let Err(err) = self.try_set(key, value).await { + debug!("failed to set entry: {err} ... retrying"); + } } /// Durably set multiple key-value pairs in `entries`. Values of `None` deletes the @@ -194,7 +208,9 @@ impl DurableCache { /// /// Failures will update the cache and retry until the cache is written successfully. pub async fn set_many(&mut self, entries: &[(&C::Key, Option<&C::Val>)]) { - while self.try_set_many(entries).await.is_err() {} + while let Err(err) = self.try_set_many(entries).await { + debug!("failed to set entries: {err} ... retrying"); + } } /// Tries to durably set `key` to `value`. A `value` of `None` deletes the entry from the cache. @@ -247,8 +263,8 @@ impl DurableCache { Ok(()) } Err(err) => { - self.sync_to(err.current.into_option()).await; - Err(Error::WriteConflict) + self.sync_to(err.current.clone().into_option()).await; + Err(Error::WriteConflict(err)) } } } diff --git a/src/ore/src/serde.rs b/src/ore/src/serde.rs index 22ac46d9eea52..ab158b5cc3a12 100644 --- a/src/ore/src/serde.rs +++ b/src/ore/src/serde.rs @@ -32,3 +32,51 @@ where } s.end() } + +/// Used to deserialize fields of [`std::collections::BTreeMap`] whose key type is not a native +/// string. Annotate the field with +/// `#[serde(deserialize_with = "mz_ore::serde::string_key_to_btree_map")]`. +pub fn string_key_to_btree_map<'de, K, V, D>( + deserializer: D, +) -> Result, D::Error> +where + K: std::str::FromStr + Ord + std::fmt::Debug, + K::Err: std::fmt::Display, + V: serde::Deserialize<'de>, + D: serde::Deserializer<'de>, +{ + struct BTreeMapVisitor { + _phantom: std::marker::PhantomData<(K, V)>, + } + + impl<'de, K, V> serde::de::Visitor<'de> for BTreeMapVisitor + where + K: std::str::FromStr + Ord, + V: serde::Deserialize<'de>, + K::Err: std::fmt::Display, + { + type Value = std::collections::BTreeMap; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(formatter, "a map with keys that implement FromStr") + } + + fn visit_map(self, mut access: A) -> Result + where + A: serde::de::MapAccess<'de>, + { + let mut map = std::collections::BTreeMap::new(); + + while let Some((key, value)) = access.next_entry::()? { + let key = K::from_str(&key).map_err(serde::de::Error::custom)?; + map.insert(key, value); + } + + Ok(map) + } + } + + deserializer.deserialize_map(BTreeMapVisitor { + _phantom: Default::default(), + }) +} diff --git a/src/repr/src/global_id.rs b/src/repr/src/global_id.rs index 12dc3a44b0a4f..0b971eae22c5f 100644 --- a/src/repr/src/global_id.rs +++ b/src/repr/src/global_id.rs @@ -84,6 +84,9 @@ impl FromStr for GlobalId { if s.len() < 2 { return Err(anyhow!("couldn't parse id {}", s)); } + if s == "Explained Query" { + return Ok(GlobalId::Explain); + } let val: u64 = s[1..].parse()?; match s.chars().next().unwrap() { 's' => Ok(GlobalId::System(val)), diff --git a/src/repr/src/optimize.rs b/src/repr/src/optimize.rs index ac5b0304a75df..c016aa968900b 100644 --- a/src/repr/src/optimize.rs +++ b/src/repr/src/optimize.rs @@ -11,12 +11,13 @@ use std::collections::BTreeMap; +use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; /// A macro for feature flags managed by the optimizer. macro_rules! optimizer_feature_flags { ({ $($feature:ident: $type:ty,)* }) => { - #[derive(Clone, Debug, Default)] + #[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, Arbitrary)] pub struct OptimizerFeatures { $(pub $feature: $type),* } diff --git a/src/transform/Cargo.toml b/src/transform/Cargo.toml index f65b42470478d..e6a22422b7218 100644 --- a/src/transform/Cargo.toml +++ b/src/transform/Cargo.toml @@ -20,6 +20,9 @@ mz-repr = { path = "../repr", features = ["tracing_"] } num-traits = "0.2" ordered-float = { version = "4.2.0", features = ["serde"] } paste = "1.0.11" +proptest = { version = "1.0.0", default-features = false, features = ["std"] } +proptest-derive = { version = "0.3.0" } +serde = "1.0.152" tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack" } diff --git a/src/transform/src/dataflow.rs b/src/transform/src/dataflow.rs index c82f290f0d4ea..6544d167616ec 100644 --- a/src/transform/src/dataflow.rs +++ b/src/transform/src/dataflow.rs @@ -26,6 +26,8 @@ use mz_ore::stack::{CheckedRecursion, RecursionGuard, RecursionLimitError}; use mz_ore::{assert_none, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log}; use mz_repr::explain::{DeltaJoinIndexUsageType, IndexUsageType, UsedIndexes}; use mz_repr::GlobalId; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; use crate::monotonic::MonotonicFlag; use crate::notice::RawOptimizerNotice; @@ -1223,13 +1225,15 @@ impl IndexUsageContext { /// Extra information about the dataflow. This is not going to be shipped, but has to be processed /// in other ways, e.g., showing notices to the user, or saving meta-information to the catalog. -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)] pub struct DataflowMetainfo { /// Notices that the optimizer wants to show to users. /// For pushing a new element, use [`Self::push_optimizer_notice_dedup`]. pub optimizer_notices: Vec, /// What kind of operation (full scan, lookup, ...) will access each index. Computed by /// `prune_and_annotate_dataflow_index_imports`. + #[serde(serialize_with = "mz_ore::serde::map_key_to_string")] + #[serde(deserialize_with = "mz_ore::serde::string_key_to_btree_map")] pub index_usage_types: BTreeMap>, } diff --git a/src/transform/src/notice.rs b/src/transform/src/notice.rs index 1ddfbdc394c0d..66f2043fdb314 100644 --- a/src/transform/src/notice.rs +++ b/src/transform/src/notice.rs @@ -44,8 +44,10 @@ use std::{concat, stringify}; use enum_kinds::EnumKind; use mz_repr::explain::ExprHumanizer; use mz_repr::GlobalId; +use proptest_derive::Arbitrary; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)] /// An long lived in-memory representation of a [`RawOptimizerNotice`] that is /// meant to be kept as part of the hydrated catalog state. pub struct OptimizerNotice { @@ -121,7 +123,9 @@ impl OptimizerNotice { } } -#[derive(EnumKind, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[derive( + EnumKind, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary, +)] #[enum_kind(ActionKind)] /// An action attached to an [`OptimizerNotice`] pub enum Action { @@ -282,7 +286,7 @@ macro_rules! raw_optimizer_notices { paste::paste!{ /// Notices that the optimizer wants to show to users. #[derive(EnumKind, Clone, Debug, Eq, PartialEq, Hash)] - #[enum_kind(OptimizerNoticeKind, derive(PartialOrd, Ord, Hash))] + #[enum_kind(OptimizerNoticeKind, derive(PartialOrd, Ord, Hash, Serialize, Deserialize,Arbitrary))] pub enum RawOptimizerNotice { $( #[doc = concat!("See [`", stringify!($ty), "`].")]