From 4a9292d992be156c0a1854d30222d452367a8039 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=B6hling=2C=20Maximilian?= Date: Tue, 9 Jun 2026 16:18:28 +0200 Subject: [PATCH] fix(KafkaClientExtensions): Fix uppercase characters not being allowed in CloudEvents --- .../KafkaClientExtensions.cs | 5 ++++- .../KafkaExtensionTests.cs | 7 ++++--- .../KafkaMessageTests.cs | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaClientExtensions.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaClientExtensions.cs index 4c829607..eadff887 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaClientExtensions.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaClientExtensions.cs @@ -81,7 +81,10 @@ MotorCloudEvent motorCloudEvent { try { - motorCloudEvent.SetAttributeFromString(header.Key, Encoding.UTF8.GetString(headerValue)); + motorCloudEvent.SetAttributeFromString( + header.Key.ToLowerInvariant(), + Encoding.UTF8.GetString(headerValue) + ); } catch (ArgumentException) { diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs index 076c6b43..bc754f4d 100644 --- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs +++ b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs @@ -80,19 +80,20 @@ public async Task Consume_RawPublishIntoKafkaAndConsumeCreateCloudEvent_Consumed public async Task Consume_RawPublishWithKafkaHeader_HeaderValueIsAvailableAsCloudEventAttribute() { var topic = NewTopic(); - const string headerKey = "customheader"; + const string headerKeyOriginal = "CustomHeaderNamE"; + const string headerKeyConverted = "customheadername"; const string headerValue = "customHeaderValue"; await PublishMessage( topic, "someKey", Message, - new Headers { { headerKey, Encoding.UTF8.GetBytes(headerValue) } } + new Headers { { headerKeyOriginal, Encoding.UTF8.GetBytes(headerValue) } } ); using var consumer = GetConsumer(topic); var consumedHeaderValueCompletionSource = new TaskCompletionSource(); consumer.ConsumeCallbackAsync = (dataEvent, _) => { - consumedHeaderValueCompletionSource.TrySetResult(dataEvent[headerKey] as string); + consumedHeaderValueCompletionSource.TrySetResult(dataEvent[headerKeyConverted] as string); return Task.FromResult(ProcessedMessageStatus.Success); }; diff --git a/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs b/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs index 2c7dec75..6b4a51c0 100644 --- a/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs +++ b/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs @@ -90,7 +90,7 @@ public void KafkaMessageToCloudEvent_WithHeaders_HeadersExtractedToCloudEvent() var kafkaMessage = publisher.CloudEventToKafkaMessage(inputCloudEvent); kafkaMessage.Headers ??= new Headers(); kafkaMessage.Headers.Add("firstcustomheader", "customvalue"u8.ToArray()); - kafkaMessage.Headers.Add("secondcustomheader", "customvalue_two"u8.ToArray()); + kafkaMessage.Headers.Add("SECONDCUSTOMHEADER", "customvalue_two"u8.ToArray()); var outputCloudEvent = consumer.KafkaMessageToCloudEvent(kafkaMessage);