Skip to content

Commit 7716773

Browse files
committed
tests: 2 replicas in more places
1 parent 4e144df commit 7716773

File tree

53 files changed

+221
-188
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+221
-188
lines changed

ci/nightly/pipeline.template.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ steps:
194194
timeout_in_minutes: 120
195195
parallelism: 2
196196
agents:
197-
queue: hetzner-aarch64-4cpu-8gb
197+
queue: hetzner-aarch64-8cpu-16gb
198198
plugins:
199199
- ./ci/plugins/mzcompose:
200200
composition: kafka-matrix
@@ -1854,13 +1854,13 @@ steps:
18541854

18551855
- id: 0dt
18561856
label: Zero downtime
1857-
depends_on: build-aarch64
1857+
depends_on: build-x86_64
18581858
timeout_in_minutes: 180
18591859
plugins:
18601860
- ./ci/plugins/mzcompose:
18611861
composition: 0dt
18621862
agents:
1863-
queue: hetzner-aarch64-16cpu-32gb
1863+
queue: hetzner-x86-64-dedi-16cpu-64gb
18641864

18651865
- id: emulator
18661866
label: Materialize Emulator

ci/test/pipeline.template.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ steps:
613613
args:
614614
[
615615
--scenario=RestartEnvironmentdClusterdStorage,
616+
--default-replication-factor=1, # faster
616617
"--seed=$BUILDKITE_JOB_ID",
617618
]
618619

@@ -645,7 +646,11 @@ steps:
645646
plugins:
646647
- ./ci/plugins/mzcompose:
647648
composition: platform-checks
648-
args: [--scenario=NoRestartNoUpgrade, "--seed=$BUILDKITE_JOB_ID"]
649+
args: [
650+
--scenario=NoRestartNoUpgrade,
651+
--default-replication-factor=1, # faster
652+
"--seed=$BUILDKITE_JOB_ID"
653+
]
649654

650655
- id: source-sink-errors
651656
label: "Source/Sink Error Reporting"

misc/dbt-materialize/mzcompose.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
6363
options=test_case.materialized_options,
6464
image=test_case.materialized_image,
6565
volumes_extra=["secrets:/secrets"],
66+
default_replication_factor=1,
67+
additional_system_parameter_defaults={
68+
"default_cluster_replication_factor": "1"
69+
},
6670
)
6771
test_args = ["dbt-materialize/tests"]
6872
if args.k:

misc/python/materialize/checks/all_checks/cluster.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def manipulate(self) -> list[Testdrive]:
2828
$ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}
2929
GRANT CREATECLUSTER ON SYSTEM TO materialize
3030
31-
> CREATE CLUSTER create_cluster2 (SIZE '2-2');
31+
> CREATE CLUSTER create_cluster2 (SIZE '2-2', REPLICATION FACTOR 1);
3232
""",
3333
]
3434
]

misc/python/materialize/checks/all_checks/sink.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,7 +1268,8 @@ def initialize(self) -> Testdrive:
12681268
return Testdrive(
12691269
dedent(
12701270
"""
1271-
> CREATE SOURCE webhook_alter1 FROM WEBHOOK BODY FORMAT TEXT;
1271+
> CREATE CLUSTER sink_webhook_cluster SIZE '1', REPLICATION FACTOR 1;
1272+
> CREATE SOURCE webhook_alter1 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;
12721273
> CREATE SINK sink_alter_wh FROM webhook_alter1
12731274
INTO KAFKA CONNECTION kafka_conn (TOPIC 'sink-alter-wh')
12741275
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
@@ -1284,7 +1285,7 @@ def manipulate(self) -> list[Testdrive]:
12841285
Testdrive(dedent(s))
12851286
for s in [
12861287
"""
1287-
> CREATE SOURCE webhook_alter2 FROM WEBHOOK BODY FORMAT TEXT;
1288+
> CREATE SOURCE webhook_alter2 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;
12881289
12891290
$ set-from-sql var=running_count
12901291
SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh';
@@ -1298,7 +1299,7 @@ def manipulate(self) -> list[Testdrive]:
12981299
2
12991300
""",
13001301
"""
1301-
> CREATE SOURCE webhook_alter3 FROM WEBHOOK BODY FORMAT TEXT;
1302+
> CREATE SOURCE webhook_alter3 IN CLUSTER sink_webhook_cluster FROM WEBHOOK BODY FORMAT TEXT;
13021303
13031304
$ set-from-sql var=running_count
13041305
SELECT COUNT(*)::text FROM mz_internal.mz_sink_status_history JOIN mz_sinks ON mz_internal.mz_sink_status_history.sink_id = mz_sinks.id WHERE name = 'sink_alter_wh';

misc/python/materialize/checks/all_checks/webhook.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from textwrap import dedent
1010

1111
from materialize.checks.actions import Testdrive
12-
from materialize.checks.checks import Check
12+
from materialize.checks.checks import Check, disabled
1313
from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
1414
from materialize.checks.executors import Executor
1515
from materialize.mz_version import MzVersion
@@ -132,6 +132,9 @@ def validate(self) -> Testdrive:
132132
)
133133

134134

135+
@disabled(
136+
"Reenable when database-issues#9184 is fixed and there is a way to set the cluster"
137+
)
135138
class WebhookTable(Check):
136139
def _can_run(self, e: Executor) -> bool:
137140
return self.base_version >= MzVersion.parse_mz("v0.130.0-dev")
@@ -141,7 +144,10 @@ def initialize(self) -> Testdrive:
141144
schemas()
142145
+ dedent(
143146
"""
147+
> CREATE CLUSTER webhook_table_cluster REPLICAS (r1 (SIZE '1'));
148+
> SET cluster = webhook_table_cluster
144149
> CREATE TABLE webhook_table_text FROM WEBHOOK BODY FORMAT TEXT;
150+
> SET cluster = quickstart
145151
146152
$ webhook-append database=materialize schema=public name=webhook_table_text
147153
hello_world

misc/python/materialize/checks/mzcompose_actions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ def execute(self, e: Executor) -> None:
8787
force_migrations=self.force_migrations,
8888
publish=self.publish,
8989
metadata_store="cockroach",
90+
default_replication_factor=2,
9091
)
9192

