From 864755839e73a1c8e4e6afcc8a4c680ff9e0ec31 Mon Sep 17 00:00:00 2001 From: "robin.brehauer" Date: Tue, 14 Apr 2026 11:56:04 +0200 Subject: [PATCH 1/8] Eliminated per-message retry policy allocation --- .../KafkaMessageConsumer.cs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs index 56c6587e0..3a54de264 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs @@ -34,6 +34,7 @@ public sealed class KafkaMessageConsumer : IMessageConsumer, IDisp private readonly IMetricFamily? _consumerLagSummary; private readonly ILogger> _logger; private readonly IHostApplicationLifetime _applicationLifetime; + private readonly AsyncPolicy _retryPolicy; private IConsumer? _consumer; private readonly CancellationTokenSource _internalCts = new(); @@ -69,6 +70,13 @@ CloudEventFormatter cloudEventFormatter _options.MaxConcurrentMessages ); _timer = new Timer(HandleCommitTimer); + + _retryPolicy = Policy + .HandleResult(status => status == ProcessedMessageStatus.TemporaryFailure) + .WaitAndRetryAsync( + _options.RetriesOnTemporaryFailure, + retryAttempt => _options.RetryBasePeriod * Math.Pow(2, retryAttempt) + ); } public Func< @@ -241,13 +249,7 @@ CancellationToken token ); var cloudEvent = KafkaMessageToCloudEvent(msg.Message); - var retryPolicy = Policy - .HandleResult(status => status == ProcessedMessageStatus.TemporaryFailure) - .WaitAndRetryAsync( - _options.RetriesOnTemporaryFailure, - retryAttempt => _options.RetryBasePeriod * Math.Pow(2, retryAttempt) - ); - var status = await retryPolicy.ExecuteAsync( + var status = await _retryPolicy.ExecuteAsync( (cancellationToken) => ConsumeCallbackAsync!.Invoke(cloudEvent, cancellationToken), token ); From 28f8305fdd3f5235d18b2730aedb74d9662352eb Mon Sep 17 00:00:00 2001 From: "robin.brehauer" Date: Tue, 14 Apr 2026 11:59:03 +0200 Subject: [PATCH 2/8] Fixed unreliable offset-based commit period --- src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs index 3a54de264..0e02d79ab 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs @@ -274,6 +274,7 @@ CancellationToken token private readonly Timer _timer; private readonly object _commitLock = new(); private bool _pendingCommit; + private int _messagesSinceLastCommit; private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken) { @@ -299,9 +300,12 @@ private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken) { _consumer?.StoreOffset(result.ConsumeResult); _pendingCommit = true; + _messagesSinceLastCommit++; } - if ((result.ConsumeResult.Offset.Value + 1) % _options.CommitPeriod == 0) + // Use message count since last commit instead of offset-based check. + // This works correctly across multiple partitions with non-contiguous offsets. + if (_messagesSinceLastCommit >= _options.CommitPeriod) { Commit(); RestartCommitTimer(); @@ -340,6 +344,7 @@ private void Commit() } _pendingCommit = false; + _messagesSinceLastCommit = 0; try { _consumer?.Commit(); From 51b25d75a84a7c0ee7a9054dc5a0e6e1d37454ec Mon Sep 17 00:00:00 2001 From: "robin.brehauer" Date: Tue, 14 Apr 2026 11:59:42 +0200 Subject: [PATCH 3/8] Fixed string interpolation in log templates --- .../KafkaMessageConsumer.cs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs index 0e02d79ab..753b6e8ba 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs @@ -168,21 +168,24 @@ private void WriteLog(LogMessage logMessage) case SyslogLevel.Alert: case SyslogLevel.Critical: _logger.LogCritical( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); break; case SyslogLevel.Error: _logger.LogError( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); break; case SyslogLevel.Warning: _logger.LogWarning( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); @@ -190,14 +193,16 @@ private void WriteLog(LogMessage logMessage) case SyslogLevel.Notice: case SyslogLevel.Info: _logger.LogInformation( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); break; case SyslogLevel.Debug: _logger.LogDebug( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); @@ -393,7 +398,7 @@ private bool IsIrrecoverableFailure(ProcessedMessageStatus status) default: _logger.LogCritical( LogEvents.UnknownProcessedMessageStatus, - "Unknown processed message status {status}", + "Unknown processed message status {Status}", status ); return true; From 45e40e05011fc7e9532eeeeb335351dfe8ee0c50 Mon Sep 17 00:00:00 2001 From: "robin.brehauer" Date: Tue, 14 Apr 2026 12:11:40 +0200 Subject: [PATCH 4/8] Flush Kafka publisher on dispose --- .../KafkaMessagePublisher.cs | 14 +++++++++++++- .../KafkaExtensionTests.cs | 8 +++++++- .../KafkaMessageTests.cs | 3 ++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs index b38c5b543..30e3c2cc6 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs @@ -5,6 +5,7 @@ using CloudNative.CloudEvents.Extensions; using CloudNative.CloudEvents.Kafka; using Confluent.Kafka; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; @@ -19,16 +20,19 @@ public class KafkaMessagePublisher : IRawMessagePublisher, IDi private readonly IProducer _producer; private readonly KafkaPublisherOptions _options; private readonly PublisherOptions _publisherOptions; + private readonly ILogger> _logger; public KafkaMessagePublisher( IOptions> options, CloudEventFormatter cloudEventFormatter, - IOptions publisherOptions + IOptions publisherOptions, + ILogger> logger ) { _cloudEventFormatter = cloudEventFormatter; _options = options.Value ?? throw new ArgumentNullException(nameof(options)); _publisherOptions = publisherOptions.Value ?? throw new ArgumentNullException(nameof(publisherOptions)); + _logger = logger; _producer = new ProducerBuilder(_options).Build(); } @@ -57,6 +61,14 @@ public async Task PublishMessageAsync(MotorCloudEvent motorCloudEvent, C public void Dispose() { + try + { + _producer.Flush(TimeSpan.FromSeconds(10)); + } + catch (Exception e) + { + _logger.LogWarning(e, "Error flushing producer during dispose"); + } _producer.Dispose(); } } diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs index c8031596e..20a4a8e26 100644 --- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs +++ b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs @@ -8,6 +8,7 @@ using CloudNative.CloudEvents.SystemTextJson; using Confluent.Kafka; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Moq; using Motor.Extensions.Hosting.Abstractions; @@ -519,7 +520,12 @@ private KafkaMessagePublisher GetPublisher(string topic) { var options = Options.Create(GetPublisherConfig(topic)); var publisherOptions = Options.Create(new PublisherOptions()); - return new KafkaMessagePublisher(options, new JsonEventFormatter(), publisherOptions); + return new KafkaMessagePublisher( + options, + new JsonEventFormatter(), + publisherOptions, + Mock.Of>>() + ); } private KafkaPublisherOptions GetPublisherConfig(string topic) diff --git a/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs b/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs index e6112ecc1..d3fda8d13 100644 --- a/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs +++ b/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs @@ -96,7 +96,8 @@ private static KafkaMessagePublisher GetKafkaPublisher( return new KafkaMessagePublisher( Options.Create(options), new JsonEventFormatter(), - Options.Create(new PublisherOptions { CloudEventFormat = format }) + Options.Create(new PublisherOptions { CloudEventFormat = format }), + Mock.Of>>() ); } From 128e148f4a3a17413398397037290a1d51b02f06 Mon Sep 17 00:00:00 2001 From: "robin.brehauer" Date: Tue, 14 Apr 2026 12:12:14 +0200 Subject: [PATCH 5/8] Set batching and compression defaults for Kafka publisher --- .../Options/KafkaPublisherOptions.cs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/Motor.Extensions.Hosting.Kafka/Options/KafkaPublisherOptions.cs b/src/Motor.Extensions.Hosting.Kafka/Options/KafkaPublisherOptions.cs index 8ec676d7c..62c582afe 100644 --- a/src/Motor.Extensions.Hosting.Kafka/Options/KafkaPublisherOptions.cs +++ b/src/Motor.Extensions.Hosting.Kafka/Options/KafkaPublisherOptions.cs @@ -4,5 +4,14 @@ namespace Motor.Extensions.Hosting.Kafka.Options; public class KafkaPublisherOptions : ProducerConfig { + public KafkaPublisherOptions() + { + // Allow librdkafka to batch messages for better throughput. + // LingerMs controls how long to wait for additional messages before sending a batch. + LingerMs ??= 5; + // Enable Snappy compression for better network throughput with low CPU overhead. + CompressionType ??= Confluent.Kafka.CompressionType.Snappy; + } + public string? Topic { get; set; } } From 7c2bcdcadcd5d2a5b2eeb25af262e626fcd75337 Mon Sep 17 00:00:00 2001 From: "robin.brehauer" Date: Tue, 14 Apr 2026 12:05:16 +0200 Subject: [PATCH 6/8] Fix duplicate Kafka EventId --- src/Motor.Extensions.Hosting.Kafka/LogEvents.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs b/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs index c135f3979..afbff61c6 100644 --- a/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs +++ b/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs @@ -14,7 +14,7 @@ public static class LogEvents public static readonly EventId UnknownProcessedMessageStatus = new(7, nameof(UnknownProcessedMessageStatus)); public static readonly EventId MessageHandlingUnexpectedException = new( - 7, + 8, nameof(MessageHandlingUnexpectedException) ); } From 844bf057689aa91e773c6c4aee74ef67bc1ece72 Mon Sep 17 00:00:00 2001 From: "robin.brehauer" Date: Tue, 14 Apr 2026 12:06:09 +0200 Subject: [PATCH 7/8] Imporve Kafka publisher throughput --- .../KafkaMessagePublisher.cs | 36 +++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs index 30e3c2cc6..c9e2c4aae 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs @@ -36,11 +36,43 @@ ILogger> logger _producer = new ProducerBuilder(_options).Build(); } - public async Task PublishMessageAsync(MotorCloudEvent motorCloudEvent, CancellationToken token = default) + public Task PublishMessageAsync(MotorCloudEvent motorCloudEvent, CancellationToken token = default) { var topic = motorCloudEvent.GetKafkaTopic() ?? _options.Topic; var message = CloudEventToKafkaMessage(motorCloudEvent); - await _producer.ProduceAsync(topic, message, token); + + // Use Produce with a TaskCompletionSource for pipelining instead of + // awaiting ProduceAsync per message. This allows librdkafka to batch + // multiple messages into a single broker request, significantly + // improving throughput. + var tcs = new TaskCompletionSource>( + TaskCreationOptions.RunContinuationsAsynchronously + ); + + try + { + _producer.Produce( + topic, + message, + deliveryReport => + { + if (deliveryReport.Error.IsError) + { + tcs.SetException(new ProduceException(deliveryReport.Error, deliveryReport)); + } + else + { + tcs.SetResult(deliveryReport); + } + } + ); + } + catch (ProduceException ex) + { + tcs.SetException(ex); + } + + return tcs.Task; } public Message CloudEventToKafkaMessage(MotorCloudEvent motorCloudEvent) From 4dc0ef8084e0df32afb7aefc466b8fa070d64a31 Mon Sep 17 00:00:00 2001 From: "robin.brehauer" Date: Tue, 14 Apr 2026 12:10:01 +0200 Subject: [PATCH 8/8] Make consturctor of Kafka publisher more readable --- .../KafkaMessageConsumer.cs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs index 753b6e8ba..67c5e5211 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs @@ -47,12 +47,17 @@ public KafkaMessageConsumer( CloudEventFormatter cloudEventFormatter ) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + ArgumentNullException.ThrowIfNull(logger); + ArgumentNullException.ThrowIfNull(config.Value); + ArgumentNullException.ThrowIfNull(applicationNameService); + ArgumentNullException.ThrowIfNull(cloudEventFormatter); + + _logger = logger; _applicationLifetime = applicationLifetime; - _applicationNameService = - applicationNameService ?? throw new ArgumentNullException(nameof(applicationNameService)); + _applicationNameService = applicationNameService; _cloudEventFormatter = cloudEventFormatter; - _options = config.Value ?? throw new ArgumentNullException(nameof(config)); + _options = config.Value; + _consumerLagSummary = metricsFactory?.CreateSummary( "consumer_lag_distribution", "Contains a summary of current consumer lag of each partition",