Skip to content

Document excludeMetaHeaderRegex kafka pubsub configuration #4724

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: v1.16
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ spec:
| sessionTimeout | N | The timeout used to detect client failures when using Kafka’s group management facility. If the broker fails to receive any heartbeats from the consumer before the expiration of this session timeout, then the consumer is removed and initiates a rebalance. Defaults to "10s". | `"20s"` |
| consumerGroupRebalanceStrategy | N | The strategy to use for consumer group rebalancing. Supported values: `range`, `sticky`, `roundrobin`. Default is `range` | `"sticky"` |
| escapeHeaders | N | Enables URL escaping of the message header values received by the consumer. Allows receiving content with special characters that are usually not allowed in HTTP headers. Default is `false`. | `true` |
| excludeHeaderMetaRegex | N | A regular expression to exclude keys from being converted from headers to metadata when consuming messages and from metadata to headers when publishing messages. This capability avoids unwanted downstream side effects for topic consumers. | '"^valueSchemaType$"`

The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component.

Expand Down Expand Up @@ -680,7 +681,34 @@ app.include_router(router)

{{% /codetab %}}

{{< /tabs >}}
{{< /tabs >}}

### Avoiding downstream side effects when publishing messages requiring custom metadata
Dapr allows customizing the publishing behavior by setting custom publish metadata.

For instance, to publish in avro format, it is required to set the `valueSchemaType=Avro` metadata.

However, by default these metadata items get converted to Kafka headers and published along with the message. This default behavior is very helpful for instance to forward tracing headers across a chain of publishers/consumers.

In certain scenario, however, it has unwanted side effects.
Let's assume you consume an Avro message using Dapr with the headers above.If this message cannot be consumed successfully and configured to be sent to a dead letter topic, `valueSchemaType=Avro` will be automatically carried forward when publishing to the dead letter topic, requiring the set up of a schema associated with this topic. In many scenarios, it is preferable to publish dead letter messages in JSON only, as complying to a determined schema is not possible.

To avoid this behavior, the kafka-pubsub component can be configured to exclude certain metadata keys from being converted to/from headers.
```yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub-exclude-metadata
type: pubsub.kafka
version: v1
metadata:
- name: brokers # Required. Kafka broker connection setting
value: "dapr-kafka.myapp.svc.cluster.local:9092"
- name: authType # Required.
value: "none"
- name: excludeMetaHeaderRegex
value: "^valueSchemaType$" # Optional. Excludes `valueSchemaType` header from being published to headers and converted to metadata
```

### Overriding default consumer group rebalancing
In Kafka, rebalancing strategies determine how partitions are assigned to consumers within a consumer group. The default strategy is "range", but "roundrobin" and "sticky" are also available.
Expand Down
Loading