diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index 4bf89a157f341..9679fc459bc1c 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -194,7 +194,7 @@ steps: timeout_in_minutes: 120 parallelism: 2 agents: - queue: hetzner-aarch64-4cpu-8gb + queue: hetzner-aarch64-8cpu-16gb plugins: - ./ci/plugins/mzcompose: composition: kafka-matrix @@ -1854,13 +1854,13 @@ steps: - id: 0dt label: Zero downtime - depends_on: build-aarch64 + depends_on: build-x86_64 timeout_in_minutes: 180 plugins: - ./ci/plugins/mzcompose: composition: 0dt agents: - queue: hetzner-aarch64-16cpu-32gb + queue: hetzner-x86-64-dedi-16cpu-64gb - id: emulator label: Materialize Emulator diff --git a/ci/test/pipeline.template.yml b/ci/test/pipeline.template.yml index 8c58f74d93fd8..99fa27796e222 100644 --- a/ci/test/pipeline.template.yml +++ b/ci/test/pipeline.template.yml @@ -613,6 +613,7 @@ steps: args: [ --scenario=RestartEnvironmentdClusterdStorage, + --default-replication-factor=1, # faster "--seed=$BUILDKITE_JOB_ID", ] @@ -645,7 +646,11 @@ steps: plugins: - ./ci/plugins/mzcompose: composition: platform-checks - args: [--scenario=NoRestartNoUpgrade, "--seed=$BUILDKITE_JOB_ID"] + args: [ + --scenario=NoRestartNoUpgrade, + --default-replication-factor=1, # faster + "--seed=$BUILDKITE_JOB_ID" + ] - id: source-sink-errors label: "Source/Sink Error Reporting" diff --git a/misc/dbt-materialize/mzcompose.py b/misc/dbt-materialize/mzcompose.py index 4385f51e3569e..e841ded286669 100644 --- a/misc/dbt-materialize/mzcompose.py +++ b/misc/dbt-materialize/mzcompose.py @@ -63,6 +63,10 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: options=test_case.materialized_options, image=test_case.materialized_image, volumes_extra=["secrets:/secrets"], + default_replication_factor=1, + additional_system_parameter_defaults={ + "default_cluster_replication_factor": "1" + }, ) test_args = ["dbt-materialize/tests"] if args.k: diff --git a/misc/python/materialize/checks/all_checks/cluster.py b/misc/python/materialize/checks/all_checks/cluster.py index 445744ddf5132..f8aca7c186d86 100644 --- a/misc/python/materialize/checks/all_checks/cluster.py +++ b/misc/python/materialize/checks/all_checks/cluster.py @@ -28,7 +28,7 @@ def manipulate(self) -> list[Testdrive]: $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr} GRANT CREATECLUSTER ON SYSTEM TO materialize - > CREATE CLUSTER create_cluster2 (SIZE '2-2'); + > CREATE CLUSTER create_cluster2 (SIZE '2-2', REPLICATION FACTOR 1); """, ] ] diff --git a/misc/python/materialize/checks/all_checks/sink.py b/misc/python/materialize/checks/all_checks/sink.py index c3ab9a7d56207..0aa4973ef5fbf 100644 --- a/misc/python/materialize/checks/all_checks/sink.py +++ b/misc/python/materialize/checks/all_checks/sink.py @@ -1268,7 +1268,8 @@ def initialize(self) -> Testdrive: return Testdrive( dedent( """ - > CREATE SOURCE webhook_alter1 FROM WEBHOOK BODY FORMAT TEXT; + > CREATE CLUSTER sink_webhook_cluster SIZE '1', REPLICATION FACTOR 1; + > CREATE SOURCE webhook_alter1 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT; > CREATE SINK sink_alter_wh FROM webhook_alter1 INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-wh') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn @@ -1284,7 +1285,7 @@ def manipulate(self) -> list[Testdrive]: Testdrive(dedent(s)) for s in [ """ - > CREATE SOURCE webhook_alter2 FROM WEBHOOK BODY FORMAT TEXT; + > CREATE SOURCE webhook_alter2 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT; $ set-from-sql var=running_count SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh'; @@ -1298,7 +1299,7 @@ def manipulate(self) -> list[Testdrive]: 2 """, """ - > CREATE SOURCE webhook_alter3 FROM WEBHOOK BODY FORMAT TEXT; + > CREATE SOURCE webhook_alter3 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT; $ set-from-sql var=running_count SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh'; diff --git a/misc/python/materialize/checks/all_checks/webhook.py b/misc/python/materialize/checks/all_checks/webhook.py index 73f9bbd87390f..0607158be5a89 100644 --- a/misc/python/materialize/checks/all_checks/webhook.py +++ b/misc/python/materialize/checks/all_checks/webhook.py @@ -9,7 +9,7 @@ from textwrap import dedent from materialize.checks.actions import Testdrive -from materialize.checks.checks import Check +from materialize.checks.checks import Check, disabled from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD from materialize.checks.executors import Executor from materialize.mz_version import MzVersion @@ -132,6 +132,9 @@ def validate(self) -> Testdrive: ) +@disabled( + "Reenable when database-issues#9184 is fixed and there is a way to set the cluster" +) class WebhookTable(Check): def _can_run(self, e: Executor) -> bool: return self.base_version >= MzVersion.parse_mz("v0.130.0-dev") @@ -141,7 +144,10 @@ def initialize(self) -> Testdrive: schemas() + dedent( """ + > CREATE CLUSTER webhook_table_cluster REPLICAS (r1 (SIZE '1')); + > SET cluster = webhook_table_cluster > CREATE TABLE webhook_table_text FROM WEBHOOK BODY FORMAT TEXT; + > SET cluster = quickstart $ webhook-append database=materialize schema=public name=webhook_table_text hello_world diff --git a/misc/python/materialize/checks/mzcompose_actions.py b/misc/python/materialize/checks/mzcompose_actions.py index bf673f0f20ecc..f2bf86a9af61a 100644 --- a/misc/python/materialize/checks/mzcompose_actions.py +++ b/misc/python/materialize/checks/mzcompose_actions.py @@ -87,6 +87,7 @@ def execute(self, e: Executor) -> None: force_migrations=self.force_migrations, publish=self.publish, metadata_store="cockroach", + default_replication_factor=2, ) # Don't fail since we are careful to explicitly kill and collect logs diff --git a/misc/python/materialize/mzcompose/composition.py b/misc/python/materialize/mzcompose/composition.py index f36d0b797b1a6..0316d265761d5 100644 --- a/misc/python/materialize/mzcompose/composition.py +++ b/misc/python/materialize/mzcompose/composition.py @@ -1083,24 +1083,6 @@ def blob_store(self) -> str: return name raise RuntimeError(f"No external blob store found: {self.compose['services']}") - def setup_quickstart_cluster(self, replicas: int = 2) -> None: - replica_names = [f"r{replica_id}" for replica_id in range(0, 2)] - replica_string = ",".join( - f"{replica_name} (SIZE '4')" for replica_name in replica_names - ) - self.sql( - f""" - DROP CLUSTER quickstart CASCADE; - CREATE CLUSTER quickstart REPLICAS ({replica_string}); - GRANT ALL PRIVILEGES ON CLUSTER quickstart TO materialize; - DROP CLUSTER IF EXISTS singlereplica; - CREATE CLUSTER singlereplica SIZE '4', REPLICATION FACTOR 2; - GRANT ALL PRIVILEGES ON CLUSTER singlereplica TO materialize; - """, - user="mz_system", - port=6877, - ) - def capture_logs(self, *services: str) -> None: # Capture logs into services.log since they will be lost otherwise # after dowing a composition. diff --git a/misc/python/materialize/mzcompose/services/materialized.py b/misc/python/materialize/mzcompose/services/materialized.py index 1cf8b40fd9391..1b11debf1d227 100644 --- a/misc/python/materialize/mzcompose/services/materialized.py +++ b/misc/python/materialize/mzcompose/services/materialized.py @@ -94,7 +94,7 @@ def __init__( metadata_store: str = METADATA_STORE, cluster_replica_size: dict[str, dict[str, Any]] | None = None, bootstrap_replica_size: str | None = None, - default_cluster_replicas: int = 2, + default_replication_factor: int = 1, ) -> None: if name is None: name = "materialized" @@ -141,9 +141,9 @@ def __init__( f"MZ_BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}", # Note(SangJunBak): mz_system and mz_probe have no replicas by default in materialized # but we re-enable them here since many of our tests rely on them. - f"MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR={default_cluster_replicas}", - f"MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR={default_cluster_replicas}", - f"MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR={default_cluster_replicas}", + f"MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR={default_replication_factor}", + f"MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR={default_replication_factor}", + f"MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR={default_replication_factor}", *environment_extra, *DEFAULT_CRDB_ENVIRONMENT, ] @@ -161,6 +161,9 @@ def __init__( system_parameter_version or image_version ) + system_parameter_defaults["default_cluster_replication_factor"] = str( + default_replication_factor + ) if additional_system_parameter_defaults is not None: system_parameter_defaults.update(additional_system_parameter_defaults) diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py index d78d90d884377..cc0c10d80c035 100644 --- a/misc/python/materialize/parallel_workload/action.py +++ b/misc/python/materialize/parallel_workload/action.py @@ -38,7 +38,6 @@ from materialize.parallel_workload.database import ( DATA_TYPES, DB, - MAX_CLUSTER_REPLICAS, MAX_CLUSTERS, MAX_COLUMNS, MAX_DBS, @@ -1349,13 +1348,6 @@ def run(self, exe: Executor) -> bool: class CreateClusterReplicaAction(Action): - def errors_to_ignore(self, exe: Executor) -> list[str]: - result = [ - "cannot create more than one replica of a cluster containing sources or sinks", - ] + super().errors_to_ignore(exe) - - return result - def run(self, exe: Executor) -> bool: with exe.db.lock: # Keep cluster 0 with 1 replica for sources/sinks @@ -1363,8 +1355,6 @@ def run(self, exe: Executor) -> bool: if not unmanaged_clusters: return False cluster = self.rng.choice(unmanaged_clusters) - if len(cluster.replicas) >= MAX_CLUSTER_REPLICAS: - return False cluster.replica_id += 1 with cluster.lock: if cluster not in exe.db.clusters or not cluster.managed: @@ -1652,6 +1642,7 @@ def run(self, exe: Executor) -> bool: sanity_restart=self.sanity_restart, additional_system_parameter_defaults=self.system_parameters, metadata_store="cockroach", + default_replication_factor=2, ) ): self.composition.up("materialized", detach=True) @@ -1702,6 +1693,7 @@ def run(self, exe: Executor) -> bool: restart="on-failure", healthcheck=LEADER_STATUS_HEALTHCHECK, metadata_store="cockroach", + default_replication_factor=2, ), ): self.composition.up(mz_service, detach=True) @@ -1829,14 +1821,6 @@ def run(self, exe: Executor) -> bool: class CreateKafkaSourceAction(Action): - def errors_to_ignore(self, exe: Executor) -> list[str]: - result = super().errors_to_ignore(exe) - if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy): - result.extend( - ["cannot create source in cluster with more than one replica"] - ) - return result - def run(self, exe: Executor) -> bool: with exe.db.lock: if len(exe.db.kafka_sources) >= MAX_KAFKA_SOURCES: @@ -1848,7 +1832,7 @@ def run(self, exe: Executor) -> bool: with schema.lock, cluster.lock: if schema not in exe.db.schemas: return False - if cluster not in exe.db.clusters or len(cluster.replicas) != 1: + if cluster not in exe.db.clusters: return False try: @@ -1894,14 +1878,6 @@ def run(self, exe: Executor) -> bool: class CreateMySqlSourceAction(Action): - def errors_to_ignore(self, exe: Executor) -> list[str]: - result = super().errors_to_ignore(exe) - if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy): - result.extend( - ["cannot create source in cluster with more than one replica"] - ) - return result - def run(self, exe: Executor) -> bool: # See database-issues#6881, not expected to work if exe.db.scenario == Scenario.BackupRestore: @@ -1917,7 +1893,7 @@ def run(self, exe: Executor) -> bool: with schema.lock, cluster.lock: if schema not in exe.db.schemas: return False - if cluster not in exe.db.clusters or len(cluster.replicas) != 1: + if cluster not in exe.db.clusters: return False try: @@ -1963,14 +1939,6 @@ def run(self, exe: Executor) -> bool: class CreatePostgresSourceAction(Action): - def errors_to_ignore(self, exe: Executor) -> list[str]: - result = super().errors_to_ignore(exe) - if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy): - result.extend( - ["cannot create source in cluster with more than one replica"] - ) - return result - def run(self, exe: Executor) -> bool: # See database-issues#6881, not expected to work if exe.db.scenario == Scenario.BackupRestore: @@ -1982,14 +1950,11 @@ def run(self, exe: Executor) -> bool: source_id = exe.db.postgres_source_id exe.db.postgres_source_id += 1 schema = self.rng.choice(exe.db.schemas) - potential_clusters = [c for c in exe.db.clusters if len(c.replicas) == 1] - if not potential_clusters: - return False - cluster = self.rng.choice(potential_clusters) + cluster = self.rng.choice(exe.db.clusters) with schema.lock, cluster.lock: if schema not in exe.db.schemas: return False - if cluster not in exe.db.clusters or len(cluster.replicas) != 1: + if cluster not in exe.db.clusters: return False try: @@ -2037,8 +2002,6 @@ def run(self, exe: Executor) -> bool: class CreateKafkaSinkAction(Action): def errors_to_ignore(self, exe: Executor) -> list[str]: return [ - # Another replica can be created in parallel - "cannot create sink in cluster with more than one replica", "BYTES format with non-encodable type", ] + super().errors_to_ignore(exe) @@ -2048,15 +2011,12 @@ def run(self, exe: Executor) -> bool: return False sink_id = exe.db.kafka_sink_id exe.db.kafka_sink_id += 1 - potential_clusters = [c for c in exe.db.clusters if len(c.replicas) == 1] - if not potential_clusters: - return False - cluster = self.rng.choice(potential_clusters) + cluster = self.rng.choice(exe.db.clusters) schema = self.rng.choice(exe.db.schemas) with schema.lock, cluster.lock: if schema not in exe.db.schemas: return False - if cluster not in exe.db.clusters or len(cluster.replicas) != 1: + if cluster not in exe.db.clusters: return False sink = KafkaSink( diff --git a/misc/python/materialize/zippy/backup_and_restore_actions.py b/misc/python/materialize/zippy/backup_and_restore_actions.py index 18185700fe7e4..130aac4576952 100644 --- a/misc/python/materialize/zippy/backup_and_restore_actions.py +++ b/misc/python/materialize/zippy/backup_and_restore_actions.py @@ -40,6 +40,7 @@ def run(self, c: Composition, state: State) -> None: sanity_restart=False, restart="on-failure", metadata_store="cockroach", + default_replication_factor=2, ) ): c.restore(state.mz_service) diff --git a/misc/python/materialize/zippy/mz_actions.py b/misc/python/materialize/zippy/mz_actions.py index d41adbc7954f6..782d980e886e8 100644 --- a/misc/python/materialize/zippy/mz_actions.py +++ b/misc/python/materialize/zippy/mz_actions.py @@ -90,6 +90,7 @@ def run(self, c: Composition, state: State) -> None: restart="on-failure", additional_system_parameter_defaults=self.additional_system_parameter_defaults, metadata_store="cockroach", + default_replication_factor=2, ) ): c.up(state.mz_service) @@ -167,6 +168,7 @@ def run(self, c: Composition, state: State) -> None: sanity_restart=False, restart="on-failure", metadata_store="cockroach", + default_replication_factor=2, ) ): c.kill(state.mz_service) @@ -201,6 +203,7 @@ def run(self, c: Composition, state: State) -> None: restart="on-failure", healthcheck=LEADER_STATUS_HEALTHCHECK, metadata_store="cockroach", + default_replication_factor=2, ), ): c.up(state.mz_service, detach=True) diff --git a/src/adapter/src/coord/sequencer/inner/cluster.rs b/src/adapter/src/coord/sequencer/inner/cluster.rs index 5871c73480880..880ae7a3fc247 100644 --- a/src/adapter/src/coord/sequencer/inner/cluster.rs +++ b/src/adapter/src/coord/sequencer/inner/cluster.rs @@ -224,7 +224,12 @@ impl Coordinator { } match &options.replication_factor { Set(rf) => *replication_factor = *rf, - Reset => *replication_factor = 1, + Reset => { + *replication_factor = self + .catalog + .system_config() + .default_cluster_replication_factor() + } Unchanged => {} } match &options.schedule { diff --git a/src/catalog/src/durable.rs b/src/catalog/src/durable.rs index 3f896331a02b2..45e252ee4860a 100644 --- a/src/catalog/src/durable.rs +++ b/src/catalog/src/durable.rs @@ -530,7 +530,7 @@ pub async fn persist_backed_catalog_state( pub fn test_bootstrap_args() -> BootstrapArgs { BootstrapArgs { default_cluster_replica_size: "1".into(), - default_cluster_replication_factor: 2, + default_cluster_replication_factor: 1, bootstrap_role: None, cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(), } diff --git a/src/catalog/src/durable/initialize.rs b/src/catalog/src/durable/initialize.rs index 4e62d9801ce80..7e8f54f79cf7c 100644 --- a/src/catalog/src/durable/initialize.rs +++ b/src/catalog/src/durable/initialize.rs @@ -96,7 +96,6 @@ const DEFAULT_USER_CLUSTER_ID: ClusterId = ClusterId::User(1); const DEFAULT_USER_CLUSTER_NAME: &str = "quickstart"; const DEFAULT_USER_REPLICA_ID: ReplicaId = ReplicaId::User(1); -const DEFAULT_USER_REPLICA_NAME: &str = "r1"; const MATERIALIZE_DATABASE_ID_VAL: u64 = 1; const MATERIALIZE_DATABASE_ID: DatabaseId = DatabaseId::User(MATERIALIZE_DATABASE_ID_VAL); @@ -260,7 +259,8 @@ pub(crate) async fn initialize( ), ( USER_REPLICA_ID_ALLOC_KEY.to_string(), - DEFAULT_USER_REPLICA_ID.inner_id() + 1, + DEFAULT_USER_REPLICA_ID.inner_id() + + u64::from(options.default_cluster_replication_factor), ), ( SYSTEM_REPLICA_ID_ALLOC_KEY.to_string(), @@ -667,35 +667,41 @@ pub(crate) async fn initialize( )); } - tx.insert_cluster_replica_with_id( - DEFAULT_USER_CLUSTER_ID, - DEFAULT_USER_REPLICA_ID, - DEFAULT_USER_REPLICA_NAME, - default_replica_config(options)?, - MZ_SYSTEM_ROLE_ID, - )?; - audit_events.push(( - mz_audit_log::EventType::Create, - mz_audit_log::ObjectType::ClusterReplica, - mz_audit_log::EventDetails::CreateClusterReplicaV2(mz_audit_log::CreateClusterReplicaV2 { - cluster_id: DEFAULT_USER_CLUSTER_ID.to_string(), - cluster_name: DEFAULT_USER_CLUSTER_NAME.to_string(), - replica_name: DEFAULT_USER_REPLICA_NAME.to_string(), - replica_id: Some(DEFAULT_USER_REPLICA_ID.to_string()), - logical_size: options.default_cluster_replica_size.to_string(), - disk: { - let cluster_size = options.default_cluster_replica_size.to_string(); - let cluster_allocation = options - .cluster_replica_size_map - .get_allocation_by_name(&cluster_size)?; - cluster_allocation.is_cc - }, - billed_as: None, - internal: false, - reason: CreateOrDropClusterReplicaReasonV1::System, - scheduling_policies: None, - }), - )); + for i in 0..options.default_cluster_replication_factor { + let replica_id = ReplicaId::User(DEFAULT_USER_REPLICA_ID.inner_id() + u64::from(i)); + let replica_name = format!("r{}", i + 1); + tx.insert_cluster_replica_with_id( + DEFAULT_USER_CLUSTER_ID, + replica_id, + &replica_name, + default_replica_config(options)?, + MZ_SYSTEM_ROLE_ID, + )?; + audit_events.push(( + mz_audit_log::EventType::Create, + mz_audit_log::ObjectType::ClusterReplica, + mz_audit_log::EventDetails::CreateClusterReplicaV2( + mz_audit_log::CreateClusterReplicaV2 { + cluster_id: DEFAULT_USER_CLUSTER_ID.to_string(), + cluster_name: DEFAULT_USER_CLUSTER_NAME.to_string(), + replica_name, + replica_id: Some(replica_id.to_string()), + logical_size: options.default_cluster_replica_size.to_string(), + disk: { + let cluster_size = options.default_cluster_replica_size.to_string(); + let cluster_allocation = options + .cluster_replica_size_map + .get_allocation_by_name(&cluster_size)?; + cluster_allocation.is_cc + }, + billed_as: None, + internal: false, + reason: CreateOrDropClusterReplicaReasonV1::System, + scheduling_policies: None, + }, + ), + )); + } let system_privileges = [MzAclItem { grantee: MZ_SYSTEM_ROLE_ID, diff --git a/src/catalog/tests/snapshots/debug__opened_trace.snap b/src/catalog/tests/snapshots/debug__opened_trace.snap index c81d818c0d813..3a3ba17548ee3 100644 --- a/src/catalog/tests/snapshots/debug__opened_trace.snap +++ b/src/catalog/tests/snapshots/debug__opened_trace.snap @@ -568,7 +568,7 @@ Trace { Managed( ManagedCluster { size: "1", - replication_factor: 2, + replication_factor: 1, availability_zones: [], logging: Some( ReplicaLogging { diff --git a/src/catalog/tests/snapshots/open__initial_snapshot.snap b/src/catalog/tests/snapshots/open__initial_snapshot.snap index 82d9918d3e11f..47ecfbce1e7b5 100644 --- a/src/catalog/tests/snapshots/open__initial_snapshot.snap +++ b/src/catalog/tests/snapshots/open__initial_snapshot.snap @@ -1220,7 +1220,7 @@ Snapshot { Managed( ManagedCluster { size: "1", - replication_factor: 2, + replication_factor: 1, availability_zones: [], logging: Some( ReplicaLogging { diff --git a/src/environmentd/src/test_util.rs b/src/environmentd/src/test_util.rs index b2dcb7759a173..0b564a4e4178a 100644 --- a/src/environmentd/src/test_util.rs +++ b/src/environmentd/src/test_util.rs @@ -139,7 +139,7 @@ impl Default for TestHarness { storage_usage_collection_interval: Duration::from_secs(3600), storage_usage_retention_period: None, default_cluster_replica_size: "1".to_string(), - default_cluster_replication_factor: 2, + default_cluster_replication_factor: 1, builtin_system_cluster_config: BootstrapBuiltinClusterConfig { size: "1".to_string(), replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR, diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index e3cd16b51718d..9f660135bbe26 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -4616,7 +4616,11 @@ pub fn plan_create_cluster_inner( )?; let replication_factor = if matches!(schedule, ClusterScheduleOptionValue::Manual) { - replication_factor.unwrap_or(1) + replication_factor.unwrap_or( + scx.catalog + .system_vars() + .default_cluster_replication_factor(), + ) } else { scx.require_feature_flag(&ENABLE_CLUSTER_SCHEDULE_REFRESH)?; if replication_factor.is_some() { diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index d659a5f2d94bc..69d06e62f1e6a 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -478,6 +478,7 @@ impl SessionVars { // size of a cluster, what indexes are present, etc. &CLUSTER, &CLUSTER_REPLICA, + &DEFAULT_CLUSTER_REPLICATION_FACTOR, &DATABASE, &SEARCH_PATH, ] @@ -1650,6 +1651,11 @@ impl SystemVars { .collect() } + /// Returns the value of the `default_cluster_replication_factor` configuration parameter. + pub fn default_cluster_replication_factor(&self) -> u32 { + *self.expect_value::(&DEFAULT_CLUSTER_REPLICATION_FACTOR) + } + /// Returns the `disk_cluster_replicas_default` configuration parameter. pub fn disk_cluster_replicas_default(&self) -> bool { *self.expect_value(&DISK_CLUSTER_REPLICAS_DEFAULT) @@ -2318,6 +2324,7 @@ static SESSION_SYSTEM_VARS: LazyLock Runner<'a> { .batch_execute("CREATE CLUSTER quickstart REPLICAS ()") .await?; } - let mut needs_default_replica = true; - for row in inner - .system_client - .query( - "SELECT name, size FROM mz_cluster_replicas - WHERE cluster_id = (SELECT id FROM mz_clusters WHERE name = 'quickstart')", - &[], - ) - .await? - { - let name: &str = row.get("name"); - let size: &str = row.get("size"); - if name == "r1" && size == self.config.replicas.to_string() { - needs_default_replica = false; - } else { - inner - .system_client - .batch_execute(&format!("DROP CLUSTER REPLICA quickstart.{}", name)) - .await?; - } - } - if needs_default_replica { - inner - .system_client - .batch_execute(&format!( - "CREATE CLUSTER REPLICA quickstart.r1 SIZE '{}'", - self.config.replicas - )) - .await?; - } // Grant initial privileges. inner @@ -1082,7 +1052,7 @@ impl<'a> RunnerInner<'a> { environment_id, cluster_replica_sizes: ClusterReplicaSizeMap::for_tests(), bootstrap_default_cluster_replica_size: config.replicas.to_string(), - bootstrap_default_cluster_replication_factor: 2, + bootstrap_default_cluster_replication_factor: 1, bootstrap_builtin_system_cluster_config: BootstrapBuiltinClusterConfig { replication_factor: SYSTEM_CLUSTER_DEFAULT_REPLICATION_FACTOR, size: config.replicas.to_string(), diff --git a/test/0dt/mzcompose.py b/test/0dt/mzcompose.py index ca27d3100201c..000b7b600ea55 100644 --- a/test/0dt/mzcompose.py +++ b/test/0dt/mzcompose.py @@ -57,6 +57,7 @@ deploy_generation=0, system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, external_metadata_store=True, + default_replication_factor=2, ), Materialized( name="mz_new", @@ -65,6 +66,7 @@ system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, restart="on-failure", external_metadata_store=True, + default_replication_factor=2, ), Testdrive( materialize_url="postgres://materialize@mz_old:6875", @@ -105,6 +107,8 @@ def workflow_read_only(c: Composition) -> None: CREATE CLUSTER cluster SIZE '2-1'; GRANT ALL ON CLUSTER cluster TO materialize; ALTER SYSTEM SET cluster = cluster; + CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1; + GRANT ALL ON CLUSTER cluster_singlereplica TO materialize; """, service="mz_old", port=6877, @@ -212,7 +216,7 @@ def workflow_read_only(c: Composition) -> None: 1 2 > CREATE SOURCE webhook_source - IN CLUSTER cluster + IN CLUSTER cluster_singlereplica FROM WEBHOOK BODY FORMAT TEXT $ webhook-append database=materialize schema=public name=webhook_source @@ -231,6 +235,7 @@ def workflow_read_only(c: Composition) -> None: deploy_generation=1, external_metadata_store=True, system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, + default_replication_factor=2, ) ): c.up("mz_old") @@ -310,6 +315,7 @@ def workflow_read_only(c: Composition) -> None: deploy_generation=1, system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, external_metadata_store=True, + default_replication_factor=2, ) ): c.up("mz_old") @@ -393,6 +399,8 @@ def workflow_basic(c: Composition) -> None: CREATE CLUSTER cluster SIZE '2-1'; GRANT ALL ON CLUSTER cluster TO materialize; ALTER SYSTEM SET cluster = cluster; + CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1; + GRANT ALL ON CLUSTER cluster_singlereplica TO materialize; """, service="mz_old", port=6877, @@ -500,7 +508,7 @@ def workflow_basic(c: Composition) -> None: 1 2 > CREATE SOURCE webhook_source - IN CLUSTER cluster + IN CLUSTER cluster_singlereplica FROM WEBHOOK BODY FORMAT TEXT $ webhook-append database=materialize schema=public name=webhook_source @@ -891,6 +899,8 @@ def workflow_kafka_source_rehydration(c: Composition) -> None: CREATE CLUSTER cluster SIZE '1'; GRANT ALL ON CLUSTER cluster TO materialize; ALTER SYSTEM SET cluster = cluster; + CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1; + GRANT ALL ON CLUSTER cluster_singlereplica TO materialize; """, service="mz_old", port=6877, @@ -948,6 +958,7 @@ def workflow_kafka_source_rehydration(c: Composition) -> None: system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, restart="on-failure", external_metadata_store=True, + default_replication_factor=2, ) ): c.up("mz_new") @@ -998,6 +1009,8 @@ def workflow_pg_source_rehydration(c: Composition) -> None: CREATE CLUSTER cluster SIZE '1'; GRANT ALL ON CLUSTER cluster TO materialize; ALTER SYSTEM SET cluster = cluster; + CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1; + GRANT ALL ON CLUSTER cluster_singlereplica TO materialize; """, service="mz_old", port=6877, @@ -1070,6 +1083,7 @@ def workflow_pg_source_rehydration(c: Composition) -> None: system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, restart="on-failure", external_metadata_store=True, + default_replication_factor=2, ) ): c.up("mz_new") @@ -1112,6 +1126,8 @@ def workflow_mysql_source_rehydration(c: Composition) -> None: CREATE CLUSTER cluster SIZE '1'; GRANT ALL ON CLUSTER cluster TO materialize; ALTER SYSTEM SET cluster = cluster; + CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1; + GRANT ALL ON CLUSTER cluster_singlereplica TO materialize; """, service="mz_old", port=6877, @@ -1185,6 +1201,7 @@ def workflow_mysql_source_rehydration(c: Composition) -> None: system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, restart="on-failure", external_metadata_store=True, + default_replication_factor=2, ) ): c.up("mz_new") @@ -1233,6 +1250,7 @@ def workflow_kafka_source_failpoint(c: Composition) -> None: system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, external_metadata_store=True, environment_extra=["FAILPOINTS=fail_state_multi_put=return"], + default_replication_factor=2, ) ): c.up("mz_old") @@ -1246,6 +1264,8 @@ def workflow_kafka_source_failpoint(c: Composition) -> None: CREATE CLUSTER cluster SIZE '1'; GRANT ALL ON CLUSTER cluster TO materialize; ALTER SYSTEM SET cluster = cluster; + CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1; + GRANT ALL ON CLUSTER cluster_singlereplica TO materialize; """ ), service="mz_old", @@ -1287,6 +1307,7 @@ def workflow_kafka_source_failpoint(c: Composition) -> None: system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, restart="on-failure", external_metadata_store=True, + default_replication_factor=2, ), Testdrive( materialize_url="postgres://materialize@mz_new:6875", @@ -1387,6 +1408,7 @@ def workflow_builtin_item_migrations(c: Composition) -> None: external_metadata_store=True, force_migrations="all", healthcheck=LEADER_STATUS_HEALTHCHECK, + default_replication_factor=2, ), ): c.up("mz_new") @@ -1524,6 +1546,8 @@ def workflow_upsert_sources(c: Composition) -> None: CREATE CLUSTER cluster SIZE '2-1'; GRANT ALL ON CLUSTER cluster TO materialize; ALTER SYSTEM SET cluster = cluster; + CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1; + GRANT ALL ON CLUSTER cluster_singlereplica TO materialize; ALTER SYSTEM SET max_sources = {num_threads * 2}; ALTER SYSTEM SET max_materialized_views = {num_threads * 2}; """, @@ -1598,6 +1622,7 @@ def worker(i: int) -> None: system_parameter_defaults=SYSTEM_PARAMETER_DEFAULTS, restart="on-failure", external_metadata_store=True, + default_replication_factor=2, ), Testdrive( materialize_url=f"postgres://materialize@{mz1}:6875", @@ -1636,6 +1661,8 @@ def workflow_ddl(c: Composition) -> None: CREATE CLUSTER cluster SIZE '2-1'; GRANT ALL ON CLUSTER cluster TO materialize; ALTER SYSTEM SET cluster = cluster; + CREATE CLUSTER cluster_singlereplica SIZE '1', REPLICATION FACTOR 1; + GRANT ALL ON CLUSTER cluster_singlereplica TO materialize; """, service="mz_old", port=6877, @@ -1743,7 +1770,7 @@ def workflow_ddl(c: Composition) -> None: 1 2 > CREATE SOURCE webhook_source - IN CLUSTER cluster + IN CLUSTER cluster_singlereplica FROM WEBHOOK BODY FORMAT TEXT $ webhook-append database=materialize schema=public name=webhook_source diff --git a/test/kafka-auth/mzcompose.py b/test/kafka-auth/mzcompose.py index 4689d644ea323..139060e6aae65 100644 --- a/test/kafka-auth/mzcompose.py +++ b/test/kafka-auth/mzcompose.py @@ -159,6 +159,7 @@ ), Materialized( volumes_extra=["secrets:/share/secrets"], + default_replication_factor=2, ), Testdrive( volumes_extra=["secrets:/share/secrets"], diff --git a/test/kafka-exactly-once/mzcompose.py b/test/kafka-exactly-once/mzcompose.py index 984fe01da6732..ffc299ef3aab1 100644 --- a/test/kafka-exactly-once/mzcompose.py +++ b/test/kafka-exactly-once/mzcompose.py @@ -23,7 +23,7 @@ Zookeeper(), Kafka(), SchemaRegistry(), - Materialized(), + Materialized(default_replication_factor=2), Testdrive(), ] diff --git a/test/kafka-matrix/mzcompose.py b/test/kafka-matrix/mzcompose.py index d226826c49bb7..5b1131a0fcf3f 100644 --- a/test/kafka-matrix/mzcompose.py +++ b/test/kafka-matrix/mzcompose.py @@ -47,7 +47,7 @@ ] SERVICES = [ - Materialized(), + Materialized(default_replication_factor=2), # Occasional timeouts in CI with 60s timeout Testdrive( volumes_extra=["../testdrive:/workdir/testdrive"], default_timeout="120s" diff --git a/test/kafka-multi-broker/mzcompose.py b/test/kafka-multi-broker/mzcompose.py index 6e40c6ddd80a1..4dfb1cd0a9e29 100644 --- a/test/kafka-multi-broker/mzcompose.py +++ b/test/kafka-multi-broker/mzcompose.py @@ -29,7 +29,7 @@ SchemaRegistry( kafka_servers=[("kafka1", "9092"), ("kafka2", "9092"), ("kafka3", "9092")] ), - Materialized(), + Materialized(default_replication_factor=2), Testdrive( entrypoint_extra=[ "--kafka-option=acks=all", diff --git a/test/kafka-resumption/mzcompose.py b/test/kafka-resumption/mzcompose.py index 041aaed6c8a9a..ceda9aa2da967 100644 --- a/test/kafka-resumption/mzcompose.py +++ b/test/kafka-resumption/mzcompose.py @@ -32,7 +32,7 @@ SchemaRegistry(), Redpanda(), Mz(app_password=""), - Materialized(), + Materialized(default_replication_factor=2), Clusterd(), Toxiproxy(), Testdrive(default_timeout="120s"), @@ -105,7 +105,8 @@ def workflow_sink_kafka_restart(c: Composition, parser: WorkflowArgumentParser) # producer ID are properly aborted after a broker restart. with c.override( Materialized( - environment_extra=["FAILPOINTS=kafka_sink_commit_transaction=sleep(5000)"] + environment_extra=["FAILPOINTS=kafka_sink_commit_transaction=sleep(5000)"], + default_replication_factor=2, ) ): c.up(*(["materialized"] + get_kafka_services(args.redpanda))) diff --git a/test/kafka-rtr/mzcompose.py b/test/kafka-rtr/mzcompose.py index 2f71cc1bc9182..70e494b061a57 100644 --- a/test/kafka-rtr/mzcompose.py +++ b/test/kafka-rtr/mzcompose.py @@ -35,7 +35,7 @@ Kafka(), SchemaRegistry(), Mz(app_password=""), - Materialized(), + Materialized(default_replication_factor=2), Toxiproxy(), Testdrive(no_reset=True, seed=1), ] diff --git a/test/mysql-cdc-old-syntax/mzcompose.py b/test/mysql-cdc-old-syntax/mzcompose.py index 32984edeb98e4..0c616829ee43e 100644 --- a/test/mysql-cdc-old-syntax/mzcompose.py +++ b/test/mysql-cdc-old-syntax/mzcompose.py @@ -62,6 +62,7 @@ def create_mysql_replica(mysql_version: str) -> MySql: additional_system_parameter_defaults={ "log_filter": "mz_storage::source::mysql=trace,info" }, + default_replication_factor=2, ), create_mysql(MySql.DEFAULT_VERSION), create_mysql_replica(MySql.DEFAULT_VERSION), @@ -328,6 +329,7 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: additional_system_parameter_defaults={ "log_filter": "mz_storage::source::mysql=trace,info" }, + default_replication_factor=2, ) mz_new = Materialized( @@ -339,6 +341,7 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: "log_filter": "mz_storage::source::mysql=trace,info", "force_source_table_syntax": "true", }, + default_replication_factor=2, ) with c.override(mz_old, create_mysql(mysql_version)): diff --git a/test/mysql-cdc-resumption-old-syntax/mzcompose.py b/test/mysql-cdc-resumption-old-syntax/mzcompose.py index 228ecbab04cbb..79e8af104a5c5 100644 --- a/test/mysql-cdc-resumption-old-syntax/mzcompose.py +++ b/test/mysql-cdc-resumption-old-syntax/mzcompose.py @@ -31,7 +31,7 @@ SERVICES = [ Alpine(), Mz(app_password=""), - Materialized(), + Materialized(default_replication_factor=2), MySql(), MySql( name="mysql-replica-1", @@ -122,7 +122,7 @@ def workflow_disruptions(c: Composition) -> None: def workflow_backup_restore(c: Composition) -> None: with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), ): scenario = backup_restore_mysql print(f"--- Running scenario {scenario.__name__}") @@ -133,7 +133,7 @@ def workflow_backup_restore(c: Composition) -> None: def workflow_bin_log_manipulations(c: Composition) -> None: with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), ): scenarios = [ reset_master_gtid, @@ -158,7 +158,10 @@ def workflow_short_bin_log_retention(c: Composition) -> None: args = MySql.DEFAULT_ADDITIONAL_ARGS.copy() args.append(f"--binlog_expire_logs_seconds={bin_log_expiration_in_sec}") - with c.override(Materialized(sanity_restart=False), MySql(additional_args=args)): + with c.override( + Materialized(sanity_restart=False, default_replication_factor=2), + MySql(additional_args=args), + ): scenarios = [logs_expiration_while_mz_down, create_source_after_logs_expiration] scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__) @@ -183,7 +186,7 @@ def workflow_master_changes(c: Composition) -> None: """ with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), MySql( name="mysql-replica-1", version=MySql.DEFAULT_VERSION, @@ -270,7 +273,7 @@ def workflow_switch_to_replica_and_kill_master(c: Composition) -> None: """ with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), MySql( name="mysql-replica-1", version=MySql.DEFAULT_VERSION, diff --git a/test/mysql-cdc-resumption/mzcompose.py b/test/mysql-cdc-resumption/mzcompose.py index b8db16bd37a6c..f5ddf2e6d788c 100644 --- a/test/mysql-cdc-resumption/mzcompose.py +++ b/test/mysql-cdc-resumption/mzcompose.py @@ -31,7 +31,7 @@ SERVICES = [ Alpine(), Mz(app_password=""), - Materialized(), + Materialized(default_replication_factor=2), MySql(), MySql( name="mysql-replica-1", @@ -122,7 +122,7 @@ def workflow_disruptions(c: Composition) -> None: def workflow_backup_restore(c: Composition) -> None: with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), ): scenario = backup_restore_mysql print(f"--- Running scenario {scenario.__name__}") @@ -133,7 +133,7 @@ def workflow_backup_restore(c: Composition) -> None: def workflow_bin_log_manipulations(c: Composition) -> None: with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), ): scenarios = [ reset_master_gtid, @@ -158,7 +158,10 @@ def workflow_short_bin_log_retention(c: Composition) -> None: args = MySql.DEFAULT_ADDITIONAL_ARGS.copy() args.append(f"--binlog_expire_logs_seconds={bin_log_expiration_in_sec}") - with c.override(Materialized(sanity_restart=False), MySql(additional_args=args)): + with c.override( + Materialized(sanity_restart=False, default_replication_factor=2), + MySql(additional_args=args), + ): scenarios = [logs_expiration_while_mz_down, create_source_after_logs_expiration] scenarios = buildkite.shard_list(scenarios, lambda s: s.__name__) @@ -183,7 +186,7 @@ def workflow_master_changes(c: Composition) -> None: """ with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), MySql( name="mysql-replica-1", version=MySql.DEFAULT_VERSION, @@ -270,7 +273,7 @@ def workflow_switch_to_replica_and_kill_master(c: Composition) -> None: """ with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), MySql( name="mysql-replica-1", version=MySql.DEFAULT_VERSION, diff --git a/test/mysql-cdc/mzcompose.py b/test/mysql-cdc/mzcompose.py index 4265e6d5d5dd2..22185d60fc7ed 100644 --- a/test/mysql-cdc/mzcompose.py +++ b/test/mysql-cdc/mzcompose.py @@ -52,6 +52,7 @@ def create_mysql_replica(mysql_version: str) -> MySql: additional_system_parameter_defaults={ "log_filter": "mz_storage::source::mysql=trace,info" }, + default_replication_factor=2, ), create_mysql(MySql.DEFAULT_VERSION), create_mysql_replica(MySql.DEFAULT_VERSION), @@ -378,6 +379,7 @@ def workflow_source_timeouts(c: Composition, parser: WorkflowArgumentParser) -> additional_system_parameter_defaults={ "log_filter": "mz_storage::source::mysql=trace,info" }, + default_replication_factor=2, ), Toxiproxy(), create_mysql(mysql_version), diff --git a/test/mysql-rtr-old-syntax/mzcompose.py b/test/mysql-rtr-old-syntax/mzcompose.py index 8db8140779077..2662c62d55fb4 100644 --- a/test/mysql-rtr-old-syntax/mzcompose.py +++ b/test/mysql-rtr-old-syntax/mzcompose.py @@ -22,7 +22,7 @@ SERVICES = [ MySql(), - Materialized(), + Materialized(default_replication_factor=2), Toxiproxy(), Testdrive( entrypoint_extra=[ diff --git a/test/mysql-rtr/mzcompose.py b/test/mysql-rtr/mzcompose.py index 8db8140779077..2662c62d55fb4 100644 --- a/test/mysql-rtr/mzcompose.py +++ b/test/mysql-rtr/mzcompose.py @@ -22,7 +22,7 @@ SERVICES = [ MySql(), - Materialized(), + Materialized(default_replication_factor=2), Toxiproxy(), Testdrive( entrypoint_extra=[ diff --git a/test/parallel-workload/mzcompose.py b/test/parallel-workload/mzcompose.py index 8db1b20a141c8..86e6aa533cc33 100644 --- a/test/parallel-workload/mzcompose.py +++ b/test/parallel-workload/mzcompose.py @@ -51,8 +51,8 @@ Minio(setup_materialize=True, additional_directories=["copytos3"]), Azurite(), Mc(), - Materialized(), - Materialized(name="materialized2"), + Materialized(default_replication_factor=2), + Materialized(name="materialized2", default_replication_factor=2), Service("sqlsmith", {"mzbuild": "sqlsmith"}), Service( name="persistcli", @@ -93,6 +93,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: ports=["6975:6875", "6976:6876", "6977:6877"], sanity_restart=sanity_restart, metadata_store="cockroach", + default_replication_factor=2, ), Toxiproxy(seed=random.randrange(2**63)), ): diff --git a/test/pg-cdc-old-syntax/mzcompose.py b/test/pg-cdc-old-syntax/mzcompose.py index 6595a547654d7..4c827b53efe2e 100644 --- a/test/pg-cdc-old-syntax/mzcompose.py +++ b/test/pg-cdc-old-syntax/mzcompose.py @@ -101,6 +101,7 @@ def create_postgres( "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error" }, external_blob_store=True, + default_replication_factor=2, ), Testdrive(), CockroachOrPostgresMetadata(), @@ -417,6 +418,7 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: additional_system_parameter_defaults={ "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error" }, + default_replication_factor=2, ) mz_new = Materialized( @@ -429,6 +431,7 @@ def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None: "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error", "force_source_table_syntax": "true", }, + default_replication_factor=2, ) with c.override(mz_old, create_postgres(pg_version=pg_version)): c.up("materialized", "test-certs", "postgres") diff --git a/test/pg-cdc-resumption-old-syntax/mzcompose.py b/test/pg-cdc-resumption-old-syntax/mzcompose.py index 0e01c024be121..f2d5c16e13b29 100644 --- a/test/pg-cdc-resumption-old-syntax/mzcompose.py +++ b/test/pg-cdc-resumption-old-syntax/mzcompose.py @@ -25,7 +25,7 @@ SERVICES = [ Alpine(), Mz(app_password=""), - Materialized(), + Materialized(default_replication_factor=2), Postgres(), Toxiproxy(), Testdrive(no_reset=True, default_timeout="300s"), @@ -99,7 +99,7 @@ def workflow_backup_restore(c: Composition) -> None: ) with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), Alpine(volumes=["pgdata:/var/lib/postgresql/data", "tmp:/scratch"]), Postgres(volumes=["pgdata:/var/lib/postgresql/data", "tmp:/scratch"]), ): diff --git a/test/pg-cdc-resumption/mzcompose.py b/test/pg-cdc-resumption/mzcompose.py index cb2ff714b7f10..9f8eff38a0c61 100644 --- a/test/pg-cdc-resumption/mzcompose.py +++ b/test/pg-cdc-resumption/mzcompose.py @@ -29,7 +29,7 @@ SERVICES = [ Alpine(), Mz(app_password=""), - Materialized(), + Materialized(default_replication_factor=2), Postgres(), Toxiproxy(), Testdrive(no_reset=True, default_timeout="300s"), @@ -106,7 +106,7 @@ def workflow_backup_restore(c: Composition) -> None: ) with c.override( - Materialized(sanity_restart=False), + Materialized(sanity_restart=False, default_replication_factor=2), Alpine(volumes=["pgdata:/var/lib/postgresql/data", "tmp:/scratch"]), Postgres(volumes=["pgdata:/var/lib/postgresql/data", "tmp:/scratch"]), ): diff --git a/test/pg-cdc/mzcompose.py b/test/pg-cdc/mzcompose.py index 9cd4b29c6724b..7d4e1f6690cb1 100644 --- a/test/pg-cdc/mzcompose.py +++ b/test/pg-cdc/mzcompose.py @@ -91,6 +91,7 @@ def create_postgres( additional_system_parameter_defaults={ "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error" }, + default_replication_factor=2, ), Testdrive(), TestCerts(), diff --git a/test/pg-rtr-old-syntax/mzcompose.py b/test/pg-rtr-old-syntax/mzcompose.py index 943d32b878f75..9fafe9fed4f2e 100644 --- a/test/pg-rtr-old-syntax/mzcompose.py +++ b/test/pg-rtr-old-syntax/mzcompose.py @@ -23,7 +23,7 @@ SERVICES = [ Postgres(), - Materialized(), + Materialized(default_replication_factor=2), Toxiproxy(), Testdrive(), ] diff --git a/test/pg-rtr/mzcompose.py b/test/pg-rtr/mzcompose.py index 943d32b878f75..9fafe9fed4f2e 100644 --- a/test/pg-rtr/mzcompose.py +++ b/test/pg-rtr/mzcompose.py @@ -23,7 +23,7 @@ SERVICES = [ Postgres(), - Materialized(), + Materialized(default_replication_factor=2), Toxiproxy(), Testdrive(), ] diff --git a/test/platform-checks/mzcompose.py b/test/platform-checks/mzcompose.py index 64319b4ab62a2..6b5f2acb8dfe4 100644 --- a/test/platform-checks/mzcompose.py +++ b/test/platform-checks/mzcompose.py @@ -52,6 +52,7 @@ def create_mzs( azurite: bool, + default_replication_factor: int, additional_system_parameter_defaults: dict[str, str] | None = None, ) -> list[TestdriveService | Materialized]: return [ @@ -64,6 +65,7 @@ def create_mzs( volumes_extra=["secrets:/share/secrets"], metadata_store="cockroach", additional_system_parameter_defaults=additional_system_parameter_defaults, + default_replication_factor=default_replication_factor, ) for mz_name in ["materialized", "mz_1", "mz_2", "mz_3", "mz_4", "mz_5"] ] + [ @@ -140,7 +142,7 @@ def create_mzs( Clusterd( name="clusterd_compute_1" ), # Started by some Scenarios, defined here only for the teardown - *create_mzs(azurite=False), + *create_mzs(azurite=False, default_replication_factor=1), Persistcli(), SshBastionHost(), ] @@ -227,6 +229,13 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: help="A list of features (e.g. azurite, sql_server), to enable.", ) + parser.add_argument( + "--default-replication-factor", + type=int, + default=2, + help="Default replication factor for clusters", + ) + args = parser.parse_args() features = Features(args.features) @@ -262,7 +271,11 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: additional_system_parameter_defaults[x[0]] = x[1] with c.override( - *create_mzs(features.azurite_enabled(), additional_system_parameter_defaults) + *create_mzs( + features.azurite_enabled(), + args.default_replication_factor, + additional_system_parameter_defaults, + ) ): executor = MzcomposeExecutor(composition=c) for scenario_class in scenarios: diff --git a/test/race-condition/mzcompose.py b/test/race-condition/mzcompose.py index ba52c1597380e..86e408e45bfae 100644 --- a/test/race-condition/mzcompose.py +++ b/test/race-condition/mzcompose.py @@ -45,7 +45,7 @@ Minio(setup_materialize=True, additional_directories=["copytos3"]), Testdrive(no_reset=True, consistent_seed=True, default_timeout="600s"), Mc(), - Materialized(), + Materialized(default_replication_factor=2), ] SERVICE_NAMES = [ @@ -375,10 +375,20 @@ def __init__(self, name: str, references: "Object | None", rng: random.Random): self.body_format = rng.choice(["TEXT", "JSON", "JSON ARRAY", "BYTES"]) def create(self) -> str: - return f"> CREATE SOURCE {self.name} IN CLUSTER quickstart FROM WEBHOOK BODY FORMAT {self.body_format}" + return dedent( + f""" + > DROP CLUSTER IF EXISTS {self.name}_cluster + > CREATE CLUSTER {self.name}_cluster SIZE '1', REPLICATION FACTOR 1 + > CREATE SOURCE {self.name} IN CLUSTER {self.name}_cluster FROM WEBHOOK BODY FORMAT {self.body_format} + """ + ) def destroy(self) -> str: - return f"> DROP SOURCE {self.name} CASCADE" + return dedent( + f""" + > DROP CLUSTER {self.name}_cluster CASCADE + """ + ) def manipulate(self, kind: int) -> str: manipulations = [ diff --git a/test/rqg/mzcompose.py b/test/rqg/mzcompose.py index 26a07f6a1b38c..7a80e9c171e74 100644 --- a/test/rqg/mzcompose.py +++ b/test/rqg/mzcompose.py @@ -32,8 +32,8 @@ SERVICES = [ RQG(), - Materialized(name="mz_this"), - Materialized(name="mz_other"), + Materialized(name="mz_this", default_replication_factor=2), + Materialized(name="mz_other", default_replication_factor=2), Postgres(), ] @@ -247,6 +247,7 @@ def materialize_image(tag: str | None) -> str | None: ports=["16875:6875", "16876:6876", "16877:6877", "16878:6878"], image=materialize_image(args.this_tag), use_default_volumes=False, + default_replication_factor=2, ), ] @@ -268,6 +269,7 @@ def materialize_image(tag: str | None) -> str | None: image=materialize_image(args.other_tag), ports=["26875:6875", "26876:6876", "26877:6877", "26878:6878"], use_default_volumes=False, + default_replication_factor=2, ) ) psql_urls.append("postgresql://materialize@mz_other:6875/materialize") diff --git a/test/sqlancer/mzcompose.py b/test/sqlancer/mzcompose.py index 6ed65536973e6..1e1bbad26babc 100644 --- a/test/sqlancer/mzcompose.py +++ b/test/sqlancer/mzcompose.py @@ -26,9 +26,7 @@ SERVICES = [ # Auto-restart so we can keep testing even after we ran into a panic - Materialized( - restart="on-failure", - ), + Materialized(restart="on-failure", default_replication_factor=2), Service( "sqlancer", { diff --git a/test/sqllogictest/managed_cluster.slt b/test/sqllogictest/managed_cluster.slt index 6aa26b556cfae..41289abd15bcb 100644 --- a/test/sqllogictest/managed_cluster.slt +++ b/test/sqllogictest/managed_cluster.slt @@ -55,7 +55,7 @@ s2 mz_catalog_server true 1 2 s3 mz_probe true 1 2 s4 mz_support true 0 2 s5 mz_analytics true 0 2 -u1 quickstart true 2 2 +u1 quickstart true 1 2 query T rowsort diff --git a/test/sqlsmith/mzcompose.py b/test/sqlsmith/mzcompose.py index 96ebe9ea9e988..b01f9cf7b3de6 100644 --- a/test/sqlsmith/mzcompose.py +++ b/test/sqlsmith/mzcompose.py @@ -41,6 +41,7 @@ restart="on-failure", memory=f"{TOTAL_MEMORY / len(MZ_SERVERS)}GB", use_default_volumes=False, + default_replication_factor=2, ) for mz_server in MZ_SERVERS ] + [ diff --git a/test/testdrive-old-kafka-src-syntax/mzcompose.py b/test/testdrive-old-kafka-src-syntax/mzcompose.py index 9960e7674977e..dd5a5f4a970c9 100644 --- a/test/testdrive-old-kafka-src-syntax/mzcompose.py +++ b/test/testdrive-old-kafka-src-syntax/mzcompose.py @@ -128,7 +128,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: if not args.system_param: sysparams = [] - additional_system_parameter_defaults = {} + additional_system_parameter_defaults = {"default_cluster_replication_factor": "1"} for val in sysparams: x = val[0].split("=", maxsplit=1) assert len(x) == 2, f"--system-param '{val}' should be the format =" @@ -142,6 +142,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: external_blob_store=True, blob_store_is_azure=args.azurite, additional_system_parameter_defaults=additional_system_parameter_defaults, + default_replication_factor=1, ) testdrive = Testdrive( diff --git a/test/testdrive/mzcompose.py b/test/testdrive/mzcompose.py index 1fed925142b39..b5d0a7ffd50e5 100644 --- a/test/testdrive/mzcompose.py +++ b/test/testdrive/mzcompose.py @@ -116,7 +116,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: else: dependencies += ["zookeeper", "kafka", "schema-registry"] - additional_system_parameter_defaults = {} + additional_system_parameter_defaults = {"default_cluster_replication_factor": "1"} for val in args.system_param or []: x = val[0].split("=", maxsplit=1) assert len(x) == 2, f"--system-param '{val}' should be the format =" @@ -127,6 +127,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: external_blob_store=True, blob_store_is_azure=args.azurite, additional_system_parameter_defaults=additional_system_parameter_defaults, + default_replication_factor=1, ) testdrive = Testdrive( diff --git a/test/testdrive/session.td b/test/testdrive/session.td index dda8d2a5b3e79..dbc3faffd316d 100644 --- a/test/testdrive/session.td +++ b/test/testdrive/session.td @@ -26,6 +26,7 @@ cluster_replica "" "Sets a target current_object_missing_warnings on "Whether to emit warnings when the current database, schema, or cluster is missing (Materialize)." database materialize "Sets the current database (CockroachDB)." DateStyle "ISO, MDY" "Sets the display format for date and time values (PostgreSQL)." +default_cluster_replication_factor 1 "Default cluster replication factor (Materialize)." emit_introspection_query_notice on "Whether to print a notice when querying per-replica introspection sources." emit_plan_insights_notice off "Boolean flag indicating whether to send a NOTICE with JSON-formatted plan insights before executing a SELECT statement (Materialize)." emit_timestamp_notice off "Boolean flag indicating whether to send a NOTICE with timestamp explanations of queries (Materialize)." diff --git a/test/upsert/mzcompose.py b/test/upsert/mzcompose.py index 85db96586856a..f8bee9387b7ea 100644 --- a/test/upsert/mzcompose.py +++ b/test/upsert/mzcompose.py @@ -46,6 +46,7 @@ "upsert_rocksdb_auto_spill_to_disk": "false", }, environment_extra=materialized_environment_extra, + default_replication_factor=2, ), Testdrive(), Clusterd( @@ -119,6 +120,7 @@ def workflow_testdrive(c: Composition, parser: WorkflowArgumentParser) -> None: "disk_cluster_replicas_default": "true", }, environment_extra=materialized_environment_extra, + default_replication_factor=2, ) with c.override(testdrive, materialized): @@ -193,6 +195,7 @@ def workflow_rehydration(c: Composition) -> None: "storage_shrink_upsert_unused_buffers_by_ratio": "4", }, environment_extra=materialized_environment_extra, + default_replication_factor=2, ), Clusterd( name="clusterd1", @@ -225,6 +228,7 @@ def workflow_rehydration(c: Composition) -> None: "storage_rocksdb_use_merge_operator": "true", }, environment_extra=materialized_environment_extra, + default_replication_factor=2, ), Clusterd( name="clusterd1", @@ -250,6 +254,7 @@ def workflow_rehydration(c: Composition) -> None: "storage_dataflow_delay_sources_past_rehydration": "true", }, environment_extra=materialized_environment_extra, + default_replication_factor=2, ), Clusterd( name="clusterd1", @@ -352,6 +357,7 @@ def workflow_incident_49(c: Composition) -> None: "storage_dataflow_delay_sources_past_rehydration": "true", }, environment_extra=materialized_environment_extra, + default_replication_factor=2, ), ), ( @@ -362,6 +368,7 @@ def workflow_incident_49(c: Composition) -> None: "storage_dataflow_delay_sources_past_rehydration": "true", }, environment_extra=materialized_environment_extra, + default_replication_factor=2, ), ), ]: @@ -500,6 +507,7 @@ def fetch_auto_spill_metric() -> int | None: "unsafe_enable_unorchestrated_cluster_replicas": "true", "storage_dataflow_delay_sources_past_rehydration": "true", }, + default_replication_factor=2, ), ), ( @@ -517,6 +525,7 @@ def fetch_auto_spill_metric() -> int | None: # Enable the RocksDB merge operator "storage_rocksdb_use_merge_operator": "true", }, + default_replication_factor=2, ), ), ]: @@ -570,6 +579,7 @@ def workflow_load_test(c: Composition, parser: WorkflowArgumentParser) -> None: "storage_dataflow_max_inflight_bytes_disk_only": "true", }, environment_extra=materialized_environment_extra, + default_replication_factor=2, ), Clusterd( name="clusterd1", @@ -671,6 +681,7 @@ def workflow_load_test(c: Composition, parser: WorkflowArgumentParser) -> None: ], additional_system_parameter_defaults=mz_configs, environment_extra=materialized_environment_extra, + default_replication_factor=2, ), ): c.kill("materialized", "clusterd1") diff --git a/test/zippy/mzcompose.py b/test/zippy/mzcompose.py index cb3475b77f26b..3b1291a72a436 100644 --- a/test/zippy/mzcompose.py +++ b/test/zippy/mzcompose.py @@ -57,6 +57,7 @@ def create_mzs( sanity_restart=False, metadata_store="cockroach", additional_system_parameter_defaults=additional_system_parameter_defaults, + default_replication_factor=2, ) for mz_name in ["materialized", "materialized2"] ] + [