Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Implement expression cache #30122

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 6 additions & 16 deletions doc/developer/design/20241008_expression_cache.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,7 @@ struct Expressions {
physical_plan: DataflowDescription<mz_compute_types::plan::Plan>,
dataflow_metainfos: DataflowMetainfo<Arc<OptimizerNotice>>,
notices: SmallVec<[Arc<OptimizerNotice>; 4]>,
optimizer_feature_overrides: OptimizerFeatures,
}

struct NewEntry {
/// `GlobalId` of the new expression.
id: GlobalId,
/// New `Expressions` to cache.
expressions: Expressions,
/// `GlobalId`s to invalidate as a result of the new entry.
invalidate_ids: BTreeSet<GlobalId>,
optimizer_feature: OptimizerFeatures,
}

struct ExpressionCache {
Expand All @@ -100,13 +91,12 @@ impl ExpressionCache {
/// Returns all cached expressions in the current deploy generation, after reconciliation.
fn open(&mut self, current_ids: &BTreeSet<GlobalId>, optimizer_features: &OptimizerFeatures, remove_prior_gens: bool) -> Vec<(GlobalId, Expressions)>;

/// Durably inserts `expressions` into current deploy generation. This may also invalidate
/// entries giving by `expressions`.
///
/// Returns a [`Future`] that completes once the changes have been made durable.
/// Durably removes all entries given by `invalidate_ids` and inserts `new_entries` into
/// current deploy generation.
///
/// Panics if any `GlobalId` already exists in the cache.
fn insert_expressions(&mut self, expressions: Vec<NewEntry>) -> impl Future<Output=()>;
/// If there is a duplicate ID in both `invalidate_ids` and `new_entries`, then the final value
/// will be taken from `new_entries`.
fn insert_expressions(&mut self, new_entries: Vec<(GlobalId, Expressions)>, invalidate_ids: BTreeSet<GlobalId>);

/// Durably remove and return all entries in current deploy generation that depend on an ID in
/// `dropped_ids` .
Expand Down
18 changes: 18 additions & 0 deletions src/catalog/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ rust_library(
"//src/build-info:mz_build_info",
"//src/cloud-resources:mz_cloud_resources",
"//src/compute-client:mz_compute_client",
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/expr:mz_expr",
"//src/orchestrator:mz_orchestrator",
"//src/ore:mz_ore",
Expand All @@ -53,6 +55,7 @@ rust_library(
"//src/storage-client:mz_storage_client",
"//src/storage-controller:mz_storage_controller",
"//src/storage-types:mz_storage_types",
"//src/transform:mz_transform",
] + all_crate_deps(normal = True),
)

Expand Down Expand Up @@ -87,8 +90,10 @@ rust_test(
"//src/build-tools:mz_build_tools",
"//src/cloud-resources:mz_cloud_resources",
"//src/compute-client:mz_compute_client",
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/expr:mz_expr",
"//src/orchestrator:mz_orchestrator",
"//src/ore:mz_ore",
Expand All @@ -105,6 +110,7 @@ rust_test(
"//src/storage-client:mz_storage_client",
"//src/storage-controller:mz_storage_controller",
"//src/storage-types:mz_storage_types",
"//src/transform:mz_transform",
] + all_crate_deps(
normal = True,
normal_dev = True,
Expand All @@ -121,8 +127,10 @@ rust_doc_test(
"//src/build-tools:mz_build_tools",
"//src/cloud-resources:mz_cloud_resources",
"//src/compute-client:mz_compute_client",
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/expr:mz_expr",
"//src/orchestrator:mz_orchestrator",
"//src/ore:mz_ore",
Expand All @@ -139,6 +147,7 @@ rust_doc_test(
"//src/storage-client:mz_storage_client",
"//src/storage-controller:mz_storage_controller",
"//src/storage-types:mz_storage_types",
"//src/transform:mz_transform",
] + all_crate_deps(
normal = True,
normal_dev = True,
Expand Down Expand Up @@ -193,8 +202,10 @@ rust_test(
"//src/build-tools:mz_build_tools",
"//src/cloud-resources:mz_cloud_resources",
"//src/compute-client:mz_compute_client",
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/expr:mz_expr",
"//src/orchestrator:mz_orchestrator",
"//src/ore:mz_ore",
Expand All @@ -211,6 +222,7 @@ rust_test(
"//src/storage-client:mz_storage_client",
"//src/storage-controller:mz_storage_controller",
"//src/storage-types:mz_storage_types",
"//src/transform:mz_transform",
] + all_crate_deps(
normal = True,
normal_dev = True,
Expand Down Expand Up @@ -247,8 +259,10 @@ rust_test(
"//src/build-tools:mz_build_tools",
"//src/cloud-resources:mz_cloud_resources",
"//src/compute-client:mz_compute_client",
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/expr:mz_expr",
"//src/orchestrator:mz_orchestrator",
"//src/ore:mz_ore",
Expand All @@ -265,6 +279,7 @@ rust_test(
"//src/storage-client:mz_storage_client",
"//src/storage-controller:mz_storage_controller",
"//src/storage-types:mz_storage_types",
"//src/transform:mz_transform",
] + all_crate_deps(
normal = True,
normal_dev = True,
Expand Down Expand Up @@ -301,8 +316,10 @@ rust_test(
"//src/build-tools:mz_build_tools",
"//src/cloud-resources:mz_cloud_resources",
"//src/compute-client:mz_compute_client",
"//src/compute-types:mz_compute_types",
"//src/controller:mz_controller",
"//src/controller-types:mz_controller_types",
"//src/durable-cache:mz_durable_cache",
"//src/expr:mz_expr",
"//src/orchestrator:mz_orchestrator",
"//src/ore:mz_ore",
Expand All @@ -319,6 +336,7 @@ rust_test(
"//src/storage-client:mz_storage_client",
"//src/storage-controller:mz_storage_controller",
"//src/storage-types:mz_storage_types",
"//src/transform:mz_transform",
] + all_crate_deps(
normal = True,
normal_dev = True,
Expand Down
8 changes: 6 additions & 2 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ mz-audit-log = { path = "../audit-log" }
mz-build-info = { path = "../build-info" }
mz-cloud-resources = { path = "../cloud-resources" }
mz-compute-client = { path = "../compute-client" }
mz-compute-types = { path = "../compute-types" }
mz-controller = { path = "../controller" }
mz-controller-types = { path = "../controller-types" }
mz-durable-cache = { path = "../durable-cache" }
mz-expr = { path = "../expr" }
mz-orchestrator = { path = "../orchestrator" }
mz-ore = { path = "../ore", features = ["chrono", "async", "tracing_"] }
Expand All @@ -44,6 +46,7 @@ mz-sql-parser = { path = "../sql-parser" }
mz-storage-client = { path = "../storage-client" }
mz-storage-controller = { path = "../storage-controller" }
mz-storage-types = { path = "../storage-types" }
mz-transform = { path = "../transform" }
paste = "1.0.11"
prometheus = { version = "0.13.3", default-features = false }
proptest = { version = "1.0.0", default-features = false, features = ["std"] }
Expand All @@ -55,11 +58,13 @@ semver = { version = "1.0.16" }
serde = "1.0.152"
serde_json = "1.0.125"
serde_plain = "1.0.1"
smallvec = { version = "1.10.0", features = ["union"] }
static_assertions = "1.1"
sha2 = "0.10.6"
thiserror = "1.0.37"
timely = "0.12.0"
tokio = { version = "1.38.0" }
tracing = "0.1.37"
thiserror = "1.0.37"
uuid = "1.2.2"
workspace-hack = { version = "0.0.0", path = "../workspace-hack" }

Expand All @@ -69,7 +74,6 @@ insta = "1.32"
mz-build-tools = { path = "../build-tools", default-features = false }
mz-postgres-util = { path = "../postgres-util" }
similar-asserts = "1.4"
tokio = { version = "1.38.0" }
tokio-postgres = { version = "0.7.8" }

[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use crate::durable::objects::{
Role, Schema, SourceReference, SourceReferences, StorageCollectionMetadata,
SystemConfiguration, SystemObjectDescription, SystemObjectMapping, UnfinalizedShard,
};
pub use crate::durable::persist::builtin_migration_shard_id;
pub use crate::durable::persist::{builtin_migration_shard_id, expression_cache_shard_id};
use crate::durable::persist::{Timestamp, UnopenedPersistCatalogState};
pub use crate::durable::transaction::Transaction;
use crate::durable::transaction::TransactionBatch;
Expand Down
8 changes: 8 additions & 0 deletions src/catalog/src/durable/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ const CATALOG_SEED: usize = 1;
const UPGRADE_SEED: usize = 2;
/// Seed used to generate the persist shard ID for builtin table migrations.
const BUILTIN_MIGRATION_SEED: usize = 3;
/// Seed used to generate the persist shard ID for the expression cache.
const EXPRESSION_CACHE_SEED: usize = 4;

/// Durable catalog mode that dictates the effect of mutable operations.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand Down Expand Up @@ -1701,6 +1703,12 @@ pub fn builtin_migration_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, BUILTIN_MIGRATION_SEED)
}

/// Deterministically generate an expression cache shard ID for the given
/// `organization_id`.
pub fn expression_cache_shard_id(organization_id: Uuid) -> ShardId {
shard_id(organization_id, EXPRESSION_CACHE_SEED)
}

/// Deterministically generate a shard ID for the given `organization_id` and `seed`.
fn shard_id(organization_id: Uuid, seed: usize) -> ShardId {
let hash = sha2::Sha256::digest(format!("{organization_id}{seed}")).to_vec();
Expand Down
Loading
Loading