9293
# Don't fail since we are careful to explicitly kill and collect logs

misc/python/materialize/mzcompose/composition.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,24 +1083,6 @@ def blob_store(self) -> str:
10831083
return name
10841084
raise RuntimeError(f"No external blob store found: {self.compose['services']}")
10851085

1086-
def setup_quickstart_cluster(self, replicas: int = 2) -> None:
1087-
replica_names = [f"r{replica_id}" for replica_id in range(0, 2)]
1088-
replica_string = ",".join(
1089-
f"{replica_name} (SIZE '4')" for replica_name in replica_names
1090-
)
1091-
self.sql(
1092-
f"""
1093-
DROP CLUSTER quickstart CASCADE;
1094-
CREATE CLUSTER quickstart REPLICAS ({replica_string});
1095-
GRANT ALL PRIVILEGES ON CLUSTER quickstart TO materialize;
1096-
DROP CLUSTER IF EXISTS singlereplica;
1097-
CREATE CLUSTER singlereplica SIZE '4', REPLICATION FACTOR 2;
1098-
GRANT ALL PRIVILEGES ON CLUSTER singlereplica TO materialize;
1099-
""",
1100-
user="mz_system",
1101-
port=6877,
1102-
)
1103-
11041086
def capture_logs(self, *services: str) -> None:
11051087
# Capture logs into services.log since they will be lost otherwise
11061088
# after dowing a composition.

misc/python/materialize/mzcompose/services/materialized.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def __init__(
9494
metadata_store: str = METADATA_STORE,
9595
cluster_replica_size: dict[str, dict[str, Any]] | None = None,
9696
bootstrap_replica_size: str | None = None,
97-
default_cluster_replicas: int = 2,
97+
default_replication_factor: int = 1,
9898
) -> None:
9999
if name is None:
100100
name = "materialized"
@@ -141,9 +141,9 @@ def __init__(
141141
f"MZ_BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
142142
# Note(SangJunBak): mz_system and mz_probe have no replicas by default in materialized
143143
# but we re-enable them here since many of our tests rely on them.
144-
f"MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR={default_cluster_replicas}",
145-
f"MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR={default_cluster_replicas}",
146-
f"MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR={default_cluster_replicas}",
144+
f"MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICATION_FACTOR={default_replication_factor}",
145+
f"MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICATION_FACTOR={default_replication_factor}",
146+
f"MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICATION_FACTOR={default_replication_factor}",
147147
*environment_extra,
148148
*DEFAULT_CRDB_ENVIRONMENT,
149149
]
@@ -161,6 +161,9 @@ def __init__(
161161
system_parameter_version or image_version
162162
)
163163

164+
system_parameter_defaults["default_cluster_replication_factor"] = str(
165+
default_replication_factor
166+
)
164167
if additional_system_parameter_defaults is not None:
165168
system_parameter_defaults.update(additional_system_parameter_defaults)
166169

misc/python/materialize/parallel_workload/action.py

