-
Notifications
You must be signed in to change notification settings - Fork 233
/
Copy pathJsonProducer.cs
50 lines (42 loc) · 1.54 KB
/
JsonProducer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using KafkaNet.Configuration;
using KafkaNet.Protocol;
using Newtonsoft.Json;
namespace KafkaNet.Client
{
public class JsonProducer : IDisposable
{
private readonly Producer _producer;
public JsonProducer(IBrokerRouter brokerRouter, IKafkaOptions options)
{
_producer = new Producer(brokerRouter, options);
}
public Task<List<ProduceResponse>> Publish<T>(string topic, IEnumerable<T> messages, Int16 acks = 1, int timeoutMS = 1000) where T : class
{
return _producer.SendMessageAsync(topic, ConvertToKafkaMessage(messages), acks, timeoutMS);
}
private static IEnumerable<Message> ConvertToKafkaMessage<T>(IEnumerable<T> messages) where T : class
{
var hasKey = typeof(T).GetProperty("Key", typeof(string)) != null;
return messages.Select(m => new Message
{
Key = hasKey ? GetKeyPropertyValue(m) : null,
Value = JsonConvert.SerializeObject(m)
});
}
private static string GetKeyPropertyValue<T>(T message) where T : class
{
if (message == null) return null;
var info = message.GetType().GetProperty("Key", typeof(string));
if (info == null) return null;
return (string)info.GetValue(message);
}
public void Dispose()
{
using (_producer) { }
}
}
}