Skip to content

Pr testt #30540

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed

Pr testt #30540

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion misc/python/materialize/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import argparse
import atexit
import getpass
import json
import os
import pathlib
import shlex
Expand All @@ -30,7 +31,10 @@
import psutil

from materialize import MZ_ROOT, rustc_flags, spawn, ui
from materialize.mzcompose import get_default_system_parameters
from materialize.mzcompose import (
cluster_replica_size_map,
get_default_system_parameters,
)
from materialize.ui import UIError
from materialize.xcompile import Arch

Expand Down Expand Up @@ -270,6 +274,7 @@ def main() -> int:
f"--timestamp-oracle-url={args.postgres}?options=--search_path=tsoracle",
f"--environment-id={environment_id}",
"--bootstrap-role=materialize",
f"--cluster-replica-sizes={json.dumps(cluster_replica_size_map())}",
*args.args,
]
if args.monitoring:
Expand Down
35 changes: 34 additions & 1 deletion misc/python/materialize/cloudtest/k8s/environmentd.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import json
import operator
import urllib.parse
from collections.abc import Callable
Expand Down Expand Up @@ -37,7 +38,11 @@
from materialize.cloudtest.k8s.api.k8s_service import K8sService
from materialize.cloudtest.k8s.api.k8s_stateful_set import K8sStatefulSet
from materialize.mz_version import MzVersion
from materialize.mzcompose import get_default_system_parameters
from materialize.mzcompose import (
bootstrap_cluster_replica_size,
cluster_replica_size_map,
get_default_system_parameters,
)


class EnvironmentdService(K8sService):
Expand Down Expand Up @@ -309,6 +314,34 @@ def env_vars(self) -> list[V1EnvVar]:
name="MZ_ADAPTER_STASH_URL",
value=f"postgres://root@cockroach.{self.cockroach_namespace}:26257?options=--search_path=adapter",
),
V1EnvVar(
name="MZ_CLUSTER_REPLICA_SIZES",
value=f"{json.dumps(cluster_replica_size_map())}",
),
V1EnvVar(
name="MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE",
value=bootstrap_cluster_replica_size(),
),
V1EnvVar(
name="MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE",
value=bootstrap_cluster_replica_size(),
),
V1EnvVar(
name="MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE",
value=bootstrap_cluster_replica_size(),
),
V1EnvVar(
name="MZ_BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE",
value=bootstrap_cluster_replica_size(),
),
V1EnvVar(
name="MZ_BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE",
value=bootstrap_cluster_replica_size(),
),
V1EnvVar(
name="MZ_BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE",
value=bootstrap_cluster_replica_size(),
),
]

if self._meets_minimum_version("0.118.0-dev"):
Expand Down
7 changes: 6 additions & 1 deletion misc/python/materialize/cloudtest/k8s/testdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import json
import os
import subprocess
import sys
Expand All @@ -16,7 +17,10 @@

