Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/adapter: Opt-in migration of sources to the new table model #30483

Merged
merged 58 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
b6eda03
Opt-in catalog migration for converting subsources to source tables
rjobanp Sep 16, 2024
7d9bc26
Add migration logic for source statements
rjobanp Oct 23, 2024
b4f9f8f
Rename the feature flag
rjobanp Oct 23, 2024
ecb16af
Add new table to audit log
rjobanp Oct 23, 2024
d70ab15
Add platform check scenario to test migration
rjobanp Oct 23, 2024
35236b0
Switch to using system vars instead of flags to allow console access …
rjobanp Oct 24, 2024
969e722
Fixes to migration based on testing
rjobanp Oct 24, 2024
0d0bf92
Also test the migration in the legacy upgrade tests
rjobanp Oct 24, 2024
947d528
platform checks: unique source name
nrainer-materialize Oct 25, 2024
06765d2
Migration structure cleanup from feedback
rjobanp Oct 25, 2024
b652d13
Address more feedback; ensure new source name is unique
rjobanp Oct 25, 2024
c089849
Fix legacy-upgrade checks
rjobanp Oct 25, 2024
491ac18
Fixes caused by rebase on main
rjobanp Oct 25, 2024
8c98350
ci: print source table migration issues
nrainer-materialize Oct 28, 2024
da5d733
migration tests: pg-cdc-old-syntax
nrainer-materialize Oct 25, 2024
9451a6b
migration tests: extract logic
nrainer-materialize Oct 28, 2024
cfd9da3
migration tests: improve verification
nrainer-materialize Oct 28, 2024
b737c30
migration tests: mysql-cdc-old-syntax
nrainer-materialize Oct 28, 2024
e8fd48b
migration tests: testdrive-old-kafka-syntax
nrainer-materialize Oct 28, 2024
2bba8bd
migration tests: improve output
nrainer-materialize Oct 29, 2024
27652cd
migration tests: fixes
nrainer-materialize Oct 29, 2024
bb49cf5
migration tests: fixes
nrainer-materialize Oct 29, 2024
acdb421
Fix for mysql source being restarted after new table added
rjobanp Oct 29, 2024
62d7ef7
Avoid needing to rewrite ids of dependent statements by changing the …
rjobanp Oct 29, 2024
1bcc902
Fix merge skew
jkosh44 Nov 14, 2024
30aa14b
Fix dependency tracking
jkosh44 Nov 14, 2024
cc0a868
Fix lint
jkosh44 Nov 14, 2024
635e988
Fix some issues
jkosh44 Nov 14, 2024
7e04286
Fix dependency tracking
jkosh44 Nov 14, 2024
21b4d51
Fix merge skew
jkosh44 Nov 15, 2024
a6c67e8
Update test versions
jkosh44 Nov 15, 2024
3144e92
More merge skew fixes
jkosh44 Dec 12, 2024
01619dc
Update item sorting to only sort within item groups
jkosh44 Dec 16, 2024
ec3842d
Experiment for migrate
jkosh44 Dec 17, 2024
622e63e
Fix migration idempotency
jkosh44 Dec 18, 2024
5a1135a
Fixup
jkosh44 Dec 18, 2024
a79183a
resolve merge conflicts
jkosh44 Jan 2, 2025
a81d461
Update versions
jkosh44 Jan 14, 2025
67aec32
Fix more merge skew
jkosh44 Jan 16, 2025
a3f06d2
Remove prints
jkosh44 Jan 28, 2025
7650c39
fixup
jkosh44 Jan 28, 2025
5e8db25
Merge branch 'main' of github.com:MaterializeInc/materialize into sou…
jkosh44 Jan 28, 2025
b75f237
Merge branch 'main' of github.com:MaterializeInc/materialize into sou…
jkosh44 Jan 28, 2025
e6c9316
Fixup
jkosh44 Jan 28, 2025
7acae30
Fix migration test
jkosh44 Jan 28, 2025
84360a9
Update test/testdrive-old-kafka-src-syntax/mzcompose.py
jkosh44 Jan 28, 2025
583408a
Fixup
jkosh44 Jan 28, 2025
766c2b0
Merge branch 'main' of github.com:MaterializeInc/materialize into sou…
jkosh44 Jan 28, 2025
6cdddc9
Fix and work around test failures
def- Jan 29, 2025
26d1ea6
Merge remote-tracking branch 'upstream/main' into source-table-migration
def- Jan 29, 2025
853f226
Try and fix test
jkosh44 Jan 29, 2025
7b93cb3
Try and fix test
jkosh44 Jan 29, 2025
1961373
Merge branch 'main' of github.com:MaterializeInc/materialize into sou…
jkosh44 Jan 29, 2025
c033fe1
Fix kafka tests
jkosh44 Jan 29, 2025
3420277
ci: Larger agent for td migration test
def- Jan 30, 2025
e2b56b4
Fix mz config
jkosh44 Jan 30, 2025
663f881
Merge branch 'source-table-migration' of github.com:jkosh44/materiali…
jkosh44 Jan 30, 2025
1286010
Fix kafka tests
jkosh44 Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,17 @@ steps:
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-migration
label: MySQL CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: mysql-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-resumption-old-syntax
label: MySQL CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -646,6 +657,17 @@ steps:
queue: hetzner-aarch64-4cpu-8gb
# the mzbuild postgres version will be used, which depends on the Dockerfile specification

