Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/adapter: Opt-in migration of sources to the new table model #30168

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
da94ed3
Opt-in catalog migration for converting subsources to source tables
rjobanp Sep 16, 2024
99ebcdd
Add migration logic for source statements
rjobanp Oct 23, 2024
e6f5f77
Rename the feature flag
rjobanp Oct 23, 2024
4d1aff4
Add new table to audit log
rjobanp Oct 23, 2024
a94504e
Add platform check scenario to test migration
rjobanp Oct 23, 2024
95b8a82
Switch to using system vars instead of flags to allow console access …
rjobanp Oct 24, 2024
7439daa
Fixes to migration based on testing
rjobanp Oct 24, 2024
18a2137
Also test the migration in the legacy upgrade tests
rjobanp Oct 24, 2024
876ba94
platform checks: unique source name
nrainer-materialize Oct 25, 2024
000e6d6
Migration structure cleanup from feedback
rjobanp Oct 25, 2024
30421e6
Address more feedback; ensure new source name is unique
rjobanp Oct 25, 2024
04f89b6
Fix legacy-upgrade checks
rjobanp Oct 25, 2024
5da93de
Fixes caused by rebase on main
rjobanp Oct 25, 2024
1bc0757
ci: print source table migration issues
nrainer-materialize Oct 28, 2024
a5dbaf2
migration tests: pg-cdc-old-syntax
nrainer-materialize Oct 25, 2024
2df472c
migration tests: extract logic
nrainer-materialize Oct 28, 2024
ce9b45a
migration tests: improve verification
nrainer-materialize Oct 28, 2024
50dc1a1
migration tests: mysql-cdc-old-syntax
nrainer-materialize Oct 28, 2024
c22a99f
migration tests: testdrive-old-kafka-syntax
nrainer-materialize Oct 28, 2024
81f3626
migration tests: improve output
nrainer-materialize Oct 29, 2024
4ad14bf
migration tests: fixes
nrainer-materialize Oct 29, 2024
721b09f
migration tests: fixes
nrainer-materialize Oct 29, 2024
d4aa1bd
Fix for mysql source being restarted after new table added
rjobanp Oct 29, 2024
a6cef8d
Avoid needing to rewrite ids of dependent statements by changing the …
rjobanp Oct 29, 2024
56c9cba
Merge branch 'main' of github.com:MaterializeInc/materialize into sou…
jkosh44 Nov 14, 2024
42c6af0
Fix merge skew
jkosh44 Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,17 @@ steps:
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-migration
label: MySQL CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: mysql-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: mysql-cdc-resumption-old-syntax
label: MySQL CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -599,6 +610,17 @@ steps:
queue: hetzner-aarch64-4cpu-8gb
# the mzbuild postgres version will be used, which depends on the Dockerfile specification

- id: pg-cdc-migration
label: Postgres CDC source-versioning migration tests
depends_on: build-aarch64
timeout_in_minutes: 360
plugins:
- ./ci/plugins/mzcompose:
composition: pg-cdc-old-syntax
run: migration
agents:
queue: hetzner-aarch64-4cpu-8gb

- id: pg-cdc-resumption-old-syntax
label: Postgres CDC resumption tests (before source versioning)
depends_on: build-aarch64
Expand Down Expand Up @@ -629,6 +651,17 @@ steps:
agents:
queue: hetzner-aarch64-8cpu-16gb

- id: testdrive-kafka-migration
label: "Testdrive %N migration tests"
depends_on: build-aarch64
timeout_in_minutes: 180
parallelism: 8
plugins:
- ./ci/plugins/mzcompose:
composition: testdrive-old-kafka-src-syntax
run: migration
agents:
queue: hetzner-aarch64-8cpu-16gb

- group: AWS
key: aws
Expand Down
2 changes: 0 additions & 2 deletions misc/python/materialize/checks/all_checks/source_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


class TableFromSourceBase(Check):

def _can_run(self, e: Executor) -> bool:
return self.base_version >= MzVersion.parse_mz("v0.116.0-dev")

Expand All @@ -34,7 +33,6 @@ class TableFromPgSource(TableFromSourceBase):
suffix = "tbl_from_pg_source"

def initialize(self) -> Testdrive:

