Skip to content

enable source replication for kafka sources #30003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

petrosagg
Copy link
Contributor

Motivation

Tips for reviewer

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@petrosagg petrosagg force-pushed the active-replication branch 2 times, most recently from 12f07ff to 818631d Compare November 6, 2024 19:18
@petrosagg petrosagg force-pushed the active-replication branch 2 times, most recently from ce9937a to 606c2ce Compare November 14, 2024 18:02
@petrosagg petrosagg changed the title WIP: enable source replication for non-pg sources enable source replication for kafka sources Nov 14, 2024
@petrosagg petrosagg marked this pull request as ready for review November 14, 2024 19:19
@petrosagg petrosagg requested review from a team as code owners November 14, 2024 19:19
@petrosagg petrosagg requested a review from jkosh44 November 14, 2024 19:19
Copy link
Contributor

@jkosh44 jkosh44 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapter parts LGTM

Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the controller and storage changes and they look good, as far as I understand them (and they are surprisingly small!). The only thing that seems suspicious to me is the handling of DroppedIds.

assert!(
!objects_installed || self.replicas.is_empty(),
"replication not supported for storage objects",
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also remove the stale # Panics docs above?

assert!(
!command.installs_objects() || self.replicas.len() <= 1,
"replication not supported for storage objects"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also remove the stale # Panics docs above?

soft_panic_or_log!(
"DroppedIds for ID {id} but we have neither ingestion nor export \
under that ID"
);
Copy link
Contributor

@teskje teskje Nov 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to have a comment here explaining when this can happen and why it's fine to ignore it.

I assume in a multi-replica cluster all replicas send a DroppedIds and after the first one we have removed the self.collections entry so we end up here? I don't quite understand what the intention behind DroppedIds is though: I thought it was so the controller would know to keep around resources that are still used by replicas, but with this change it would do so only for the fastest replica.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After what we talked about today I added some code with the simplest way I would think of that handles multiple DroppedIds responses.

_ => REPLICATION_SERVER_ID_OFFSET,
};
let mut rng = rand::thread_rng();
let server_id: u32 = rng.gen();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above says:

The value does not actually matter since it's irrelevant for GTID-based replication and won't
cause errors if it happens to be the same as another replica in the mysql cluster

Based on this I'd think it should be fine if all replicas running a source used the same server ID. Is that not the case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I had reverted this change. It does matter in the end, in the sense that if two clients use the same server_id at the same time one of them will get an error. Choosing a random server id was my initial remedy for this issue, but afterwards I thought it was hacky enough that I removed MySQL from being replicated in this PR and I will treat it like postgres, i.e a source that gets scheduled only on the most recent replica of a cluster

Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this change be gated by the new feedback UPSERT source being enabled?

@petrosagg
Copy link
Contributor Author

This PR is currently blocked until we resolve https://github.com/MaterializeInc/database-issues/issues/8798 , since running a replicated kafka source with the old upsert implementation can lead to negative accumulations

@petrosagg petrosagg force-pushed the active-replication branch 3 times, most recently from 72f8657 to a47e9a1 Compare January 16, 2025 19:23
Copy link
Contributor

@aljoscha aljoscha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes looks good! I want to add a sys cfg for enabled multi-replica sources (I can do it). And I don't know about the test changes, would probably be good to get @def- to have a look

@@ -2102,11 +2114,7 @@ fn source_sink_cluster_config(
Some(in_cluster) => scx.catalog.get_cluster(in_cluster.id),
};

if cluster.replica_ids().len() > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent! feels weird that this was in here...

@@ -130,7 +130,8 @@ where

while *self.upper == [IntoTime::minimum()]
|| (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper)
&& PartialOrder::less_than(&self.upper, &new_into_upper))
&& PartialOrder::less_than(&self.upper, &new_into_upper)
&& self.upper.less_equal(&binding_ts))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was this change needed?

@@ -9,6 +9,7 @@

$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr}
ALTER SYSTEM SET unsafe_enable_unorchestrated_cluster_replicas = true
ALTER SYSTEM SET enable_create_table_from_source = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change needed?

Copy link
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test changes lgtm assuming nightly is green. Triggered a run: https://buildkite.com/materialize/nightly/builds/11000
Edit: needs to be rebased for upgrade tests to work.
data ingest fails because of this PR:

materialize.data_ingest.query_error.QueryError: ('cannot create source in cluster with more than one replica', 'CREATE SOURCE "materialize"."public"."mysql_source0"\n                    IN CLUSTER "quickstart"\n                    FROM MYSQL CONNECTION mysql0\n                    ')

@@ -75,7 +76,7 @@ def run(
rng = random.Random(random.randrange(SEED_RANGE))

print(
f"+++ Running with: --seed={seed} --threads={num_threads} --runtime={runtime} --complexity={complexity.value} --scenario={scenario.value} {'--naughty-identifiers ' if naughty_identifiers else ''} (--host={host})"
f"+++ Running with: --seed={seed} --threads={num_threads} --runtime={runtime} --complexity={complexity.value} --scenario={scenario.value} {'--naughty-identifiers ' if naughty_identifiers else ''} --replicas={replicas} (--host={host})"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a new Parallel Workload run with --replicas=2/4 in ci/nightly/pipeline.template.yml?

@morsapaes
Copy link
Contributor

Closing, as this has been subsumed by #31227. Thanks for laying out the foundation, @petrosagg! 🫶

@morsapaes morsapaes closed this Feb 18, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants