Skip to content

Commit

Permalink
Revert "adapter: Create deterministic log index IDs (MaterializeInc#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
ggevay committed Nov 28, 2024
1 parent 101bc31 commit b9866a8
Show file tree
Hide file tree
Showing 30 changed files with 145 additions and 1,756 deletions.
63 changes: 11 additions & 52 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,51 +518,14 @@ impl Catalog {
Fut: Future<Output = T>,
{
let persist_client = PersistClient::new_for_tests().await;
let organization_id = Uuid::new_v4();
let environmentd_id = Uuid::new_v4();
let bootstrap_args = test_bootstrap_args();
let catalog = Self::open_debug_catalog(persist_client, organization_id, &bootstrap_args)
let catalog = Self::open_debug_catalog(persist_client, environmentd_id, &bootstrap_args)
.await
.expect("can open debug catalog");
f(catalog).await
}

/// Like [`Catalog::with_debug`], but the catalog created believes that bootstrap is still
/// in progress.
pub async fn with_debug_in_bootstrap<F, Fut, T>(f: F) -> T
where
F: FnOnce(Catalog) -> Fut,
Fut: Future<Output = T>,
{
let persist_client = PersistClient::new_for_tests().await;
let organization_id = Uuid::new_v4();
let bootstrap_args = test_bootstrap_args();
let mut catalog =
Self::open_debug_catalog(persist_client.clone(), organization_id, &bootstrap_args)
.await
.expect("can open debug catalog");

// Replace `storage` in `catalog` with one that doesn't think bootstrap is over.
let now = SYSTEM_TIME.clone();
let openable_storage = TestCatalogStateBuilder::new(persist_client)
.with_organization_id(organization_id)
.with_default_deploy_generation()
.build()
.await
.expect("can create durable catalog");
let mut storage = openable_storage
.open(now().into(), &bootstrap_args)
.await
.expect("can open durable catalog");
// Drain updates.
let _ = storage
.sync_to_current_updates()
.await
.expect("can sync to current updates");
catalog.storage = Arc::new(tokio::sync::Mutex::new(storage));

f(catalog).await
}

/// Opens a debug catalog.
///
/// See [`Catalog::with_debug`].
Expand Down Expand Up @@ -768,17 +731,13 @@ impl Catalog {
commit_ts: mz_repr::Timestamp,
) -> Result<(CatalogItemId, GlobalId), Error> {
use mz_ore::collections::CollectionExt;

let mut storage = self.storage().await;
let mut txn = storage.transaction().await?;
let id = txn
.allocate_system_item_ids(1)
.maybe_terminate("allocating system ids")?
.into_element();
// Drain transaction.
let _ = txn.get_and_commit_op_updates();
txn.commit(commit_ts).await?;
Ok(id)
self.storage()
.await
.allocate_system_ids(1, commit_ts)
.await
.maybe_terminate("allocating system ids")
.map(|ids| ids.into_element())
.err_into()
}

/// Get the next system item ID without allocating it.
Expand Down Expand Up @@ -2690,7 +2649,7 @@ mod tests {
assert_eq!(
mz_sql::catalog::ObjectType::ClusterReplica,
conn_catalog.get_object_type(&ObjectId::ClusterReplica((
ClusterId::user(1).expect("1 is a valid ID"),
ClusterId::User(1),
ReplicaId::User(1)
)))
);
Expand All @@ -2712,7 +2671,7 @@ mod tests {
assert_eq!(
None,
conn_catalog.get_privileges(&SystemObjectId::Object(ObjectId::ClusterReplica((
ClusterId::user(1).expect("1 is a valid ID"),
ClusterId::User(1),
ReplicaId::User(1),
))))
);
Expand Down
29 changes: 16 additions & 13 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use mz_catalog::config::{ClusterReplicaSizeMap, StateConfig};
use mz_catalog::durable::objects::{
SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
};
use mz_catalog::durable::{ClusterVariant, ClusterVariantManaged, Transaction};
use mz_catalog::durable::{
ClusterVariant, ClusterVariantManaged, Transaction, SYSTEM_CLUSTER_ID_ALLOC_KEY,
};
use mz_catalog::expr_cache::{
ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
};
Expand Down Expand Up @@ -633,8 +635,6 @@ impl Catalog {
);
}

catalog.storage().await.mark_bootstrap_complete();

Ok(OpenCatalogResult {
catalog,
storage_collections_to_drop,
Expand Down Expand Up @@ -773,10 +773,6 @@ impl Catalog {

let (new_item_id, new_global_id) = match id {
CatalogItemId::System(_) => txn.allocate_system_item_ids(1)?.into_element(),
CatalogItemId::IntrospectionSourceIndex(id) => (
CatalogItemId::IntrospectionSourceIndex(id),
GlobalId::IntrospectionSourceIndex(id),
),
CatalogItemId::User(_) => txn.allocate_user_item_ids(1)?.into_element(),
_ => unreachable!("can't migrate id: {id}"),
};
Expand Down Expand Up @@ -1227,7 +1223,10 @@ fn add_new_builtin_clusters_migration(
if !cluster_names.contains(builtin_cluster.name) {
let cluster_size = builtin_cluster_sizes.get_size(builtin_cluster.name)?;
let cluster_allocation = cluster_sizes.get_allocation_by_name(&cluster_size)?;
let id = txn.get_and_increment_id(SYSTEM_CLUSTER_ID_ALLOC_KEY.to_string())?;
let id = ClusterId::System(id);
txn.insert_system_cluster(
id,
builtin_cluster.name,
vec![],
builtin_cluster.privileges.to_vec(),
Expand Down Expand Up @@ -1267,9 +1266,13 @@ fn add_new_remove_old_builtin_introspection_source_migration(
}
}

for log in new_logs {
let (item_id, gid) =
Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
let new_ids = txn.allocate_system_item_ids(usize_to_u64(new_logs.len()))?;
let new_entries = new_logs
.into_iter()
.zip_eq(new_ids)
.map(|(log, (item_id, gid))| (log, item_id, gid));

for (log, item_id, gid) in new_entries {
new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
}

Expand Down Expand Up @@ -1640,7 +1643,7 @@ mod builtin_migration_tests {
.with_key(vec![0])
.finish(),
resolved_ids: resolved_ids.into_iter().collect(),
cluster_id: ClusterId::user(1).expect("1 is a valid ID"),
cluster_id: ClusterId::User(1),
non_null_assertions: vec![],
custom_logical_compaction_window: None,
refresh_schedule: None,
Expand All @@ -1657,7 +1660,7 @@ mod builtin_migration_tests {
keys: Default::default(),
conn_id: None,
resolved_ids: [(on_item_id, on_gid)].into_iter().collect(),
cluster_id: ClusterId::user(1).expect("1 is a valid ID"),
cluster_id: ClusterId::User(1),
custom_logical_compaction_window: None,
is_retained_metrics_object: false,
})
Expand Down Expand Up @@ -1767,7 +1770,7 @@ mod builtin_migration_tests {
}

async fn run_test_case(test_case: BuiltinMigrationTestCase) {
Catalog::with_debug_in_bootstrap(|mut catalog| async move {
Catalog::with_debug(|mut catalog| async move {
let mut item_id_mapping = BTreeMap::new();
let mut name_mapping = BTreeMap::new();

Expand Down
14 changes: 4 additions & 10 deletions src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use mz_catalog::memory::objects::{
use mz_catalog::SYSTEM_CONN_ID;
use mz_controller::clusters::{ManagedReplicaLocation, ReplicaConfig, ReplicaLocation};
use mz_controller_types::{ClusterId, ReplicaId};
use mz_ore::cast::usize_to_u64;
use mz_ore::collections::HashSet;
use mz_ore::instrument;
use mz_ore::now::EpochMillis;
Expand Down Expand Up @@ -902,19 +903,12 @@ impl Catalog {
let privileges: Vec<_> =
merge_mz_acl_items(owner_privileges.into_iter().chain(default_privileges))
.collect();
let introspection_source_ids: Vec<_> = introspection_sources
.iter()
.map(|introspection_source| {
Transaction::allocate_introspection_source_index_id(
&id,
introspection_source.variant,
)
})
.collect();
let introspection_source_ids =
tx.allocate_system_item_ids(usize_to_u64(introspection_sources.len()))?;

let introspection_sources = introspection_sources
.into_iter()
.zip_eq(introspection_source_ids)
.zip_eq(introspection_source_ids.into_iter())
.map(|(log, (item_id, gid))| (log, item_id, gid))
.collect();

Expand Down
5 changes: 2 additions & 3 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2301,8 +2301,7 @@ impl Coordinator {
let id_too_large = match id {
CatalogItemId::System(id) => *id >= next_system_item_id,
CatalogItemId::User(id) => *id >= next_user_item_id,
CatalogItemId::IntrospectionSourceIndex(_)
| CatalogItemId::Transient(_) => false,
CatalogItemId::Transient(_) => false,
};
if id_too_large {
info!(
Expand Down Expand Up @@ -3625,7 +3624,7 @@ impl Coordinator {

// An arbitrary compute instance ID to satisfy the function calls below. Note that
// this only works because this function will never run.
let compute_instance = ComputeInstanceId::user(1).expect("1 is a valid ID");
let compute_instance = ComputeInstanceId::User(1);

let _: () = self.ship_dataflow(dataflow, compute_instance, None).await;
}
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/validity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ mod tests {
let PlanValidity::Checks { cluster_id, .. } = validity else {
panic!();
};
*cluster_id = Some(ClusterId::user(3).expect("3 is a valid ID"));
*cluster_id = Some(ClusterId::User(3));
}),
Box::new(|res| {
assert_contains!(
Expand Down
2 changes: 0 additions & 2 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ breaking:
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v72.proto
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v73.proto
# reason: does currently not require backward-compatibility
- cluster-client/src/client.proto
# reason: does currently not require backward-compatibility
- compute-client/src/logging.proto
Expand Down
6 changes: 1 addition & 5 deletions src/catalog/protos/hashes.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"name": "objects.proto",
"md5": "65c8ec9661c8a207bc9eb5af098fa98f"
"md5": "2d781c72c4a56b13dfb1b4215f3614f0"
},
{
"name": "objects_v67.proto",
Expand All @@ -26,9 +26,5 @@
{
"name": "objects_v72.proto",
"md5": "b21cb2b1b41649c78405731e53560d59"
},
{
"name": "objects_v73.proto",
"md5": "d5d1a8c6b1aa8212245cfd343a3b8417"
}
]
18 changes: 3 additions & 15 deletions src/catalog/protos/objects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ message ClusterIntrospectionSourceIndexKey {
}

message ClusterIntrospectionSourceIndexValue {
// TODO(parkmycar): Ideally this is a IntrospectionSourceCatalogItemId but making this change panics 0dt
// TODO(parkmycar): Ideally this is a SystemCatalogItemId but making this change panics 0dt
// upgrades if there were new builtin objects added since the older version of Materialize
// doesn't know how to read the new IntrospectionSourceCatalogItemId type.
// doesn't know how to read the new SystemCatalogItemId type.
uint64 index_id = 1;
uint32 oid = 2;
IntrospectionSourceIndexGlobalId global_id = 3;
SystemGlobalId global_id = 3;
}

message ClusterReplicaKey {
Expand Down Expand Up @@ -307,7 +307,6 @@ message CatalogItemId {
uint64 system = 1;
uint64 user = 2;
uint64 transient = 3;
uint64 introspection_source_index = 4;
}
}

Expand All @@ -316,18 +315,12 @@ message SystemCatalogItemId {
uint64 value = 1;
}

/// A newtype wrapper for a `CatalogItemId` that is always in the "introspection source index" namespace.
message IntrospectionSourceIndexCatalogItemId {
uint64 value = 1;
}

message GlobalId {
oneof value {
uint64 system = 1;
uint64 user = 2;
uint64 transient = 3;
Empty explain = 4;
uint64 introspection_source_index = 5;
}
}

Expand All @@ -336,11 +329,6 @@ message SystemGlobalId {
uint64 value = 1;
}

/// A newtype wrapper for a `GlobalId` that is always in the "introspection source index" namespace.
message IntrospectionSourceIndexGlobalId {
uint64 value = 1;
}

message ClusterId {
oneof value {
uint64 system = 1;
Expand Down
Loading

0 comments on commit b9866a8

Please sign in to comment.