return Testdrive(
self.generic_setup()
+ dedent(
Expand Down
56 changes: 56 additions & 0 deletions misc/python/materialize/checks/all_checks/upsert.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,59 @@ def validate(self) -> Testdrive:
"""
)
)


class UpsertLegacy(Check):
"""
An upsert source test that uses the legacy syntax to create the source
on all versions to ensure the source is properly migrated with the
ActivateSourceVersioningMigration scenario
"""

def initialize(self) -> Testdrive:
return Testdrive(
schemas()
+ dedent(
"""
$ kafka-create-topic topic=upsert-legacy-syntax
nrainer-materialize marked this conversation as resolved.
Show resolved Hide resolved

$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}

> CREATE SOURCE upsert_insert_legacy
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-legacy-syntax-${testdrive.seed}')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE UPSERT

> CREATE MATERIALIZED VIEW upsert_insert_legacy_view AS SELECT COUNT(DISTINCT key1 || ' ' || f1) FROM upsert_insert_legacy;
"""
)
)

def manipulate(self) -> list[Testdrive]:
return [
Testdrive(schemas() + dedent(s))
for s in [
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
"""
$ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
{"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
""",
]
]

def validate(self) -> Testdrive:
return Testdrive(
dedent(
"""
> SELECT COUNT(*), COUNT(DISTINCT key1), COUNT(DISTINCT f1) FROM upsert_insert_legacy
10000 10000 10000

> SELECT * FROM upsert_insert_legacy_view;
10000
"""
)
)
46 changes: 46 additions & 0 deletions misc/python/materialize/checks/scenarios_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,3 +391,49 @@ def actions(self) -> list[Action]:
),
Validate(self),
]


class ActivateSourceVersioningMigration(Scenario):
"""
Starts MZ, initializes and manipulates, then forces the migration
of sources to the new table model (introducing Source Versioning).
"""

def base_version(self) -> MzVersion:
return get_last_version()

def actions(self) -> list[Action]:
print(f"Upgrading from tag {self.base_version()}")
return [
StartMz(
self,
tag=self.base_version(),
),
Initialize(self),
Manipulate(self, phase=1),
KillMz(
capture_logs=True
), # We always use True here otherwise docker-compose will lose the pre-upgrade logs
StartMz(
self,
tag=None,
# Activate the `force_source_table_syntax` flag
# which should trigger the migration of sources
# using the old syntax to the new table model.
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Manipulate(self, phase=2),
Validate(self),
# A second restart while already on the new version
KillMz(capture_logs=True),
StartMz(
self,
tag=None,
additional_system_parameter_defaults={
"force_source_table_syntax": "true",
},
),
Validate(self),
]
2 changes: 2 additions & 0 deletions misc/python/materialize/cli/ci_annotate_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@
| (FAIL|TIMEOUT)\s+\[\s*\d+\.\d+s\]
# parallel-workload
| worker_.*\ still\ running: [\s\S]* Threads\ have\ not\ stopped\ within\ 5\ minutes,\ exiting\ hard
# source-table migration
| source-table-migration\ issue
)
.* $
""",
Expand Down
5 changes: 4 additions & 1 deletion misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@


def get_default_system_parameters(
version: MzVersion | None = None, zero_downtime: bool = False
version: MzVersion | None = None,
zero_downtime: bool = False,
force_source_table_syntax: bool = False,
) -> dict[str, str]:
"""For upgrade tests we only want parameters set when all environmentd /
clusterd processes have reached a specific version (or higher)
Expand Down Expand Up @@ -118,6 +120,7 @@ def get_default_system_parameters(
"enable_table_keys": "true",
"enable_variadic_left_join_lowering": "true",
"enable_worker_core_affinity": "true",
"force_source_table_syntax": "true" if force_source_table_syntax else "false",
"persist_batch_columnar_format": (
"both_v2" if version >= MzVersion.parse_mz("v0.112.0-dev") else "row"
),
Expand Down
71 changes: 71 additions & 0 deletions misc/python/materialize/source_table_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""Utilities for testing the source table migration"""
from materialize.mz_version import MzVersion
from materialize.mzcompose.composition import Composition


def verify_sources_after_source_table_migration(
c: Composition, file: str, fail: bool = False
) -> None:
source_names_rows = c.sql_query(
"SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
)
source_names = [row[0] for row in source_names_rows]

print(f"Sources created in {file} are: {source_names}")

c.sql("SET statement_timeout = '20s'")

for source_name in source_names:
_verify_source(c, file, source_name, fail=fail)


def _verify_source(
c: Composition, file: str, source_name: str, fail: bool = False
) -> None:
try:
print(f"Checking source: {source_name}")

# must not crash
statement = f"SELECT count(*) FROM {source_name};"
print(statement)
c.sql_query(statement)

statement = f"SHOW CREATE SOURCE {source_name};"
print(statement)
result = c.sql_query(statement)
sql = result[0][1]
assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"

if not source_name.endswith("_progress"):
assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"

print("OK.")
except Exception as e:
print(f"source-table-migration issue in {file}: {str(e)}")

if fail:
raise e


def check_source_table_migration_test_sensible() -> None:
assert MzVersion.parse_cargo() < MzVersion.parse_mz(
"v0.130.0"
), "migration test probably no longer needed"


def get_old_image_for_source_table_migration_test() -> str:
return "materialize/materialized:v0.122.0"
Comment on lines +60 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@def- Any chance that you know what the significance of these two versions are?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably just supposed to be an old version to upgrade from, I don't think it actually matters.



def get_new_image_for_source_table_migration_test() -> str | None:
return None
Loading
Loading