Skip to content

Conversation

@def-
Copy link
Contributor

@def- def- commented Nov 3, 2025

Using MaterializeInc/rust-rdkafka#35 which in turn is based on confluentinc/librdkafka#5230 (master with a minor build fix since we don't have libcurl available)

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@def- def- requested review from bkirwi and teskje November 3, 2025 22:47
@def-
Copy link
Contributor Author

def- commented Nov 3, 2025

Some build failures, have to see if we can get them fixed, but including curl doesn't seem that great: https://buildkite.com/materialize/test/builds/111230#019a4be8-5906-46c0-a7fe-6eb43453b110

@def-
Copy link
Contributor Author

def- commented Nov 4, 2025

There are unfortunately some test failures now: https://buildkite.com/materialize/test/builds/111238#019a4db7-ac91-4357-86d3-9c3c7d4dfcea I'll fix the Kafka tests ones. The Source/Sink Error Reporting ones look serious, I'm not sure if it's a bug in librdkafka or we need to adapt our code to changes:

> SELECT bool_or(error ~* 'topic testdrive-sink-topic-\d+ does not exist'), bool_or(details->'namespaced'->>'kafka' ~* 'topic testdrive-sink-topic-\d+ does not exist')
FROM mz_internal.mz_sink_status_history
JOIN mz_sinks ON mz_sinks.id = sink_id
WHERE name = 'sink1' and status = 'stalled'
2025-11-04T07:24:39.582327Z  WARN mz_testdrive::action::consistency: No Catalog state on disk, skipping consistency check
6:1: error: non-matching rows: expected:
[["true", "true"]]
got:
[["false", "false"]]
Poor diff:
+ false false
- true true

@def-
Copy link
Contributor Author

def- commented Nov 4, 2025

I looked into the source/sink errors a bit (bin/mzcompose --find source-sink-errors down && bin/mzcompose --find source-sink-errors run default) and it turns out we still get the correct result, it just takes librdkafka longer now to converge. I can adapt the test if we are ok with that. Edit: adapted it.

@def- def- requested review from DAlperin and martykulma November 4, 2025 09:35
@def- def- changed the title build(deps): Switch to upstream librdkafka build(deps): Switch to (mostly) upstream librdkafka Nov 4, 2025
@def- def- requested a review from petrosagg November 4, 2025 10:00
@def-
Copy link
Contributor Author

def- commented Nov 5, 2025

I bisected the Source/Sink error failures in librdkafka:

49f54254eca9e138d804fa06ac8b23e54b36128c is the first bad commit
commit 49f54254eca9e138d804fa06ac8b23e54b36128c
Author: Emanuele Sabellico <[email protected]>
Date:   Thu Feb 20 17:31:58 2025 +0100

    Use same strategy for updating cache partition metadata
    as for the `rd_kafka_toppar_t` that is also
    the same strategy as in Java client, considering:

    - when topic id changes partitions metadata is taken
      entirely from the new one.

    - when leader epoch is greater or equal to the
      one in the cache, or null (-1),
      partition metadata is taken from the new one.

    - when leader epoch is less than the one in the
      cache, partition metadata remains the same.

    Also when full metadata is necessary, the cache
    is used for storing it and for matching topics
    in the regex, removing the need to store
    the full metadata result, that is about the same
    size, but the cache is updated more accurately.

 CHANGELOG.md                 |   7 +
 src/rdkafka.c                |   3 +-
 src/rdkafka_cgrp.c           |   7 +-
 src/rdkafka_int.h            |   6 +-
 src/rdkafka_metadata.c       | 100 ++++++--------
 src/rdkafka_metadata.h       |  14 +-
 src/rdkafka_metadata_cache.c | 317 ++++++++++++++++++++++++++++++++++---------
 src/rdkafka_topic.c          |  80 +++++++++--
 src/rdkafka_topic.h          |   2 +
 9 files changed, 385 insertions(+), 151 deletions(-)

I think the old behavior in librdkafka was just buggy, the new one seems to be as documented: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
topic.metadata.propagation.max.ms defaults to 30s, and metadata.max.age.ms is "topic.metadata.refresh.interval.ms * 3", so you can have information being out of date by up to 90s.
Setting the config value manually indeed works:

diff --git a/src/kafka-util/src/client.rs b/src/kafka-util/src/client.rs
index 0181af23ae..e6cdc9facf 100644
--- a/src/kafka-util/src/client.rs
+++ b/src/kafka-util/src/client.rs
@@ -955,6 +955,7 @@ pub fn create_new_client_config(
             .as_millis()
             .to_string(),
     );
+    config.set("topic.metadata.propagation.max.ms", "10000");

     config
 }

Would we prefer that compared to changing the test?

@martykulma
Copy link
Contributor

Would we prefer that compared to changing the test?

My understanding is it should be fine to lower this value.

We do handle topic propagation delay in ensure_topic(), this change should just make that detection faster.

I'm not sure if we want to set it for all kafka clients or just sinks. Changes in topic metadata aren't likely to happen often. I'd vote for just setting it for the admin client we use to create the topic in sink.rs.

@def-
Copy link
Contributor Author

def- commented Nov 5, 2025

I'm ok with merging this as is. The 60s value in the test was just randomly chosen, it's not like we ever deliberately tested or specified that that's the time for a sink error to propagate.

I tried the sink-only change but that still causes the same test failures: #34034

Copy link
Contributor

@martykulma martykulma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @def-!

@def- def- merged commit 72c42b3 into MaterializeInc:main Nov 6, 2025
314 of 316 checks passed
@def- def- deleted the pr-rdkafka branch November 6, 2025 13:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants