Skip to content

tests: 2 replicas in more places #32203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ steps:
timeout_in_minutes: 120
parallelism: 2
agents:
queue: hetzner-aarch64-4cpu-8gb
queue: hetzner-aarch64-8cpu-16gb
plugins:
- ./ci/plugins/mzcompose:
composition: kafka-matrix
Expand Down Expand Up @@ -1854,13 +1854,13 @@ steps:

- id: 0dt
label: Zero downtime
depends_on: build-aarch64
depends_on: build-x86_64
timeout_in_minutes: 180
plugins:
- ./ci/plugins/mzcompose:
composition: 0dt
agents:
queue: hetzner-aarch64-16cpu-32gb
queue: hetzner-x86-64-dedi-16cpu-64gb

- id: emulator
label: Materialize Emulator
Expand Down
7 changes: 6 additions & 1 deletion ci/test/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ steps:
args:
[
--scenario=RestartEnvironmentdClusterdStorage,
--default-replication-factor=1, # faster
"--seed=$BUILDKITE_JOB_ID",
]

Expand Down Expand Up @@ -645,7 +646,11 @@ steps:
plugins:
- ./ci/plugins/mzcompose:
composition: platform-checks
args: [--scenario=NoRestartNoUpgrade, "--seed=$BUILDKITE_JOB_ID"]
args: [
--scenario=NoRestartNoUpgrade,
--default-replication-factor=1, # faster
"--seed=$BUILDKITE_JOB_ID"
]

- id: source-sink-errors
label: "Source/Sink Error Reporting"
Expand Down
4 changes: 4 additions & 0 deletions misc/dbt-materialize/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
options=test_case.materialized_options,
image=test_case.materialized_image,
volumes_extra=["secrets:/secrets"],
default_replication_factor=1,
additional_system_parameter_defaults={
"default_cluster_replication_factor": "1"
},
)
test_args = ["dbt-materialize/tests"]
if args.k:
Expand Down
2 changes: 1 addition & 1 deletion misc/python/materialize/checks/all_checks/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def manipulate(self) -> list[Testdrive]:
$ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
GRANT CREATECLUSTER ON SYSTEM TO materialize

> CREATE CLUSTER create_cluster2 (SIZE '2-2');
> CREATE CLUSTER create_cluster2 (SIZE '2-2', REPLICATION FACTOR 1);
""",
]
]
Expand Down
7 changes: 4 additions & 3 deletions misc/python/materialize/checks/all_checks/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,8 @@ def initialize(self) -> Testdrive:
return Testdrive(
dedent(
"""
> CREATE SOURCE webhook_alter1 FROM WEBHOOK BODY FORMAT TEXT;
> CREATE CLUSTER sink_webhook_cluster SIZE '1', REPLICATION FACTOR 1;
> CREATE SOURCE webhook_alter1 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;
> CREATE SINK sink_alter_wh FROM webhook_alter1
INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-wh')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
Expand All @@ -1284,7 +1285,7 @@ def manipulate(self) -> list[Testdrive]:
Testdrive(dedent(s))
for s in [
"""
> CREATE SOURCE webhook_alter2 FROM WEBHOOK BODY FORMAT TEXT;
> CREATE SOURCE webhook_alter2 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;

$ set-from-sql var=running_count
SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh';
Expand All @@ -1298,7 +1299,7 @@ def manipulate(self) -> list[Testdrive]:
2
""",
"""
> CREATE SOURCE webhook_alter3 FROM WEBHOOK BODY FORMAT TEXT;
> CREATE SOURCE webhook_alter3 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;

$ set-from-sql var=running_count
SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh';
Expand Down
8 changes: 7 additions & 1 deletion misc/python/materialize/checks/all_checks/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from textwrap import dedent

from materialize.checks.actions import Testdrive
from materialize.checks.checks import Check
from materialize.checks.checks import Check, disabled
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
from materialize.checks.executors import Executor
from materialize.mz_version import MzVersion
Expand Down Expand Up @@ -132,6 +132,9 @@ def validate(self) -> Testdrive:
)


@disabled(
"Reenable when database-issues#9184 is fixed and there is a way to set the cluster"
)
class WebhookTable(Check):
def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse_mz("v0.130.0-dev")
Expand All @@ -141,7 +144,10 @@ def initialize(self) -> Testdrive:
schemas()
+ dedent(
"""
> CREATE CLUSTER webhook_table_cluster REPLICAS (r1 (SIZE '1'));
> SET cluster = webhook_table_cluster
> CREATE TABLE webhook_table_text FROM WEBHOOK BODY FORMAT TEXT;
> SET cluster = quickstart

$ webhook-append database=materialize schema=public name=webhook_table_text
hello_world
Expand Down
1 change: 1 addition & 0 deletions misc/python/materialize/checks/mzcompose_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def execute(self, e: Executor) -> None:
force_migrations=self.force_migrations,
publish=self.publish,
metadata_store="cockroach",
default_replication_factor=2,
)

