-
Notifications
You must be signed in to change notification settings - Fork 26
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Derick Bailey
committed
Aug 30, 2016
1 parent
eb70f39
commit 2d20de1
Showing
7 changed files
with
647 additions
and
540 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
# Error Handling | ||
|
||
Rabbus uses a [middleware structure](middleware.md) to both produce and consume | ||
messages, similar to that of Express.js. As such, errors are generally | ||
pushed through the `next(err)` call, and handled through an error handling | ||
middleware function. | ||
|
||
```js | ||
myProducer.use(function(err, message, properties, actions, next){ | ||
|
||
// handle the error, here | ||
console.log(err.stack); | ||
|
||
}); | ||
``` | ||
|
||
## Non-Middleware Errors | ||
|
||
Each of the objects in Rabbus will also emit an "error" | ||
message when an error occurs outside of the middleware stack. | ||
You can use standard NodeJS EventEmitter functions to | ||
subscribe / unsubscribe the error events. | ||
|
||
```js | ||
var sub = new Subscriber(...); | ||
sub.on("error", function(err){ | ||
// do something with the err object, here | ||
}); | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
# Extending Rabbus w/ Middleware | ||
|
||
Rabbus message Producers and Consumers use a middleware system that allows you | ||
to extend the capabilities of the bus. To use it, call the `.use` method of any | ||
given message Consumer object (Receiver, Responder, Subscriber) or Producer | ||
(Sender, Requester, Publisher). | ||
|
||
The `use` method takes a callback function with a signature that varies depending | ||
on whether you're using a producer or consumer. | ||
|
||
## Consumer Middleware | ||
|
||
The `use` method on consumers takes a callback with this signature: | ||
|
||
```js | ||
consumer.use(function(message, properties, actions, next){ | ||
|
||
}); | ||
``` | ||
|
||
The parameters are as follows: | ||
|
||
* **message**: the message body | ||
* **properties**: the properties of the message, including headers, etc. | ||
* **actions**: an object containing various methods for interaction with the RabbitMQ message, and to continue the middleware chain | ||
* **ack()**: the message is completely processed. acknowledge to the server. prevents any additional middleware from running | ||
* **nack()**: the message cannot be processed, and should be re-queued for later. prevents any additional middleware from running | ||
* **reject()**: the message cannot be processed and should not be re-queued. be sure you have a dead-letter queue before using this. prevents any additional middleware from running | ||
* **reply(msg)**: send a reply back to the requester, during a request/response scenario. prevents any additional middleware from running | ||
* **next()**: this middleware is done, and the next one can be called; call `next(err)` to forward an error to error handling middleware functions | ||
|
||
### Consumer Middleware Examples | ||
|
||
As an example, you could log every message that gets sent through your consumer: | ||
|
||
```js | ||
var mySubscriber = new MySubscriber(); | ||
|
||
mySubscriber.use(function(message, properties, actions, next){ | ||
|
||
console.log("Got a message. Doing stuff with middleware."); | ||
console.log(message); | ||
|
||
// allow the middleware chain to continue | ||
next(); | ||
}); | ||
``` | ||
|
||
In another scenario, you may want the middleware to `nack` the message because | ||
some condition is not yet met. | ||
|
||
```js | ||
var rec = new SomeReceiver(); | ||
|
||
rec.use(function(message, properties, actions, next){ | ||
|
||
// check some conditions | ||
if (message.someData && someOtherSystem.stuffNotReady()){ | ||
|
||
// conditions not met. nack the message and try again later | ||
actions.nack(); | ||
|
||
} else { | ||
|
||
// everything is good to go, allow the next middleware to run | ||
next(); | ||
|
||
} | ||
}); | ||
``` | ||
|
||
**WARNING:** If you forget to call `next()` or one of the other actions, | ||
your message will be stuck in limbo, unacknowledged. | ||
|
||
## Producer Middleware | ||
|
||
The `use` method on consumers takes a callback with this signature: | ||
|
||
```js | ||
producer.use(function(message, headers, next){ | ||
|
||
}); | ||
``` | ||
|
||
The parameters are as follows: | ||
|
||
* **message**: the message body, which you can transform as needed | ||
* **headers**: the headers of the message, which can be altered in any way you need | ||
* **next()**: this middleware is done, and the next one can be called; call `next(err)` to forward an error to the error handlers | ||
|
||
### Producer Middleware Examples | ||
|
||
You can easily add / change headers or the actual message content in your | ||
producer middleware. Any change you make to the `message` or `headers` objects | ||
will make their way to the next middleware, and ultimately to RabbitMQ as part | ||
of the message. | ||
|
||
```js | ||
var myPub = new MyPublisher(); | ||
|
||
myPub.use(function(message, headers, actions, next){ | ||
|
||
var hasFoo = !!(message.foo); | ||
|
||
if (hasFoo){ | ||
// add data to the message body | ||
message.bar = "foo is there"; | ||
message.baz = true; | ||
} | ||
|
||
// add a header to the message properties | ||
headers.hasFoo = hasFoo; | ||
|
||
// allow the middleware chain to continue | ||
next(); | ||
}); | ||
``` | ||
|
||
**WARNING:** If you forget to call `next()` in your middleware, | ||
the message will never be published. While this is generally dangerous, it can | ||
be used to stop messages that should not be sent. | ||
|
||
## Order Of Middleware Processing | ||
|
||
Whether you are using a Producer or Consumer, middleware is processed in the | ||
order in which it was added: first in, first out. | ||
|
||
For example, if you have a consumer that handles a message and then adds | ||
some middleware, you will have the middleware processed first. | ||
|
||
```js | ||
responder.handle("message.type", function(msg, properties, actions, next){ | ||
console.log("handler fires last"); | ||
actions.ack(); | ||
}); | ||
|
||
responder.use(function(msg, props, actions, next){ | ||
console.log("first middleware"); | ||
next(); | ||
}); | ||
|
||
responder.use(function(msg, props, actions, next){ | ||
console.log("second middleware"); | ||
next(); | ||
}); | ||
|
||
responder.use(function(msg, props, actions, next){ | ||
console.log("third middleware"); | ||
next(); | ||
}); | ||
``` | ||
|
||
When this subscriber receives a message to handle, you will see the following: | ||
|
||
``` | ||
first middleware | ||
second middleware | ||
third middleware | ||
handler fires last | ||
``` | ||
|
||
It is recommended you add the middleware before adding the `handle` | ||
call. Adding middleware after calling `handle` could allow messages to be | ||
handled before the middleware is in place. | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
# Publish / Subscribe | ||
|
||
**Note**: Currently, a `messageType` is required for Pub/Sub. | ||
|
||
The Publish / Subscribe object pair uses a fanout exchange inside of RabbitMQ, | ||
allowing you to have as many subscribers as you need. Think of pub/sub as an | ||
event that gets broadcast to anyone that cares, or no one at all if no one is | ||
listening. | ||
|
||
## Set Up A Publisher | ||
|
||
```js | ||
// define a publisher | ||
// ------------------ | ||
|
||
var util = require("util"); | ||
var Rabbus = require("rabbus"); | ||
var rabbot = require("rabbot"); | ||
|
||
function SomePublisher(){ | ||
Rabbus.Publisher.call(this, rabbot, { | ||
exchange: "pub-sub.exchange", | ||
routingKey: "pub-sub.key" | ||
}); | ||
} | ||
|
||
util.inherits(SomePublisher, Rabbus.Publisher); | ||
|
||
// publish a message | ||
// ----------------- | ||
|
||
var publisher = new SomePublisher(); | ||
|
||
var message = { | ||
place: "world" | ||
}; | ||
|
||
publisher.publish(message, function(){ | ||
console.log("published a message"); | ||
}); | ||
``` | ||
|
||
### Publisher Options | ||
|
||
The following options are available when configuring a publisher: | ||
|
||
* **exchange** (string): name of the exchange to create and publish to | ||
* **exchange** (object): object literal with options for the exchange | ||
* **name** (string): name of the exchange to create and publish to | ||
* **type** (string): type of exchange to use. default is `fanout`. | ||
* **autoDelete** (boolean): delete this exchange when there are no more connections using it. default is `false`. | ||
* **durable** (boolean): this exchange will survive a shut down / restart of RabbitMQ. default is `true`. | ||
* **persistent** (boolean): messages published through this exchange will be saved to disk / survive restart of RabbitMQ. default is `true`. | ||
* **messageType** (string): *optional* the type of message being published. ([See below.](#the-messagetype-attribute)) | ||
* **routingKey** (string): the routing key to use for the published message | ||
|
||
## Set Up A Subscriber | ||
|
||
```js | ||
// define a subscriber | ||
// ------------------- | ||
|
||
var util = require("util"); | ||
var Rabbus = require("rabbus"); | ||
var rabbot = require("rabbot"); | ||
|
||
function SomeSubscriber(){ | ||
Rabbus.Subscriber.call(this, rabbot, { | ||
exchange: "pub-sub.exchange", | ||
queue: "pub-sub.queue", | ||
routingKey: "pub-sub.key" | ||
}); | ||
} | ||
|
||
util.inherits(SomeSubscriber, Rabbus.Subscriber); | ||
|
||
// subscribe to a message | ||
// ---------------------- | ||
|
||
var sub1 = new SomeSubscriber(); | ||
sub1.subscribe(function(message, properties, actions, next){ | ||
console.log("1: hello", message.place); | ||
actions.ack(); | ||
}); | ||
|
||
var sub2 = new SomeSubscriber(); | ||
sub2.subscribe(function(message, properties, actions, next){ | ||
console.log("2: hello", message.place); | ||
actions.ack(); | ||
}); | ||
|
||
var sub3 = new SomeSubscriber(); | ||
sub3.subscribe(function(message, properties, actions, next){ | ||
console.log("3: hello", message.place); | ||
actions.ack(); | ||
}); | ||
``` | ||
|
||
### Subscriber Options | ||
|
||
See Publisher options for Exchange definition. The exchange | ||
and queue that you specify in these options will be used to | ||
create the binding between the exchange and queue. | ||
|
||
* **exchange**: (see Publisher for options) | ||
* **queue** (string): name of the queue to create and subscribe to | ||
* **queue** (object): object literal with options for the queue | ||
* **name** (string): name of the queue to create and subscriber to | ||
* **autoDelete** (boolean): delete this queue when there are no more connections using it. default is `false`. | ||
* **durable** (boolean): this queue will survive a shut down / restart of RabbitMQ. default is `true`. | ||
* **messageType** (string): *optional* the type of message to handle for this subscriber instance. ([See below.](#the-messagetype-attribute)) | ||
* **routingKey** (string): the routing key to use for binding the exchange and queue | ||
* **routingKey** ([string]): an array of string for the routing key to use for binding the exchange and queue |
Oops, something went wrong.