Skip to content

DNM: cc wip for testing #30446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -2020,6 +2020,14 @@ impl SessionCatalog for ConnCatalog<'_> {
let comment_id = self.state.get_comment_id(ObjectId::Item(*id));
self.state.comments.get_object_comments(comment_id)
}

fn is_cluster_size_cc(&self, size: &str) -> bool {
self.state
.cluster_replica_sizes
.0
.get(size)
.map_or(false, |a| a.is_cc)
}
}

#[cfg(test)]
1 change: 1 addition & 0 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
@@ -2006,6 +2006,7 @@ impl CatalogState {
workers,
credits_per_hour,
cpu_exclusive: _,
is_cc: _,
disabled: _,
selectors: _,
},
39 changes: 30 additions & 9 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@ use mz_catalog::builtin::{
BUILTIN_PREFIXES, BUILTIN_ROLES, MZ_STORAGE_USAGE_BY_SHARD_DESCRIPTION,
RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
};
use mz_catalog::config::StateConfig;
use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig};
use mz_catalog::durable::objects::{
SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
};
@@ -38,8 +38,8 @@ use mz_catalog::memory::objects::{
};
use mz_catalog::SYSTEM_CONN_ID;
use mz_compute_client::logging::LogVariant;
use mz_controller::clusters::ReplicaLogging;
use mz_controller_types::{is_cluster_size_v2, ClusterId};
use mz_controller::clusters::{ReplicaAllocation, ReplicaLogging};
use mz_controller_types::ClusterId;
use mz_ore::cast::usize_to_u64;
use mz_ore::collections::{CollectionExt, HashSet};
use mz_ore::now::to_datetime;
@@ -297,9 +297,17 @@ impl Catalog {
// TODO(jkosh44) These functions should clean up old clusters, replicas, and
// roles like they do for builtin items and introspection sources, but they
// don't.
add_new_builtin_clusters_migration(&mut txn, &cluster_sizes)?;
add_new_builtin_clusters_migration(
&mut txn,
&cluster_sizes,
&state.cluster_replica_sizes,
)?;
add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
add_new_builtin_cluster_replicas_migration(&mut txn, &cluster_sizes)?;
add_new_builtin_cluster_replicas_migration(
&mut txn,
&cluster_sizes,
&state.cluster_replica_sizes,
)?;
add_new_builtin_roles_migration(&mut txn)?;
remove_invalid_config_param_role_defaults_migration(&mut txn)?;
(migrated_builtins, new_builtins)
@@ -1046,11 +1054,17 @@ fn add_new_remove_old_builtin_items_migration(
fn add_new_builtin_clusters_migration(
txn: &mut mz_catalog::durable::Transaction<'_>,
builtin_cluster_sizes: &BuiltinBootstrapClusterSizes,
cluster_sizes: &ClusterReplicaSizeMap,
) -> Result<(), mz_catalog::durable::CatalogError> {
let cluster_names: BTreeSet<_> = txn.get_clusters().map(|cluster| cluster.name).collect();
for builtin_cluster in BUILTIN_CLUSTERS {
if !cluster_names.contains(builtin_cluster.name) {
let cluster_size = builtin_cluster_sizes.get_size(builtin_cluster.name)?;
let cluster_allocation = cluster_sizes.0.get(&cluster_size).ok_or_else(|| {
mz_catalog::durable::CatalogError::Catalog(
SqlCatalogError::UnknownClusterReplicaSize(cluster_size.clone()),
)
})?;
let id = txn.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
let id = ClusterId::System(id);
txn.insert_system_cluster(
@@ -1061,10 +1075,10 @@ fn add_new_builtin_clusters_migration(
builtin_cluster.owner_id.to_owned(),
mz_catalog::durable::ClusterConfig {
variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
size: cluster_size.clone(),
size: cluster_size,
availability_zones: vec![],
replication_factor: builtin_cluster.replication_factor,
disk: is_cluster_size_v2(&cluster_size),
disk: cluster_allocation.is_cc,
logging: default_logging_config(),
optimizer_feature_overrides: Default::default(),
schedule: Default::default(),
@@ -1135,6 +1149,7 @@ fn add_new_builtin_roles_migration(
fn add_new_builtin_cluster_replicas_migration(
txn: &mut Transaction<'_>,
builtin_cluster_sizes: &BuiltinBootstrapClusterSizes,
cluster_sizes: &ClusterReplicaSizeMap,
) -> Result<(), AdapterError> {
let cluster_lookup: BTreeMap<_, _> = txn
.get_clusters()
@@ -1164,8 +1179,13 @@ fn add_new_builtin_cluster_replicas_migration(
builtin_cluster_sizes.get_size(builtin_replica.cluster_name)?
}
};
let replica_allocation = cluster_sizes.0.get(&replica_size).ok_or_else(|| {
mz_catalog::durable::CatalogError::Catalog(
SqlCatalogError::UnknownClusterReplicaSize(replica_size.clone()),
)
})?;

let config = builtin_cluster_replica_config(replica_size);
let config = builtin_cluster_replica_config(replica_size, replica_allocation);
txn.insert_cluster_replica(
cluster.id,
builtin_replica.name,
@@ -1247,12 +1267,13 @@ fn remove_pending_cluster_replicas_migration(tx: &mut Transaction) -> Result<(),

pub(crate) fn builtin_cluster_replica_config(
replica_size: String,
replica_allocation: &ReplicaAllocation,
) -> mz_catalog::durable::ReplicaConfig {
mz_catalog::durable::ReplicaConfig {
location: mz_catalog::durable::ReplicaLocation::Managed {
availability_zone: None,
billed_as: None,
disk: is_cluster_size_v2(&replica_size),
disk: replica_allocation.is_cc,
pending: false,
internal: false,
size: replica_size,
1 change: 1 addition & 0 deletions src/catalog-debug/src/main.rs
Original file line number Diff line number Diff line change
@@ -542,6 +542,7 @@ async fn upgrade_check(
default_cluster_replica_size:
"DEFAULT CLUSTER REPLICA SIZE IS ONLY USED FOR NEW ENVIRONMENTS".into(),
bootstrap_role: None,
cluster_replica_size_map: ClusterReplicaSizeMap::default(),
},
)
.await?;
7 changes: 7 additions & 0 deletions src/catalog/src/config.rs
Original file line number Diff line number Diff line change
@@ -156,6 +156,7 @@ impl Default for ClusterReplicaSizeMap {
workers: workers.into(),
credits_per_hour: 1.into(),
cpu_exclusive: false,
is_cc: false,
disabled: false,
selectors: BTreeMap::default(),
},
@@ -176,6 +177,7 @@ impl Default for ClusterReplicaSizeMap {
workers: 1,
credits_per_hour: scale.into(),
cpu_exclusive: false,
is_cc: false,
disabled: false,
selectors: BTreeMap::default(),
},
@@ -191,6 +193,7 @@ impl Default for ClusterReplicaSizeMap {
workers: scale.into(),
credits_per_hour: scale.into(),
cpu_exclusive: false,
is_cc: false,
disabled: false,
selectors: BTreeMap::default(),
},
@@ -206,6 +209,7 @@ impl Default for ClusterReplicaSizeMap {
workers: 8,
credits_per_hour: 1.into(),
cpu_exclusive: false,
is_cc: false,
disabled: false,
selectors: BTreeMap::default(),
},
@@ -222,6 +226,7 @@ impl Default for ClusterReplicaSizeMap {
workers: 4,
credits_per_hour: 2.into(),
cpu_exclusive: false,
is_cc: false,
disabled: false,
selectors: BTreeMap::default(),
},
@@ -237,6 +242,7 @@ impl Default for ClusterReplicaSizeMap {
workers: 0,
credits_per_hour: 0.into(),
cpu_exclusive: false,
is_cc: true,
disabled: true,
selectors: BTreeMap::default(),
},
@@ -253,6 +259,7 @@ impl Default for ClusterReplicaSizeMap {
workers: 1,
credits_per_hour: 1.into(),
cpu_exclusive: false,
is_cc: true,
disabled: false,
selectors: BTreeMap::default(),
},
3 changes: 3 additions & 0 deletions src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ use mz_ore::now::EpochMillis;
use mz_persist_client::PersistClient;
use mz_repr::GlobalId;

use crate::config::ClusterReplicaSizeMap;
use crate::durable::debug::{DebugCatalogState, Trace};
pub use crate::durable::error::{CatalogError, DurableCatalogError, FenceError};
pub use crate::durable::metrics::Metrics;
@@ -69,6 +70,7 @@ pub(crate) const CATALOG_CONTENT_VERSION_KEY: &str = "catalog_content_version";

#[derive(Clone, Debug)]
pub struct BootstrapArgs {
pub cluster_replica_size_map: ClusterReplicaSizeMap,
pub default_cluster_replica_size: String,
pub bootstrap_role: Option<String>,
}
@@ -403,5 +405,6 @@ pub fn test_bootstrap_args() -> BootstrapArgs {
BootstrapArgs {
default_cluster_replica_size: "1".into(),
bootstrap_role: None,
cluster_replica_size_map: ClusterReplicaSizeMap::default(),
}
}
64 changes: 48 additions & 16 deletions src/catalog/src/durable/initialize.rs
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@ use ipnet::IpNet;
use itertools::max;
use mz_audit_log::{CreateOrDropClusterReplicaReasonV1, EventV1, VersionedEvent};
use mz_controller::clusters::ReplicaLogging;
use mz_controller_types::{is_cluster_size_v2, ClusterId, ReplicaId};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::collections::HashSet;
use mz_ore::now::EpochMillis;
use mz_pgrepr::oid::{
@@ -30,8 +30,8 @@ use mz_repr::adt::mz_acl_item::{AclMode, MzAclItem};
use mz_repr::network_policy_id::NetworkPolicyId;
use mz_repr::role_id::RoleId;
use mz_sql::catalog::{
DefaultPrivilegeAclItem, DefaultPrivilegeObject, ObjectType, RoleAttributes, RoleMembership,
RoleVars, SystemObjectType,
CatalogError as SqlCatalogError, DefaultPrivilegeAclItem, DefaultPrivilegeObject, ObjectType,
RoleAttributes, RoleMembership, RoleVars, SystemObjectType,
};
use mz_sql::names::{
DatabaseId, ObjectId, ResolvedDatabaseSpecifier, SchemaId, SchemaSpecifier, PUBLIC_ROLE_NAME,
@@ -619,7 +619,7 @@ pub(crate) async fn initialize(
Vec::new(),
MZ_SYSTEM_ROLE_ID,
cluster_privileges,
default_cluster_config(options),
default_cluster_config(options)?,
&HashSet::new(),
)?;
audit_events.extend([
@@ -665,7 +665,7 @@ pub(crate) async fn initialize(
DEFAULT_USER_CLUSTER_ID,
DEFAULT_USER_REPLICA_ID,
DEFAULT_USER_REPLICA_NAME,
default_replica_config(options),
default_replica_config(options)?,
MZ_SYSTEM_ROLE_ID,
)?;
audit_events.push((
@@ -677,7 +677,19 @@ pub(crate) async fn initialize(
replica_name: DEFAULT_USER_REPLICA_NAME.to_string(),
replica_id: Some(DEFAULT_USER_REPLICA_ID.to_string()),
logical_size: options.default_cluster_replica_size.to_string(),
disk: is_cluster_size_v2(&options.default_cluster_replica_size),
disk: {
let cluster_size = options.default_cluster_replica_size.to_string();
let cluster_allocation = options
.cluster_replica_size_map
.0
.get(&cluster_size)
.ok_or_else(|| {
CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(
cluster_size,
))
})?;
cluster_allocation.is_cc
},
billed_as: None,
internal: false,
reason: CreateOrDropClusterReplicaReasonV1::System,
@@ -753,31 +765,51 @@ pub fn resolve_system_schema(name: &str) -> &Schema {
}

/// Defines the default config for a Cluster.
fn default_cluster_config(args: &BootstrapArgs) -> ClusterConfig {
ClusterConfig {
fn default_cluster_config(args: &BootstrapArgs) -> Result<ClusterConfig, CatalogError> {
let cluster_size = args.default_cluster_replica_size.to_string();
let cluster_allocation = args
.cluster_replica_size_map
.0
.get(&cluster_size)
.ok_or_else(|| {
CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(
cluster_size.clone(),
))
})?;
Ok(ClusterConfig {
variant: ClusterVariant::Managed(ClusterVariantManaged {
size: args.default_cluster_replica_size.to_string(),
size: cluster_size,
replication_factor: 1,
availability_zones: vec![],
logging: ReplicaLogging {
log_logging: false,
interval: Some(Duration::from_secs(1)),
},
disk: is_cluster_size_v2(&args.default_cluster_replica_size),
disk: cluster_allocation.is_cc,
optimizer_feature_overrides: Default::default(),
schedule: Default::default(),
}),
workload_class: None,
}
})
}

/// Defines the default config for a Cluster Replica.
fn default_replica_config(args: &BootstrapArgs) -> ReplicaConfig {
ReplicaConfig {
fn default_replica_config(args: &BootstrapArgs) -> Result<ReplicaConfig, CatalogError> {
let cluster_size = args.default_cluster_replica_size.to_string();
let cluster_allocation = args
.cluster_replica_size_map
.0
.get(&cluster_size)
.ok_or_else(|| {
CatalogError::Catalog(SqlCatalogError::UnknownClusterReplicaSize(
cluster_size.clone(),
))
})?;
Ok(ReplicaConfig {
location: ReplicaLocation::Managed {
size: args.default_cluster_replica_size.to_string(),
size: cluster_size,
availability_zone: None,
disk: is_cluster_size_v2(&args.default_cluster_replica_size),
disk: cluster_allocation.is_cc,
internal: false,
billed_as: None,
pending: false,
@@ -786,5 +818,5 @@ fn default_replica_config(args: &BootstrapArgs) -> ReplicaConfig {
log_logging: false,
interval: Some(Duration::from_secs(1)),
},
}
})
}
7 changes: 4 additions & 3 deletions src/clusterd/src/bin/clusterd.rs
Original file line number Diff line number Diff line change
@@ -127,9 +127,10 @@ struct Args {
#[clap(long)]
announce_memory_limit: Option<usize>,

/// Whether the cluster is using a v2 (cc/C) size or not.
/// Whether this size represents a modern "cc" size rather than a legacy
/// T-shirt size.
#[clap(long)]
is_cluster_size_v2: bool,
is_cc: bool,

/// Set core affinity for Timely workers.
///
@@ -269,7 +270,7 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
.unwrap_or_default();
let mut persist_cfg =
PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), mz_dyncfgs::all_dyncfgs());
persist_cfg.is_cc_active = args.is_cluster_size_v2;
persist_cfg.is_cc_active = args.is_cc;
persist_cfg.announce_memory_limit = args.announce_memory_limit;
let persist_clients = Arc::new(PersistClientCache::new(
persist_cfg,
6 changes: 0 additions & 6 deletions src/controller-types/src/lib.rs
Original file line number Diff line number Diff line change
@@ -28,9 +28,3 @@ impl From<u64> for WatchSetId {
}

pub use mz_compute_types::DEFAULT_COMPUTE_REPLICA_LOGGING_INTERVAL as DEFAULT_REPLICA_LOGGING_INTERVAL;

/// Reports whether a given size name is a "v2" cluster size--i.e., a cluster
/// size that ends in "cc" or "C".
pub fn is_cluster_size_v2(size: &str) -> bool {
size.ends_with("cc") || size.ends_with('C') || size == "mz_probe"
}
Loading