# Don't fail since we are careful to explicitly kill and collect logs
Expand Down
18 changes: 0 additions & 18 deletions misc/python/materialize/mzcompose/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,24 +1083,6 @@ def blob_store(self) -> str:
return name
raise RuntimeError(f"No external blob store found: {self.compose['services']}")

def setup_quickstart_cluster(self, replicas: int = 2) -> None:
replica_names = [f"r{replica_id}" for replica_id in range(0, 2)]
replica_string = ",".join(
f"{replica_name} (SIZE '4')" for replica_name in replica_names
)
self.sql(
f"""
DROP CLUSTER quickstart CASCADE;
CREATE CLUSTER quickstart REPLICAS ({replica_string});
GRANT ALL PRIVILEGES ON CLUSTER quickstart TO materialize;
DROP CLUSTER IF EXISTS singlereplica;
CREATE CLUSTER singlereplica SIZE '4', REPLICATION FACTOR 2;
GRANT ALL PRIVILEGES ON CLUSTER singlereplica TO materialize;
""",
user="mz_system",
port=6877,
)

def capture_logs(self, *services: str) -> None:
# Capture logs into services.log since they will be lost otherwise
# after dowing a composition.
Expand Down
11 changes: 7 additions & 4 deletions misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def __init__(
metadata_store: str = METADATA_STORE,
cluster_replica_size: dict[str, dict[str, Any]] | None = None,
bootstrap_replica_size: str | None = None,
default_cluster_replicas: int = 2,
default_replication_factor: int = 1,
) -> None:
if name is None:
name = "materialized"
Expand Down Expand Up @@ -141,9 +141,9 @@ def __init__(
f"MZ_BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
# Note(SangJunBak): mz_system and mz_probe have no replicas by default in materialized
# but we re-enable them here since many of our tests rely on them.
f"MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR={default_cluster_replicas}",
f"MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR={default_cluster_replicas}",
f"MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR={default_cluster_replicas}",
f"MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR={default_replication_factor}",
f"MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR={default_replication_factor}",
f"MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR={default_replication_factor}",
*environment_extra,
*DEFAULT_CRDB_ENVIRONMENT,
]
Expand All @@ -161,6 +161,9 @@ def __init__(
system_parameter_version or image_version
)

system_parameter_defaults["default_cluster_replication_factor"] = str(
default_replication_factor
)
if additional_system_parameter_defaults is not None:
system_parameter_defaults.update(additional_system_parameter_defaults)

Expand Down
56 changes: 8 additions & 48 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from materialize.parallel_workload.database import (
DATA_TYPES,
DB,
MAX_CLUSTER_REPLICAS,
MAX_CLUSTERS,
MAX_COLUMNS,
MAX_DBS,
Expand Down Expand Up @@ -1349,22 +1348,13 @@ def run(self, exe: Executor) -> bool:


class CreateClusterReplicaAction(Action):
def errors_to_ignore(self, exe: Executor) -> list[str]:
result = [
"cannot create more than one replica of a cluster containing sources or sinks",
] + super().errors_to_ignore(exe)

return result

def run(self, exe: Executor) -> bool:
with exe.db.lock:
# Keep cluster 0 with 1 replica for sources/sinks
unmanaged_clusters = [c for c in exe.db.clusters[1:] if not c.managed]
if not unmanaged_clusters:
return False
cluster = self.rng.choice(unmanaged_clusters)
if len(cluster.replicas) >= MAX_CLUSTER_REPLICAS:
return False
cluster.replica_id += 1
with cluster.lock:
if cluster not in exe.db.clusters or not cluster.managed:
Expand Down Expand Up @@ -1652,6 +1642,7 @@ def run(self, exe: Executor) -> bool:
sanity_restart=self.sanity_restart,
additional_system_parameter_defaults=self.system_parameters,
metadata_store="cockroach",
default_replication_factor=2,
)
):
self.composition.up("materialized", detach=True)
Expand Down Expand Up @@ -1702,6 +1693,7 @@ def run(self, exe: Executor) -> bool:
restart="on-failure",
healthcheck=LEADER_STATUS_HEALTHCHECK,
metadata_store="cockroach",
default_replication_factor=2,
),
):
self.composition.up(mz_service, detach=True)
Expand Down Expand Up @@ -1829,14 +1821,6 @@ def run(self, exe: Executor) -> bool:


class CreateKafkaSourceAction(Action):
def errors_to_ignore(self, exe: Executor) -> list[str]:
result = super().errors_to_ignore(exe)
if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy):
result.extend(
["cannot create source in cluster with more than one replica"]
)
return result

def run(self, exe: Executor) -> bool:
with exe.db.lock:
if len(exe.db.kafka_sources) >= MAX_KAFKA_SOURCES:
Expand All @@ -1848,7 +1832,7 @@ def run(self, exe: Executor) -> bool:
with schema.lock, cluster.lock:
if schema not in exe.db.schemas:
return False
if cluster not in exe.db.clusters or len(cluster.replicas) != 1:
if cluster not in exe.db.clusters:
return False

