Skip to content

storage: allow kafka/loadgen sources on multi-replica clusters, gated by dyncfg #31227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 17, 2025

Conversation

aljoscha
Copy link
Contributor

Version of #30003 with a dyncfg for enabling/disabling multi-replica sources

Motivation

Tips for reviewer

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@aljoscha aljoscha requested review from a team as code owners January 29, 2025 14:08
@aljoscha aljoscha requested a review from ParkMyCar January 29, 2025 14:08
@aljoscha aljoscha force-pushed the active-replication-with-cfg branch 2 times, most recently from f8c49f7 to 3e761d9 Compare February 3, 2025 16:06
Copy link
Contributor

@ParkMyCar ParkMyCar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woohoo!

Copy link
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test changes lgtm, I triggered a nightly run: https://buildkite.com/materialize/nightly/builds/11070

@def-
Copy link
Contributor

def- commented Feb 6, 2025

I'm a bit confused by the data ingest error: https://buildkite.com/materialize/nightly/builds/11070#0194dc61-63a3-4e71-b78a-7dad5aa1bac1

psycopg.errors.InternalError_: cannot create source in cluster with more than one replica

I thought that shouldn't happen anymore since we enable multi-cluster replication now?

@aljoscha
Copy link
Contributor Author

@def- This only enables multi-replica sources for Kafka sources, the other sources are still an open TODO after this. And yeah, sorry because the issue title is confusing.

Fwiw, here's the overall EPIC: https://github.com/MaterializeInc/database-issues/issues/5051

@aljoscha aljoscha changed the title storage: enable source replication, make it configurable storage: allow kafka/loadgen sources on multi-replica clusters, gated by dyncfg Feb 10, 2025
@aljoscha aljoscha force-pushed the active-replication-with-cfg branch from 3e761d9 to c1cf70e Compare February 10, 2025 11:30
Copy link
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nightly run: https://buildkite.com/materialize/nightly/builds/11102 Edit: I'm fixing up data ingest Edit2: New run just with data ingest: https://buildkite.com/materialize/nightly/builds/11104

I also want to write some more tests, but can do it after this is merged too.

@def- def- force-pushed the active-replication-with-cfg branch from 8265d81 to eae3e49 Compare February 10, 2025 18:03
@def-
Copy link
Contributor

def- commented Feb 10, 2025

Something interesting seems to have happened in the data ingest test: https://buildkite.com/materialize/nightly/builds/11104#_

data-ingest-materialized-1     | 2025-02-10T18:41:59.892749Z  thread 'main' panicked at src/adapter/src/catalog/apply.rs:1024:33: PlanError(Unstructured("cannot create source in cluster with more than one replica")): invalid persisted SQL: CREATE SOURCE "materialize"."public"."kafka_table0" IN CLUSTER [u2] FROM KAFKA CONNECTION [u1 AS "materialize"."public"."kafka_conn"] (TOPIC = 'data-ingest-0') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION [u2 AS "materialize"."public"."csr_conn"] SEED KEY SCHEMA '{"type":"record","name":"key","fields":[{"name":"key0","type":"long"},{"name":"key1","type":"int"}]}' VALUE SCHEMA '{"type":"record","name":"value","fields":[{"name":"value0","type":"int"},{"name":"value1","type":"double"},{"name":"value2","type":"double"},{"name":"value3","type":"string"},{"name":"value4","type":"double"},{"name":"value5","type":"int"},{"name":"value6","type":"double"},{"name":"value7","type":"int"},{"name":"value8","type":"double"}]}' ENVELOPE UPSERT EXPOSE PROGRESS AS [u17 AS "materialize"."public"."kafka_table0_progress"]

Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good, but I'm suspicious that the active_copies tracking might not always be correct. At least I don't understand why we can unconditionally initialize the number of active copies to 1 when a collection is created, and I think we can leak CollectionState when a replica is dropped at the wrong moment.

