Skip to content
Tony Woode edited this page Apr 8, 2015 · 1 revision

RabbitMQ AMQP Connection Setup -- how to be an RPC client

Cheatsheet

An AMQP Message has:

  1. An Exchange Name
  2. A Queue Name
  3. Metadata Headers - ReplyTo and CorrelationID
  4. A body - a String

RabbitMQ will require you to set up these

  • A connection to the message switch
  • A binding for that connection to an exchange
  • A declaration for a return message queue (make this "exclusive", as only your program will be using it) -- if you don't give it a name, it will get a globally unique one, which you can ask it for

Our Setup

  • You Don't need to send a message to a particular queue -- that's handled for you automatically
  • You send to the DEFAULT exchange
  • The Routing Key is the name of the Method you want to call
  • The body is a JSON serialisation of an array of parameter names and value
  • You also need to get these into the call:
    1. A 'CorrelationID' such as UUID for correlating requests and replies, in case these occur out of order, and also to allow stale responses to be discarded
    2. A ‘ReplyTo’ queue name and a 'reply to' routing tag (i.e.: you need your own queue and the name of it needs to be here so the brain knows where to reply back to - to whit: You need to make a queue, get the name of your queue, and put it into the replyTo header of the AMQP message) e.g.
routing tag = "test.echo"
body = "42"
replyTo = "The_name_of_the_receive_queue_you_just_setup"
correlationId = "some_globally_unique_string_such_as_a_UUID"
```

and then you get back something that will look like:

body = "42" correlationId =


#### Calling conventions

* Communication method needs to be REQ/REP -- http://stackoverflow.com/questions/10524613/how-to-create-rep-req-on-rabbit-js

For both arguments and results:

* If the API does not define an object to pass, a serialization of an JSON "null" object should be passed.

* If the API defines only a single object to pass, the serialization of that single object should be passed

* If the API defines multiple arguments or results to pass, these should be put into an associative array, with the names of the fields in the array being the names of the fields in the API definition, and that should be serialized. 

* A special case:
If there is an "out of context" error, such as the RPC server throwing an exception, instead of a valid JSON object, a string starting with the characters "Error: ", and the rest of the body should be passed back. This should be viewed as an undefined exception from a software viewpoint -- but will generally be debugging messages such as an exception message or stack dump. 

* For a single-threaded synchronous application, one reply queue is all you need. For a multi-threaded synchronous application, you will need one reply queue for each thread. For an event-driven asynchronous application, one reply queue suffices, and you will need some sort of dispatcher data structure to link up the received correlationIds with the code that gets called to handle those return events.

## Making contact

There are a number of test methods to try and set off. They are called:
* test.no_op -- argument null, returns null
* test.echo -- argument any JSON object, returns the same object
* test.hello -- argument none, returns "hello"
* test.factorial -- argument an integer, returns its factorial

## Code

#### Source
_The setup is identical to THIS EXAMPLE on the RabbitMQ tutorials_

http://www.rabbitmq.com/tutorials/tutorial-six-python.html

_The Java equivalent can be found here:_

http://www.rabbitmq.com/tutorials/tutorial-six-java.html

### Keith's **Working** Example Javascript Code
**amqpclient.js**
````javascript
//exmaple on how to use amqprpc
var amqp = require('amqp');
var connection = amqp.createConnection({host:'192.168.1.27'});

var rpc = new (require('./amqprpc'))(connection);

connection.on("ready", function(){
  console.log("ready");
  var outstanding=0; //counter of outstanding requests

  //do a number of requests
  for(var i=1; i<=10 ;i+=1){
    //we are about to make a request, increase counter
    outstanding += 1;
    rpc.makeRequest('test.echo', {x: i, foo:'bar', index:outstanding}, function response(err, response){
      if(err)
        console.error(err);
      else
        console.log("response", response.data.toString());
      //reduce for each timeout or response
      outstanding-=1;
      isAllDone();
    });
  }

  function isAllDone() {
    //if no more outstanding then close connection
    if(outstanding === 0){
      connection.end();
    }
  }

});

amqprpc.js

var amqp = require('amqp')
  , crypto = require('crypto')

var TIMEOUT=2000; //time to wait for response in ms
var CONTENT_TYPE='application/json';

exports = module.exports = AmqpRpc;

function AmqpRpc(connection){
  var self = this;
  this.connection = typeof(connection) != 'undefined' ? connection : amqp.createConnection();
  this.requests = {}; //hash to store request in wait for response
  this.response_queue = false; //plaseholder for the future queue
}

AmqpRpc.prototype.makeRequest = function(queue_name, content, callback){
  var self = this;
  //generate a unique correlation id for this call
  var correlationId = crypto.randomBytes(16).toString('hex');

  //create a timeout for what should happen if we don't get a response
  var tId = setTimeout(function(corr_id){
    //if this ever gets called we didn't get a response in a 
    //timely fashion
    callback(new Error("timeout " + corr_id));
    //delete the entry from hash
    delete self.requests[corr_id];
  }, TIMEOUT, correlationId);

  //create a request entry to store in a hash
  var entry = {
    callback:callback,
    timeout: tId //the id for the timeout so we can clear it
  };

  //put the entry in the hash so we can match the response later
  self.requests[correlationId]=entry;

  //make sure we have a response queue
  self.setupResponseQueue(function(){
    //put the request on a queue
    self.connection.publish(queue_name, content, {
      correlationId:correlationId,
      contentType:CONTENT_TYPE,
      replyTo:self.response_queue});
  });
}


AmqpRpc.prototype.setupResponseQueue = function(next){
  //don't mess around if we have a queue
  if(this.response_queue) return next();

  var self = this;
  //create the queue
  self.connection.queue('', {exclusive:true}, function(q){  
    //store the name
    self.response_queue = q.name;
    //subscribe to messages
    q.subscribe(function(message, headers, deliveryInfo, m){
      //get the correlationId
      var correlationId = m.correlationId;
      //is it a response to a pending request
      if(correlationId in self.requests){
        //retreive the request entry
        var entry = self.requests[correlationId];
        //make sure we don't timeout by clearing it
        clearTimeout(entry.timeout);
        //delete the entry from hash
        delete self.requests[correlationId];
        //callback, no err
        entry.callback(null, message);
      }
    });
    return next();    
  });
}
Clone this wiki locally