-
Notifications
You must be signed in to change notification settings - Fork 3.3k
[internal/kafka] Validate franz-go configuration options early in Config.Validate #46024
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Extract securityOpts helper from commonOpts and include TLS/SASL/Kerberos options in kgo.ValidateOpts calls, using context.Background(). Add test certificate files to kafkareceiver testdata. Co-authored-by: Cursor <[email protected]>
Co-authored-by: Cursor <[email protected]>
internal/kafka/franz_client.go
Outdated
| // converting config values to kgo options and calling kgo.ValidateOpts. | ||
| func ValidateClientConfigOpts(cfg configkafka.ClientConfig) error { | ||
| opts := clientConfigOpts(cfg) | ||
| secOpts, err := securityOpts(context.Background(), cfg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[to reviewer] had to use context.Background because tlsConfig.LoadTLSConfig takes a ctx but it isn't available in config validate code path.
| CAFile: "ca.pem", | ||
| CertFile: "cert.pem", | ||
| KeyFile: "key.pem", | ||
| CAFile: "testdata/ca.pem", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[to reviewer] it was referencing non-existent tls files. This is now fixed by copying files into testdata and referencing them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's ok for me 👍
Thanks for taking it. I was wondering if it does not make sense to dynamically generate certs at the test start, but for config test, maybe it does not take the effort 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer these to be dynamically generated within tests if possible, since if someone was to blindly scan the project and see static keys embedded they may panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds early validation of franz-go (franz-go/kgo) configuration by performing option validation during Config.Validate() for Kafka components, improving feedback on misconfiguration before runtime.
Changes:
- Introduces
internal/kafka.Validate{Client,Producer,Consumer}ConfigOptshelpers that translate configs tokgo.Optand validate them. - Hooks the new validation into
Config.Validate()forexporter/kafka,receiver/kafka, andreceiver/kafkametrics. - Updates
kafkareceivertestdata/config and adds TLS fixture PEM files to support TLS validation in tests.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
internal/kafka/franz_client.go |
Refactors franz-go option construction and adds exported validation helpers using kgo.ValidateOpts. |
exporter/kafkaexporter/config.go |
Validates producer/client franz-go options during exporter config validation. |
receiver/kafkareceiver/config.go |
Validates consumer/client franz-go options per configured signal during receiver config validation. |
receiver/kafkametricsreceiver/config.go |
Adds config validation that checks franz-go client options early. |
receiver/kafkareceiver/config_test.go |
Adjusts expectations and test cases to account for TLS file paths and default client/consumer configs. |
receiver/kafkareceiver/testdata/config.yaml |
Updates TLS paths to point at new testdata PEM files. |
receiver/kafkareceiver/testdata/ca.pem |
Adds CA certificate fixture for TLS tests. |
receiver/kafkareceiver/testdata/cert.pem |
Adds client certificate fixture for TLS tests. |
receiver/kafkareceiver/testdata/key.pem |
Adds private key fixture for TLS tests. |
.chloggen/kafka-validate-opts-*.yaml |
Adds changelog entries for the affected Kafka components. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
receiver/kafkareceiver/config.go
Outdated
| // Validate franz-go options for each configured signal. Deprecated | ||
| // Topic/ExcludeTopic fields are already migrated by Unmarshal. | ||
| for _, sig := range []*TopicEncodingConfig{&c.Logs, &c.Metrics, &c.Traces, &c.Profiles} { | ||
| if len(sig.Topics) == 0 { | ||
| continue | ||
| } | ||
| if err := kafka.ValidateConsumerConfigOpts(c.ClientConfig, c.ConsumerConfig, sig.Topics, sig.ExcludeTopics); err != nil { |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Config.Validate currently calls ValidateConsumerConfigOpts once per configured signal; that helper loads/parses TLS material via LoadTLSConfig each time. With the default config this can happen up to 4 times and repeats identical work. Consider validating the client/consumer/security options once (or caching the computed security opts) and only varying the topic-related opts per signal to avoid repeated file I/O and cert parsing during startup validation.
| // Validate franz-go options for each configured signal. Deprecated | |
| // Topic/ExcludeTopic fields are already migrated by Unmarshal. | |
| for _, sig := range []*TopicEncodingConfig{&c.Logs, &c.Metrics, &c.Traces, &c.Profiles} { | |
| if len(sig.Topics) == 0 { | |
| continue | |
| } | |
| if err := kafka.ValidateConsumerConfigOpts(c.ClientConfig, c.ConsumerConfig, sig.Topics, sig.ExcludeTopics); err != nil { | |
| // Validate franz-go options. Deprecated Topic/ExcludeTopic fields are already migrated by Unmarshal. | |
| var allTopics []string | |
| var allExcludeTopics []string | |
| for _, sig := range []*TopicEncodingConfig{&c.Logs, &c.Metrics, &c.Traces, &c.Profiles} { | |
| if len(sig.Topics) == 0 { | |
| continue | |
| } | |
| allTopics = append(allTopics, sig.Topics...) | |
| allExcludeTopics = append(allExcludeTopics, sig.ExcludeTopics...) | |
| } | |
| if len(allTopics) > 0 { | |
| if err := kafka.ValidateConsumerConfigOpts(c.ClientConfig, c.ConsumerConfig, allTopics, allExcludeTopics); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't think we can avoid this nicely. When the component starts up and invokes NewClient, it will parse TLS files per signal anyway.
internal/kafka/franz_client.go
Outdated
| // ValidateClientConfigOpts validates the configkafka.ClientConfig by | ||
| // converting config values to kgo options and calling kgo.ValidateOpts. | ||
| func ValidateClientConfigOpts(cfg configkafka.ClientConfig) error { | ||
| opts := clientConfigOpts(cfg) | ||
| secOpts, err := securityOpts(context.Background(), cfg) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| opts = append(opts, secOpts...) | ||
| return kgo.ValidateOpts(opts...) | ||
| } | ||
|
|
||
| // ValidateProducerConfigOpts validates the configkafka.ClientConfig and | ||
| // configkafka.ProducerConfig by converting config values to kgo options | ||
| // and calling kgo.ValidateOpts. | ||
| func ValidateProducerConfigOpts(clientCfg configkafka.ClientConfig, producerCfg configkafka.ProducerConfig, timeout time.Duration) error { | ||
| opts := clientConfigOpts(clientCfg) | ||
| secOpts, err := securityOpts(context.Background(), clientCfg) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| opts = append(opts, secOpts...) | ||
| opts = append(opts, producerConfigOpts(producerCfg)...) | ||
| opts = append(opts, kgo.ProduceRequestTimeout(timeout)) | ||
| return kgo.ValidateOpts(opts...) |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new Validate*ConfigOpts helpers return raw errors from securityOpts / kgo.ValidateOpts. Those errors can be fairly low-level and may not mention which config section they came from. Consider wrapping returned errors with a short prefix (e.g., "invalid client config options" / "invalid producer config options" / "invalid consumer config options") to make Config.Validate() failures clearer for users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consumer / producer config vs client config isn't a user-facing concept, don't think we need to tell between them in error message.
Co-authored-by: Copilot <[email protected]>
Add testdata entries and test cases to each component's TestLoadConfig to verify that kgo.ValidateOpts catches invalid configurations early. Co-authored-by: Cursor <[email protected]>
PTAL into the CI error above 🙏 |
Description
Validate franz-go configuration options early in Config.Validate
Affects exporter/kafka, receiver/kafka, receiver/kafkametrics
Link to tracking issue
Testing
Documentation