Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: Stabilize Schema Evolution #30205

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions misc/python/materialize/checks/all_checks/continual_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion


def schemas() -> str:
Expand All @@ -20,6 +22,9 @@ def schemas() -> str:
class AuditLogCT(Check):
"""Continual Task for audit logging"""

def _can_run(self, e: Executor) -> bool:
return self.base_version > MzVersion.parse_mz("v0.127.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
Expand Down Expand Up @@ -64,6 +69,9 @@ def validate(self) -> Testdrive:
class StreamTableJoinCT(Check):
"""Continual Task for stream table join"""

def _can_run(self, e: Executor) -> bool:
return self.base_version > MzVersion.parse_mz("v0.127.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
Expand Down Expand Up @@ -127,6 +135,9 @@ def validate(self) -> Testdrive:
class UpsertCT(Check):
"""Continual Task for upserts"""

def _can_run(self, e: Executor) -> bool:
return self.base_version > MzVersion.parse_mz("v0.127.0-dev")

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
Expand Down
7 changes: 6 additions & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def get_default_system_parameters(
"enable_columnation_lgalloc": "true",
"enable_compute_chunked_stack": "true",
"enable_connection_validation_syntax": "true",
"enable_continual_task_builtins": "true",
"enable_continual_task_builtins": (
"true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
),
"enable_continual_task_create": "true",
"enable_continual_task_retain": "true",
"enable_continual_task_transform": "true",
Expand All @@ -115,6 +117,9 @@ def get_default_system_parameters(
"enable_table_keys": "true",
"enable_variadic_left_join_lowering": "true",
"enable_worker_core_affinity": "true",
"persist_record_schema_id": (
"true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
),
"persist_batch_columnar_format": "both_v2",
"persist_batch_columnar_format_percent": "100",
"persist_batch_delete_enabled": "true",
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,7 @@ def __init__(
"75",
"100",
]
self.flags_with_values["persist_record_schema_id"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_structured_order"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["persist_batch_structured_key_lower_len"] = [
"0",
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,10 @@ impl CatalogState {
self.get_entry(item_id)
}

pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
self.entry_by_id.iter()
}

pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
let schema = self
.temporary_schemas
Expand Down
13 changes: 13 additions & 0 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2601,6 +2601,7 @@ impl Coordinator {
}
};

let mut compute_collections = vec![];
let mut collections = vec![];
let mut new_builtin_continual_tasks = vec![];
for entry in catalog.entries() {
Expand Down Expand Up @@ -2640,6 +2641,7 @@ impl Coordinator {
status_collection_id: None,
timeline: None,
};
compute_collections.push((mv.global_id(), mv.desc.clone()));
collections.push((mv.global_id(), collection_desc));
}
CatalogItem::ContinualTask(ct) => {
Expand All @@ -2656,6 +2658,7 @@ impl Coordinator {
// `create_collections_for_bootstrap`.
new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
} else {
compute_collections.push((ct.global_id(), ct.desc.clone()));
collections.push((ct.global_id(), collection_desc));
}
}
Expand All @@ -2677,6 +2680,16 @@ impl Coordinator {
.flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
.collect();

// Before possibly creating collections, make sure their schemas are correct.
//
// Across different versions of Materialize the nullability of columns can change based on
// updates to our optimizer.
self.controller
.storage
.evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that?

  • This is a write on the read path: starting the read-only replica irrevocably migrates the collection.
  • This is fine in the short term because we're only making things more nullable at the data type level, so all the old writes are valid with the new schema.
  • This is fine in the longer term because it should be a noop at the data type level.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly! Also, we only migrate collections for Materialized Views and Continual Tasks, not Tables or Sources

.await
.unwrap_or_terminate("cannot fail to evolve collections");

self.controller
.storage
.create_collections_for_bootstrap(
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ impl<T> ShouldTerminateGracefully for StorageError<T> {
StorageError::ResourceExhausted(_)
| StorageError::CollectionMetadataAlreadyExists(_)
| StorageError::PersistShardAlreadyInUse(_)
| StorageError::PersistSchemaEvolveRace { .. }
| StorageError::PersistInvalidSchemaEvolve { .. }
| StorageError::TxnWalShardAlreadyExists
| StorageError::UpdateBeyondUpper(_)
| StorageError::ReadBeforeSince(_)
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ breaking:
- compute-types/src/sinks.proto
# reason: Ignore because plans are currently not persisted.
- expr/src/relation.proto
# reason: we very carefully evolve these protobuf definitions
- persist-client/src/internal/state.proto
# reason: does currently not require backward-compatibility
- storage-client/src/client.proto
# reason: does currently not require backward-compatibility
Expand Down
1 change: 1 addition & 0 deletions src/catalog-debug/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rust_binary(
"//src/orchestrator-tracing:mz_orchestrator_tracing",
"//src/ore:mz_ore",
"//src/persist-client:mz_persist_client",
"//src/persist-types:mz_persist_types",
"//src/repr:mz_repr",
"//src/service:mz_service",
"//src/sql:mz_sql",
Expand Down
2 changes: 2 additions & 0 deletions src/catalog-debug/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true

[dependencies]
anyhow = "1.0.66"
arrow = { version = "51.0.0", default-features = false }
clap = { version = "3.2.24", features = ["derive", "env"] }
futures = "0.3.25"
mz-adapter = { path = "../adapter" }
Expand All @@ -21,6 +22,7 @@ mz-orchestrator-tracing = { path = "../orchestrator-tracing" }
mz-ore = { path = "../ore" }
mz-storage-types = { path = "../storage-types" }
mz-persist-client = { path = "../persist-client" }
mz-persist-types = { path = "../persist-types" }
mz-tls-util = { path = "../tls-util" }
mz-repr = { path = "../repr" }
mz-service = { path = "../service" }
Expand Down
67 changes: 62 additions & 5 deletions src/catalog-debug/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use mz_catalog::durable::debug::{
use mz_catalog::durable::{
persist_backed_catalog_state, BootstrapArgs, OpenableDurableCatalogState,
};
use mz_catalog::memory::objects::CatalogItem;
use mz_cloud_resources::AwsExternalIdPrefix;
use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
use mz_ore::cli::{self, CliConfig};
Expand All @@ -49,11 +50,12 @@ use mz_ore::url::SensitiveUrl;
use mz_persist_client::cache::PersistClientCache;
use mz_persist_client::cfg::PersistConfig;
use mz_persist_client::rpc::PubSubClientConnection;
use mz_persist_client::{PersistClient, PersistLocation};
use mz_persist_client::{Diagnostics, PersistClient, PersistLocation};
use mz_repr::{Diff, Timestamp};
use mz_service::secrets::SecretsReaderCliArgs;
use mz_sql::catalog::EnvironmentId;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::sources::SourceData;
use serde::{Deserialize, Serialize};
use tracing::{error, Instrument};

Expand Down Expand Up @@ -254,9 +256,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
upgrade_check(
args,
openable_state,
persist_client,
secrets,
cluster_replica_sizes,
persist_client,
start,
)
.await
Expand Down Expand Up @@ -533,9 +535,9 @@ async fn epoch(
async fn upgrade_check(
args: Args,
openable_state: Box<dyn OpenableDurableCatalogState>,
persist_client: PersistClient,
secrets: SecretsReaderCliArgs,
cluster_replica_sizes: ClusterReplicaSizeMap,
persist_client: PersistClient,
start: Instant,
) -> Result<(), anyhow::Error> {
let secrets_reader = secrets.load().await.context("loading secrets reader")?;
Expand Down Expand Up @@ -568,7 +570,7 @@ async fn upgrade_check(
// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
// Because of that we purposefully move this Future onto the heap (i.e. Box it).
let InitializeStateResult {
state: _state,
state,
storage_collections_to_drop: _,
migrated_storage_collections_0dt: _,
new_builtin_collections: _,
Expand Down Expand Up @@ -609,7 +611,7 @@ async fn upgrade_check(
None,
),
builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy,
persist_client,
persist_client: persist_client.clone(),
enable_expression_cache_override: None,
enable_0dt_deployment: true,
helm_chart_version: None,
Expand All @@ -628,6 +630,61 @@ async fn upgrade_check(
dur.as_millis(),
);
println!("{msg}");

// Check that we can evolve the schema for all Persist shards.
let storage_entries = state
.get_entries()
.filter_map(|(_item_id, entry)| match entry.item() {
// TODO(alter_table): Handle multiple versions of tables.
CatalogItem::Table(table) => Some((table.global_id_writes(), &table.desc)),
CatalogItem::Source(source) => Some((source.global_id(), &source.desc)),
CatalogItem::ContinualTask(ct) => Some((ct.global_id(), &ct.desc)),
CatalogItem::MaterializedView(mv) => Some((mv.global_id(), &mv.desc)),
CatalogItem::Log(_)
| CatalogItem::View(_)
| CatalogItem::Sink(_)
| CatalogItem::Index(_)
| CatalogItem::Type(_)
| CatalogItem::Func(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => None,
});
for (gid, item_desc) in storage_entries {
let shard_id = state
.storage_metadata()
.get_collection_shard::<Timestamp>(gid)
.context("getting shard_id")?;
let diagnostics = Diagnostics {
shard_name: gid.to_string(),
handle_purpose: "catalog upgrade check".to_string(),
};
let persisted_schema = persist_client
.latest_schema::<SourceData, (), Timestamp, Diff>(shard_id, diagnostics)
.await
.expect("invalid persist usage");
// We should always have schemas registered for Shards, unless their environment happened
// to crash after running DDL and hasn't come back up yet.
let Some((_schema_id, persisted_relation_desc, _)) = persisted_schema else {
anyhow::bail!("no schema found for {gid}, did their environment crash?");
};

let persisted_data_type =
mz_persist_types::columnar::data_type::<SourceData>(&persisted_relation_desc)?;
let new_data_type = mz_persist_types::columnar::data_type::<SourceData>(item_desc)?;

let migration =
mz_persist_types::schema::backward_compatible(&persisted_data_type, &new_data_type);
if migration.is_none() {
anyhow::bail!(
"invalid Persist schema migration!\npersisted: {:?}\n{:?}\nnew: {:?}\n{:?}",
persisted_relation_desc,
persisted_data_type,
item_desc,
new_data_type,
);
}
}

Ok(())
}

Expand Down
Loading
Loading