Skip to content
50 changes: 33 additions & 17 deletions src/Motor.Extensions.Hosting.Kafka/KafkaMessageConsumer.cs
Comment thread
cavus700 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public sealed class KafkaMessageConsumer<TData> : IMessageConsumer<TData>, IDisp
private readonly IMetricFamily<ISummary>? _consumerLagSummary;
private readonly ILogger<KafkaMessageConsumer<TData>> _logger;
private readonly IHostApplicationLifetime _applicationLifetime;
private readonly AsyncPolicy<ProcessedMessageStatus> _retryPolicy;
private readonly List<IRawMessagePublisher<TData>> _deadLetterPublisher;
private IConsumer<string?, byte[]>? _consumer;
private readonly CancellationTokenSource _internalCts = new();
Expand All @@ -48,10 +49,14 @@ public KafkaMessageConsumer(
IEnumerable<IRawMessagePublisher<TData>>? deadLetterPublishers = null
)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
ArgumentNullException.ThrowIfNull(logger);
ArgumentNullException.ThrowIfNull(config.Value);
ArgumentNullException.ThrowIfNull(applicationNameService);
ArgumentNullException.ThrowIfNull(cloudEventFormatter);

_logger = logger;
_applicationLifetime = applicationLifetime;
_applicationNameService =
applicationNameService ?? throw new ArgumentNullException(nameof(applicationNameService));
_applicationNameService = applicationNameService;
_cloudEventFormatter = cloudEventFormatter;
_deadLetterPublisher = deadLetterPublishers?.ToList() ?? new List<IRawMessagePublisher<TData>>();
_options = config.Value ?? throw new ArgumentNullException(nameof(config));
Expand All @@ -72,6 +77,13 @@ public KafkaMessageConsumer(
_options.MaxConcurrentMessages
);
_timer = new Timer(HandleCommitTimer);

_retryPolicy = Policy
.HandleResult<ProcessedMessageStatus>(status => status == ProcessedMessageStatus.TemporaryFailure)
.WaitAndRetryAsync(
_options.RetriesOnTemporaryFailure,
retryAttempt => _options.RetryBasePeriod * Math.Pow(2, retryAttempt)
);
}

public Func<
Expand Down Expand Up @@ -163,36 +175,41 @@ private void WriteLog(LogMessage logMessage)
case SyslogLevel.Alert:
case SyslogLevel.Critical:
_logger.LogCritical(
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
"{Message} -(Facility: {Facility}, Name: {Name})",
logMessage.Message,
logMessage.Facility,
logMessage.Name
);
break;
case SyslogLevel.Error:
_logger.LogError(
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
"{Message} -(Facility: {Facility}, Name: {Name})",
logMessage.Message,
logMessage.Facility,
logMessage.Name
);
break;
case SyslogLevel.Warning:
_logger.LogWarning(
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
"{Message} -(Facility: {Facility}, Name: {Name})",
logMessage.Message,
logMessage.Facility,
logMessage.Name
);
break;
case SyslogLevel.Notice:
case SyslogLevel.Info:
_logger.LogInformation(
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
"{Message} -(Facility: {Facility}, Name: {Name})",
logMessage.Message,
logMessage.Facility,
logMessage.Name
);
break;
case SyslogLevel.Debug:
_logger.LogDebug(
$"{logMessage.Message} -(Facility: {{facility}}, Name: {{name}})",
"{Message} -(Facility: {Facility}, Name: {Name})",
logMessage.Message,
logMessage.Facility,
logMessage.Name
);
Expand Down Expand Up @@ -244,13 +261,7 @@ CancellationToken token
);
var cloudEvent = KafkaMessageToCloudEvent(msg.Message);