from materialize.cloudtest import DEFAULT_K8S_NAMESPACE
from materialize.cloudtest.k8s.api.k8s_pod import K8sPod
from materialize.mzcompose import get_default_system_parameters
from materialize.mzcompose import (
cluster_replica_size_map,
get_default_system_parameters,
)
from materialize.mzcompose.test_result import (
extract_error_chunks_from_output,
)
Expand Down Expand Up @@ -77,6 +81,7 @@ def run(
"--var=single-replica-cluster=quickstart",
"--var=default-storage-size=1",
"--var=default-replica-size=1",
f"--cluster-replica-sizes={json.dumps(cluster_replica_size_map())}",
*([f"--aws-region={self.aws_region}"] if self.aws_region else []),
*(
[
Expand Down
50 changes: 50 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,53 @@ def _wait_for_pg(
error = e
ui.progress(finish=True)
raise UIError(f"never got correct result for {args}: {error}")


def bootstrap_cluster_replica_size() -> str:
return "bootstrap"


def cluster_replica_size_map() -> dict[str, dict[str, Any]]:
def replica_size(
workers: int,
scale: int,
disabled: bool = False,
is_cc: bool = False,
memory_limit: str | None = None,
) -> dict[str, Any]:
return {
"cpu_exclusive": False,
"cpu_limit": None,
"credits_per_hour": f"{workers * scale}",
"disabled": disabled,
"disk_limit": None,
"is_cc": is_cc,
"memory_limit": memory_limit,
"scale": scale,
"workers": workers,
# "selectors": {},
}

replica_sizes = {
bootstrap_cluster_replica_size(): replica_size(1, 1),
"2-4": replica_size(4, 2),
"free": replica_size(0, 0, disabled=True),
"1cc": replica_size(1, 1, is_cc=True),
"1C": replica_size(1, 1, is_cc=True),
}

for i in range(0, 6):
workers = 1 << i
replica_sizes[f"{workers}"] = replica_size(workers, 1)
for mem in [4, 8, 16, 32]:
replica_sizes[f"{workers}-{mem}G"] = replica_size(
workers, 1, memory_limit=f"{mem} GiB"
)

replica_sizes[f"{workers}-1"] = replica_size(1, workers)
replica_sizes[f"{workers}-{workers}"] = replica_size(workers, workers)
replica_sizes[f"mem-{workers}"] = replica_size(
workers, 1, memory_limit=f"{workers} GiB"
)

return replica_sizes
10 changes: 8 additions & 2 deletions misc/python/materialize/mzcompose/composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from psycopg import Connection, Cursor

from materialize import MZ_ROOT, mzbuild, spawn, ui
from materialize.mzcompose import loader
from materialize.mzcompose import cluster_replica_size_map, loader
from materialize.mzcompose.service import Service
from materialize.mzcompose.services.materialized import (
LEADER_STATUS_HEALTHCHECK,
Expand Down Expand Up @@ -297,6 +297,7 @@ def invoke(
check: bool = True,
max_tries: int = 1,
silent: bool = False,
environment: dict[str, str] | None = None,
) -> subprocess.CompletedProcess:
"""Invoke `docker compose` on the rendered composition.

Expand Down Expand Up @@ -363,6 +364,7 @@ def invoke(
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=environment,
)
if stdin is not None:
p.stdin.write(stdin) # type: ignore
Expand Down Expand Up @@ -415,6 +417,7 @@ def invoke(
input=stdin,
text=True,
bufsize=1,
env=environment,
)
except subprocess.CalledProcessError as e:
if e.stdout and not capture_and_print:
Expand Down Expand Up @@ -775,7 +778,7 @@ def run(
return self.invoke(
"run",
*(["--entrypoint", entrypoint] if entrypoint else []),
*(f"-e{k}={v}" for k, v in env_extra.items()),
*(f"-e{k}" for k in env_extra.keys()),
*(["--detach"] if detach else []),
*(["--rm"] if rm else []),
service,
Expand All @@ -785,6 +788,7 @@ def run(
capture_and_print=capture_and_print,
stdin=stdin,
check=check,
environment=env_extra,
)

def run_testdrive_files(
Expand All @@ -801,12 +805,14 @@ def run_testdrive_files(
f"--materialize-internal-url=postgres://mz_system@{mz_service}:6877",
]
)
environment = {"CLUST_REPLICA_SIZES": json.dumps(cluster_replica_size_map())}
return self.run(
"testdrive",
*args,
rm=rm,
# needed for sufficient error information in the junit.xml while still printing to stdout during execution
capture_and_print=True,
env_extra=environment,
)

def exec(
Expand Down
24 changes: 18 additions & 6 deletions misc/python/materialize/mzcompose/services/materialized.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,18 @@
# by the Apache License, Version 2.0.


import json
from enum import Enum
from typing import Any

from materialize import docker
from materialize.mz_version import MzVersion
from materialize.mzcompose import (
DEFAULT_CRDB_ENVIRONMENT,
DEFAULT_MZ_ENVIRONMENT_ID,
DEFAULT_MZ_VOLUMES,
bootstrap_cluster_replica_size,
cluster_replica_size_map,
get_default_system_parameters,
)
from materialize.mzcompose.service import (
Expand Down Expand Up @@ -62,6 +66,8 @@ def __init__(
publish: bool | None = None,
stop_grace_period: str = "120s",
metadata_store: str = METADATA_STORE,
cluster_replica_size: dict[str, dict[str, Any]] | None = None,
bootstrap_replica_size: str | None = None,
) -> None:
if name is None:
name = "materialized"
Expand All @@ -73,6 +79,11 @@ def __init__(
s: {"condition": "service_started"} for s in depends_on
}

if bootstrap_replica_size is None:
bootstrap_replica_size = bootstrap_cluster_replica_size()
if cluster_replica_size is None:
cluster_replica_size = cluster_replica_size_map()

environment = [
"MZ_NO_TELEMETRY=1",
f"MZ_SOFT_ASSERTIONS={int(soft_assertions)}",
Expand All @@ -94,6 +105,13 @@ def __init__(
# use Composition.override.
"MZ_LOG_FILTER",
"CLUSTERD_LOG_FILTER",
f"MZ_CLUSTER_REPLICA_SIZES={json.dumps(cluster_replica_size)}",
f"MZ_BOOTSTRAP_DEFAULT_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
f"MZ_BOOTSTRAP_BUILTIN_SYSTEM_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
f"MZ_BOOTSTRAP_BUILTIN_PROBE_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
f"MZ_BOOTSTRAP_BUILTIN_SUPPORT_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
f"MZ_BOOTSTRAP_BUILTIN_CATALOG_SERVER_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
f"MZ_BOOTSTRAP_BUILTIN_ANALYTICS_CLUSTER_REPLICA_SIZE={bootstrap_replica_size}",
*environment_extra,
*DEFAULT_CRDB_ENVIRONMENT,
]
Expand Down Expand Up @@ -166,12 +184,6 @@ def __init__(
else default_size
)
)
command += [
# Issue database-issues#4562 prevents the habitual use of large introspection
# clusters, so we leave the builtin cluster replica size as is.
# f"--bootstrap-builtin-cluster-replica-size={self.default_replica_size}",
f"--bootstrap-default-cluster-replica-size={self.default_replica_size}",
]

if external_metadata_store:
address = (
Expand Down
12 changes: 11 additions & 1 deletion misc/python/materialize/mzcompose/services/testdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import json
import random
from typing import Any

from materialize import buildkite
from materialize.mzcompose import DEFAULT_MZ_VOLUMES
from materialize.mzcompose import DEFAULT_MZ_VOLUMES, cluster_replica_size_map
from materialize.mzcompose.service import (
Service,
ServiceDependency,
Expand Down Expand Up @@ -56,9 +58,13 @@ def __init__(
mz_service: str = "materialized",
metadata_store: str = METADATA_STORE,
stop_grace_period: str = "120s",
cluster_replica_size: dict[str, dict[str, Any]] | None = None,
) -> None:
depends_graph: dict[str, ServiceDependency] = {}

if cluster_replica_size is None:
cluster_replica_size = cluster_replica_size_map()

if environment is None:
environment = [
"TMPDIR=/share/tmp",
Expand All @@ -75,6 +81,10 @@ def __init__(
"AWS_SESSION_TOKEN",
]

environment += [
f"CLUSTER_REPLICA_SIZES={json.dumps(cluster_replica_size)}",
]

volumes = [
volume_workdir,
*(v for v in DEFAULT_MZ_VOLUMES if v.startswith("tmp:")),
Expand Down
12 changes: 9 additions & 3 deletions src/adapter/benches/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use criterion::{criterion_group, criterion_main, Criterion};
use mz_adapter::catalog::{Catalog, Op};
use mz_catalog::durable::test_bootstrap_args;
use mz_persist_client::PersistClient;
use mz_sql::session::user::MZ_SYSTEM_ROLE_ID;
use tokio::runtime::Runtime;
Expand All @@ -18,10 +19,15 @@ fn bench_transact(c: &mut Criterion) {
c.bench_function("transact", |b| {
let runtime = Runtime::new().unwrap();

let bootstrap_args = test_bootstrap_args();
let mut catalog = runtime.block_on(async {
Catalog::open_debug_catalog(PersistClient::new_for_tests().await, Uuid::new_v4())
.await
.unwrap()
Catalog::open_debug_catalog(
PersistClient::new_for_tests().await,
Uuid::new_v4(),
&bootstrap_args,
)
.await
.unwrap()
});
let mut id = 0;
b.iter(|| {
Expand Down
Loading
Loading