Lines changed: 8 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
from materialize.parallel_workload.database import (
3939
DATA_TYPES,
4040
DB,
41-
MAX_CLUSTER_REPLICAS,
4241
MAX_CLUSTERS,
4342
MAX_COLUMNS,
4443
MAX_DBS,
@@ -1349,22 +1348,13 @@ def run(self, exe: Executor) -> bool:
13491348

13501349

13511350
class CreateClusterReplicaAction(Action):
1352-
def errors_to_ignore(self, exe: Executor) -> list[str]:
1353-
result = [
1354-
"cannot create more than one replica of a cluster containing sources or sinks",
1355-
] + super().errors_to_ignore(exe)
1356-
1357-
return result
1358-
13591351
def run(self, exe: Executor) -> bool:
13601352
with exe.db.lock:
13611353
# Keep cluster 0 with 1 replica for sources/sinks
13621354
unmanaged_clusters = [c for c in exe.db.clusters[1:] if not c.managed]
13631355
if not unmanaged_clusters:
13641356
return False
13651357
cluster = self.rng.choice(unmanaged_clusters)
1366-
if len(cluster.replicas) >= MAX_CLUSTER_REPLICAS:
1367-
return False
13681358
cluster.replica_id += 1
13691359
with cluster.lock:
13701360
if cluster not in exe.db.clusters or not cluster.managed:
@@ -1652,6 +1642,7 @@ def run(self, exe: Executor) -> bool:
16521642
sanity_restart=self.sanity_restart,
16531643
additional_system_parameter_defaults=self.system_parameters,
16541644
metadata_store="cockroach",
1645+
default_replication_factor=2,
16551646
)
16561647
):
16571648
self.composition.up("materialized", detach=True)
@@ -1702,6 +1693,7 @@ def run(self, exe: Executor) -> bool:
17021693
restart="on-failure",
17031694
healthcheck=LEADER_STATUS_HEALTHCHECK,
17041695
metadata_store="cockroach",
1696+
default_replication_factor=2,
17051697
),
17061698
):
17071699
self.composition.up(mz_service, detach=True)
@@ -1829,14 +1821,6 @@ def run(self, exe: Executor) -> bool:
18291821

18301822

18311823
class CreateKafkaSourceAction(Action):
1832-
def errors_to_ignore(self, exe: Executor) -> list[str]:
1833-
result = super().errors_to_ignore(exe)
1834-
if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy):
1835-
result.extend(
1836-
["cannot create source in cluster with more than one replica"]
1837-
)
1838-
return result
1839-
18401824
def run(self, exe: Executor) -> bool:
18411825
with exe.db.lock:
18421826
if len(exe.db.kafka_sources) >= MAX_KAFKA_SOURCES:
@@ -1848,7 +1832,7 @@ def run(self, exe: Executor) -> bool:
18481832
with schema.lock, cluster.lock:
18491833
if schema not in exe.db.schemas:
18501834
return False
1851-
if cluster not in exe.db.clusters or len(cluster.replicas) != 1:
1835+
if cluster not in exe.db.clusters:
18521836
return False
18531837

18541838
try:
@@ -1894,14 +1878,6 @@ def run(self, exe: Executor) -> bool:
18941878

18951879

18961880
class CreateMySqlSourceAction(Action):
1897-
def errors_to_ignore(self, exe: Executor) -> list[str]:
1898-
result = super().errors_to_ignore(exe)
1899-
if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy):
1900-
result.extend(
1901-
["cannot create source in cluster with more than one replica"]
1902-
)
1903-
return result
1904-
19051881
def run(self, exe: Executor) -> bool:
19061882
# See database-issues#6881, not expected to work
19071883
if exe.db.scenario == Scenario.BackupRestore:
@@ -1917,7 +1893,7 @@ def run(self, exe: Executor) -> bool:
19171893
with schema.lock, cluster.lock:
19181894
if schema not in exe.db.schemas:
19191895
return False
1920-
if cluster not in exe.db.clusters or len(cluster.replicas) != 1:
1896+
if cluster not in exe.db.clusters:
19211897
return False
19221898

19231899
try:
@@ -1963,14 +1939,6 @@ def run(self, exe: Executor) -> bool:
19631939

19641940

