diff --git a/Cargo.lock b/Cargo.lock
index db04d807601b6..513918b815913 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4606,6 +4606,7 @@ dependencies = [
  "fail",
  "futures",
  "governor",
+ "hex",
  "http 1.1.0",
  "ipnet",
  "itertools 0.12.1",
@@ -4634,6 +4635,7 @@ dependencies = [
  "mz-pgrepr",
  "mz-pgwire-common",
  "mz-postgres-util",
+ "mz-proto",
  "mz-repr",
  "mz-rocksdb-types",
  "mz-secrets",
@@ -4649,6 +4651,7 @@ dependencies = [
  "mz-transform",
  "opentelemetry",
  "prometheus",
+ "prost",
  "qcell",
  "rand 0.8.5",
  "rand_chacha 0.3.0",
diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml
index d6a9bae57793d..2d418175e193f 100644
--- a/ci/nightly/pipeline.template.yml
+++ b/ci/nightly/pipeline.template.yml
@@ -615,6 +615,17 @@ steps:
         agents:
           queue: hetzner-aarch64-4cpu-8gb
 
+      - id: mysql-cdc-migration
+        label: MySQL CDC source-versioning migration tests
+        depends_on: build-aarch64
+        timeout_in_minutes: 360
+        plugins:
+          - ./ci/plugins/mzcompose:
+              composition: mysql-cdc-old-syntax
+              run: migration
+        agents:
+          queue: hetzner-aarch64-4cpu-8gb
+
       - id: mysql-cdc-resumption-old-syntax
         label: MySQL CDC resumption tests (before source versioning)
         depends_on: build-aarch64
@@ -646,6 +657,17 @@ steps:
           queue: hetzner-aarch64-4cpu-8gb
         # the mzbuild postgres version will be used, which depends on the Dockerfile specification
 
+      - id: pg-cdc-migration
+        label: Postgres CDC source-versioning migration tests
+        depends_on: build-aarch64
+        timeout_in_minutes: 360
+        plugins:
+          - ./ci/plugins/mzcompose:
+              composition: pg-cdc-old-syntax
+              run: migration
+        agents:
+          queue: hetzner-aarch64-4cpu-8gb
+
       - id: pg-cdc-resumption-old-syntax
         label: Postgres CDC resumption tests (before source versioning)
         depends_on: build-aarch64
@@ -677,6 +699,16 @@ steps:
         agents:
           queue: hetzner-aarch64-8cpu-16gb
 
+      - id: testdrive-kafka-migration
+        label: "Testdrive (before Kafka source versioning) migration tests"
+        depends_on: build-aarch64
+        timeout_in_minutes: 180
+        plugins:
+          - ./ci/plugins/mzcompose:
+              composition: testdrive-old-kafka-src-syntax
+              run: migration
+        agents:
+          queue: hetzner-aarch64-16cpu-32gb
 
   - group: AWS
     key: aws
diff --git a/ci/test/lint-main/checks/check-mzcompose-files.sh b/ci/test/lint-main/checks/check-mzcompose-files.sh
index ed16e1ac22f96..c9b7ce1eab886 100755
--- a/ci/test/lint-main/checks/check-mzcompose-files.sh
+++ b/ci/test/lint-main/checks/check-mzcompose-files.sh
@@ -45,6 +45,7 @@ check_default_workflow_references_others() {
         -not -wholename "./test/canary-environment/mzcompose.py" `# Only run manually` \
         -not -wholename "./test/ssh-connection/mzcompose.py" `# Handled differently` \
         -not -wholename "./test/scalability/mzcompose.py" `# Other workflows are for manual usage` \
+        -not -wholename "./test/testdrive-old-kafka-src-syntax/mzcompose.py" `# Other workflow is run separately` \
         -not -wholename "./test/terraform/mzcompose.py" `# Handled differently` \
     )
 
diff --git a/misc/python/materialize/checks/all_checks/upsert.py b/misc/python/materialize/checks/all_checks/upsert.py
index dd7f92caf831e..543478f0a0fe8 100644
--- a/misc/python/materialize/checks/all_checks/upsert.py
+++ b/misc/python/materialize/checks/all_checks/upsert.py
@@ -164,3 +164,59 @@ def validate(self) -> Testdrive:
            """
             )
         )
+
+
+class UpsertLegacy(Check):
+    """
+    An upsert source test that uses the legacy syntax to create the source
+    on all versions to ensure the source is properly migrated with the
+    ActivateSourceVersioningMigration scenario
+    """
+
+    def initialize(self) -> Testdrive:
+        return Testdrive(
+            schemas()
+            + dedent(
+                """
+                $ kafka-create-topic topic=upsert-legacy-syntax
+
+                $ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
+                {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
+
+                > CREATE SOURCE upsert_insert_legacy
+                  FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-upsert-legacy-syntax-${testdrive.seed}')
+                  FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
+                  ENVELOPE UPSERT
+
+                > CREATE MATERIALIZED VIEW upsert_insert_legacy_view AS SELECT COUNT(DISTINCT key1 || ' ' || f1) FROM upsert_insert_legacy;
+                """
+            )
+        )
+
+    def manipulate(self) -> list[Testdrive]:
+        return [
+            Testdrive(schemas() + dedent(s))
+            for s in [
+                """
+                $ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
+                {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
+                """,
+                """
+                $ kafka-ingest format=avro key-format=avro topic=upsert-legacy-syntax key-schema=${keyschema} schema=${schema} repeat=10000
+                {"key1": "A${kafka-ingest.iteration}"} {"f1": "A${kafka-ingest.iteration}"}
+                """,
+            ]
+        ]
+
+    def validate(self) -> Testdrive:
+        return Testdrive(
+            dedent(
+                """
+                > SELECT COUNT(*), COUNT(DISTINCT key1), COUNT(DISTINCT f1) FROM upsert_insert_legacy
+                10000 10000 10000
+
+                > SELECT * FROM upsert_insert_legacy_view;
+                10000
+           """
+            )
+        )
diff --git a/misc/python/materialize/checks/scenarios_upgrade.py b/misc/python/materialize/checks/scenarios_upgrade.py
index 2c0b0aab80a0c..981e3e8e1b2db 100644
--- a/misc/python/materialize/checks/scenarios_upgrade.py
+++ b/misc/python/materialize/checks/scenarios_upgrade.py
@@ -421,3 +421,49 @@ def actions(self) -> list[Action]:
             ),
             Validate(self),
         ]
+
+
+class ActivateSourceVersioningMigration(Scenario):
+    """
+    Starts MZ, initializes and manipulates, then forces the migration
+    of sources to the new table model (introducing Source Versioning).
+    """
+
+    def base_version(self) -> MzVersion:
+        return get_last_version()
+
+    def actions(self) -> list[Action]:
+        print(f"Upgrading from tag {self.base_version()}")
+        return [
+            StartMz(
+                self,
+                tag=self.base_version(),
+            ),
+            Initialize(self),
+            Manipulate(self, phase=1),
+            KillMz(
+                capture_logs=True
+            ),  #  We always use True here otherwise docker-compose will lose the pre-upgrade logs
+            StartMz(
+                self,
+                tag=None,
+                # Activate the `force_source_table_syntax` flag
+                # which should trigger the migration of sources
+                # using the old syntax to the new table model.
+                additional_system_parameter_defaults={
+                    "force_source_table_syntax": "true",
+                },
+            ),
+            Manipulate(self, phase=2),
+            Validate(self),
+            # A second restart while already on the new version
+            KillMz(capture_logs=True),
+            StartMz(
+                self,
+                tag=None,
+                additional_system_parameter_defaults={
+                    "force_source_table_syntax": "true",
+                },
+            ),
+            Validate(self),
+        ]
diff --git a/misc/python/materialize/cli/ci_annotate_errors.py b/misc/python/materialize/cli/ci_annotate_errors.py
index 53b7fc82d272b..84865cbbcaff4 100644
--- a/misc/python/materialize/cli/ci_annotate_errors.py
+++ b/misc/python/materialize/cli/ci_annotate_errors.py
@@ -101,6 +101,8 @@
     | (FAIL|TIMEOUT)\s+\[\s*\d+\.\d+s\]
     # parallel-workload
     | worker_.*\ still\ running: [\s\S]* Threads\ have\ not\ stopped\ within\ 5\ minutes,\ exiting\ hard
+    # source-table migration
+    | source-table-migration\ issue
     )
     .* $
     """,
diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py
index 561e68719a92e..bc66525fcdd7c 100644
--- a/misc/python/materialize/mzcompose/__init__.py
+++ b/misc/python/materialize/mzcompose/__init__.py
@@ -53,7 +53,9 @@
 
 
 def get_default_system_parameters(
-    version: MzVersion | None = None, zero_downtime: bool = False
+    version: MzVersion | None = None,
+    zero_downtime: bool = False,
+    force_source_table_syntax: bool = False,
 ) -> dict[str, str]:
     """For upgrade tests we only want parameters set when all environmentd /
     clusterd processes have reached a specific version (or higher)
@@ -89,7 +91,7 @@ def get_default_system_parameters(
         "enable_0dt_deployment": "true" if zero_downtime else "false",
         "enable_0dt_deployment_panic_after_timeout": "true",
         "enable_0dt_deployment_sources": (
-            "true" if version >= MzVersion.parse_mz("v0.125.0-dev") else "false"
+            "true" if version >= MzVersion.parse_mz("v0.132.0-dev") else "false"
         ),
         "enable_alter_swap": "true",
         "enable_columnation_lgalloc": "true",
@@ -125,6 +127,7 @@ def get_default_system_parameters(
         "persist_record_schema_id": (
             "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
         ),
+        "force_source_table_syntax": "true" if force_source_table_syntax else "false",
         "persist_batch_columnar_format": "both_v2",
         "persist_batch_delete_enabled": "true",
         "persist_batch_structured_order": "true",
diff --git a/misc/python/materialize/source_table_migration.py b/misc/python/materialize/source_table_migration.py
new file mode 100644
index 0000000000000..474665977698f
--- /dev/null
+++ b/misc/python/materialize/source_table_migration.py
@@ -0,0 +1,69 @@
+# Copyright Materialize, Inc. and contributors. All rights reserved.
+#
+# Use of this software is governed by the Business Source License
+# included in the LICENSE file at the root of this repository.
+#
+# As of the Change Date specified in that file, in accordance with
+# the Business Source License, use of this software will be governed
+# by the Apache License, Version 2.0.
+
+"""Utilities for testing the source table migration"""
+from materialize.mz_version import MzVersion
+from materialize.mzcompose.composition import Composition
+
+
+def verify_sources_after_source_table_migration(
+    c: Composition, file: str, fail: bool = False
+) -> None:
+    source_names_rows = c.sql_query(
+        "SELECT sm.name || '.' || src.name FROM mz_sources src INNER JOIN mz_schemas sm ON src.schema_id = sm.id WHERE src.id LIKE 'u%';"
+    )
+    source_names = [row[0] for row in source_names_rows]
+
+    print(f"Sources created in {file} are: {source_names}")
+
+    c.sql("SET statement_timeout = '20s'")
+
+    for source_name in source_names:
+        _verify_source(c, file, source_name, fail=fail)
+
+
+def _verify_source(
+    c: Composition, file: str, source_name: str, fail: bool = False
+) -> None:
+    try:
+        print(f"Checking source: {source_name}")
+
+        # must not crash
+        statement = f"SELECT count(*) FROM {source_name};"
+        c.sql_query(statement)
+
+        statement = f"SHOW CREATE SOURCE {source_name};"
+        result = c.sql_query(statement)
+        sql = result[0][1]
+        assert "FOR TABLE" not in sql, f"FOR TABLE found in: {sql}"
+        assert "FOR ALL TABLES" not in sql, f"FOR ALL TABLES found in: {sql}"
+
+        if not source_name.endswith("_progress"):
+            assert "CREATE SUBSOURCE" not in sql, f"CREATE SUBSOURCE found in: {sql}"
+
+        print("OK.")
+    except Exception as e:
+        print(f"source-table-migration issue in {file}: {str(e)}")
+
+        if fail:
+            raise e
+
+
+def check_source_table_migration_test_sensible() -> None:
+    assert MzVersion.parse_cargo() < MzVersion.parse_mz(
+        "v0.139.0"
+    ), "migration test probably no longer needed"
+
+
+def get_old_image_for_source_table_migration_test() -> str:
+    return "materialize/materialized:v0.131.0"
+
+
+def get_new_image_for_source_table_migration_test() -> str | None:
+    return None
diff --git a/src/adapter/BUILD.bazel b/src/adapter/BUILD.bazel
index 65920e5ef9811..f833c66a2fd83 100644
--- a/src/adapter/BUILD.bazel
+++ b/src/adapter/BUILD.bazel
@@ -53,6 +53,7 @@ rust_library(
         "//src/pgrepr:mz_pgrepr",
         "//src/pgwire-common:mz_pgwire_common",
         "//src/postgres-util:mz_postgres_util",
+        "//src/proto:mz_proto",
         "//src/repr:mz_repr",
         "//src/rocksdb-types:mz_rocksdb_types",
         "//src/secrets:mz_secrets",
@@ -119,6 +120,7 @@ rust_test(
         "//src/pgrepr:mz_pgrepr",
         "//src/pgwire-common:mz_pgwire_common",
         "//src/postgres-util:mz_postgres_util",
+        "//src/proto:mz_proto",
         "//src/repr:mz_repr",
         "//src/rocksdb-types:mz_rocksdb_types",
         "//src/secrets:mz_secrets",
@@ -165,6 +167,7 @@ rust_doc_test(
         "//src/pgrepr:mz_pgrepr",
         "//src/pgwire-common:mz_pgwire_common",
         "//src/postgres-util:mz_postgres_util",
+        "//src/proto:mz_proto",
         "//src/repr:mz_repr",
         "//src/rocksdb-types:mz_rocksdb_types",
         "//src/secrets:mz_secrets",
@@ -231,6 +234,7 @@ rust_test(
         "//src/pgrepr:mz_pgrepr",
         "//src/pgwire-common:mz_pgwire_common",
         "//src/postgres-util:mz_postgres_util",
+        "//src/proto:mz_proto",
         "//src/repr:mz_repr",
         "//src/rocksdb-types:mz_rocksdb_types",
         "//src/secrets:mz_secrets",
@@ -297,6 +301,7 @@ rust_test(
         "//src/pgrepr:mz_pgrepr",
         "//src/pgwire-common:mz_pgwire_common",
         "//src/postgres-util:mz_postgres_util",
+        "//src/proto:mz_proto",
         "//src/repr:mz_repr",
         "//src/rocksdb-types:mz_rocksdb_types",
         "//src/secrets:mz_secrets",
@@ -363,6 +368,7 @@ rust_test(
         "//src/pgrepr:mz_pgrepr",
         "//src/pgwire-common:mz_pgwire_common",
         "//src/postgres-util:mz_postgres_util",
+        "//src/proto:mz_proto",
         "//src/repr:mz_repr",
         "//src/rocksdb-types:mz_rocksdb_types",
         "//src/secrets:mz_secrets",
diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml
index 92c489f39a4e5..273be043fa7a3 100644
--- a/src/adapter/Cargo.toml
+++ b/src/adapter/Cargo.toml
@@ -23,6 +23,7 @@ enum-kinds = "0.5.1"
 fail = { version = "0.5.1", features = ["failpoints"] }
 futures = "0.3.25"
 governor = "0.6.0"
+hex = "0.4.3"
 http = "1.1.0"
 ipnet = "2.5.0"
 itertools = "0.12.1"
@@ -53,6 +54,7 @@ mz-pgcopy = { path = "../pgcopy" }
 mz-pgrepr = { path = "../pgrepr" }
 mz-pgwire-common = { path = "../pgwire-common" }
 mz-postgres-util = { path = "../postgres-util" }
+mz-proto = { path = "../proto" }
 mz-repr = { path = "../repr", features = ["tracing_"] }
 mz-rocksdb-types = { path = "../rocksdb-types" }
 mz-secrets = { path = "../secrets" }
@@ -68,6 +70,7 @@ mz-transform = { path = "../transform" }
 mz-timestamp-oracle = { path = "../timestamp-oracle" }
 opentelemetry = { version = "0.24.0", features = ["trace"] }
 prometheus = { version = "0.13.3", default-features = false }
+prost = { version = "0.13.2", features = ["no-recursion-limit"] }
 qcell = "0.5"
 rand = "0.8.5"
 rand_chacha = "0.3"
diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs
index 745cf082bfe81..13e962576b7fe 100644
--- a/src/adapter/src/catalog/apply.rs
+++ b/src/adapter/src/catalog/apply.rs
@@ -25,7 +25,7 @@ use mz_catalog::builtin::{
 use mz_catalog::durable::objects::{
     ClusterKey, DatabaseKey, DurableType, ItemKey, NetworkPolicyKey, RoleKey, SchemaKey,
 };
-use mz_catalog::durable::{CatalogError, DurableCatalogError, SystemObjectMapping};
+use mz_catalog::durable::{CatalogError, SystemObjectMapping};
 use mz_catalog::memory::error::{Error, ErrorKind};
 use mz_catalog::memory::objects::{
     CatalogEntry, CatalogItem, Cluster, ClusterReplica, DataSourceDesc, Database, Func, Index, Log,
@@ -54,7 +54,7 @@ use mz_sql::session::vars::{VarError, VarInput};
 use mz_sql::{plan, rbac};
 use mz_sql_parser::ast::Expr;
 use mz_storage_types::sources::Timeline;
-use tracing::{error, info_span, warn, Instrument};
+use tracing::{info_span, warn, Instrument};
 
 use crate::catalog::state::LocalExpressionCache;
 use crate::catalog::{BuiltinTableUpdate, CatalogState};
@@ -1035,17 +1035,7 @@ impl CatalogState {
                         }
                     }
                 };
-                // We allow sinks to break this invariant due to a know issue with `ALTER SINK`.
-                // https://github.com/MaterializeInc/materialize/pull/28708.
-                if !entry.is_sink() && entry.uses().iter().any(|id| *id > entry.id) {
-                    let msg = format!(
-                        "item cannot depend on items with larger GlobalIds, item: {:?}, dependencies: {:?}",
-                        entry,
-                        entry.uses()
-                    );
-                    error!("internal catalog errr: {msg}");
-                    return Err(CatalogError::Durable(DurableCatalogError::Internal(msg)));
-                }
+
                 self.insert_entry(entry);
             }
             StateDiff::Retraction => {
@@ -1912,42 +1902,145 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
         }
     }
 
-    /// Sort item updates by [`CatalogItemId`].
+    /// Sort item updates by dependency.
+    ///
+    /// First we group items into groups that are totally ordered by dependency. For example, when
+    /// sorting all items by dependency we know that all tables can come after all sources, because
+    /// a source can never depend on a table. Within these groups, the ID order matches the
+    /// dependency order.
+    ///
+    /// It used to be the case that the ID order of ALL items matched the dependency order. However,
+    /// certain migrations shuffled item IDs around s.t. this was no longer true. A much better
+    /// approach would be to investigate each item, discover their exact dependencies, and then
+    /// perform a topological sort. This is non-trivial because we only have the CREATE SQL of each
+    /// item here. Within the SQL the dependent items are sometimes referred to by ID and sometimes
+    /// referred to by name.
+    ///
+    /// The logic of this function should match [`sort_temp_item_updates`].
     fn sort_item_updates(
         item_updates: Vec<(mz_catalog::durable::Item, Timestamp, StateDiff)>,
     ) -> VecDeque<(mz_catalog::durable::Item, Timestamp, StateDiff)> {
-        item_updates
-            .into_iter()
-            // HACK: due to `ALTER SINK`, sinks can appear before the objects they
-            // depend upon. Fortunately, because sinks can never have dependencies
-            // and can never depend upon one another, to fix the topological sort,
-            // we can just always move sinks to the end.
-            .sorted_by_key(|(item, _ts, _diff)| {
-                if item.create_sql.starts_with("CREATE SINK") {
-                    CatalogItemId::User(u64::MAX)
-                } else {
-                    item.id
-                }
-            })
+        // Partition items into groups s.t. each item in one group has a predefined order with all
+        // items in other groups. For example, all sinks are ordered greater than all tables.
+        let mut types = Vec::new();
+        // N.B. Functions can depend on system tables, but not user tables.
+        // TODO(udf): This will change when UDFs are supported.
+        let mut funcs = Vec::new();
+        let mut secrets = Vec::new();
+        let mut connections = Vec::new();
+        let mut sources = Vec::new();
+        let mut tables = Vec::new();
+        let mut derived_items = Vec::new();
+        let mut sinks = Vec::new();
+        let mut continual_tasks = Vec::new();
+
+        for update in item_updates {
+            match update.0.item_type() {
+                CatalogItemType::Type => types.push(update),
+                CatalogItemType::Func => funcs.push(update),
+                CatalogItemType::Secret => secrets.push(update),
+                CatalogItemType::Connection => connections.push(update),
+                CatalogItemType::Source => sources.push(update),
+                CatalogItemType::Table => tables.push(update),
+                CatalogItemType::View
+                | CatalogItemType::MaterializedView
+                | CatalogItemType::Index => derived_items.push(update),
+                CatalogItemType::Sink => sinks.push(update),
+                CatalogItemType::ContinualTask => continual_tasks.push(update),
+            }
+        }
+
+        // Within each group, sort by ID.
+        for group in [
+            &mut types,
+            &mut funcs,
+            &mut secrets,
+            &mut connections,
+            &mut sources,
+            &mut tables,
+            &mut derived_items,
+            &mut sinks,
+            &mut continual_tasks,
+        ] {
+            group.sort_by_key(|(item, _, _)| item.id);
+        }
+
+        iter::empty()
+            .chain(types)
+            .chain(funcs)
+            .chain(secrets)
+            .chain(connections)
+            .chain(sources)
+            .chain(tables)
+            .chain(derived_items)
+            .chain(sinks)
+            .chain(continual_tasks)
             .collect()
     }
+
     let item_retractions = sort_item_updates(item_retractions);
     let item_additions = sort_item_updates(item_additions);
 
-    /// Sort temporary item updates by GlobalId.
+    /// Sort temporary item updates by dependency.
+    ///
+    /// The logic of this function should match [`sort_item_updates`].
     fn sort_temp_item_updates(
         temp_item_updates: Vec<(TemporaryItem, Timestamp, StateDiff)>,
     ) -> VecDeque<(TemporaryItem, Timestamp, StateDiff)> {
-        temp_item_updates
-            .into_iter()
-            // HACK: due to `ALTER SINK`, sinks can appear before the objects they
-            // depend upon. Fortunately, because sinks can never have dependencies
-            // and can never depend upon one another, to fix the topological sort,
-            // we can just always move sinks to the end.
-            .sorted_by_key(|(item, _ts, _diff)| match item.item.typ() {
-                CatalogItemType::Sink => CatalogItemId::User(u64::MAX),
-                _ => item.id,
-            })
+        // Partition items into groups s.t. each item in one group has a predefined order with all
+        // items in other groups. For example, all sinks are ordered greater than all tables.
+        let mut types = Vec::new();
+        // N.B. Functions can depend on system tables, but not user tables.
+        let mut funcs = Vec::new();
+        let mut secrets = Vec::new();
+        let mut connections = Vec::new();
+        let mut sources = Vec::new();
+        let mut tables = Vec::new();
+        let mut derived_items = Vec::new();
+        let mut sinks = Vec::new();
+        let mut continual_tasks = Vec::new();
+
+        for update in temp_item_updates {
+            match update.0.item.typ() {
+                CatalogItemType::Type => types.push(update),
+                CatalogItemType::Func => funcs.push(update),
+                CatalogItemType::Secret => secrets.push(update),
+                CatalogItemType::Connection => connections.push(update),
+                CatalogItemType::Source => sources.push(update),
+                CatalogItemType::Table => tables.push(update),
+                CatalogItemType::View
+                | CatalogItemType::MaterializedView
+                | CatalogItemType::Index => derived_items.push(update),
+                CatalogItemType::Sink => sinks.push(update),
+                CatalogItemType::ContinualTask => continual_tasks.push(update),
+            }
+        }
+
+        // Within each group, sort by ID.
+        for group in [
+            &mut types,
+            &mut funcs,
+            &mut secrets,
+            &mut connections,
+            &mut sources,
+            &mut tables,
+            &mut derived_items,
+            &mut sinks,
+            &mut continual_tasks,
+        ] {
+            group.sort_by_key(|(item, _, _)| item.id);
+        }
+
+        iter::empty()
+            .chain(types)
+            .chain(funcs)
+            .chain(secrets)
+            .chain(connections)
+            .chain(sources)
+            .chain(tables)
+            .chain(derived_items)
+            .chain(sinks)
+            .chain(continual_tasks)
             .collect()
     }
     let temp_item_retractions = sort_temp_item_updates(temp_item_retractions);
diff --git a/src/adapter/src/catalog/migrate.rs b/src/adapter/src/catalog/migrate.rs
index 93d2decf6b264..1e5626f9e2274 100644
--- a/src/adapter/src/catalog/migrate.rs
+++ b/src/adapter/src/catalog/migrate.rs
@@ -12,13 +12,14 @@ use std::collections::BTreeMap;
 use maplit::btreeset;
 use mz_catalog::builtin::BuiltinTable;
 use mz_catalog::durable::Transaction;
-use mz_catalog::memory::objects::StateUpdate;
+use mz_catalog::memory::objects::{BootstrapStateUpdateKind, StateUpdate};
 use mz_ore::collections::CollectionExt;
 use mz_ore::now::NowFn;
 use mz_persist_types::ShardId;
 use mz_repr::{CatalogItemId, Timestamp};
 use mz_sql::ast::display::AstDisplay;
-use mz_sql_parser::ast::{Raw, Statement};
+use mz_sql::names::FullItemName;
+use mz_sql_parser::ast::{IdentError, Raw, Statement};
 use mz_storage_client::controller::StorageTxn;
 use semver::Version;
 use tracing::info;
@@ -79,6 +80,11 @@ where
     Ok(())
 }
 
+pub(crate) struct MigrateResult {
+    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
+    pub(crate) post_item_updates: Vec<(BootstrapStateUpdateKind, Timestamp, i64)>,
+}
+
 /// Migrates all user items and loads them into `state`.
 ///
 /// Returns the builtin updates corresponding to all user items.
@@ -87,9 +93,9 @@ pub(crate) async fn migrate(
     tx: &mut Transaction<'_>,
     local_expr_cache: &mut LocalExpressionCache,
     item_updates: Vec<StateUpdate>,
-    _now: NowFn,
+    now: NowFn,
     _boot_ts: Timestamp,
-) -> Result<Vec<BuiltinTableUpdate<&'static BuiltinTable>>, anyhow::Error> {
+) -> Result<MigrateResult, anyhow::Error> {
     let catalog_version = tx.get_catalog_content_version();
     let catalog_version = match catalog_version {
         Some(v) => Version::parse(&v)?,
@@ -101,6 +107,12 @@ pub(crate) async fn migrate(
         catalog_version
     );
 
+    // Special block for `ast_rewrite_sources_to_tables` migration
+    // since it requires a feature flag needs to update multiple AST items at once.
+    if state.system_config().force_source_table_syntax() {
+        ast_rewrite_sources_to_tables(tx, now)?;
+    }
+
     rewrite_ast_items(tx, |_tx, _id, _stmt| {
         // Add per-item AST migrations below.
         //
@@ -122,6 +134,18 @@ pub(crate) async fn migrate(
     let op_item_updates = into_consolidatable_updates_startup(op_item_updates, commit_ts);
     item_updates.extend(op_item_updates);
     differential_dataflow::consolidation::consolidate_updates(&mut item_updates);
+
+    // Since some migrations might introduce non-item 'post-item' updates, we sequester those
+    // so they can be applied with other post-item updates after migrations to avoid
+    // accumulating negative diffs.
+    let (post_item_updates, item_updates): (Vec<_>, Vec<_>) = item_updates
+        .into_iter()
+        // The only post-item update kind we currently generate is to
+        // update storage collection metadata.
+        .partition(|(kind, _, _)| {
+            matches!(kind, BootstrapStateUpdateKind::StorageCollectionMetadata(_))
+        });
+
     let item_updates = item_updates
         .into_iter()
         .map(|(kind, ts, diff)| StateUpdate {
@@ -173,7 +197,10 @@ pub(crate) async fn migrate(
         "migration from catalog version {:?} complete",
         catalog_version
     );
-    Ok(ast_builtin_table_updates)
+    Ok(MigrateResult {
+        builtin_table_updates: ast_builtin_table_updates,
+        post_item_updates,
+    })
 }
 
 // Add new migrations below their appropriate heading, and precede them with a
@@ -186,6 +213,575 @@ pub(crate) async fn migrate(
 // Please include the adapter team on any code reviews that add or edit
 // migrations.
 
+/// Migrates all sources to use the new sources as tables model
+///
+/// First we migrate existing `CREATE SUBSOURCE` statements, turning them into
+/// `CREATE TABLE .. FROM SOURCE` statements. This covers existing Postgres,
+/// MySQL, and multi-output (tpch, auction, marketing) load-generator subsources.
+///
+/// Second we migrate existing `CREATE SOURCE` statements for these multi-output
+/// sources to remove any subsource-specific options (e.g. TEXT COLUMNS).
+///
+/// Third we migrate existing single-output `CREATE SOURCE` statements.
+/// This includes existing Kafka and single-output load-generator
+/// subsources. This will generate an additional `CREATE TABLE .. FROM SOURCE`
+/// statement that copies over all the export-specific options. This table will use
+/// to the existing source statement's persist shard but use a new GlobalID.
+/// The original source statement will be updated to remove the export-specific options,
+/// renamed to `<original_name>_source`, and use a new empty shard while keeping its
+/// same GlobalId.
+///
+fn ast_rewrite_sources_to_tables(
+    tx: &mut Transaction<'_>,
+    now: NowFn,
+) -> Result<(), anyhow::Error> {
+    use maplit::btreemap;
+    use maplit::btreeset;
+    use mz_persist_types::ShardId;
+    use mz_proto::RustType;
+    use mz_sql::ast::{
+        CreateSourceConnection, CreateSourceStatement, CreateSubsourceOptionName,
+        CreateSubsourceStatement, CreateTableFromSourceStatement, Ident,
+        KafkaSourceConfigOptionName, LoadGenerator, MySqlConfigOptionName, PgConfigOptionName,
+        RawItemName, TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName,
+        UnresolvedItemName, Value, WithOptionValue,
+    };
+    use mz_storage_client::controller::StorageTxn;
+    use mz_storage_types::sources::load_generator::LoadGeneratorOutput;
+    use mz_storage_types::sources::SourceExportStatementDetails;
+    use prost::Message;
+
+    let items_with_statements = tx
+        .get_items()
+        .map(|item| {
+            let stmt = mz_sql::parse::parse(&item.create_sql)?.into_element().ast;
+            Ok((item, stmt))
+        })
+        .collect::<Result<Vec<_>, anyhow::Error>>()?;
+    let items_with_statements_copied = items_with_statements.clone();
+
+    let item_names_per_schema = items_with_statements_copied
+        .iter()
+        .map(|(item, _)| (item.schema_id.clone(), &item.name))
+        .fold(BTreeMap::new(), |mut acc, (schema_id, name)| {
+            acc.entry(schema_id)
+                .or_insert_with(|| btreeset! {})
+                .insert(name);
+            acc
+        });
+
+    // Any CatalogItemId that should be changed to a new CatalogItemId in any statements that
+    // reference it. This is necessary for ensuring downstream statements (e.g.
+    // mat views, indexes) that reference a single-output source (e.g. kafka)
+    // will now reference the corresponding new table, with the same data, instead.
+    let mut changed_ids = BTreeMap::new();
+
+    for (mut item, stmt) in items_with_statements {
+        match stmt {
+            // Migrate each `CREATE SUBSOURCE` statement to an equivalent
+            // `CREATE TABLE ... FROM SOURCE` statement.
+            Statement::CreateSubsource(CreateSubsourceStatement {
+                name,
+                columns,
+                constraints,
+                of_source,
+                if_not_exists,
+                mut with_options,
+            }) => {
+                let raw_source_name = match of_source {
+                    // If `of_source` is None then this is a `progress` subsource which we
+                    // are not migrating as they are not currently relevant to the new table model.
+                    None => continue,
+                    Some(name) => name,
+                };
+                let source = match raw_source_name {
+                    // Some legacy subsources have named-only references to their `of_source`
+                    // so we ensure we always use an ID-based reference in the stored
+                    // `CREATE TABLE ... FROM SOURCE` statements.
+                    RawItemName::Name(name) => {
+                        // Convert the name reference to an ID reference.
+                        let (source_item, _) = items_with_statements_copied
+                            .iter()
+                            .find(|(_, statement)| match statement {
+                                Statement::CreateSource(stmt) => stmt.name == name,
+                                _ => false,
+                            })
+                            .expect("source must exist");
+                        RawItemName::Id(source_item.id.to_string(), name, None)
+                    }
+                    RawItemName::Id(..) => raw_source_name,
+                };
+
+                // The external reference is a `with_option` on subsource statements but is a
+                // separate field on table statements.
+                let external_reference = match with_options
+                    .iter()
+                    .position(|opt| opt.name == CreateSubsourceOptionName::ExternalReference)
+                {
+                    Some(i) => match with_options.remove(i).value {
+                        Some(WithOptionValue::UnresolvedItemName(name)) => name,
+                        _ => unreachable!("external reference must be an unresolved item name"),
+                    },
+                    None => panic!("subsource must have an external reference"),
+                };
+
+                let with_options = with_options
+                    .into_iter()
+                    .map(|option| {
+                        match option.name {
+                            CreateSubsourceOptionName::Details => TableFromSourceOption {
+                                name: TableFromSourceOptionName::Details,
+                                // The `details` option on both subsources and tables is identical, using the same
+                                // ProtoSourceExportStatementDetails serialized value.
+                                value: option.value,
+                            },
+                            CreateSubsourceOptionName::TextColumns => TableFromSourceOption {
+                                name: TableFromSourceOptionName::TextColumns,
+                                value: option.value,
+                            },
+                            CreateSubsourceOptionName::ExcludeColumns => TableFromSourceOption {
+                                name: TableFromSourceOptionName::ExcludeColumns,
+                                value: option.value,
+                            },
+                            CreateSubsourceOptionName::Progress => {
+                                panic!("progress option should not exist on this subsource")
+                            }
+                            CreateSubsourceOptionName::ExternalReference => {
+                                unreachable!("This option is handled separately above.")
+                            }
+                        }
+                    })
+                    .collect::<Vec<_>>();
+
+                let table = CreateTableFromSourceStatement {
+                    name,
+                    constraints,
+                    columns: mz_sql::ast::TableFromSourceColumns::Defined(columns),
+                    if_not_exists,
+                    source,
+                    external_reference: Some(external_reference.clone()),
+                    with_options,
+                    // Subsources don't have `envelope`, `include_metadata`, or `format` options.
+                    envelope: None,
+                    include_metadata: vec![],
+                    format: None,
+                };
+
+                info!(
+                    "migrate: converted subsource {} to table {}",
+                    item.create_sql, table
+                );
+                item.create_sql = Statement::CreateTableFromSource(table).to_ast_string_stable();
+                tx.update_item(item.id, item)?;
+            }
+
+            // Postgres sources are multi-output sources whose subsources are
+            // migrated above. All we need to do is remove the subsource-related
+            // options from this statement since they are no longer relevant.
+            Statement::CreateSource(CreateSourceStatement {
+                connection:
+                    mut conn @ (CreateSourceConnection::Postgres { .. }
+                    | CreateSourceConnection::Yugabyte { .. }),
+                name,
+                if_not_exists,
+                in_cluster,
+                include_metadata,
+                format,
+                envelope,
+                col_names,
+                with_options,
+                key_constraint,
+                external_references,
+                progress_subsource,
+            }) => {
+                let options = match &mut conn {
+                    CreateSourceConnection::Postgres { options, .. } => options,
+                    CreateSourceConnection::Yugabyte { options, .. } => options,
+                    _ => unreachable!("match determined above"),
+                };
+                // This option storing text columns on the primary source statement is redundant
+                // with the option on subsource statements so can just be removed.
+                // This was kept for round-tripping of `CREATE SOURCE` statements that automatically
+                // generated subsources, which is no longer necessary.
+                if options
+                    .iter()
+                    .any(|o| matches!(o.name, PgConfigOptionName::TextColumns))
+                {
+                    options.retain(|o| !matches!(o.name, PgConfigOptionName::TextColumns));
+                    let stmt = Statement::CreateSource(CreateSourceStatement {
+                        connection: conn,
+                        name,
+                        if_not_exists,
+                        in_cluster,
+                        include_metadata,
+                        format,
+                        envelope,
+                        col_names,
+                        with_options,
+                        key_constraint,
+                        external_references,
+                        progress_subsource,
+                    });
+                    item.create_sql = stmt.to_ast_string_stable();
+                    tx.update_item(item.id, item)?;
+                    info!("migrate: converted postgres source {stmt} to remove subsource options");
+                }
+            }
+            // MySQL sources are multi-output sources whose subsources are
+            // migrated above. All we need to do is remove the subsource-related
+            // options from this statement since they are no longer relevant.
+            Statement::CreateSource(CreateSourceStatement {
+                connection: mut conn @ CreateSourceConnection::MySql { .. },
+                name,
+                if_not_exists,
+                in_cluster,
+                include_metadata,
+                format,
+                envelope,
+                col_names,
+                with_options,
+                key_constraint,
+                external_references,
+                progress_subsource,
+                ..
+            }) => {
+                let options = match &mut conn {
+                    CreateSourceConnection::MySql { options, .. } => options,
+                    _ => unreachable!("match determined above"),
+                };
+                // These options storing text and exclude columns on the primary source statement
+                // are redundant with the options on subsource statements so can just be removed.
+                // They was kept for round-tripping of `CREATE SOURCE` statements that automatically
+                // generated subsources, which is no longer necessary.
+                if options.iter().any(|o| {
+                    matches!(
+                        o.name,
+                        MySqlConfigOptionName::TextColumns | MySqlConfigOptionName::ExcludeColumns
+                    )
+                }) {
+                    options.retain(|o| {
+                        !matches!(
+                            o.name,
+                            MySqlConfigOptionName::TextColumns
+                                | MySqlConfigOptionName::ExcludeColumns
+                        )
+                    });
+                    let stmt = Statement::CreateSource(CreateSourceStatement {
+                        connection: conn,
+                        name,
+                        if_not_exists,
+                        in_cluster,
+                        include_metadata,
+                        format,
+                        envelope,
+                        col_names,
+                        with_options,
+                        key_constraint,
+                        external_references,
+                        progress_subsource,
+                    });
+                    item.create_sql = stmt.to_ast_string_stable();
+                    tx.update_item(item.id, item)?;
+                    info!("migrate: converted mysql source {stmt} to remove subsource options");
+                }
+            }
+            // Multi-output load generator sources whose subsources are already
+            // migrated above. There is no need to remove any options from this
+            // statement since they are not export-specific.
+            Statement::CreateSource(CreateSourceStatement {
+                connection:
+                    CreateSourceConnection::LoadGenerator {
+                        generator:
+                            LoadGenerator::Auction | LoadGenerator::Marketing | LoadGenerator::Tpch,
+                        ..
+                    },
+                ..
+            }) => {}
+            // Single-output sources that need to be migrated to tables. These sources currently output
+            // data to the primary collection of the source statement. We will create a new table
+            // statement for them and move all export-specific options over from the source statement,
+            // while moving the `CREATE SOURCE` statement to a new name and moving its shard to the
+            // new table statement.
+            Statement::CreateSource(CreateSourceStatement {
+                connection:
+                    conn @ (CreateSourceConnection::Kafka { .. }
+                    | CreateSourceConnection::LoadGenerator {
+                        generator:
+                            LoadGenerator::Clock
+                            | LoadGenerator::Datums
+                            | LoadGenerator::Counter
+                            | LoadGenerator::KeyValue,
+                        ..
+                    }),
+                name,
+                col_names,
+                include_metadata,
+                format,
+                envelope,
+                with_options,
+                if_not_exists,
+                in_cluster,
+                progress_subsource,
+                external_references,
+                key_constraint,
+            }) => {
+                // To check if this is a source that has already been migrated we use a basic
+                // heuristic: if there is at least one existing table for the source, and if
+                // the envelope/format/include_metadata options are empty, we assume it's
+                // already been migrated.
+                let tables_for_source =
+                    items_with_statements_copied
+                        .iter()
+                        .any(|(_, statement)| match statement {
+                            Statement::CreateTableFromSource(stmt) => {
+                                let source: CatalogItemId = match &stmt.source {
+                                    RawItemName::Name(_) => {
+                                        unreachable!("tables store source as ID")
+                                    }
+                                    RawItemName::Id(source_id, _, _) => {
+                                        source_id.parse().expect("valid id")
+                                    }
+                                };
+                                source == item.id
+                            }
+                            _ => false,
+                        });
+                if tables_for_source
+                    && envelope.is_none()
+                    && format.is_none()
+                    && include_metadata.is_empty()
+                {
+                    info!("migrate: skipping already migrated source: {}", name);
+                    continue;
+                }
+
+                // Use the current source name as the new table name, and rename the source to
+                // `<source_name>_source`. This is intended to allow users to continue using
+                // queries that reference the source name, since they will now need to query the
+                // table instead.
+
+                assert_eq!(
+                    item.name,
+                    name.0.last().expect("at least one ident").to_string()
+                );
+                // First find an unused name within the same schema to avoid conflicts.
+                let is_valid = |new_source_ident: &Ident| {
+                    if item_names_per_schema
+                        .get(&item.schema_id)
+                        .expect("schema must exist")
+                        .contains(&new_source_ident.to_string())
+                    {
+                        Ok::<_, IdentError>(false)
+                    } else {
+                        Ok(true)
+                    }
+                };
+                let new_source_ident =
+                    Ident::try_generate_name(item.name.clone(), "_source", is_valid)?;
+
+                // We will use the original item name for the new table item.
+                let table_item_name = item.name.clone();
+
+                // Update the source item/statement to use the new name.
+                let mut new_source_name = name.clone();
+                *new_source_name.0.last_mut().expect("at least one ident") =
+                    new_source_ident.clone();
+                item.name = new_source_ident.to_string();
+
+                // A reference to the source that will be included in the table statement
+                let source_ref =
+                    RawItemName::Id(item.id.to_string(), new_source_name.clone(), None);
+
+                let columns = if col_names.is_empty() {
+                    TableFromSourceColumns::NotSpecified
+                } else {
+                    TableFromSourceColumns::Named(col_names)
+                };
+
+                // All source tables must have a `details` option, which is a serialized proto
+                // describing any source-specific details for this table statement.
+                let details = match &conn {
+                    // For kafka sources this proto is currently empty.
+                    CreateSourceConnection::Kafka { .. } => SourceExportStatementDetails::Kafka {},
+                    CreateSourceConnection::LoadGenerator { .. } => {
+                        // Since these load generators are single-output we use the default output.
+                        SourceExportStatementDetails::LoadGenerator {
+                            output: LoadGeneratorOutput::Default,
+                        }
+                    }
+                    _ => unreachable!("match determined above"),
+                };
+                let table_with_options = vec![TableFromSourceOption {
+                    name: TableFromSourceOptionName::Details,
+                    value: Some(WithOptionValue::Value(Value::String(hex::encode(
+                        details.into_proto().encode_to_vec(),
+                    )))),
+                }];
+
+                // Generate the same external-reference that would have been generated
+                // during purification for single-output sources.
+                let external_reference = match &conn {
+                    CreateSourceConnection::Kafka { options, .. } => {
+                        let topic_option = options
+                            .iter()
+                            .find(|o| matches!(o.name, KafkaSourceConfigOptionName::Topic))
+                            .expect("kafka sources must have a topic");
+                        let topic = match &topic_option.value {
+                            Some(WithOptionValue::Value(Value::String(topic))) => topic,
+                            _ => unreachable!("topic must be a string"),
+                        };
+
+                        Some(UnresolvedItemName::qualified(&[Ident::new(topic)?]))
+                    }
+                    CreateSourceConnection::LoadGenerator { generator, .. } => {
+                        // Since these load generators are single-output the external reference
+                        // uses the schema-name for both namespace and name.
+                        let name = FullItemName {
+                                database: mz_sql::names::RawDatabaseSpecifier::Name(
+                                    mz_storage_types::sources::load_generator::LOAD_GENERATOR_DATABASE_NAME
+                                        .to_owned(),
+                                ),
+                                schema: generator.schema_name().to_string(),
+                                item: generator.schema_name().to_string(),
+                            };
+                        Some(UnresolvedItemName::from(name))
+                    }
+                    _ => unreachable!("match determined above"),
+                };
+
+                // The new table statement, stealing the name and the export-specific fields from
+                // the create source statement.
+                let table = CreateTableFromSourceStatement {
+                    name,
+                    constraints: vec![],
+                    columns,
+                    if_not_exists: false,
+                    source: source_ref,
+                    external_reference,
+                    with_options: table_with_options,
+                    envelope,
+                    include_metadata,
+                    format,
+                };
+
+                // The source statement with a new name and many of its fields emptied
+                let source = CreateSourceStatement {
+                    connection: conn,
+                    name: new_source_name,
+                    if_not_exists,
+                    in_cluster,
+                    include_metadata: vec![],
+                    format: None,
+                    envelope: None,
+                    col_names: vec![],
+                    with_options,
+                    key_constraint,
+                    external_references,
+                    progress_subsource,
+                };
+
+                let source_id = item.id;
+                let source_global_id = item.global_id;
+                let schema_id = item.schema_id.clone();
+                let schema = tx.get_schema(&item.schema_id).expect("schema must exist");
+
+                let owner_id = item.owner_id.clone();
+                let privileges = item.privileges.clone();
+                let extra_versions = item.extra_versions.clone();
+
+                // Update the source statement in the catalog first, since the name will
+                // otherwise conflict with the new table statement.
+                info!("migrate: updated source {} to {source}", item.create_sql);
+                item.create_sql = Statement::CreateSource(source).to_ast_string_stable();
+                tx.update_item(item.id, item)?;
+
+                // Insert the new table statement into the catalog with a new id.
+                let ids = tx.allocate_user_item_ids(1)?;
+                let (new_table_id, new_table_global_id) = ids[0];
+                info!("migrate: added table {new_table_id}: {table}");
+                tx.insert_user_item(
+                    new_table_id,
+                    new_table_global_id,
+                    schema_id,
+                    &table_item_name,
+                    table.to_ast_string_stable(),
+                    owner_id,
+                    privileges,
+                    &Default::default(),
+                    extra_versions,
+                )?;
+                // We need to move the shard currently attached to the source statement to the
+                // table statement such that the existing data in the shard is preserved and can
+                // be queried on the new table statement. However, we need to keep the GlobalId of
+                // the source the same, to preserve existing references to that statement in
+                // external tools such as DBT and Terraform. We will insert a new shard for the source
+                // statement which will be automatically created after the migration is complete.
+                let new_source_shard = ShardId::new();
+                let (source_global_id, existing_source_shard) = tx
+                    .delete_collection_metadata(btreeset! {source_global_id})
+                    .pop()
+                    .expect("shard should exist");
+                tx.insert_collection_metadata(btreemap! {
+                    new_table_global_id => existing_source_shard,
+                    source_global_id => new_source_shard
+                })?;
+
+                add_to_audit_log(
+                    tx,
+                    mz_audit_log::EventType::Create,
+                    mz_audit_log::ObjectType::Table,
+                    mz_audit_log::EventDetails::IdFullNameV1(mz_audit_log::IdFullNameV1 {
+                        id: new_table_id.to_string(),
+                        name: mz_audit_log::FullNameV1 {
+                            database: schema
+                                .database_id
+                                .map(|d| d.to_string())
+                                .unwrap_or_default(),
+                            schema: schema.name,
+                            item: table_item_name,
+                        },
+                    }),
+                    now(),
+                )?;
+
+                // We also need to update any other statements that reference the source to use the new
+                // table id/name instead.
+                changed_ids.insert(source_id, new_table_id);
+            }
+
+            #[expect(unreachable_patterns)]
+            Statement::CreateSource(_) => {}
+            _ => (),
+        }
+    }
+
+    let mut updated_items = BTreeMap::new();
+    for (mut item, mut statement) in items_with_statements_copied {
+        match &statement {
+            // Don’t rewrite any of the statements we just migrated.
+            Statement::CreateSource(_) => {}
+            Statement::CreateSubsource(_) => {}
+            Statement::CreateTableFromSource(_) => {}
+            // We need to rewrite any statements that reference a source id to use the new
+            // table id instead, since any contained data in the source will now be in the table.
+            // This assumes the table has stolen the source's name, which is the case
+            // for all sources that were migrated.
+            _ => {
+                if mz_sql::names::modify_dependency_item_ids(&mut statement, &changed_ids) {
+                    info!("migrate: updated dependency reference in statement {statement}");
+                    item.create_sql = statement.to_ast_string_stable();
+                    updated_items.insert(item.id, item);
+                }
+            }
+        }
+    }
+    if !updated_items.is_empty() {
+        tx.update_items(updated_items)?;
+    }
+
+    Ok(())
+}
+
 // Durable migrations
 
 /// Migrations that run only on the durable catalog before any data is loaded into memory.
@@ -237,3 +833,17 @@ pub(crate) fn durable_migrate(
 //
 // Please include the adapter team on any code reviews that add or edit
 // migrations.
+
+fn add_to_audit_log(
+    tx: &mut Transaction,
+    event_type: mz_audit_log::EventType,
+    object_type: mz_audit_log::ObjectType,
+    details: mz_audit_log::EventDetails,
+    occurred_at: mz_ore::now::EpochMillis,
+) -> Result<(), anyhow::Error> {
+    let id = tx.get_and_increment_id(mz_catalog::durable::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
+    let event =
+        mz_audit_log::VersionedEvent::new(id, event_type, object_type, details, None, occurred_at);
+    tx.insert_audit_log_event(event);
+    Ok(())
+}
diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs
index ac200eba0a54f..c390252dee33f 100644
--- a/src/adapter/src/catalog/open.rs
+++ b/src/adapter/src/catalog/open.rs
@@ -262,7 +262,6 @@ impl Catalog {
         let mut post_item_updates = Vec::new();
         let mut audit_log_updates = Vec::new();
         for (kind, ts, diff) in updates {
-            let diff = diff.try_into().expect("valid diff");
             match kind {
                 BootstrapStateUpdateKind::Role(_)
                 | BootstrapStateUpdateKind::Database(_)
@@ -276,7 +275,7 @@ impl Catalog {
                     pre_item_updates.push(StateUpdate {
                         kind: kind.into(),
                         ts,
-                        diff,
+                        diff: diff.try_into().expect("valid diff"),
                     })
                 }
                 BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
@@ -284,29 +283,25 @@ impl Catalog {
                     system_item_updates.push(StateUpdate {
                         kind: kind.into(),
                         ts,
-                        diff,
+                        diff: diff.try_into().expect("valid diff"),
                     })
                 }
                 BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
                     kind: kind.into(),
                     ts,
-                    diff,
+                    diff: diff.try_into().expect("valid diff"),
                 }),
                 BootstrapStateUpdateKind::Comment(_)
                 | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
                 | BootstrapStateUpdateKind::SourceReferences(_)
                 | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
-                    post_item_updates.push(StateUpdate {
-                        kind: kind.into(),
-                        ts,
-                        diff,
-                    })
+                    post_item_updates.push((kind, ts, diff));
                 }
                 BootstrapStateUpdateKind::AuditLog(_) => {
                     audit_log_updates.push(StateUpdate {
                         kind: kind.into(),
                         ts,
-                        diff,
+                        diff: diff.try_into().expect("valid diff"),
                     });
                 }
             }
@@ -384,7 +379,7 @@ impl Catalog {
 
         // Migrate item ASTs.
         let builtin_table_update = if !config.skip_migrations {
-            migrate::migrate(
+            let migrate_result = migrate::migrate(
                 &mut state,
                 &mut txn,
                 &mut local_expr_cache,
@@ -399,7 +394,21 @@ impl Catalog {
                     this_version: config.build_info.version,
                     cause: e.to_string(),
                 })
-            })?
+            })?;
+            if !migrate_result.post_item_updates.is_empty() {
+                // Include any post-item-updates generated by migrations, and then consolidate
+                // them to ensure diffs are all positive.
+                post_item_updates.extend(migrate_result.post_item_updates);
+                // Push everything to the same timestamp so it consolidates cleanly.
+                if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
+                    for (_, ts, _) in &mut post_item_updates {
+                        *ts = max_ts;
+                    }
+                }
+                differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
+            }
+
+            migrate_result.builtin_table_updates
         } else {
             state
                 .apply_updates_for_bootstrap(item_updates, &mut local_expr_cache)
@@ -407,6 +416,14 @@ impl Catalog {
         };
         builtin_table_updates.extend(builtin_table_update);
 
+        let post_item_updates = post_item_updates
+            .into_iter()
+            .map(|(kind, ts, diff)| StateUpdate {
+                kind: kind.into(),
+                ts,
+                diff: diff.try_into().expect("valid diff"),
+            })
+            .collect();
         let builtin_table_update = state
             .apply_updates_for_bootstrap(post_item_updates, &mut local_expr_cache)
             .await;
diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs
index a7d19ab15870f..fbda0a3246618 100644
--- a/src/adapter/src/coord.rs
+++ b/src/adapter/src/coord.rs
@@ -1943,29 +1943,6 @@ impl Coordinator {
         let mut privatelink_connections = BTreeMap::new();
 
         for entry in &entries {
-            // TODO(database-issues#7922): we should move this invariant into `CatalogEntry`.
-            mz_ore::soft_assert_or_log!(
-                // We only expect user objects to objects obey this invariant.
-                // System objects, for instance, can depend on other system
-                // objects that belong to a schema that is simply loaded first.
-                // To meaningfully resolve this, we could need more careful
-                // loading order or more complex IDs, neither of which seem very
-                // beneficial.
-                //
-                // HACK: sinks are permitted to depend on items with larger IDs,
-                // due to `ALTER SINK`.
-                !entry.id().is_user()
-                    || entry.is_sink()
-                    || entry
-                        .uses()
-                        .iter()
-                        .all(|dependency_id| *dependency_id <= entry.id),
-                "entries should only use to items with lesser `GlobalId`s, but \
-                {:?} uses {:?}",
-                entry.id,
-                entry.uses()
-            );
-
             debug!(
                 "coordinator init: installing {} {}",
                 entry.item().typ(),
diff --git a/src/catalog/src/durable/objects.rs b/src/catalog/src/durable/objects.rs
index e6bb869ce84fb..1d0002461d61f 100644
--- a/src/catalog/src/durable/objects.rs
+++ b/src/catalog/src/durable/objects.rs
@@ -509,6 +509,12 @@ pub struct Item {
     pub extra_versions: BTreeMap<RelationVersion, GlobalId>,
 }
 
+impl Item {
+    pub fn item_type(&self) -> CatalogItemType {
+        item_type(&self.create_sql)
+    }
+}
+
 impl DurableType for Item {
     type Key = ItemKey;
     type Value = ItemValue;
@@ -1289,32 +1295,36 @@ pub struct ItemValue {
 }
 
 impl ItemValue {
-    pub(crate) fn item_type(&self) -> CatalogItemType {
-        // NOTE(benesch): the implementation of this method is hideous, but is
-        // there a better alternative? Storing the object type alongside the
-        // `create_sql` would introduce the possibility of skew.
-        let mut tokens = self.create_sql.split_whitespace();
-        assert_eq!(tokens.next(), Some("CREATE"));
-        match tokens.next() {
-            Some("TABLE") => CatalogItemType::Table,
-            Some("SOURCE") | Some("SUBSOURCE") => CatalogItemType::Source,
-            Some("SINK") => CatalogItemType::Sink,
-            Some("VIEW") => CatalogItemType::View,
-            Some("MATERIALIZED") => {
-                assert_eq!(tokens.next(), Some("VIEW"));
-                CatalogItemType::MaterializedView
-            }
-            Some("CONTINUAL") => {
-                assert_eq!(tokens.next(), Some("TASK"));
-                CatalogItemType::ContinualTask
-            }
-            Some("INDEX") => CatalogItemType::Index,
-            Some("TYPE") => CatalogItemType::Type,
-            Some("FUNCTION") => CatalogItemType::Func,
-            Some("SECRET") => CatalogItemType::Secret,
-            Some("CONNECTION") => CatalogItemType::Connection,
-            _ => panic!("unexpected create sql: {}", self.create_sql),
+    pub fn item_type(&self) -> CatalogItemType {
+        item_type(&self.create_sql)
+    }
+}
+
+fn item_type(create_sql: &str) -> CatalogItemType {
+    // NOTE(benesch): the implementation of this method is hideous, but is
+    // there a better alternative? Storing the object type alongside the
+    // `create_sql` would introduce the possibility of skew.
+    let mut tokens = create_sql.split_whitespace();
+    assert_eq!(tokens.next(), Some("CREATE"));
+    match tokens.next() {
+        Some("TABLE") => CatalogItemType::Table,
+        Some("SOURCE") | Some("SUBSOURCE") => CatalogItemType::Source,
+        Some("SINK") => CatalogItemType::Sink,
+        Some("VIEW") => CatalogItemType::View,
+        Some("MATERIALIZED") => {
+            assert_eq!(tokens.next(), Some("VIEW"));
+            CatalogItemType::MaterializedView
+        }
+        Some("CONTINUAL") => {
+            assert_eq!(tokens.next(), Some("TASK"));
+            CatalogItemType::ContinualTask
         }
+        Some("INDEX") => CatalogItemType::Index,
+        Some("TYPE") => CatalogItemType::Type,
+        Some("FUNCTION") => CatalogItemType::Func,
+        Some("SECRET") => CatalogItemType::Secret,
+        Some("CONNECTION") => CatalogItemType::Connection,
+        _ => panic!("unexpected create sql: {}", create_sql),
     }
 }
 
diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs
index 47b4f465af9f1..38ec930d54a9f 100644
--- a/src/catalog/src/durable/transaction.rs
+++ b/src/catalog/src/durable/transaction.rs
@@ -2061,6 +2061,13 @@ impl<'a> Transaction<'a> {
             .map(|(k, v)| DurableType::from_key_value(k, v))
     }
 
+    pub fn get_schema(&self, id: &SchemaId) -> Option<Schema> {
+        let key = SchemaKey { id: *id };
+        self.schemas
+            .get(&key)
+            .map(|v| DurableType::from_key_value(key, v.clone()))
+    }
+
     pub fn get_introspection_source_indexes(
         &self,
         cluster_id: ClusterId,
diff --git a/src/sql-parser/src/ast/defs/ddl.rs b/src/sql-parser/src/ast/defs/ddl.rs
index 7bffdcc990681..573aa56f4dfce 100644
--- a/src/sql-parser/src/ast/defs/ddl.rs
+++ b/src/sql-parser/src/ast/defs/ddl.rs
@@ -1294,6 +1294,24 @@ impl AstDisplay for LoadGenerator {
 }
 impl_display!(LoadGenerator);
 
+impl LoadGenerator {
+    /// Corresponds with the same mapping on the `LoadGenerator` enum defined in
+    /// src/storage-types/src/sources/load_generator.rs, but re-defined here for
+    /// cases where we only have the AST representation. This can be removed once
+    /// the `ast_rewrite_sources_to_tables` migration is removed.
+    pub fn schema_name(&self) -> &'static str {
+        match self {
+            LoadGenerator::Counter => "counter",
+            LoadGenerator::Clock => "clock",
+            LoadGenerator::Marketing => "marketing",
+            LoadGenerator::Auction => "auction",
+            LoadGenerator::Datums => "datums",
+            LoadGenerator::Tpch => "tpch",
+            LoadGenerator::KeyValue => "key_value",
+        }
+    }
+}
+
 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
 pub enum LoadGeneratorOptionName {
     ScaleFactor,
diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs
index 7af22528fce85..1401f83872040 100644
--- a/src/sql/src/names.rs
+++ b/src/sql/src/names.rs
@@ -24,6 +24,7 @@ use mz_repr::network_policy_id::NetworkPolicyId;
 use mz_repr::role_id::RoleId;
 use mz_repr::{CatalogItemId, GlobalId, RelationVersion};
 use mz_repr::{ColumnName, RelationVersionSelector};
+use mz_sql_parser::ast::visit_mut::VisitMutNode;
 use mz_sql_parser::ast::{CreateContinualTaskStatement, Expr, RawNetworkPolicyName, Version};
 use mz_sql_parser::ident;
 use proptest_derive::Arbitrary;
@@ -2386,6 +2387,44 @@ where
     ResolvedIds::new(visitor.ids)
 }
 
+#[derive(Debug)]
+pub struct ItemDependencyModifier<'a> {
+    pub modified: bool,
+    pub id_map: &'a BTreeMap<CatalogItemId, CatalogItemId>,
+}
+
+impl<'ast, 'a> VisitMut<'ast, Raw> for ItemDependencyModifier<'a> {
+    fn visit_item_name_mut(&mut self, item_name: &mut RawItemName) {
+        if let RawItemName::Id(id, _, _) = item_name {
+            let parsed_id = id.parse::<CatalogItemId>().unwrap();
+            if let Some(new_id) = self.id_map.get(&parsed_id) {
+                *id = new_id.to_string();
+                self.modified = true;
+            }
+        }
+    }
+}
+
+/// Updates any references in the provided AST node that are keys in `id_map`.
+/// If an id is found it will be updated to the value of the key in `id_map`.
+/// This assumes the names of the reference(s) are unmodified (e.g. each pair of
+/// ids refer to an item of the same name, whose id has changed).
+pub fn modify_dependency_item_ids<'ast, N>(
+    node: &'ast mut N,
+    id_map: &BTreeMap<CatalogItemId, CatalogItemId>,
+) -> bool
+where
+    N: VisitMutNode<'ast, Raw>,
+{
+    let mut modifier = ItemDependencyModifier {
+        id_map,
+        modified: false,
+    };
+    node.visit_mut(&mut modifier);
+
+    modifier.modified
+}
+
 // Used when displaying a view's source for human creation. If the name
 // specified is the same as the name in the catalog, we don't use the ID format.
 #[derive(Debug)]
diff --git a/src/storage-types/src/sources/load_generator.rs b/src/storage-types/src/sources/load_generator.rs
index 3d9dd471cd71a..0d6ea646992f1 100644
--- a/src/storage-types/src/sources/load_generator.rs
+++ b/src/storage-types/src/sources/load_generator.rs
@@ -196,6 +196,8 @@ pub enum LoadGenerator {
 pub const LOAD_GENERATOR_DATABASE_NAME: &str = "mz_load_generators";
 
 impl LoadGenerator {
+    /// Must be kept in-sync with the same mapping on the `LoadGenerator` enum defined in
+    /// src/sql-parser/src/ast/defs/ddl.rs.
     pub fn schema_name(&self) -> &'static str {
         match self {
             LoadGenerator::Counter { .. } => "counter",
diff --git a/src/storage/src/source/mysql/replication/events.rs b/src/storage/src/source/mysql/replication/events.rs
index 14a628e8db25a..56f0016a3faa4 100644
--- a/src/storage/src/source/mysql/replication/events.rs
+++ b/src/storage/src/source/mysql/replication/events.rs
@@ -197,6 +197,11 @@ pub(super) async fn handle_query_event(
         (Some("commit"), None) => {
             is_complete_event = true;
         }
+        // Detect `CREATE TABLE <tbl>` statements which don't affect existing tables but do
+        // signify a complete event (e.g. for the purposes of advancing the GTID)
+        (Some("create"), Some("table")) => {
+            is_complete_event = true;
+        }
         _ => {}
     }
 
diff --git a/test/legacy-upgrade/check-from-v0.27.0-compile-proto-sources.td b/test/legacy-upgrade/check-from-v0.27.0-compile-proto-sources.td
index 4240d3a2229de..fcc08c2529d2d 100644
--- a/test/legacy-upgrade/check-from-v0.27.0-compile-proto-sources.td
+++ b/test/legacy-upgrade/check-from-v0.27.0-compile-proto-sources.td
@@ -24,5 +24,3 @@ c
 h
 a
 e
-
-> DROP SOURCE kafka_proto_source CASCADE;
diff --git a/test/legacy-upgrade/check-from-v0.45.0-kafka-progress.td b/test/legacy-upgrade/check-from-v0.45.0-kafka-progress.td
index 69af351dbbe33..9592f9055b7d5 100644
--- a/test/legacy-upgrade/check-from-v0.45.0-kafka-progress.td
+++ b/test/legacy-upgrade/check-from-v0.45.0-kafka-progress.td
@@ -18,5 +18,3 @@ $ set-regex match=\d+ replacement=<NUMBER>
 $ set-regex match=testdrive-upgrade-kafka-source-.*?' replacement=<TOPIC>'
 >[version<8100] SHOW CREATE SOURCE kafka_source
 materialize.public.kafka_source "CREATE SOURCE \"materialize\".\"public\".\"kafka_source\" FROM KAFKA CONNECTION \"materialize\".\"public\".\"kafka_conn\" (TOPIC = '<TOPIC>') FORMAT AVRO USING SCHEMA '{   \"type\": \"record\",   \"name\": \"cpx\",   \"fields\": [     {\"name\": \"a\", \"type\": \"long\"},     {\"name\": \"b\", \"type\": \"long\"}   ] }' ENVELOPE NONE EXPOSE PROGRESS AS \"materialize\".\"public\".\"kafka_source_progress\""
-
-> DROP SOURCE kafka_source;
diff --git a/test/legacy-upgrade/mzcompose.py b/test/legacy-upgrade/mzcompose.py
index 60c9830353e55..9b4eee9404ca4 100644
--- a/test/legacy-upgrade/mzcompose.py
+++ b/test/legacy-upgrade/mzcompose.py
@@ -118,20 +118,69 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
         else:
             if parallelism_count == 1 or parallelism_index == 0:
                 test_upgrade_from_version(
-                    c, f"{version}", priors, filter=args.filter, zero_downtime=True
+                    c,
+                    f"{version}",
+                    priors,
+                    filter=args.filter,
+                    zero_downtime=True,
+                    force_source_table_syntax=False,
                 )
             if parallelism_count == 1 or parallelism_index == 1:
                 test_upgrade_from_version(
-                    c, f"{version}", priors, filter=args.filter, zero_downtime=False
+                    c,
+                    f"{version}",
+                    priors,
+                    filter=args.filter,
+                    zero_downtime=False,
+                    force_source_table_syntax=False,
+                )
+                test_upgrade_from_version(
+                    c,
+                    f"{version}",
+                    priors,
+                    filter=args.filter,
+                    zero_downtime=False,
+                    force_source_table_syntax=True,
                 )
 
     if parallelism_count == 1 or parallelism_index == 0:
         test_upgrade_from_version(
-            c, "current_source", priors=[], filter=args.filter, zero_downtime=True
+            c,
+            "current_source",
+            priors=[],
+            filter=args.filter,
+            zero_downtime=True,
+            force_source_table_syntax=False,
         )
+        if args.lts_upgrade:
+            # Direct upgrade from latest LTS version without any inbetween versions
+            version = LTS_VERSIONS[-1]
+            priors = [v for v in all_versions if v <= version]
+            test_upgrade_from_version(
+                c,
+                f"{version}",
+                priors,
+                filter=args.filter,
+                zero_downtime=False,
+                force_source_table_syntax=True,
+                lts_upgrade=True,
+            )
     if parallelism_count == 1 or parallelism_index == 1:
         test_upgrade_from_version(
-            c, "current_source", priors=[], filter=args.filter, zero_downtime=False
+            c,
+            "current_source",
+            priors=[],
+            filter=args.filter,
+            zero_downtime=False,
+            force_source_table_syntax=False,
+        )
+        test_upgrade_from_version(
+            c,
+            "current_source",
+            priors=[],
+            filter=args.filter,
+            zero_downtime=False,
+            force_source_table_syntax=True,
         )
         if args.lts_upgrade:
             # Direct upgrade from latest LTS version without any inbetween versions
@@ -143,6 +192,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
                 priors,
                 filter=args.filter,
                 zero_downtime=False,
+                force_source_table_syntax=False,
                 lts_upgrade=True,
             )
 
@@ -166,6 +216,7 @@ def test_upgrade_from_version(
     priors: list[MzVersion],
     filter: str,
     zero_downtime: bool,
+    force_source_table_syntax: bool,
     lts_upgrade: bool = False,
 ) -> None:
     print(
@@ -173,7 +224,7 @@ def test_upgrade_from_version(
     )
 
     system_parameter_defaults = get_default_system_parameters(
-        zero_downtime=zero_downtime
+        zero_downtime=zero_downtime,
     )
     deploy_generation = 0
 
@@ -303,6 +354,13 @@ def test_upgrade_from_version(
                     c.rm(mz_service)
 
     print(f"{'0dt-' if zero_downtime else ''}Upgrading to final version")
+    system_parameter_defaults = get_default_system_parameters(
+        zero_downtime=zero_downtime,
+        # We can only force the syntax on the final version so that the migration to convert
+        # sources to the new model can be applied without preventing sources from being
+        # created in the old syntax on the older version.
+        force_source_table_syntax=force_source_table_syntax,
+    )
     mz_to = Materialized(
         name=mz_service,
         options=list(mz_options.values()),
diff --git a/test/mysql-cdc-old-syntax/mzcompose.py b/test/mysql-cdc-old-syntax/mzcompose.py
index 32d78456c45c2..322998e556fdd 100644
--- a/test/mysql-cdc-old-syntax/mzcompose.py
+++ b/test/mysql-cdc-old-syntax/mzcompose.py
@@ -22,9 +22,19 @@
 )
 from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
 from materialize.mzcompose.services.materialized import Materialized
+from materialize.mzcompose.services.minio import Minio
 from materialize.mzcompose.services.mysql import MySql
+from materialize.mzcompose.services.postgres import (
+    METADATA_STORE,
+    CockroachOrPostgresMetadata,
+)
 from materialize.mzcompose.services.test_certs import TestCerts
 from materialize.mzcompose.services.testdrive import Testdrive
+from materialize.source_table_migration import (
+    get_new_image_for_source_table_migration_test,
+    get_old_image_for_source_table_migration_test,
+    verify_sources_after_source_table_migration,
+)
 
 
 def create_mysql(mysql_version: str) -> MySql:
@@ -46,6 +56,7 @@ def create_mysql_replica(mysql_version: str) -> MySql:
 
 SERVICES = [
     Materialized(
+        external_blob_store=True,
         additional_system_parameter_defaults={
             "log_filter": "mz_storage::source::mysql=trace,info"
         },
@@ -53,6 +64,8 @@ def create_mysql_replica(mysql_version: str) -> MySql:
     create_mysql(MySql.DEFAULT_VERSION),
     create_mysql_replica(MySql.DEFAULT_VERSION),
     TestCerts(),
+    CockroachOrPostgresMetadata(),
+    Minio(setup_materialize=True),
     Testdrive(default_timeout="60s"),
 ]
 
@@ -79,7 +92,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
         f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
     )
     for name in sharded_workflows:
-        if name == "default":
+        if name in ("default", "migration"):
             continue
 
         with c.test_case(name):
@@ -273,3 +286,88 @@ def do_inserts(c: Composition):
             """
         ),
     )
+
+
+def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None:
+    parser.add_argument(
+        "filter",
+        nargs="*",
+        default=["*.td"],
+        help="limit to only the files matching filter",
+    )
+    args = parser.parse_args()
+
+    matching_files = []
+    for filter in args.filter:
+        matching_files.extend(glob.glob(filter, root_dir="test/mysql-cdc-old-syntax"))
+
+    sharded_files: list[str] = sorted(
+        buildkite.shard_list(matching_files, lambda file: file)
+    )
+    print(f"Files: {sharded_files}")
+
+    mysql_version = get_targeted_mysql_version(parser)
+
+    for file in sharded_files:
+
+        mz_old = Materialized(
+            name="materialized",
+            image=get_old_image_for_source_table_migration_test(),
+            external_metadata_store=True,
+            external_blob_store=True,
+            additional_system_parameter_defaults={
+                "log_filter": "mz_storage::source::mysql=trace,info"
+            },
+        )
+
+        mz_new = Materialized(
+            name="materialized",
+            image=get_new_image_for_source_table_migration_test(),
+            external_metadata_store=True,
+            external_blob_store=True,
+            additional_system_parameter_defaults={
+                "log_filter": "mz_storage::source::mysql=trace,info",
+                "force_source_table_syntax": "true",
+            },
+        )
+
+        with c.override(mz_old, create_mysql(mysql_version)):
+            c.up("materialized", "mysql")
+
+            print(f"Running {file} with mz_old")
+
+            valid_ssl_context = retrieve_ssl_context_for_mysql(c)
+            wrong_ssl_context = retrieve_invalid_ssl_context_for_mysql(c)
+
+            c.sources_and_sinks_ignored_from_validation.add("drop_table")
+
+            c.run_testdrive_files(
+                f"--var=ssl-ca={valid_ssl_context.ca}",
+                f"--var=ssl-client-cert={valid_ssl_context.client_cert}",
+                f"--var=ssl-client-key={valid_ssl_context.client_key}",
+                f"--var=ssl-wrong-ca={wrong_ssl_context.ca}",
+                f"--var=ssl-wrong-client-cert={wrong_ssl_context.client_cert}",
+                f"--var=ssl-wrong-client-key={wrong_ssl_context.client_key}",
+                f"--var=mysql-root-password={MySql.DEFAULT_ROOT_PASSWORD}",
+                "--var=mysql-user-password=us3rp4ssw0rd",
+                f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
+                f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
+                "--no-reset",
+                file,
+            )
+
+            c.kill("materialized", wait=True)
+
+            with c.override(mz_new):
+                c.up("materialized")
+
+                print("Running mz_new")
+                verify_sources_after_source_table_migration(c, file)
+
+                c.kill("materialized", wait=True)
+                c.kill("mysql", wait=True)
+                c.kill(METADATA_STORE, wait=True)
+                c.rm("materialized")
+                c.rm(METADATA_STORE)
+                c.rm("mysql")
+                c.rm_volumes("mzdata")
diff --git a/test/mysql-cdc-old-syntax/subsource-resolution-duplicates.td b/test/mysql-cdc-old-syntax/subsource-resolution-duplicates.td
index 1b5662cc78956..c762d583849e5 100644
--- a/test/mysql-cdc-old-syntax/subsource-resolution-duplicates.td
+++ b/test/mysql-cdc-old-syntax/subsource-resolution-duplicates.td
@@ -53,6 +53,3 @@ detail: subsources referencing table: x, y
  mz_source          mysql     quickstart  ""
  mz_source_progress progress  <null>      ""
  t                  subsource quickstart  ""
-
-$ mysql-execute name=mysql
-DROP DATABASE other;
diff --git a/test/mysql-cdc-old-syntax/table-in-mysql-schema.td b/test/mysql-cdc-old-syntax/table-in-mysql-schema.td
index 185858ccffe36..e7e992a9d4824 100644
--- a/test/mysql-cdc-old-syntax/table-in-mysql-schema.td
+++ b/test/mysql-cdc-old-syntax/table-in-mysql-schema.td
@@ -15,6 +15,8 @@ $ set-max-tries max-tries=20
 # Test that tables in the mysql schema are not replicated
 #
 
+> DROP SOURCE IF EXISTS mz_source;
+
 > CREATE SECRET mysqlpass AS '${arg.mysql-root-password}'
 > CREATE CONNECTION mysql_conn TO MYSQL (
     HOST mysql,
@@ -27,13 +29,12 @@ $ mysql-connect name=mysql url=mysql://root@mysql password=${arg.mysql-root-pass
 $ mysql-execute name=mysql
 DROP DATABASE IF EXISTS public;
 DROP DATABASE IF EXISTS another_schema;
+DROP DATABASE IF EXISTS other;
+CREATE DATABASE IF NOT EXISTS mysql;
 USE mysql;
-
 # Insert data pre-snapshot
-CREATE TABLE t_in_mysql (f1 INT);
-INSERT INTO t_in_mysql VALUES (1), (2);
-
-> DROP SOURCE IF EXISTS mz_source;
+CREATE TABLE mysql.t_in_mysql (f1 INT);
+INSERT INTO mysql.t_in_mysql VALUES (1), (2);
 
 ! CREATE SOURCE mz_source
   FROM MYSQL CONNECTION mysql_conn
diff --git a/test/mysql-cdc/table-in-mysql-schema.td b/test/mysql-cdc/table-in-mysql-schema.td
index e27212d1aaabc..ebd30988bfd86 100644
--- a/test/mysql-cdc/table-in-mysql-schema.td
+++ b/test/mysql-cdc/table-in-mysql-schema.td
@@ -15,6 +15,8 @@ $ set-max-tries max-tries=20
 # Test that tables in the mysql schema are not replicated
 #
 
+> DROP SOURCE IF EXISTS mz_source;
+
 > CREATE SECRET mysqlpass AS '${arg.mysql-root-password}'
 > CREATE CONNECTION mysql_conn TO MYSQL (
     HOST mysql,
@@ -34,8 +36,6 @@ CREATE DATABASE public;
 USE public;
 CREATE TABLE time_zone (f1 INT);
 
-> DROP SOURCE IF EXISTS mz_source;
-
 > CREATE SOURCE mz_source FROM MYSQL CONNECTION mysql_conn;
 
 ! CREATE TABLE timezone FROM SOURCE mz_source (REFERENCE public.timezone);
diff --git a/test/pg-cdc-old-syntax/mzcompose.py b/test/pg-cdc-old-syntax/mzcompose.py
index f44e64c31615b..43a8ea2933ccc 100644
--- a/test/pg-cdc-old-syntax/mzcompose.py
+++ b/test/pg-cdc-old-syntax/mzcompose.py
@@ -21,10 +21,20 @@
 from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
 from materialize.mzcompose.service import Service, ServiceConfig
 from materialize.mzcompose.services.materialized import Materialized
-from materialize.mzcompose.services.postgres import Postgres
+from materialize.mzcompose.services.minio import Minio
+from materialize.mzcompose.services.postgres import (
+    METADATA_STORE,
+    CockroachOrPostgresMetadata,
+    Postgres,
+)
 from materialize.mzcompose.services.test_certs import TestCerts
 from materialize.mzcompose.services.testdrive import Testdrive
 from materialize.mzcompose.services.toxiproxy import Toxiproxy
+from materialize.source_table_migration import (
+    get_new_image_for_source_table_migration_test,
+    get_old_image_for_source_table_migration_test,
+    verify_sources_after_source_table_migration,
+)
 
 # Set the max slot WAL keep size to 10MB
 DEFAULT_PG_EXTRA_COMMAND = ["-c", "max_slot_wal_keep_size=10"]
@@ -88,8 +98,11 @@ def create_postgres(
         additional_system_parameter_defaults={
             "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error"
         },
+        external_blob_store=True,
     ),
     Testdrive(),
+    CockroachOrPostgresMetadata(),
+    Minio(setup_materialize=True),
     TestCerts(),
     Toxiproxy(),
     create_postgres(pg_version=None),
@@ -323,14 +336,18 @@ def workflow_cdc(c: Composition, parser: WorkflowArgumentParser) -> None:
 def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
     workflows_with_internal_sharding = ["cdc"]
     sharded_workflows = workflows_with_internal_sharding + buildkite.shard_list(
-        [w for w in c.workflows if w not in workflows_with_internal_sharding],
+        [
+            w
+            for w in c.workflows
+            if w not in workflows_with_internal_sharding and w != "migration"
+        ],
         lambda w: w,
     )
     print(
         f"Workflows in shard with index {buildkite.get_parallelism_index()}: {sharded_workflows}"
     )
     for name in sharded_workflows:
-        if name == "default":
+        if name in ("default", "migration"):
             continue
 
         # TODO: Flaky, reenable when database-issues#7611 is fixed
@@ -348,3 +365,88 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
 
         with c.test_case(name):
             c.workflow(name, *parser.args)
+
+
+def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None:
+    parser.add_argument(
+        "filter",
+        nargs="*",
+        default=["*.td"],
+        help="limit to only the files matching filter",
+    )
+    args = parser.parse_args()
+
+    matching_files = []
+    for filter in args.filter:
+        matching_files.extend(glob.glob(filter, root_dir="test/pg-cdc-old-syntax"))
+    sharded_files: list[str] = sorted(
+        buildkite.shard_list(matching_files, lambda file: file)
+    )
+    print(f"Files: {sharded_files}")
+
+    ssl_ca = c.run("test-certs", "cat", "/secrets/ca.crt", capture=True).stdout
+    ssl_cert = c.run("test-certs", "cat", "/secrets/certuser.crt", capture=True).stdout
+    ssl_key = c.run("test-certs", "cat", "/secrets/certuser.key", capture=True).stdout
+    ssl_wrong_cert = c.run(
+        "test-certs", "cat", "/secrets/postgres.crt", capture=True
+    ).stdout
+    ssl_wrong_key = c.run(
+        "test-certs", "cat", "/secrets/postgres.key", capture=True
+    ).stdout
+
+    pg_version = get_targeted_pg_version(parser)
+
+    for file in sharded_files:
+        mz_old = Materialized(
+            name="materialized",
+            image=get_old_image_for_source_table_migration_test(),
+            volumes_extra=["secrets:/share/secrets"],
+            external_metadata_store=True,
+            external_blob_store=True,
+            additional_system_parameter_defaults={
+                "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error"
+            },
+        )
+
+        mz_new = Materialized(
+            name="materialized",
+            image=get_new_image_for_source_table_migration_test(),
+            volumes_extra=["secrets:/share/secrets"],
+            external_metadata_store=True,
+            external_blob_store=True,
+            additional_system_parameter_defaults={
+                "log_filter": "mz_storage::source::postgres=trace,debug,info,warn,error",
+                "force_source_table_syntax": "true",
+            },
+        )
+        with c.override(mz_old, create_postgres(pg_version=pg_version)):
+            c.up("materialized", "test-certs", "postgres")
+
+            print(f"Running {file} with mz_old")
+
+            c.run_testdrive_files(
+                f"--var=ssl-ca={ssl_ca}",
+                f"--var=ssl-cert={ssl_cert}",
+                f"--var=ssl-key={ssl_key}",
+                f"--var=ssl-wrong-cert={ssl_wrong_cert}",
+                f"--var=ssl-wrong-key={ssl_wrong_key}",
+                f"--var=default-replica-size={Materialized.Size.DEFAULT_SIZE}-{Materialized.Size.DEFAULT_SIZE}",
+                f"--var=default-storage-size={Materialized.Size.DEFAULT_SIZE}-1",
+                "--no-reset",
+                file,
+            )
+            c.kill("materialized", wait=True)
+
+            with c.override(mz_new):
+                c.up("materialized")
+
+                print("Running mz_new")
+                verify_sources_after_source_table_migration(c, file)
+
+                c.kill("materialized", wait=True)
+                c.kill("postgres", wait=True)
+                c.kill(METADATA_STORE, wait=True)
+                c.rm("materialized")
+                c.rm(METADATA_STORE)
+                c.rm("postgres")
+                c.rm_volumes("mzdata")
diff --git a/test/pg-cdc-old-syntax/subsource-resolution-duplicates.td b/test/pg-cdc-old-syntax/subsource-resolution-duplicates.td
index 3f510be4088b8..8f053fd552679 100644
--- a/test/pg-cdc-old-syntax/subsource-resolution-duplicates.td
+++ b/test/pg-cdc-old-syntax/subsource-resolution-duplicates.td
@@ -57,6 +57,3 @@ detail: subsources referencing table: x, y
  mz_source          postgres  quickstart  ""
  mz_source_progress progress  <null>      ""
  t                  subsource quickstart  ""
-
-$ postgres-execute connection=postgres://postgres:postgres@postgres
-DROP SCHEMA other CASCADE;
diff --git a/test/testdrive-old-kafka-src-syntax/avro-resolution-no-publish-writer.td b/test/testdrive-old-kafka-src-syntax/avro-resolution-no-publish-writer.td
index e0bb6d626c03c..19ce5236fcf66 100644
--- a/test/testdrive-old-kafka-src-syntax/avro-resolution-no-publish-writer.td
+++ b/test/testdrive-old-kafka-src-syntax/avro-resolution-no-publish-writer.td
@@ -43,5 +43,6 @@ GRANT CREATE, USAGE ON SCHEMA public TO materialize
 $ kafka-ingest format=bytes topic=resolution-no-publish-writer timestamp=1
 \\x00\x00\x00\x00\x01\xf6\x01
 
-! SELECT * FROM resolution_no_publish_writer;
-contains:to resolve
+# TODO: Reenable when https://github.com/MaterializeInc/database-issues/issues/8933 is fixed
+# ! SELECT * FROM resolution_no_publish_writer;
+# contains:to resolve
diff --git a/test/testdrive-old-kafka-src-syntax/indexes.td b/test/testdrive-old-kafka-src-syntax/indexes.td
index afe535df1b2a9..5d2c2aa87ec64 100644
--- a/test/testdrive-old-kafka-src-syntax/indexes.td
+++ b/test/testdrive-old-kafka-src-syntax/indexes.td
@@ -7,6 +7,9 @@
 # the Business Source License, use of this software will be governed
 # by the Apache License, Version 2.0.
 
+$ set-sql-timeout duration=1s
+$ set-max-tries max-tries=5
+
 $ set-arg-default single-replica-cluster=quickstart
 
 $ set-regex match=cluster1|quickstart replacement=<VARIABLE_OUTPUT>
@@ -279,7 +282,109 @@ bar_ind bar <VARIABLE_OUTPUT> {a}  ""
 $ postgres-execute connection=postgres://mz_system@${testdrive.materialize-internal-sql-addr}/materialize
 ALTER SYSTEM SET enable_rbac_checks TO true
 
-> SHOW INDEXES IN CLUSTER mz_catalog_server
+>[version<=13100] SHOW INDEXES IN CLUSTER mz_catalog_server
+mz_active_peeks_per_worker_s2_primary_idx                   mz_active_peeks_per_worker                   mz_catalog_server    {id,worker_id}                              ""
+mz_arrangement_batches_raw_s2_primary_idx                   mz_arrangement_batches_raw                   mz_catalog_server    {operator_id,worker_id}                     ""
+mz_arrangement_records_raw_s2_primary_idx                   mz_arrangement_records_raw                   mz_catalog_server    {operator_id,worker_id}                     ""
+mz_arrangement_sharing_raw_s2_primary_idx                   mz_arrangement_sharing_raw                   mz_catalog_server    {operator_id,worker_id}                     ""
+mz_arrangement_heap_capacity_raw_s2_primary_idx             mz_arrangement_heap_capacity_raw             mz_catalog_server    {operator_id,worker_id}                     ""
+mz_arrangement_heap_allocations_raw_s2_primary_idx          mz_arrangement_heap_allocations_raw          mz_catalog_server    {operator_id,worker_id}                     ""
+mz_arrangement_heap_size_raw_s2_primary_idx                 mz_arrangement_heap_size_raw                 mz_catalog_server    {operator_id,worker_id}                     ""
+mz_arrangement_batcher_allocations_raw_s2_primary_idx       mz_arrangement_batcher_allocations_raw       mz_catalog_server    {operator_id,worker_id}                     ""
+mz_arrangement_batcher_capacity_raw_s2_primary_idx          mz_arrangement_batcher_capacity_raw          mz_catalog_server    {operator_id,worker_id}                     ""
+mz_arrangement_batcher_records_raw_s2_primary_idx           mz_arrangement_batcher_records_raw           mz_catalog_server    {operator_id,worker_id}                     ""
+mz_arrangement_batcher_size_raw_s2_primary_idx              mz_arrangement_batcher_size_raw              mz_catalog_server    {operator_id,worker_id}                     ""
+mz_cluster_deployment_lineage_ind                           mz_cluster_deployment_lineage                mz_catalog_server    {cluster_id}                                ""
+mz_cluster_replica_history_ind                              mz_cluster_replica_history                   mz_catalog_server    {dropped_at}                                ""
+mz_cluster_replica_name_history_ind                         mz_cluster_replica_name_history              mz_catalog_server    {id}                                        ""
+mz_cluster_replica_metrics_ind                              mz_cluster_replica_metrics                   mz_catalog_server    {replica_id}                                ""
+mz_cluster_replica_metrics_history_ind                      mz_cluster_replica_metrics_history           mz_catalog_server    {replica_id}                                ""
+mz_cluster_replica_sizes_ind                                mz_cluster_replica_sizes                     mz_catalog_server    {size}                                      ""
+mz_cluster_replica_statuses_ind                             mz_cluster_replica_statuses                  mz_catalog_server    {replica_id}                                ""
+mz_cluster_replica_status_history_ind                       mz_cluster_replica_status_history            mz_catalog_server    {replica_id}                                ""
+mz_cluster_replicas_ind                                     mz_cluster_replicas                          mz_catalog_server    {id}                                        ""
+mz_clusters_ind                                             mz_clusters                                  mz_catalog_server    {id}                                        ""
+mz_columns_ind                                              mz_columns                                   mz_catalog_server    {name}                                      ""
+mz_comments_ind                                             mz_comments                                  mz_catalog_server    {id}                                        ""
+mz_compute_dependencies_ind                                 mz_compute_dependencies                      mz_catalog_server    {dependency_id}                             ""
+mz_compute_dataflow_global_ids_per_worker_s2_primary_idx   mz_compute_dataflow_global_ids_per_worker     mz_catalog_server    {id,worker_id}                             ""
+mz_compute_error_counts_raw_s2_primary_idx                  mz_compute_error_counts_raw                  mz_catalog_server    {export_id,worker_id}                       ""
+mz_compute_exports_per_worker_s2_primary_idx                mz_compute_exports_per_worker                mz_catalog_server    {export_id,worker_id}                       ""
+mz_compute_frontiers_per_worker_s2_primary_idx              mz_compute_frontiers_per_worker              mz_catalog_server    {export_id,worker_id}                       ""
+mz_compute_hydration_times_per_worker_s2_primary_idx        mz_compute_hydration_times_per_worker        mz_catalog_server    {export_id,worker_id}                       ""
+mz_compute_import_frontiers_per_worker_s2_primary_idx       mz_compute_import_frontiers_per_worker       mz_catalog_server    {export_id,import_id,worker_id}             ""
+mz_compute_lir_mapping_per_worker_s2_primary_idx            mz_compute_lir_mapping_per_worker            mz_catalog_server    {global_id,lir_id,worker_id}                ""
+mz_compute_operator_durations_histogram_raw_s2_primary_idx  mz_compute_operator_durations_histogram_raw  mz_catalog_server    {id,worker_id,duration_ns}                  ""
+mz_connections_ind                                          mz_connections                               mz_catalog_server    {schema_id}                                 ""
+mz_console_cluster_utilization_overview_ind                 mz_console_cluster_utilization_overview      mz_catalog_server    {cluster_id}                                ""
+mz_continual_tasks_ind                                      mz_continual_tasks                           mz_catalog_server    {id}                                        ""
+mz_databases_ind                                            mz_databases                                 mz_catalog_server    {name}                                      ""
+mz_dataflow_addresses_per_worker_s2_primary_idx             mz_dataflow_addresses_per_worker             mz_catalog_server    {id,worker_id}                              ""
+mz_dataflow_channels_per_worker_s2_primary_idx              mz_dataflow_channels_per_worker              mz_catalog_server    {id,worker_id}                              ""
+mz_dataflow_operator_reachability_raw_s2_primary_idx        mz_dataflow_operator_reachability_raw        mz_catalog_server    {address,port,worker_id,update_type,time}   ""
+mz_dataflow_operators_per_worker_s2_primary_idx             mz_dataflow_operators_per_worker             mz_catalog_server    {id,worker_id}                              ""
+mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx mz_dataflow_shutdown_durations_histogram_raw mz_catalog_server    {worker_id,duration_ns}                     ""
+mz_frontiers_ind                                            mz_frontiers                                 mz_catalog_server    {object_id}                                 ""
+mz_indexes_ind                                              mz_indexes                                   mz_catalog_server    {id}                                        ""
+mz_kafka_sources_ind                                        mz_kafka_sources                             mz_catalog_server    {id}                                        ""
+mz_materialized_views_ind                                   mz_materialized_views                        mz_catalog_server    {id}                                        ""
+mz_message_batch_counts_received_raw_s2_primary_idx         mz_message_batch_counts_received_raw         mz_catalog_server    {channel_id,from_worker_id,to_worker_id}    ""
+mz_message_batch_counts_sent_raw_s2_primary_idx             mz_message_batch_counts_sent_raw             mz_catalog_server    {channel_id,from_worker_id,to_worker_id}    ""
+mz_message_counts_received_raw_s2_primary_idx               mz_message_counts_received_raw               mz_catalog_server    {channel_id,from_worker_id,to_worker_id}    ""
+mz_message_counts_sent_raw_s2_primary_idx                   mz_message_counts_sent_raw                   mz_catalog_server    {channel_id,from_worker_id,to_worker_id}    ""
+mz_object_dependencies_ind                                  mz_object_dependencies                       mz_catalog_server    {object_id}                                 ""
+mz_object_history_ind                                       mz_object_history                            mz_catalog_server    {id}                                        ""
+mz_object_lifetimes_ind                                     mz_object_lifetimes                          mz_catalog_server    {id}                                        ""
+mz_object_transitive_dependencies_ind                       mz_object_transitive_dependencies            mz_catalog_server    {object_id}                                 ""
+mz_objects_ind                                              mz_objects                                   mz_catalog_server    {schema_id}                                 ""
+mz_notices_ind                                              mz_notices                                   mz_catalog_server    {id}                                        ""
+mz_peek_durations_histogram_raw_s2_primary_idx              mz_peek_durations_histogram_raw              mz_catalog_server    {worker_id,type,duration_ns}                ""
+mz_recent_activity_log_thinned_ind                          mz_recent_activity_log_thinned               mz_catalog_server    {sql_hash}                                  ""
+mz_recent_storage_usage_ind                                 mz_recent_storage_usage                      mz_catalog_server    {object_id}                                 ""
+mz_roles_ind                                                mz_roles                                     mz_catalog_server    {id}                                        ""
+mz_scheduling_elapsed_raw_s2_primary_idx                    mz_scheduling_elapsed_raw                    mz_catalog_server    {id,worker_id}                              ""
+mz_scheduling_parks_histogram_raw_s2_primary_idx            mz_scheduling_parks_histogram_raw            mz_catalog_server    {worker_id,slept_for_ns,requested_ns}       ""
+mz_schemas_ind                                              mz_schemas                                   mz_catalog_server    {database_id}                               ""
+mz_secrets_ind                                              mz_secrets                                   mz_catalog_server    {name}                                      ""
+mz_show_all_objects_ind                                     mz_show_all_objects                          mz_catalog_server    {schema_id}                                 ""
+mz_show_cluster_replicas_ind                                mz_show_cluster_replicas                     mz_catalog_server    {cluster}                                   ""
+mz_show_clusters_ind                                        mz_show_clusters                             mz_catalog_server    {name}                                      ""
+mz_show_columns_ind                                         mz_show_columns                              mz_catalog_server    {id}                                        ""
+mz_show_connections_ind                                     mz_show_connections                          mz_catalog_server    {schema_id}                                 ""
+mz_show_databases_ind                                       mz_show_databases                            mz_catalog_server    {name}                                      ""
+mz_show_indexes_ind                                         mz_show_indexes                              mz_catalog_server    {schema_id}                                 ""
+mz_show_materialized_views_ind                              mz_show_materialized_views                   mz_catalog_server    {schema_id}                                 ""
+mz_show_roles_ind                                           mz_show_roles                                mz_catalog_server    {name}                                      ""
+mz_show_schemas_ind                                         mz_show_schemas                              mz_catalog_server    {database_id}                               ""
+mz_show_secrets_ind                                         mz_show_secrets                              mz_catalog_server    {schema_id}                                 ""
+mz_show_sinks_ind                                           mz_show_sinks                                mz_catalog_server    {schema_id}                                 ""
+mz_show_sources_ind                                         mz_show_sources                              mz_catalog_server    {schema_id}                                 ""
+mz_show_tables_ind                                          mz_show_tables                               mz_catalog_server    {schema_id}                                 ""
+mz_show_types_ind                                           mz_show_types                                mz_catalog_server    {schema_id}                                 ""
+mz_show_views_ind                                           mz_show_views                                mz_catalog_server    {schema_id}                                 ""
+mz_sink_statistics_ind                                      mz_sink_statistics                           mz_catalog_server    {id}                                        ""
+mz_sink_status_history_ind                                  mz_sink_status_history                       mz_catalog_server    {sink_id}                                   ""
+mz_source_statistics_with_history_ind                       mz_source_statistics_with_history            mz_catalog_server    {id}                                        ""
+mz_sink_statuses_ind                                        mz_sink_statuses                             mz_catalog_server    {id}                                        ""
+mz_sinks_ind                                                mz_sinks                                     mz_catalog_server    {id}                                        ""
+mz_source_statistics_ind                                    mz_source_statistics                         mz_catalog_server    {id}                                        ""
+mz_source_status_history_ind                                mz_source_status_history                     mz_catalog_server    {source_id}                                 ""
+mz_source_statuses_ind                                      mz_source_statuses                           mz_catalog_server    {id}                                        ""
+mz_sources_ind                                              mz_sources                                   mz_catalog_server    {id}                                        ""
+mz_tables_ind                                               mz_tables                                    mz_catalog_server    {schema_id}                                 ""
+mz_types_ind                                                mz_types                                     mz_catalog_server    {schema_id}                                 ""
+mz_recent_sql_text_ind                                      mz_recent_sql_text                           mz_catalog_server    {sql_hash}                                  ""
+mz_views_ind                                                mz_views                                     mz_catalog_server    {schema_id}                                 ""
+mz_wallclock_global_lag_recent_history_ind                  mz_wallclock_global_lag_recent_history       mz_catalog_server    {object_id}                                 ""
+mz_webhook_sources_ind                                      mz_webhook_sources                           mz_catalog_server    {id}                                        ""
+pg_attrdef_all_databases_ind                                pg_attrdef_all_databases                     mz_catalog_server    {oid,adrelid,adnum,adbin,adsrc}             ""
+pg_attribute_all_databases_ind                              pg_attribute_all_databases                   mz_catalog_server    {attrelid,attname,atttypid,attlen,attnum,atttypmod,attnotnull,atthasdef,attidentity,attgenerated,attisdropped,attcollation,database_name,pg_type_database_name}    ""
+pg_class_all_databases_ind                                  pg_class_all_databases                       mz_catalog_server    {relname}                                   ""
+pg_description_all_databases_ind                            pg_description_all_databases                 mz_catalog_server    {objoid,classoid,objsubid,description,oid_database_name,class_database_name}    ""
+pg_namespace_all_databases_ind                              pg_namespace_all_databases                   mz_catalog_server    {nspname}                                   ""
+pg_type_all_databases_ind                                   pg_type_all_databases                        mz_catalog_server    {oid}                                       ""
+
+>[version>13100] SHOW INDEXES IN CLUSTER mz_catalog_server
 mz_active_peeks_per_worker_s2_primary_idx                   mz_active_peeks_per_worker                   mz_catalog_server    {id,worker_id}                              ""
 mz_arrangement_batches_raw_s2_primary_idx                   mz_arrangement_batches_raw                   mz_catalog_server    {operator_id,worker_id}                     ""
 mz_arrangement_records_raw_s2_primary_idx                   mz_arrangement_records_raw                   mz_catalog_server    {operator_id,worker_id}                     ""
diff --git a/test/testdrive-old-kafka-src-syntax/mzcompose.py b/test/testdrive-old-kafka-src-syntax/mzcompose.py
index 77fd29709bf9d..3491e6b711218 100644
--- a/test/testdrive-old-kafka-src-syntax/mzcompose.py
+++ b/test/testdrive-old-kafka-src-syntax/mzcompose.py
@@ -12,10 +12,10 @@
 the expected-result/actual-result (aka golden testing) paradigm. A query is
 retried until it produces the desired result.
 """
-
+import glob
 from pathlib import Path
 
-from materialize import ci_util
+from materialize import ci_util, spawn
 from materialize.mzcompose import get_default_system_parameters
 from materialize.mzcompose.composition import Composition, WorkflowArgumentParser
 from materialize.mzcompose.services.azure import Azurite
@@ -24,11 +24,20 @@
 from materialize.mzcompose.services.materialized import Materialized
 from materialize.mzcompose.services.minio import Minio
 from materialize.mzcompose.services.mysql import MySql
-from materialize.mzcompose.services.postgres import Postgres
+from materialize.mzcompose.services.postgres import (
+    METADATA_STORE,
+    CockroachOrPostgresMetadata,
+    Postgres,
+)
 from materialize.mzcompose.services.redpanda import Redpanda
 from materialize.mzcompose.services.schema_registry import SchemaRegistry
 from materialize.mzcompose.services.testdrive import Testdrive
 from materialize.mzcompose.services.zookeeper import Zookeeper
+from materialize.source_table_migration import (
+    get_new_image_for_source_table_migration_test,
+    get_old_image_for_source_table_migration_test,
+    verify_sources_after_source_table_migration,
+)
 
 SERVICES = [
     Zookeeper(),
@@ -40,6 +49,7 @@
     Minio(setup_materialize=True, additional_directories=["copytos3"]),
     Azurite(),
     Materialized(external_blob_store=True),
+    CockroachOrPostgresMetadata(),
     FivetranDestination(volumes_extra=["tmp:/share/tmp"]),
     Testdrive(external_blob_store=True),
 ]
@@ -237,3 +247,233 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
             ci_util.upload_junit_report(
                 "testdrive", Path(__file__).parent / junit_report
             )
+
+
+def workflow_migration(c: Composition, parser: WorkflowArgumentParser) -> None:
+    """Run testdrive."""
+    parser.add_argument(
+        "--redpanda",
+        action="store_true",
+        help="run against Redpanda instead of the Confluent Platform",
+    )
+    parser.add_argument(
+        "--aws-region",
+        help="run against the specified AWS region instead of localstack",
+    )
+    parser.add_argument(
+        "--kafka-default-partitions",
+        type=int,
+        metavar="N",
+        help="set the default number of kafka partitions per topic",
+    )
+    parser.add_argument(
+        "--system-param",
+        type=str,
+        action="append",
+        nargs="*",
+        help="System parameters to set in Materialize, i.e. what you would set with `ALTER SYSTEM SET`",
+    )
+
+    parser.add_argument("--replicas", type=int, default=1, help="use multiple replicas")
+
+    parser.add_argument(
+        "--default-timeout",
+        type=str,
+        help="set the default timeout for Testdrive",
+    )
+
+    parser.add_argument(
+        "--rewrite-results",
+        action="store_true",
+        help="Rewrite results, disables junit reports",
+    )
+
+    parser.add_argument(
+        "--azurite", action="store_true", help="Use Azurite as blob store instead of S3"
+    )
+
+    parser.add_argument(
+        "files",
+        nargs="*",
+        default=["*.td"],
+        help="run against the specified files",
+    )
+
+    (args, _) = parser.parse_known_args()
+
+    matching_files = []
+    for filter in args.files:
+        matching_files.extend(
+            glob.glob(filter, root_dir="test/testdrive-old-kafka-src-syntax")
+        )
+    matching_files = [file for file in matching_files if file != "session.td"]
+
+    dependencies = [
+        "fivetran-destination",
+        "minio",
+        "materialized",
+        "postgres",
+        "mysql",
+    ]
+
+    if args.redpanda:
+        kafka_deps = ["redpanda"]
+    else:
+        kafka_deps = ["zookeeper", "kafka", "schema-registry"]
+
+    dependencies += kafka_deps
+
+    sysparams = args.system_param
+    if not args.system_param:
+        sysparams = []
+
+    additional_system_parameter_defaults = {}
+    for val in sysparams:
+        x = val[0].split("=", maxsplit=1)
+        assert len(x) == 2, f"--system-param '{val}' should be the format <key>=<val>"
+        key = x[0]
+        val = x[1]
+
+        additional_system_parameter_defaults[key] = val
+
+    leaves_tombstones = (
+        "true"
+        if additional_system_parameter_defaults.get(
+            "storage_use_continual_feedback_upsert",
+            get_default_system_parameters()["storage_use_continual_feedback_upsert"],
+        )
+        == "false"
+        else "false"
+    )
+
+    mz_old = Materialized(
+        default_size=Materialized.Size.DEFAULT_SIZE,
+        image=get_old_image_for_source_table_migration_test(),
+        external_blob_store=True,
+        blob_store_is_azure=args.azurite,
+        additional_system_parameter_defaults=dict(additional_system_parameter_defaults),
+    )
+
+    testdrive = Testdrive(
+        forward_buildkite_shard=True,
+        kafka_default_partitions=args.kafka_default_partitions,
+        aws_region=args.aws_region,
+        default_timeout=args.default_timeout,
+        volumes_extra=["mzdata:/mzdata"],
+        external_blob_store=True,
+        blob_store_is_azure=args.azurite,
+        fivetran_destination=True,
+        fivetran_destination_files_path="/share/tmp",
+        entrypoint_extra=[
+            f"--var=uses-redpanda={args.redpanda}",
+            f"--var=leaves-tombstones={leaves_tombstones}",
+        ],
+    )
+
+    print(additional_system_parameter_defaults)
+    x = dict(additional_system_parameter_defaults)
+    additional_system_parameter_defaults["force_source_table_syntax"] = "true"
+    print(additional_system_parameter_defaults)
+    print(x)
+
+    mz_new = Materialized(
+        default_size=Materialized.Size.DEFAULT_SIZE,
+        image=get_new_image_for_source_table_migration_test(),
+        external_blob_store=True,
+        blob_store_is_azure=args.azurite,
+        additional_system_parameter_defaults=additional_system_parameter_defaults,
+    )
+
+    for file in matching_files:
+        with c.override(testdrive, mz_old):
+            c.up(*dependencies)
+
+            c.sql(
+                "ALTER SYSTEM SET max_clusters = 50;",
+                port=6877,
+                user="mz_system",
+            )
+
+            non_default_testdrive_vars = []
+
+            if args.replicas > 1:
+                c.sql("DROP CLUSTER quickstart CASCADE", user="mz_system", port=6877)
+                # Make sure a replica named 'r1' always exists
+                replica_names = [
+                    "r1" if replica_id == 0 else f"replica{replica_id}"
+                    for replica_id in range(0, args.replicas)
+                ]
+                replica_string = ",".join(
+                    f"{replica_name} (SIZE '{mz_old.default_replica_size}')"
+                    for replica_name in replica_names
+                )
+                c.sql(
+                    f"CREATE CLUSTER quickstart REPLICAS ({replica_string})",
+                    user="mz_system",
+                    port=6877,
+                )
+
+                # Note that any command that outputs SHOW CLUSTERS will have output
+                # that depends on the number of replicas testdrive has. This means
+                # it might be easier to skip certain tests if the number of replicas
+                # is > 1.
+                c.sql(
+                    f"""
+                    CREATE CLUSTER testdrive_single_replica_cluster SIZE = '{mz_old.default_replica_size}';
+                    GRANT ALL PRIVILEGES ON CLUSTER testdrive_single_replica_cluster TO materialize;
+                    """,
+                    user="mz_system",
+                    port=6877,
+                )
+
+                non_default_testdrive_vars.append(f"--var=replicas={args.replicas}")
+                non_default_testdrive_vars.append(
+                    "--var=single-replica-cluster=testdrive_single_replica_cluster"
+                )
+
+            non_default_testdrive_vars.append(
+                f"--var=default-replica-size={mz_old.default_replica_size}"
+            )
+            non_default_testdrive_vars.append(
+                f"--var=default-storage-size={mz_old.default_storage_size}"
+            )
+
+            print(f"Running {file} with mz_old")
+
+            c.run_testdrive_files(
+                *non_default_testdrive_vars,
+                "--no-reset",
+                file,
+            )
+
+            c.kill("materialized", wait=True)
+
+            with c.override(mz_new):
+                c.up("materialized")
+
+                print("Running mz_new")
+                verify_sources_after_source_table_migration(c, file)
+
+                c.kill("materialized", wait=True)
+                c.kill("postgres", wait=True)
+                c.kill("mysql", wait=True)
+                c.kill(METADATA_STORE, wait=True)
+
+                for dep in kafka_deps:
+                    c.kill(dep, wait=True)
+
+                for dep in kafka_deps:
+                    c.rm(dep)
+
+                c.rm("materialized")
+                c.rm(METADATA_STORE)
+                c.rm("postgres")
+                c.rm("mysql")
+
+                # remove the testdrive container which uses the mzdata volume
+                testdrive_container_id = spawn.capture(
+                    ["docker", "ps", "-a", "--filter", f"volume={c.name}_mzdata", "-q"]
+                ).strip()
+                spawn.runv(["docker", "rm", testdrive_container_id])
+
+                c.rm_volumes("mzdata", force=True)