diff --git a/spec/01transport.coffee b/spec/01transport.coffee index 7d41823..36515a6 100644 --- a/spec/01transport.coffee +++ b/spec/01transport.coffee @@ -119,6 +119,30 @@ transportTests = (type) -> clientA.connect (err) -> done err + describe 'sending participant registration message', -> + client = null + beforeEach (done) -> + client = transport.getClient address + return client.connect done + afterEach (done) -> + return client.disconnect done + + it 'should be received by subscribed broker', (done) -> + definition = + id: '123' + role: 'role' + component: 'lib/Component' + inports: [] + outports: [] + onDiscover = (message) -> + got = message.data.payload + chai.expect(got).to.eql definition + return done() + broker.subscribeParticipantChange onDiscover, (err) -> + return done err if err + client.registerParticipant definition, (err) -> + return done err if err + describe 'outqueue without subscribers', -> it 'sending should not error', (done) -> payload = { foo: 'bar91' } diff --git a/src/amqp.coffee b/src/amqp.coffee index 2efc41c..b4185ca 100644 --- a/src/amqp.coffee +++ b/src/amqp.coffee @@ -2,6 +2,7 @@ debug = require('debug')('msgflo:amqp') async = require 'async' interfaces = require './interfaces' +uuid = require 'uuid' try amqp = require 'amqplib/callback_api' @@ -144,10 +145,12 @@ class Client extends interfaces.MessagingClient protocol: 'discovery' command: 'participant' payload: part - @channel.assertQueue 'fbp' - data = new Buffer JSON.stringify msg - @channel.sendToQueue 'fbp', data - return callback null + exchangeName = 'fbp' + @channel.assertExchange exchangeName, 'fanout', {}, (err) => + return callback err if err + data = new Buffer JSON.stringify msg + @channel.publish exchangeName, '', data + return callback null class MessageBroker extends Client constructor: (address, options) -> @@ -203,7 +206,12 @@ class MessageBroker extends Client return callback null # Participant registration - subscribeParticipantChange: (handler) -> + subscribeParticipantChange: (handler, callback) -> + defaultCallback = (err) -> + if err + console.err "Error in msgflo.amqp.subscribeParticipantChange, and no callback added", err + callback = defaultCallback if not callback + deserialize = (message) => debug 'receive on fbp', message.fields.deliveryTag data = null @@ -216,8 +224,17 @@ class MessageBroker extends Client data: data return handler out - @channel.assertQueue 'fbp' - @channel.consume 'fbp', deserialize + exchangeName = 'fbp' + @channel.assertExchange exchangeName, 'fanout', {}, (err) => + return callback err if err + subscribeQueue = '.fbp-subscribe-' + uuid.v4() + @channel.assertQueue subscribeQueue, { persistent: false }, (err) => + return callback err if err + @channel.bindQueue subscribeQueue, exchangeName, '', {}, (err) => + return callback err if err + @channel.consume subscribeQueue, deserialize + debug 'subscribed to', subscribeQueue, exchangeName + return callback null exports.Client = Client exports.MessageBroker = MessageBroker diff --git a/src/direct.coffee b/src/direct.coffee index 6aa85c0..b4e2c6b 100644 --- a/src/direct.coffee +++ b/src/direct.coffee @@ -121,9 +121,16 @@ class MessageBroker extends interfaces.MessageBroker nackMessage: (message) -> return - subscribeParticipantChange: (handler) -> + subscribeParticipantChange: (handler, callback) -> + defaultCallback = (err) -> + if err + console.err "Error in msgflo.direct.subscribeParticipantChange, and no callback added", err + callback = defaultCallback if not callback + @createQueue '', 'fbp', (err) => - @subscribeToQueue 'fbp', handler, () -> + return callback err if err + @subscribeToQueue 'fbp', handler, (err) -> + return callback err exports.MessageBroker = MessageBroker exports.Client = Client diff --git a/src/interfaces.coffee b/src/interfaces.coffee index 4a8f68d..afd5c8b 100644 --- a/src/interfaces.coffee +++ b/src/interfaces.coffee @@ -73,7 +73,7 @@ class MessageBroker extends MessagingSystem throw new Error 'Not Implemented' # Participant registration - subscribeParticipantChange: (handler) -> + subscribeParticipantChange: (handler, callback) -> throw new Error 'Not Implemented' exports.MessageBroker = MessageBroker diff --git a/src/mqtt.coffee b/src/mqtt.coffee index a6f3a8d..380c72a 100644 --- a/src/mqtt.coffee +++ b/src/mqtt.coffee @@ -125,9 +125,16 @@ class MessageBroker extends Client routing.binderMixin this # Participant registration - subscribeParticipantChange: (handler) -> + subscribeParticipantChange: (handler, callback) -> + defaultCallback = (err) -> + if err + console.err "Error in msgflo.mqtt.subscribeParticipantChange, and no callback added", err + callback = defaultCallback if not callback + @createQueue '', 'fbp', (err) => - @subscribeToQueue 'fbp', handler, () -> + return callback err if err + @subscribeToQueue 'fbp', handler, (err) -> + return callback err exports.Client = Client exports.MessageBroker = MessageBroker