Comment on lines -1918 to -1963
} else {
soft_panic_or_log!(
"DroppedId for ID {id} but we have neither ingestion nor export \
under that ID"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that soft panic should stay? Since we're doing the active_copies tracking there isn't a reason we should ever get here, right?

for id in instance.active_ingestions() {
self.collections
.get_mut(id)
.expect("instance contains unknown ingestion")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this a soft panic instead? I'm wary of panics that could bring down envd, if it's not immediately obvious that we can't hit them. And I don't think that's obvious here.

@@ -948,6 +963,7 @@ where
data_source,
collection_metadata: metadata,
extra_state,
active_copies: 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know that there is one active copy when a collection is created? Don't we need to look at the number of replicas for that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this ends up being an ingestion, then run_ingestions sets the active_copies to the number of instance replicas. Otherwise it is set to one for tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Would be great to have that context in a comment too!

self.collections
.get_mut(id)
.expect("instance contains unknown ingestion")
.active_copies -= 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might have to remove collection state if this value drops to zero. For example, consider the case where a source is dropped but before we receive the DroppedId response from the replica we drop the replica. With the current code I think we would just leak the CollectionState for the dropped collection.

This is tricky because we can't always drop a collection when this counter goes to zero. All replicas in a cluster can be dropped without the sources on the same cluster being dropped as well.

@aljoscha
Copy link
Contributor Author

Looks mostly good, but I'm suspicious that the active_copies tracking might not always be correct. At least I don't understand why we can unconditionally initialize the number of active copies to 1 when a collection is created, and I think we can leak CollectionState when a replica is dropped at the wrong moment.

@petrosagg Could you please look at the questions about active_copies and the changes to reclock (basically all the questions above 😅)

@aljoscha
Copy link
Contributor Author

@teskje and @petrosagg For some of the trickier questions around collection state, active_copies, concurrency, assertions, and leaking state, I think you already know my general stance 😅

@aljoscha
Copy link
Contributor Author

@def- the nightly is failing with:

data-ingest-materialized-1     | 2025-02-10T18:41:59.892749Z  thread 'main' panicked at src/adapter/src/catalog/apply.rs:1024:33: PlanError(Unstructured("cannot create source in cluster with more than one replica")): invalid persisted SQL: CREATE SOURCE "materialize"."public"."kafka_table0" IN CLUSTER [u2] FROM KAFKA CONNECTION [u1 AS "materialize"."public"."kafka_conn"] (TOPIC = 'data-ingest-0') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION [u2 AS "materialize"."public"."csr_conn"] SEED KEY SCHEMA '{"type":"record","name":"key","fields":[{"name":"key0","type":"long"},{"name":"key1","type":"int"}]}' VALUE SCHEMA '{"type":"record","name":"value","fields":[{"name":"value0","type":"int"},{"name":"value1","type":"double"},{"name":"value2","type":"double"},{"name":"value3","type":"string"},{"name":"value4","type":"double"},{"name":"value5","type":"int"},{"name":"value6","type":"double"},{"name":"value7","type":"int"},{"name":"value8","type":"double"}]}' ENVELOPE UPSERT EXPOSE PROGRESS AS [u17 AS "materialize"."public"."kafka_table0_progress"]

Which happens when enable_multi_replica_sources=on is disabled but has been enabled before and there are multi-replica clusters with sources. We currently can't go back, because we would have to delete either replicas or sources to get back to a valid state with the flag disabled.

@aljoscha aljoscha force-pushed the active-replication-with-cfg branch from eae3e49 to 7f1aabf Compare February 14, 2025 10:15
@aljoscha
Copy link
Contributor Author

Rebased and fixed, @def- we still need to get to the bottom of ☝️ , are the tests flipping the dyncfg on and off?

@def-
Copy link
Contributor

def- commented Feb 14, 2025

No, they are not. The dyncfg should always be on in the Data Ingest test.

@def-
Copy link
Contributor

def- commented Feb 14, 2025

What might happen is that during bootstrapping we parse the SQL without respecting the dyncfg setting?

@aljoscha aljoscha force-pushed the active-replication-with-cfg branch from 7f1aabf to 0882f9c Compare February 17, 2025 12:42
@aljoscha
Copy link
Contributor Author

What might happen is that during bootstrapping we parse the SQL without respecting the dyncfg setting?

Good catch! I pushed a commit that should fix this.

petrosagg and others added 4 commits February 17, 2025 14:00
We want the mint operation to be a no-op if either new_from_upper or
binding_ts are not beyond the current source_upper and upper
respectively.

Before, we only checked the former which meant that if the requested
binding_ts was in the past we would attempt to write to the remap shard
updates that are not beyond the upper.

Co-authored-by: Petros Angelatos <[email protected]>
Signed-off-by: Petros Angelatos <[email protected]>
aljoscha and others added 4 commits February 17, 2025 14:00
This will lead to panics when bootstrapping from a catalog that has
these allowed. Largely, it seems, because we don't yet have the proper
dyncfg value that would allow this.

We still have checks that disallow sources on multi-replica clusters at
the sequencer level, so should be fine.
@aljoscha aljoscha force-pushed the active-replication-with-cfg branch from 0882f9c to 7ed362c Compare February 17, 2025 13:01
@aljoscha
Copy link
Contributor Author

@def- what about that remaining cloudtest failure here: https://buildkite.com/materialize/nightly/builds/11180?

@def-
Copy link
Contributor

def- commented Feb 17, 2025

I have pushed a fix: 2e0132c @jubrad does this make sense?

@aljoscha
Copy link
Contributor Author

I have pushed a fix: 2e0132c @jubrad does this make sense?

doh! tyty! 🙇‍♂️

@aljoscha aljoscha force-pushed the active-replication-with-cfg branch from 2e0132c to a2fe527 Compare February 17, 2025 16:04
@aljoscha
Copy link
Contributor Author

@def- I pushed a2fe527 because this was yielding

error: expected error containing "creating cluster replica would violate max_replicas_per_cluster limit", got "unknown cluster replica size 2000"

@def- def- force-pushed the active-replication-with-cfg branch from a2fe527 to 08db2f5 Compare February 17, 2025 16:06
@aljoscha
Copy link
Contributor Author

@def- I think you pushed a commit that changes the expected error message, on top of mine which changes size to replication factor 😅

@def-
Copy link
Contributor

def- commented Feb 17, 2025

Oops, didn't see your comment and just pushed something similar, sorry!

@def- def- force-pushed the active-replication-with-cfg branch from 08db2f5 to a2fe527 Compare February 17, 2025 16:26
@aljoscha
Copy link
Contributor Author

Oops, didn't see your comment and just pushed something similar, sorry!

No worries at all! ☺️

@aljoscha aljoscha merged commit 20fabdd into MaterializeInc:main Feb 17, 2025
237 of 241 checks passed
@aljoscha aljoscha deleted the active-replication-with-cfg branch February 17, 2025 20:46
def- added a commit to def-/materialize that referenced this pull request Feb 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants