Skip to content

Commit 1a16317

Browse files
committed
Basic tutorial of how to use rabbitmq with nodejs
0 parents  commit 1a16317

12 files changed

+402
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
node_modules

how-to-run.md

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# How To Run The Examples
2+
3+
### Dependencies
4+
* RabbitMQ Host.
5+
* NodeJS.
6+
7+
#### How to install RabbitMQ using docker
8+
Install docker run this command:
9+
```bash
10+
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
11+
```
12+
13+
After that, you will be able to access to the management on `0.0.0.0:15672` or `localhost:15672` and have access to the services on the port `5672`.
14+
15+
### Tutorial 1
16+
17+
1. Open a terminal to run the consumer:
18+
```bash
19+
# this will show: [*] Waiting for messages. To exit press CTRL+C
20+
$ node t1-basic-consumer.js
21+
```
22+
23+
2. Open other terminal to run the producer:
24+
```bash
25+
# this will show: [x] Sent 'hello world'
26+
$ node t1-basic.js 'hello world.'
27+
```
28+
29+
3. Wait 3 seconds and then you will see `[x] Received 'hello world'` on the consumers terminal.
30+
31+
### Tutorial 2
32+
1. Open the terminal to run the consumer:
33+
```bash
34+
# this will show: [*] Waiting for logs. To exit press CTRL+C.
35+
$ node t2-direct-consumer.js SEVERITY
36+
```
37+
38+
2. Open other terminal to run the producer:
39+
```bash
40+
# this will show: [x] Sent SEVERITY:'hellow world'
41+
$ node t2-direct.js SEVERITY 'hello world'
42+
```
43+
44+
### Tutorial 3
45+
1. Open the terminal to run the consumer:
46+
```bash
47+
# This command will consume all the messages with a key that match with `logs.#`: `logs.info`, `logs.error`, `logs.warning`
48+
$ node t3-topics-consumer.js 'logs.#'
49+
```
50+
51+
2. Open other terminal to run the producer:
52+
```bash
53+
$ node t3-topics.js 'logs.info' 'information message'
54+
$ node t3-topics.js 'logs.error' 'error message'
55+
$ node t3-topics.js 'logs.warning' 'warning message'
56+
```
57+
### Tutotial 4
58+
1. Open the terminal to run the consumer:
59+
```bash
60+
# this consumer will process events only that match with `logs.info`. Also
61+
# this consumer will only accept the messages equal to `hello` and reject the others.
62+
# All the messages rejected will be stored on `queue.dl`.
63+
$ node t4-deadletter-consumer.js 'logs.info'
64+
```
65+
66+
2. Open other terminal to runt the producer:
67+
```bash
68+
# this message is accepted
69+
$ node t4-deadletter.js 'logs.info' 'hello'
70+
# and this is rejected.
71+
$ node t4-deadletter.js 'logs.info' 'hellos'
72+
```

package.json

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"name": "amqp-tutorial",
3+
"version": "0.0.1",
4+
"description": "amqp tutorial using rabbitmq",
5+
"main": "index.js",
6+
"scripts": {
7+
"test": "echo \"Error: no test specified\" && exit 1"
8+
},
9+
"keywords": [
10+
"rabbitmq",
11+
"amqp"
12+
],
13+
"author": "learnercys <[email protected]>",
14+
"license": "ISC",
15+
"dependencies": {
16+
"amqplib": "^0.4.2",
17+
"when": "^3.7.7"
18+
}
19+
}

readme.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# AMQP -> RabbitMQ
2+
3+
Advanced Message Queuing Protocol
4+
5+
***
6+
7+
### Producer
8+
Manage the messages to be published so they can be processed.
9+
10+
### Consumer
11+
Receive the messages to process.
12+
13+
***
14+
15+
### Exchange
16+
Entity that receives the messages from the produces and enrute to its linked queues.
17+
18+
##### Exchanges Types
19+
20+
* Direct: Send messages to the linked queues based on the _routing key(Unicast)._
21+
* Fanout: Send messages to all the queues linked to the exchange.
22+
* Topic: Send messages to the linked queues based on the _routing key(Multicast)._
23+
* Headers: Designed to enrute multiples attributes(headers).
24+
25+
***
26+
27+
### Queue
28+
Store the messages consumed by the application.
29+
30+
### Binding
31+
Rules used for the exchange to send messages to its linked queues.
32+
33+
***
34+
35+
### How Run the Examples
36+
See the document [how-to-run.md](how-to-run.md)
37+
38+
### Reference:
39+
1. https://www.rabbitmq.com/tutorials/amqp-concepts.html
40+
2. http://www.squaremobius.net/amqp.node/channel_api.html
41+
3. https://github.com/squaremo/amqp.node
42+
4. https://www.rabbitmq.com/getstarted.html
43+
5. https://www.cloudamqp.com/blog/2015-05-18-part1-rabbitmq-for-beginners-what-is-rabbitmq.html

t1-basic-consumer.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
var amqp = require('amqplib');
2+
3+
amqp.connect('amqp://localhost').then(function (conn) {
4+
return conn.createChannel().then(function (ch) {
5+
// queue to consume
6+
var q = 'queuet1';
7+
8+
// we need to make sure that the queue exists
9+
var ok = ch.assertQueue(q, { durable: false});
10+
11+
ok = ok.then(function () {
12+
// initialize the consume event
13+
return ch.consume(q, function (msg) {
14+
setTimeout(function () {
15+
console.log(" [x] Received '%s'", msg.content.toString());
16+
}, 3000);
17+
}, { noAck: true });
18+
});
19+
20+
return ok.then(function (_consumeOk) {
21+
console.log(' [*] Waiting for messages. To exit press CTRL+C');
22+
});
23+
});
24+
}).then(null, console.warn);

t1-basic.js

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// This is a basic example of how we should publish messages
2+
// into rabbitmq, here we will not use directly an exchange but
3+
// rabbitmq always use an exchange to process data, so the default
4+
// exchange will be `direct` -> `amqp.direct`.=
5+
6+
// RabbitMQ client
7+
var amqp = require('amqplib');
8+
var when = require('when');
9+
10+
var args = process.argv.slice(2);
11+
12+
// and this will be the message to send.
13+
var msg = args[0] || 'hello world';
14+
15+
amqp.connect('amqp://localhost').then(function(conn) {
16+
// here we use `when` to ensure that the connection is closed
17+
// after the `sendToQueue` event
18+
return when(conn.createChannel().then(function(ch) {
19+
// this will be the name of our new queue.
20+
var q = 'queuet1';
21+
22+
// we need to be sure that the queue `queuet1` exists
23+
// if not, the `assertQueue` method create it.
24+
var ok = ch.assertQueue(q, {durable: false});
25+
26+
return ok.then(function(_qok) {
27+
// after create | validate the existence of the queue
28+
// we are ready to publish messages to it.
29+
ch.sendToQueue(q, new Buffer(msg));
30+
console.log(" [x] Sent '%s'", msg);
31+
32+
// we shouldn't need the channel anymore.
33+
return ch.close();
34+
});
35+
})).ensure(function () { conn.close(); });
36+
}).then(null, console.warn);
37+

t2-direct-consumer.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
2+
var amqp = require('amqplib');
3+
4+
var args = process.argv.slice(2);
5+
var severity = args[0] || 'info';
6+
7+
amqp.connect('amqp://localhost').then(function(conn) {
8+
process.once('SIGINT', function() { conn.close(); });
9+
10+
return conn.createChannel().then(function(ch) {
11+
var ex = 'direct_logs';
12+
13+
var ok = ch.assertExchange(ex, 'direct', {durable: false});
14+
15+
ok = ok.then(function() {
16+
return ch.assertQueue('', {exclusive: true});
17+
});
18+
19+
ok = ok.then(function(qok) {
20+
var queue = qok.queue;
21+
22+
return ch.bindQueue(queue, ex, severity).then(function() { return queue; });
23+
});
24+
25+
ok = ok.then(function(queue) {
26+
return ch.consume(queue, logMessage, {noAck: true});
27+
});
28+
29+
return ok.then(function() {
30+
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
31+
});
32+
33+
function logMessage(msg) {
34+
console.log(" [x] %s:'%s'",
35+
msg.fields.routingKey,
36+
msg.content.toString());
37+
}
38+
});
39+
}).then(null, console.warn);

t2-direct.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
2+
var amqp = require('amqplib');
3+
var when = require('when');
4+
5+
var args = process.argv.slice(2);
6+
7+
// severity is the way that the consumer filter an event
8+
var severity = args[0] || 'info';
9+
var message = args[1] || 'Hello World!';
10+
11+
amqp.connect('amqp://localhost').then(function(conn) {
12+
return when(conn.createChannel().then(function(ch) {
13+
// the exchange name used to receive all the messages
14+
var ex = 'direct_logs';
15+
16+
var ok = ch.assertExchange(ex, 'direct', {durable: false});
17+
18+
return ok.then(function() {
19+
ch.publish(ex, severity, new Buffer(message));
20+
console.log(" [x] Sent %s:'%s'", severity, message);
21+
return ch.close();
22+
});
23+
})).ensure(function() { conn.close(); });
24+
}).then(null, console.warn);

t3-topics-consumer.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
2+
var amqp = require('amqplib');
3+
4+
var key = process.argv[2] || 'log.info';
5+
6+
amqp.connect('amqp://localhost').then(function (conn) {
7+
process.once('SIGINT', function() { conn.close(); });
8+
9+
return conn.createChannel().then(function(ch) {
10+
var ex = 'topic_logs';
11+
var ok = ch.assertExchange(ex, 'topic', {durable: false});
12+
13+
ok = ok.then(function() {
14+
return ch.assertQueue('', {exclusive: true});
15+
});
16+
17+
ok = ok.then(function(qok) {
18+
var queue = qok.queue;
19+
return ch.bindQueue(queue, ex, key).then(function() { return queue; });
20+
});
21+
22+
ok = ok.then(function(queue) {
23+
return ch.consume(queue, logMessage, {noAck: true});
24+
});
25+
26+
return ok.then(function() {
27+
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
28+
});
29+
30+
function logMessage(msg) {
31+
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
32+
}
33+
});
34+
}).then(null, console.warn);

t3-topics.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
2+
var amqp = require('amqplib');
3+
var when = require('when');
4+
5+
var args = process.argv.slice(2);
6+
var key = args[0] || 'log.info';
7+
var message = args[1] || 'Hello World!';
8+
9+
amqp.connect('amqp://localhost').then(function(conn) {
10+
return when(conn.createChannel().then(function(ch) {
11+
var ex = 'topic_logs';
12+
var ok = ch.assertExchange(ex, 'topic', {durable: false});
13+
14+
ok = ok.then(function() {
15+
return ch.publish(ex, key, new Buffer(message));
16+
});
17+
18+
return ok.then(function() {
19+
console.log(" [x] Sent %s:'%s'", key, message);
20+
return ch.close();
21+
});
22+
})).ensure(function() { conn.close(); })
23+
}).then(null, console.log);

t4-deadletter-consumer.js

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
2+
var amqp = require('amqplib');
3+
var basename = require('path').basename;
4+
5+
var key = process.argv[2] || 'log.info';
6+
7+
amqp.connect('amqp://localhost').then(function (conn) {
8+
process.once('SIGINT', function() { conn.close(); });
9+
10+
return conn.createChannel().then(function (ch) {
11+
var ex = 'topic_exchange';
12+
var queue = 'queue';
13+
var exdl = 'topic_exchange.dl';
14+
var queuedl = 'queue.dl';
15+
16+
// dead letter exchange configs
17+
var ok = ch.assertExchange(exdl, 'topic', {durable: true});
18+
19+
ok = ok.then(function() {
20+
return ch.assertQueue(queuedl, {durable: true});
21+
});
22+
23+
ok = ok.then(function() {
24+
return ch.bindQueue(queuedl, exdl, key);
25+
});
26+
27+
// exchange configs
28+
ok = ok.then(function() {
29+
return ch.assertExchange(ex, 'topic', {durable: true});
30+
});
31+
32+
ok = ok.then(function () {
33+
return ch.assertQueue(queue, {
34+
durable: true,
35+
arguments: {
36+
'x-dead-letter-exchange': exdl
37+
}
38+
});
39+
});
40+
41+
ok = ok.then(function () {
42+
return ch.bindQueue(queue, ex, key).then(function() { return queue; });
43+
});
44+
45+
ok = ok.then(function(queue) {
46+
return ch.consume(queue, logMessage, {noAck: false});
47+
});
48+
49+
return ok.then(function() {
50+
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
51+
});
52+
53+
function logMessage(msg) {
54+
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
55+
56+
if(msg.content.toString() !== 'hello') {
57+
ch.reject(msg, false);
58+
} else {
59+
ch.ack(msg);
60+
}
61+
}
62+
});
63+
}).then(null, console.warn);

0 commit comments

Comments
 (0)