19651941
class CreatePostgresSourceAction(Action):
1966-
def errors_to_ignore(self, exe: Executor) -> list[str]:
1967-
result = super().errors_to_ignore(exe)
1968-
if exe.db.scenario in (Scenario.Kill, Scenario.ZeroDowntimeDeploy):
1969-
result.extend(
1970-
["cannot create source in cluster with more than one replica"]
1971-
)
1972-
return result
1973-
19741942
def run(self, exe: Executor) -> bool:
19751943
# See database-issues#6881, not expected to work
19761944
if exe.db.scenario == Scenario.BackupRestore:
@@ -1982,14 +1950,11 @@ def run(self, exe: Executor) -> bool:
19821950
source_id = exe.db.postgres_source_id
19831951
exe.db.postgres_source_id += 1
19841952
schema = self.rng.choice(exe.db.schemas)
1985-
potential_clusters = [c for c in exe.db.clusters if len(c.replicas) == 1]
1986-
if not potential_clusters:
1987-
return False
1988-
cluster = self.rng.choice(potential_clusters)
1953+
cluster = self.rng.choice(exe.db.clusters)
19891954
with schema.lock, cluster.lock:
19901955
if schema not in exe.db.schemas:
19911956
return False
1992-
if cluster not in exe.db.clusters or len(cluster.replicas) != 1:
1957+
if cluster not in exe.db.clusters:
19931958
return False
19941959

19951960
try:
@@ -2037,8 +2002,6 @@ def run(self, exe: Executor) -> bool:
20372002
class CreateKafkaSinkAction(Action):
20382003
def errors_to_ignore(self, exe: Executor) -> list[str]:
20392004
return [
2040-
# Another replica can be created in parallel
2041-
"cannot create sink in cluster with more than one replica",
20422005
"BYTES format with non-encodable type",
20432006
] + super().errors_to_ignore(exe)
20442007

@@ -2048,15 +2011,12 @@ def run(self, exe: Executor) -> bool:
20482011
return False
20492012
sink_id = exe.db.kafka_sink_id
20502013
exe.db.kafka_sink_id += 1
2051-
potential_clusters = [c for c in exe.db.clusters if len(c.replicas) == 1]
2052-
if not potential_clusters:
2053-
return False
2054-
cluster = self.rng.choice(potential_clusters)
2014+
cluster = self.rng.choice(exe.db.clusters)
20552015
schema = self.rng.choice(exe.db.schemas)
20562016
with schema.lock, cluster.lock:
20572017
if schema not in exe.db.schemas:
20582018
return False
2059-
if cluster not in exe.db.clusters or len(cluster.replicas) != 1:
2019+
if cluster not in exe.db.clusters:
20602020
return False
20612021

20622022
sink = KafkaSink(

misc/python/materialize/zippy/backup_and_restore_actions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ def run(self, c: Composition, state: State) -> None:
4040
sanity_restart=False,
4141
restart="on-failure",
4242
metadata_store="cockroach",
43+
default_replication_factor=2,
4344
)
4445
):
4546
c.restore(state.mz_service)

misc/python/materialize/zippy/mz_actions.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ def run(self, c: Composition, state: State) -> None:
9090
restart="on-failure",
9191
additional_system_parameter_defaults=self.additional_system_parameter_defaults,
9292
metadata_store="cockroach",
93+
default_replication_factor=2,
9394
)
9495
):
9596
c.up(state.mz_service)
@@ -167,6 +168,7 @@ def run(self, c: Composition, state: State) -> None:
167168
sanity_restart=False,
168169
restart="on-failure",
169170
metadata_store="cockroach",
171+
default_replication_factor=2,
170172
)
171173
):
172174
c.kill(state.mz_service)
@@ -201,6 +203,7 @@ def run(self, c: Composition, state: State) -> None:
201203
restart="on-failure",
202204
healthcheck=LEADER_STATUS_HEALTHCHECK,
203205
metadata_store="cockroach",
206+
default_replication_factor=2,
204207
),
205208
):
206209
c.up(state.mz_service, detach=True)

src/adapter/src/coord/sequencer/inner/cluster.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,12 @@ impl Coordinator {
224224
}
225225
match &options.replication_factor {
226226
Set(rf) => *replication_factor = *rf,
227-
Reset => *replication_factor = 1,
227+
Reset => {
228+
*replication_factor = self
229+
.catalog
230+
.system_config()
231+
.default_cluster_replication_factor()
232+
}
228233
Unchanged => {}
229234
}
230235
match &options.schedule {

src/catalog/src/durable.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ pub async fn persist_backed_catalog_state(
530530
pub fn test_bootstrap_args() -> BootstrapArgs {
531531
BootstrapArgs {
532532
default_cluster_replica_size: "1".into(),
533-
default_cluster_replication_factor: 2,
533+
default_cluster_replication_factor: 1,
534534
bootstrap_role: None,
535535
cluster_replica_size_map: ClusterReplicaSizeMap::for_tests(),
536536
}

0 commit comments

Comments
 (0)