var retryPolicy = Policy
.HandleResult<ProcessedMessageStatus>(status => status == ProcessedMessageStatus.TemporaryFailure)
.WaitAndRetryAsync(
_options.RetriesOnTemporaryFailure,
retryAttempt => _options.RetryBasePeriod * Math.Pow(2, retryAttempt)
);
var status = await retryPolicy.ExecuteAsync(
var status = await _retryPolicy.ExecuteAsync(
(cancellationToken) => ConsumeCallbackAsync!.Invoke(cloudEvent, cancellationToken),
token
);
Expand All @@ -275,6 +286,7 @@ CancellationToken token
private readonly Timer _timer;
private readonly object _commitLock = new();
private bool _pendingCommit;
private int _messagesSinceLastCommit;

private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -313,9 +325,12 @@ private async Task ExecuteCommitLoopAsync(CancellationToken cancellationToken)
{
_consumer?.StoreOffset(result.ConsumeResult);
_pendingCommit = true;
_messagesSinceLastCommit++;
}

if ((result.ConsumeResult.Offset.Value + 1) % _options.CommitPeriod == 0)
// Use message count since last commit instead of offset-based check.
// This works correctly across multiple partitions with non-contiguous offsets.
if (_messagesSinceLastCommit >= _options.CommitPeriod)
{
Commit();
RestartCommitTimer();
Expand Down Expand Up @@ -354,6 +369,7 @@ private void Commit()
}

_pendingCommit = false;
_messagesSinceLastCommit = 0;
try
{
_consumer?.Commit();
Expand Down Expand Up @@ -455,7 +471,7 @@ private bool IsIrrecoverableFailure(ProcessedMessageStatus status)
default:
_logger.LogCritical(
LogEvents.UnknownProcessedMessageStatus,
"Unknown processed message status {status}",
"Unknown processed message status {Status}",
status
);
return true;
Expand Down
50 changes: 47 additions & 3 deletions src/Motor.Extensions.Hosting.Kafka/KafkaMessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using CloudNative.CloudEvents.Extensions;
using CloudNative.CloudEvents.Kafka;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Motor.Extensions.Hosting.Abstractions;
using Motor.Extensions.Hosting.CloudEvents;
Expand All @@ -19,24 +20,59 @@ public class KafkaMessagePublisher<TOutput> : IRawMessagePublisher<TOutput>, IDi
private readonly IProducer<string?, byte[]> _producer;
private readonly KafkaPublisherOptions<TOutput> _options;
private readonly PublisherOptions _publisherOptions;
private readonly ILogger<KafkaMessagePublisher<TOutput>> _logger;

public KafkaMessagePublisher(
IOptions<KafkaPublisherOptions<TOutput>> options,
CloudEventFormatter cloudEventFormatter,
IOptions<PublisherOptions> publisherOptions
IOptions<PublisherOptions> publisherOptions,
ILogger<KafkaMessagePublisher<TOutput>> logger
)
{
_cloudEventFormatter = cloudEventFormatter;
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
_publisherOptions = publisherOptions.Value ?? throw new ArgumentNullException(nameof(publisherOptions));
_logger = logger;
_producer = new ProducerBuilder<string?, byte[]>(_options).Build();
}

public async Task PublishMessageAsync(MotorCloudEvent<byte[]> motorCloudEvent, CancellationToken token = default)
public Task PublishMessageAsync(MotorCloudEvent<byte[]> motorCloudEvent, CancellationToken token = default)
{
var topic = motorCloudEvent.GetKafkaTopic() ?? _options.Topic;
var message = CloudEventToKafkaMessage(motorCloudEvent);
await _producer.ProduceAsync(topic, message, token);

// Use Produce with a TaskCompletionSource for pipelining instead of
// awaiting ProduceAsync per message. This allows librdkafka to batch
// multiple messages into a single broker request, significantly
// improving throughput.
var tcs = new TaskCompletionSource<DeliveryResult<string?, byte[]>>(
TaskCreationOptions.RunContinuationsAsynchronously
);

try
{
_producer.Produce(
topic,
message,
deliveryReport =>
{
if (deliveryReport.Error.IsError)
{
tcs.SetException(new ProduceException<string?, byte[]>(deliveryReport.Error, deliveryReport));
}
else
{
tcs.SetResult(deliveryReport);
}
}
);
}
catch (ProduceException<string?, byte[]> ex)
{
tcs.SetException(ex);
}

return tcs.Task;
}

public Message<string?, byte[]> CloudEventToKafkaMessage(MotorCloudEvent<byte[]> motorCloudEvent)
Expand All @@ -57,6 +93,14 @@ public async Task PublishMessageAsync(MotorCloudEvent<byte[]> motorCloudEvent, C

public void Dispose()
{
try
{
_producer.Flush(TimeSpan.FromSeconds(10));
}
catch (Exception e)
{
_logger.LogWarning(e, "Error flushing producer during dispose");
}
_producer.Dispose();
}
}
2 changes: 1 addition & 1 deletion src/Motor.Extensions.Hosting.Kafka/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public static class LogEvents
public static readonly EventId UnknownProcessedMessageStatus = new(7, nameof(UnknownProcessedMessageStatus));

public static readonly EventId MessageHandlingUnexpectedException = new(
7,
8,
nameof(MessageHandlingUnexpectedException)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,14 @@ namespace Motor.Extensions.Hosting.Kafka.Options;

public class KafkaPublisherOptions<T> : ProducerConfig
{
public KafkaPublisherOptions()
{
// Allow librdkafka to batch messages for better throughput.
// LingerMs controls how long to wait for additional messages before sending a batch.
LingerMs ??= 5;
// Enable Snappy compression for better network throughput with low CPU overhead.
CompressionType ??= Confluent.Kafka.CompressionType.Snappy;
}

public string? Topic { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using CloudNative.CloudEvents.SystemTextJson;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Moq;
using Motor.Extensions.Hosting.Abstractions;
Expand Down Expand Up @@ -516,7 +517,12 @@ private KafkaMessagePublisher<T> GetPublisher<T>(string topic)
{
var options = Options.Create(GetPublisherConfig<T>(topic));
var publisherOptions = Options.Create(new PublisherOptions());
return new KafkaMessagePublisher<T>(options, new JsonEventFormatter(), publisherOptions);
return new KafkaMessagePublisher<T>(
options,
new JsonEventFormatter(),
publisherOptions,
Mock.Of<ILogger<KafkaMessagePublisher<T>>>()
);
}

private KafkaPublisherOptions<T> GetPublisherConfig<T>(string topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ private static KafkaMessagePublisher<TData> GetKafkaPublisher<TData>(
return new KafkaMessagePublisher<TData>(
Options.Create(options),
new JsonEventFormatter(),
Options.Create(new PublisherOptions { CloudEventFormat = format })
Options.Create(new PublisherOptions { CloudEventFormat = format }),
Mock.Of<ILogger<KafkaMessagePublisher<TData>>>()
);
}

Expand Down
Loading