forked from mdmuhtasimfuadfahim/kafka-pub-sub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProduceEvent.js
53 lines (46 loc) · 1.71 KB
/
ProduceEvent.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
const { Partitioners } = require('kafkajs');
const kafka = require('./config/kafka');
const validateTopic = require('./validation/validateTopic');
const validateEvent = require('./validation/validateEvent');
const validateData = require('./validation/validateData');
const validateHeaders = require('./validation/validateHeaders');
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner
});
/**
* Asynchronously produces an event for a given topic with optional data.
*
* @param {string} topic - the topic to produce the event to.
* @param {string} event - the name of the event being produced.
* @param {Object} [data={}] - optional data to include with the event.
* @param {Object} [headers={}] - optional headers to include with the message.
* @return {Promise} a promise that resolves when the message is sent successfully.
* @throws {string} an error message if message sending fails.
*/
const ProduceEvent = async (topic, event, data = {}, headers = {}) => {
// basic validations
validateTopic(topic);
validateEvent(event);
validateData(data);
validateHeaders(headers);
await producer.connect();
return new Promise(async (resolve, reject) => {
try {
const message = [{
key: `key-${event}`,
value: JSON.stringify({ data }),
headers,
}];
const response = await producer.send({
topic,
messages: message,
});
await producer.disconnect();
resolve(response);
} catch (error) {
await producer.disconnect();
reject(error.message);
}
});
};
module.exports = ProduceEvent;