try:
Expand Down Expand Up @@ -1894,14 +1878,6 @@ def run(self, exe: Executor) -> bool:


class CreateMySqlSourceAction(Action):
def errors_to_ignore(self, exe: Executor) -> list[str]:
result = super().errors_to_ignore(exe)
if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy):
result.extend(
["cannot create source in cluster with more than one replica"]
)
return result

def run(self, exe: Executor) -> bool:
# See database-issues#6881, not expected to work
if exe.db.scenario == Scenario.BackupRestore:
Expand All @@ -1917,7 +1893,7 @@ def run(self, exe: Executor) -> bool:
with schema.lock, cluster.lock:
if schema not in exe.db.schemas:
return False
if cluster not in exe.db.clusters or len(cluster.replicas) != 1:
if cluster not in exe.db.clusters:
return False

try:
Expand Down Expand Up @@ -1963,14 +1939,6 @@ def run(self, exe: Executor) -> bool:


class CreatePostgresSourceAction(Action):
def errors_to_ignore(self, exe: Executor) -> list[str]:
result = super().errors_to_ignore(exe)
if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy):
result.extend(
["cannot create source in cluster with more than one replica"]
)
return result

def run(self, exe: Executor) -> bool:
# See database-issues#6881, not expected to work
if exe.db.scenario == Scenario.BackupRestore:
Expand All @@ -1982,14 +1950,11 @@ def run(self, exe: Executor) -> bool:
source_id = exe.db.postgres_source_id
exe.db.postgres_source_id += 1
schema = self.rng.choice(exe.db.schemas)
potential_clusters = [c for c in exe.db.clusters if len(c.replicas) == 1]
if not potential_clusters:
return False
cluster = self.rng.choice(potential_clusters)
cluster = self.rng.choice(exe.db.clusters)
with schema.lock, cluster.lock:
if schema not in exe.db.schemas:
return False
if cluster not in exe.db.clusters or len(cluster.replicas) != 1:
if cluster not in exe.db.clusters:
return False

try:
Expand Down Expand Up @@ -2037,8 +2002,6 @@ def run(self, exe: Executor) -> bool:
class CreateKafkaSinkAction(Action):
def errors_to_ignore(self, exe: Executor) -> list[str]:
return [
# Another replica can be created in parallel
"cannot create sink in cluster with more than one replica",
"BYTES format with non-encodable type",
] + super().errors_to_ignore(exe)

Expand All @@ -2048,15 +2011,12 @@ def run(self, exe: Executor) -> bool:
return False
sink_id = exe.db.kafka_sink_id
exe.db.kafka_sink_id += 1
potential_clusters = [c for c in exe.db.clusters if len(c.replicas) == 1]
if not potential_clusters:
return False
cluster = self.rng.choice(potential_clusters)
cluster = self.rng.choice(exe.db.clusters)
schema = self.rng.choice(exe.db.schemas)
with schema.lock, cluster.lock:
if schema not in exe.db.schemas:
return False
if cluster not in exe.db.clusters or len(cluster.replicas) != 1:
if cluster not in exe.db.clusters:
return False

sink = KafkaSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def run(self, c: Composition, state: State) -> None:
sanity_restart=False,
restart="on-failure",
metadata_store="cockroach",
default_replication_factor=2,
)
):
c.restore(state.mz_service)
Expand Down
3 changes: 3 additions & 0 deletions misc/python/materialize/zippy/mz_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def run(self, c: Composition, state: State) -> None:
restart="on-failure",
additional_system_parameter_defaults=self.additional_system_parameter_defaults,
metadata_store="cockroach",
default_replication_factor=2,
)
):
c.up(state.mz_service)
Expand Down Expand Up @@ -167,6 +168,7 @@ def run(self, c: Composition, state: State) -> None:
sanity_restart=False,
restart="on-failure",
metadata_store="cockroach",
default_replication_factor=2,
)
):
c.kill(state.mz_service)
Expand Down Expand Up @@ -201,6 +203,7 @@ def run(self, c: Composition, state: State) -> None:
restart="on-failure",
healthcheck=LEADER_STATUS_HEALTHCHECK,
metadata_store="cockroach",
default_replication_factor=2,
),
):
c.up(state.mz_service, detach=True)
Expand Down
7 changes: 6 additions & 1 deletion src/adapter/src/coord/sequencer/inner/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ impl Coordinator {
}
match &options.replication_factor {
Set(rf) => *replication_factor = *rf,
Reset => *replication_factor = 1,
Reset => {
*replication_factor = self
.catalog
.system_config()
.default_cluster_replication_factor()
}
Unchanged => {}
}
match &options.schedule {
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ pub async fn persist_backed_catalog_state(
pub fn test_bootstrap_args() -> BootstrapArgs {
BootstrapArgs {
default_cluster_replica_size: "1".into(),
default_cluster_replication_factor: 2,
default_cluster_replication_factor: 1,
bootstrap_role: None,
cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
}
Expand Down
Loading