- id: pg-cdc-migration
label: Postgres CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: pg-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: pg-cdc-resumption-old-syntax
label: Postgres CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -677,6 +699,16 @@ steps:
agents:
queue: hetzner-aarch64-8cpu-16gb

- id: testdrive-kafka-migration
label: "Testdrive (before Kafka source versioning) migration tests"
depends_on: build-aarch64
timeout_in_minutes: 180
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive-old-kafka-src-syntax
run: migration
agents:
queue: hetzner-aarch64-16cpu-32gb

- group: AWS
key: aws
Expand Down
1 change: 1 addition & 0 deletions ci/test/lint-main/checks/check-mzcompose-files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ check_default_workflow_references_others() {
-not -wholename "./test/canary-environment/mzcompose.py" `# Only run manually` \
-not -wholename "./test/ssh-connection/mzcompose.py" `# Handled differently` \
-not -wholename "./test/scalability/mzcompose.py" `# Other workflows are for manual usage` \
-not -wholename "./test/testdrive-old-kafka-src-syntax/mzcompose.py" `# Other workflow is run separately` \
-not -wholename "./test/terraform/mzcompose.py" `# Handled differently` \
)

Expand Down
56 changes: 56 additions & 0 deletions misc/python/materialize/checks/all_checks/upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,59 @@ def validate(self) -> Testdrive:
"""
)
)


class UpsertLegacy(Check):
"""
An upsert source test that uses the legacy syntax to create the source
on all versions to ensure the source is properly migrated with the
ActivateSourceVersioningMigration scenario
"""

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
+ dedent(
"""
$ kafka-create-topic topic=upsert-legacy-syntax

$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

> CREATE SOURCE upsert_insert_legacy
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-legacy-syntax-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT

> CREATE MATERIALIZED VIEW upsert_insert_legacy_view AS SELECT COUNT(DISTINCT key1 || ' ' || f1) FROM upsert_insert_legacy;
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT COUNT(*), COUNT(DISTINCT key1), COUNT(DISTINCT f1) FROM upsert_insert_legacy
10000 10000 10000

> SELECT * FROM upsert_insert_legacy_view;
10000
"""
)
)
46 changes: 46 additions & 0 deletions misc/python/materialize/checks/scenarios_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,49 @@ def actions(self) -> list[Action]:
),
Validate(self),
]


class ActivateSourceVersioningMigration(Scenario):
"""
Starts MZ, initializes and manipulates, then forces the migration
of sources to the new table model (introducing Source Versioning).
"""

def base_version(self) -> MzVersion:
return get_last_version()

def actions(self) -> list[Action]:
print(f"Upgrading from tag {self.base_version()}")
return [
StartMz(
self,
tag=self.base_version(),
),
Initialize(self),
Manipulate(self, phase=1),
KillMz(
capture_logs=True
), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
StartMz(
self,
tag=None,
# Activate the `force_source_table_syntax` flag
# which should trigger the migration of sources
# using the old syntax to the new table model.
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Manipulate(self, phase=2),
Validate(self),
# A second restart while already on the new version
KillMz(capture_logs=True),
StartMz(
self,
tag=None,
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Validate(self),
]
2 changes: 2 additions & 0 deletions misc/python/materialize/cli/ci_annotate_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@
| (FAIL|TIMEOUT)\s+\[\s*\d+\.\d+s\]
# parallel-workload
| worker_.*\ still\ running: [\s\S]* Threads\ have\ not\ stopped\ within\ 5\ minutes,\ exiting\ hard
# source-table migration
| source-table-migration\ issue
)
.* $
""",
Expand Down
7 changes: 5 additions & 2 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@


