forked from mdmuhtasimfuadfahim/rmq-pub-sub
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRMQSubStream.js
51 lines (43 loc) · 1.72 KB
/
RMQSubStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
const amqp = require('amqplib');
const validateTopic = require('./validations/validateTopic');
const RabbitMQLogger = require('./config/RabbitMQLogger');
const config = require('./config/config');
const validateData = require('./validations/validateData');
/**
* Creates a new RabbitMQ substream that consumes messages from a specified topic
* and returns the received data. Validates the topic and data before consuming
* and logs relevant information.
*
* @param {string} topic - The topic to consume messages from
* @return {Promise<JSON>} A Promise that resolves with the received data
* @throws {string} An error message if there was a problem consuming the topic
*/
const RMQSubStream = async (topic) => {
// basic validations
validateTopic(topic);
const connection = await amqp.connect(config.AMQP_URL);
const channel = await connection.createChannel();
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
try {
// setting up loggers
RabbitMQLogger.settopic(topic);
RabbitMQLogger.info(topic, {}, `Consuming topic ${topic}`);
await channel.assertQueue(topic);
await channel.consume(topic, async (data) => {
const receivedData = JSON.parse(data.content);
validateData(receivedData);
RabbitMQLogger.setLogData(receivedData);
RabbitMQLogger.info(topic, receivedData, `Received topic ${topic}`);
await channel.ack(data);
resolve(receivedData);
});
} catch (error) {
// setting up loggers
RabbitMQLogger.debug('Error to consume', error);
RabbitMQLogger.error(topic, 'Error: ', error.message);
reject(error.message);
}
});
};
module.exports = RMQSubStream;