Skip to content
Closed

dnr #30584

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,10 @@ impl CatalogState {
self.get_entry(item_id)
}

pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
self.entry_by_id.iter()
}

pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
let schema = self
.temporary_schemas
Expand Down
16 changes: 16 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2567,6 +2567,7 @@ impl Coordinator {
}
};

let mut compute_collections = vec![];
let mut collections = vec![];
let mut new_builtin_continual_tasks = vec![];
for entry in catalog.entries() {
Expand All @@ -2576,6 +2577,7 @@ impl Coordinator {
source.global_id(),
source_desc(&source.data_source, &source.desc, &source.timeline),
));
compute_collections.push((source.global_id(), source.desc.clone()));
}
CatalogItem::Table(table) => {
match &table.data_source {
Expand All @@ -2584,6 +2586,7 @@ impl Coordinator {
(gid, CollectionDescription::for_table(desc.clone()))
});
collections.extend(collections_descs);
compute_collections.extend(table.collection_descs());
}
TableDataSource::DataSource {
desc: data_source_desc,
Expand All @@ -2606,6 +2609,7 @@ impl Coordinator {
status_collection_id: None,
timeline: None,
};
compute_collections.push((mv.global_id(), mv.desc.clone()));
collections.push((mv.global_id(), collection_desc));
}
CatalogItem::ContinualTask(ct) => {
Expand All @@ -2624,6 +2628,7 @@ impl Coordinator {
} else {
collections.push((ct.global_id(), collection_desc));
}
compute_collections.push((ct.global_id(), ct.desc.clone()));
}
_ => (),
}
Expand All @@ -2643,6 +2648,17 @@ impl Coordinator {
.flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
.collect();

// Before possibly creating collections, make sure their schemas are correct.
//
// Across different versions of Materialize the nullability of columns can change based on
// updates to our optimizer.
tracing::info!(?compute_collections, "EVOLVING COLLECTIONS");
self.controller
.storage
.evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
.await
.unwrap_or_terminate("cannot fail to evolve collections");

self.controller
.storage
.create_collections_for_bootstrap(
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ impl<T> ShouldTerminateGracefully for StorageError<T> {
StorageError::ResourceExhausted(_)
| StorageError::CollectionMetadataAlreadyExists(_)
| StorageError::PersistShardAlreadyInUse(_)
| StorageError::PersistSchemaEvolveRace { .. }
| StorageError::PersistInvalidSchemaEvolve { .. }
| StorageError::TxnWalShardAlreadyExists
| StorageError::UpdateBeyondUpper(_)
| StorageError::ReadBeforeSince(_)
Expand Down
4 changes: 4 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ breaking:
- compute-types/src/sinks.proto
# reason: Ignore because plans are currently not persisted.
- expr/src/relation.proto
# reason: we very carefully evolve these protobuf definitions
- persist-client/src/internal/state.proto
# reason: temporarily ignored by ParkMyCar
- repr/src/relation_and_scalar.proto
# reason: does currently not require backward-compatibility
- storage-client/src/client.proto
# reason: does currently not require backward-compatibility
Expand Down
1 change: 1 addition & 0 deletions src/catalog-debug/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rust_binary(
"//src/orchestrator-tracing:mz_orchestrator_tracing",
"//src/ore:mz_ore",
"//src/persist-client:mz_persist_client",
"//src/persist-types:mz_persist_types",
"//src/repr:mz_repr",
"//src/service:mz_service",
"//src/sql:mz_sql",
Expand Down
2 changes: 2 additions & 0 deletions src/catalog-debug/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true

[dependencies]
anyhow = "1.0.66"
arrow = { version = "51.0.0", default-features = false }
clap = { version = "3.2.24", features = ["derive", "env"] }
futures = "0.3.25"
mz-adapter = { path = "../adapter" }
Expand All @@ -21,6 +22,7 @@ mz-orchestrator-tracing = { path = "../orchestrator-tracing" }
mz-ore = { path = "../ore" }
mz-storage-types = { path = "../storage-types" }
mz-persist-client = { path = "../persist-client" }
mz-persist-types = { path = "../persist-types" }
mz-tls-util = { path = "../tls-util" }
mz-repr = { path = "../repr" }
mz-service = { path = "../service" }
Expand Down
67 changes: 62 additions & 5 deletions src/catalog-debug/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use mz_catalog::durable::debug::{
use mz_catalog::durable::{
persist_backed_catalog_state, BootstrapArgs, OpenableDurableCatalogState,
};
use mz_catalog::memory::objects::CatalogItem;
use mz_cloud_resources::AwsExternalIdPrefix;
use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
use mz_ore::cli::{self, CliConfig};
Expand All @@ -49,12 +50,13 @@ use mz_ore::url::SensitiveUrl;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::cfg::PersistConfig;
use mz_persist_client::rpc::PubSubClientConnection;
use mz_persist_client::{PersistClient, PersistLocation};
use mz_persist_client::{Diagnostics, PersistClient, PersistLocation};
use mz_repr::{Diff, Timestamp};
use mz_service::secrets::SecretsReaderCliArgs;
use mz_sql::catalog::EnvironmentId;
use mz_sql::session::vars::ConnectionCounter;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::sources::SourceData;
use serde::{Deserialize, Serialize};
use tracing::{error, Instrument};

Expand Down Expand Up @@ -255,9 +257,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
upgrade_check(
args,
openable_state,
persist_client,
secrets,
cluster_replica_sizes,
persist_client,
start,
)
.await
Expand Down Expand Up @@ -534,9 +536,9 @@ async fn epoch(
async fn upgrade_check(
args: Args,
openable_state: Box<dyn OpenableDurableCatalogState>,
persist_client: PersistClient,
secrets: SecretsReaderCliArgs,
cluster_replica_sizes: ClusterReplicaSizeMap,
persist_client: PersistClient,
start: Instant,
) -> Result<(), anyhow::Error> {
let secrets_reader = secrets.load().await.context("loading secrets reader")?;
Expand Down Expand Up @@ -568,7 +570,7 @@ async fn upgrade_check(
// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
// Because of that we purposefully move this Future onto the heap (i.e. Box it).
let InitializeStateResult {
state: _state,
state,
storage_collections_to_drop: _,
migrated_storage_collections_0dt: _,
new_builtin_collections: _,
Expand Down Expand Up @@ -610,7 +612,7 @@ async fn upgrade_check(
),
active_connection_count: Arc::new(Mutex::new(ConnectionCounter::new(0, 0))),
builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy,
persist_client,
persist_client: persist_client.clone(),
helm_chart_version: None,
},
&mut storage,
Expand All @@ -627,6 +629,61 @@ async fn upgrade_check(
dur.as_millis(),
);
println!("{msg}");

// Check that we can evolve the schema for all Persist shards.
let storage_entries = state
.get_entries()
.filter_map(|(_item_id, entry)| match entry.item() {
// TODO(alter_table): Handle multiple versions of tables.
CatalogItem::Table(table) => Some((table.global_id_writes(), &table.desc)),
CatalogItem::Source(source) => Some((source.global_id(), &source.desc)),
CatalogItem::ContinualTask(ct) => Some((ct.global_id(), &ct.desc)),
CatalogItem::MaterializedView(mv) => Some((mv.global_id(), &mv.desc)),
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => None,
});
for (gid, item_desc) in storage_entries {
let shard_id = state
.storage_metadata()
.get_collection_shard::<Timestamp>(gid)
.context("getting shard_id")?;
let diagnostics = Diagnostics {
shard_name: gid.to_string(),
handle_purpose: "catalog upgrade check".to_string(),
};
let persisted_schema = persist_client
.latest_schema::<SourceData, (), Timestamp, Diff>(shard_id, diagnostics)
.await
.expect("invalid persist usage");
// We should always have schemas registered for Shards, unless their environment happened
// to crash after running DDL and hasn't come back up yet.
let Some((_schema_id, persisted_relation_desc, _)) = persisted_schema else {
anyhow::bail!("no schema found for {gid}, did their environment crash?");
};

let persisted_data_type =
mz_persist_types::columnar::data_type::<SourceData>(&persisted_relation_desc)?;
let new_data_type = mz_persist_types::columnar::data_type::<SourceData>(item_desc)?;

let migration =
mz_persist_types::schema::backward_compatible(&persisted_data_type, &new_data_type);
if migration.is_none() {
anyhow::bail!(
"invalid Persist schema migration!\npersisted: {:?}\n{:?}\nnew: {:?}\n{:?}",
persisted_relation_desc,
persisted_data_type,
item_desc,
new_data_type,
);
}
}

Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9474,9 +9474,9 @@ pub static BUILTINS_STATIC: LazyLock<Vec<Builtin<NameReference>>> = LazyLock::ne
Builtin::View(&MZ_RECENT_STORAGE_USAGE),
Builtin::Index(&MZ_RECENT_STORAGE_USAGE_IND),
Builtin::Connection(&MZ_ANALYTICS),
Builtin::ContinualTask(&MZ_CLUSTER_REPLICA_METRICS_HISTORY_CT),
Builtin::ContinualTask(&MZ_CLUSTER_REPLICA_STATUS_HISTORY_CT),
Builtin::ContinualTask(&MZ_WALLCLOCK_LAG_HISTORY_CT),
// Builtin::ContinualTask(&MZ_CLUSTER_REPLICA_METRICS_HISTORY_CT),
// Builtin::ContinualTask(&MZ_CLUSTER_REPLICA_STATUS_HISTORY_CT),
// Builtin::ContinualTask(&MZ_WALLCLOCK_LAG_HISTORY_CT),
Builtin::View(&MZ_INDEX_ADVICE),
]);

Expand Down
5 changes: 5 additions & 0 deletions src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,11 @@ where
HollowBatch::new(desc, parts, self.num_updates, run_meta, runs),
);

// if !parts_empty {
// let backtrace = std::backtrace::Backtrace::capture();
// tracing::info!(schema = ?self.write_schemas, num_parts = %parts_len, "FOUND NON EMPTY BATCH\n\n{backtrace}\n\n");
// }

Ok(batch)
}

Expand Down
1 change: 0 additions & 1 deletion src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_PREFIX)
.add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_SUFFIX)
.add(&crate::fetch::PART_DECODE_FORMAT)
.add(&crate::DANGEROUS_ENABLE_SCHEMA_EVOLUTION)
}

impl PersistConfig {
Expand Down
2 changes: 2 additions & 0 deletions src/persist-client/src/internal/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// making your Proto changes in a release before you populate non-default values,
// or guard the code changes behind a feature flag.

// buf breaking: ignore (we very carefully evolve these protobuf definitions)

syntax = "proto3";

package mz_persist_client.internal.state;
Expand Down
Loading