def get_default_system_parameters(
version: MzVersion | None = None, zero_downtime: bool = False
version: MzVersion | None = None,
zero_downtime: bool = False,
force_source_table_syntax: bool = False,
) -> dict[str, str]:
"""For upgrade tests we only want parameters set when all environmentd /
clusterd processes have reached a specific version (or higher)
Expand Down Expand Up @@ -89,7 +91,7 @@ def get_default_system_parameters(
"enable_0dt_deployment": "true" if zero_downtime else "false",
"enable_0dt_deployment_panic_after_timeout": "true",
"enable_0dt_deployment_sources": (
"true" if version >= MzVersion.parse_mz("v0.125.0-dev") else "false"
"true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
),
"enable_alter_swap": "true",
"enable_columnation_lgalloc": "true",
Expand Down Expand Up @@ -125,6 +127,7 @@ def get_default_system_parameters(
"persist_record_schema_id": (
"true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
),
"force_source_table_syntax": "true" if force_source_table_syntax else "false",
"persist_batch_columnar_format": "both_v2",
"persist_batch_delete_enabled": "true",
"persist_batch_structured_order": "true",
Expand Down
69 changes: 69 additions & 0 deletions misc/python/materialize/source_table_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Utilities for testing the source table migration"""
from materialize.mz_version import MzVersion
from materialize.mzcompose.composition import Composition


def verify_sources_after_source_table_migration(
c: Composition, file: str, fail: bool = False
) -> None:
source_names_rows = c.sql_query(
"SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
)
source_names = [row[0] for row in source_names_rows]

print(f"Sources created in {file} are: {source_names}")

c.sql("SET statement_timeout = '20s'")

for source_name in source_names:
_verify_source(c, file, source_name, fail=fail)


def _verify_source(
c: Composition, file: str, source_name: str, fail: bool = False
) -> None:
try:
print(f"Checking source: {source_name}")

# must not crash
statement = f"SELECT count(*) FROM {source_name};"
c.sql_query(statement)

statement = f"SHOW CREATE SOURCE {source_name};"
result = c.sql_query(statement)
sql = result[0][1]
assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"

if not source_name.endswith("_progress"):
assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"

print("OK.")
except Exception as e:
print(f"source-table-migration issue in {file}: {str(e)}")

if fail:
raise e


def check_source_table_migration_test_sensible() -> None:
jkosh44 marked this conversation as resolved.
Show resolved Hide resolved
assert MzVersion.parse_cargo() < MzVersion.parse_mz(
"v0.139.0"
), "migration test probably no longer needed"


def get_old_image_for_source_table_migration_test() -> str:
return "materialize/materialized:v0.131.0"


def get_new_image_for_source_table_migration_test() -> str | None:
return None
6 changes: 6 additions & 0 deletions src/adapter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rust_library(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -119,6 +120,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -165,6 +167,7 @@ rust_doc_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -231,6 +234,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -297,6 +301,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down Expand Up @@ -363,6 +368,7 @@ rust_test(
"//src/pgrepr:mz_pgrepr",
"//src/pgwire-common:mz_pgwire_common",
"//src/postgres-util:mz_postgres_util",
"//src/proto:mz_proto",
"//src/repr:mz_repr",
"//src/rocksdb-types:mz_rocksdb_types",
"//src/secrets:mz_secrets",
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ enum-kinds = "0.5.1"
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.25"
governor = "0.6.0"
hex = "0.4.3"
http = "1.1.0"
ipnet = "2.5.0"
itertools = "0.12.1"
Expand Down Expand Up @@ -53,6 +54,7 @@ mz-pgcopy = { path = "../pgcopy" }
mz-pgrepr = { path = "../pgrepr" }
mz-pgwire-common = { path = "../pgwire-common" }
mz-postgres-util = { path = "../postgres-util" }
mz-proto = { path = "../proto" }
mz-repr = { path = "../repr", features = ["tracing_"] }
mz-rocksdb-types = { path = "../rocksdb-types" }
mz-secrets = { path = "../secrets" }
Expand All @@ -68,6 +70,7 @@ mz-transform = { path = "../transform" }
mz-timestamp-oracle = { path = "../timestamp-oracle" }
opentelemetry = { version = "0.24.0", features = ["trace"] }
prometheus = { version = "0.13.3", default-features = false }
prost = { version = "0.13.2", features = ["no-recursion-limit"] }
qcell = "0.5"
rand = "0.8.5"
rand_chacha = "0.3"
Expand Down
Loading