-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathkafka-producer.js
34 lines (25 loc) · 909 Bytes
/
kafka-producer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
let kafkaurl = process.env.KAFKA_URL?process.env.KAFKA_URL:'localhost:9092'
console.log(`Configuring kafka URL as ${kafkaurl}`)
let kafka = require('kafka-node'),
Producer = kafka.Producer,
client = new kafka.KafkaClient({kafkaHost: kafkaurl}),
producer = new Producer(client)
producer.on('error', function (err) {})
module.exports = {
send(topic, msg){
producer.send([{ topic: topic, messages: msg, partition: 0 },], function (err, data) {
data? console.log(data): false
})
},
createTopic(topic){
let topicsToCreate = [{
topic: topic,
partitions: 1,
replicationFactor: 1
}]
client.createTopics(topicsToCreate, (error, result) => {
console.log(result)
// result is an array of any errors if a given topic could not be created
});
}
}