diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs index f1be5aec..ac50e12a 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs @@ -34,6 +34,7 @@ public sealed class KafkaMessageConsumer : IMessageConsumer, IDisp private readonly IMetricFamily? _consumerLagSummary; private readonly ILogger> _logger; private readonly IHostApplicationLifetime _applicationLifetime; + private readonly AsyncPolicy _retryPolicy; private readonly List> _deadLetterPublisher; private IConsumer? _consumer; private readonly CancellationTokenSource _internalCts = new(); @@ -48,10 +49,14 @@ public KafkaMessageConsumer( IEnumerable>? deadLetterPublishers = null ) { - _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + ArgumentNullException.ThrowIfNull(logger); + ArgumentNullException.ThrowIfNull(config.Value); + ArgumentNullException.ThrowIfNull(applicationNameService); + ArgumentNullException.ThrowIfNull(cloudEventFormatter); + + _logger = logger; _applicationLifetime = applicationLifetime; - _applicationNameService = - applicationNameService ?? throw new ArgumentNullException(nameof(applicationNameService)); + _applicationNameService = applicationNameService; _cloudEventFormatter = cloudEventFormatter; _deadLetterPublisher = deadLetterPublishers?.ToList() ?? new List>(); _options = config.Value ?? throw new ArgumentNullException(nameof(config)); @@ -72,6 +77,13 @@ public KafkaMessageConsumer( _options.MaxConcurrentMessages ); _timer = new Timer(HandleCommitTimer); + + _retryPolicy = Policy + .HandleResult(status => status == ProcessedMessageStatus.TemporaryFailure) + .WaitAndRetryAsync( + _options.RetriesOnTemporaryFailure, + retryAttempt => _options.RetryBasePeriod * Math.Pow(2, retryAttempt) + ); } public Func< @@ -163,21 +175,24 @@ private void WriteLog(LogMessage logMessage) case SyslogLevel.Alert: case SyslogLevel.Critical: _logger.LogCritical( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); break; case SyslogLevel.Error: _logger.LogError( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); break; case SyslogLevel.Warning: _logger.LogWarning( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); @@ -185,14 +200,16 @@ private void WriteLog(LogMessage logMessage) case SyslogLevel.Notice: case SyslogLevel.Info: _logger.LogInformation( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); break; case SyslogLevel.Debug: _logger.LogDebug( - $"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})", + "{Message} -(Facility: {Facility}, Name: {Name})", + logMessage.Message, logMessage.Facility, logMessage.Name ); @@ -244,13 +261,7 @@ CancellationToken token ); var cloudEvent = KafkaMessageToCloudEvent(msg.Message); - var retryPolicy = Policy - .HandleResult(status => status == ProcessedMessageStatus.TemporaryFailure) - .WaitAndRetryAsync( - _options.RetriesOnTemporaryFailure, - retryAttempt => _options.RetryBasePeriod * Math.Pow(2, retryAttempt) - ); - var status = await retryPolicy.ExecuteAsync( + var status = await _retryPolicy.ExecuteAsync( (cancellationToken) => ConsumeCallbackAsync!.Invoke(cloudEvent, cancellationToken), token ); @@ -275,6 +286,7 @@ CancellationToken token private readonly Timer _timer; private readonly object _commitLock = new(); private bool _pendingCommit; + private int _messagesSinceLastCommit; private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken) { @@ -313,9 +325,12 @@ private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken) { _consumer?.StoreOffset(result.ConsumeResult); _pendingCommit = true; + _messagesSinceLastCommit++; } - if ((result.ConsumeResult.Offset.Value + 1) % _options.CommitPeriod == 0) + // Use message count since last commit instead of offset-based check. + // This works correctly across multiple partitions with non-contiguous offsets. + if (_messagesSinceLastCommit >= _options.CommitPeriod) { Commit(); RestartCommitTimer(); @@ -354,6 +369,7 @@ private void Commit() } _pendingCommit = false; + _messagesSinceLastCommit = 0; try { _consumer?.Commit(); @@ -455,7 +471,7 @@ private bool IsIrrecoverableFailure(ProcessedMessageStatus status) default: _logger.LogCritical( LogEvents.UnknownProcessedMessageStatus, - "Unknown processed message status {status}", + "Unknown processed message status {Status}", status ); return true; diff --git a/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs b/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs index b38c5b54..c9e2c4aa 100644 --- a/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs +++ b/src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs @@ -5,6 +5,7 @@ using CloudNative.CloudEvents.Extensions; using CloudNative.CloudEvents.Kafka; using Confluent.Kafka; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Motor.Extensions.Hosting.Abstractions; using Motor.Extensions.Hosting.CloudEvents; @@ -19,24 +20,59 @@ public class KafkaMessagePublisher : IRawMessagePublisher, IDi private readonly IProducer _producer; private readonly KafkaPublisherOptions _options; private readonly PublisherOptions _publisherOptions; + private readonly ILogger> _logger; public KafkaMessagePublisher( IOptions> options, CloudEventFormatter cloudEventFormatter, - IOptions publisherOptions + IOptions publisherOptions, + ILogger> logger ) { _cloudEventFormatter = cloudEventFormatter; _options = options.Value ?? throw new ArgumentNullException(nameof(options)); _publisherOptions = publisherOptions.Value ?? throw new ArgumentNullException(nameof(publisherOptions)); + _logger = logger; _producer = new ProducerBuilder(_options).Build(); } - public async Task PublishMessageAsync(MotorCloudEvent motorCloudEvent, CancellationToken token = default) + public Task PublishMessageAsync(MotorCloudEvent motorCloudEvent, CancellationToken token = default) { var topic = motorCloudEvent.GetKafkaTopic() ?? _options.Topic; var message = CloudEventToKafkaMessage(motorCloudEvent); - await _producer.ProduceAsync(topic, message, token); + + // Use Produce with a TaskCompletionSource for pipelining instead of + // awaiting ProduceAsync per message. This allows librdkafka to batch + // multiple messages into a single broker request, significantly + // improving throughput. + var tcs = new TaskCompletionSource>( + TaskCreationOptions.RunContinuationsAsynchronously + ); + + try + { + _producer.Produce( + topic, + message, + deliveryReport => + { + if (deliveryReport.Error.IsError) + { + tcs.SetException(new ProduceException(deliveryReport.Error, deliveryReport)); + } + else + { + tcs.SetResult(deliveryReport); + } + } + ); + } + catch (ProduceException ex) + { + tcs.SetException(ex); + } + + return tcs.Task; } public Message CloudEventToKafkaMessage(MotorCloudEvent motorCloudEvent) @@ -57,6 +93,14 @@ public async Task PublishMessageAsync(MotorCloudEvent motorCloudEvent, C public void Dispose() { + try + { + _producer.Flush(TimeSpan.FromSeconds(10)); + } + catch (Exception e) + { + _logger.LogWarning(e, "Error flushing producer during dispose"); + } _producer.Dispose(); } } diff --git a/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs b/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs index a5b86f9d..cd90e9ad 100644 --- a/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs +++ b/src/Motor.Extensions.Hosting.Kafka/LogEvents.cs @@ -15,7 +15,7 @@ public static class LogEvents public static readonly EventId UnknownProcessedMessageStatus = new(7, nameof(UnknownProcessedMessageStatus)); public static readonly EventId MessageHandlingUnexpectedException = new( - 7, + 8, nameof(MessageHandlingUnexpectedException) ); diff --git a/src/Motor.Extensions.Hosting.Kafka/Options/KafkaPublisherOptions.cs b/src/Motor.Extensions.Hosting.Kafka/Options/KafkaPublisherOptions.cs index 8ec676d7..62c582af 100644 --- a/src/Motor.Extensions.Hosting.Kafka/Options/KafkaPublisherOptions.cs +++ b/src/Motor.Extensions.Hosting.Kafka/Options/KafkaPublisherOptions.cs @@ -4,5 +4,14 @@ namespace Motor.Extensions.Hosting.Kafka.Options; public class KafkaPublisherOptions : ProducerConfig { + public KafkaPublisherOptions() + { + // Allow librdkafka to batch messages for better throughput. + // LingerMs controls how long to wait for additional messages before sending a batch. + LingerMs ??= 5; + // Enable Snappy compression for better network throughput with low CPU overhead. + CompressionType ??= Confluent.Kafka.CompressionType.Snappy; + } + public string? Topic { get; set; } } diff --git a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs index 934ec825..6277edfb 100644 --- a/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs +++ b/test/Motor.Extensions.Hosting.Kafka_IntegrationTest/KafkaExtensionTests.cs @@ -3,6 +3,7 @@ using CloudNative.CloudEvents.SystemTextJson; using Confluent.Kafka; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Moq; using Motor.Extensions.Hosting.Abstractions; @@ -516,7 +517,12 @@ private KafkaMessagePublisher GetPublisher(string topic) { var options = Options.Create(GetPublisherConfig(topic)); var publisherOptions = Options.Create(new PublisherOptions()); - return new KafkaMessagePublisher(options, new JsonEventFormatter(), publisherOptions); + return new KafkaMessagePublisher( + options, + new JsonEventFormatter(), + publisherOptions, + Mock.Of>>() + ); } private KafkaPublisherOptions GetPublisherConfig(string topic) diff --git a/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs b/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs index 855b51ea..58681ac8 100644 --- a/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs +++ b/test/Motor.Extensions.Hosting.Kafka_UnitTest/KafkaMessageTests.cs @@ -94,7 +94,8 @@ private static KafkaMessagePublisher GetKafkaPublisher( return new KafkaMessagePublisher( Options.Create(options), new JsonEventFormatter(), - Options.Create(new PublisherOptions { CloudEventFormat = format }) + Options.Create(new PublisherOptions { CloudEventFormat = format }), + Mock.Of>>() ); }