diff --git a/Common/config/default.json b/Common/config/default.json index 8728c5df..3af81f80 100644 --- a/Common/config/default.json +++ b/Common/config/default.json @@ -114,42 +114,63 @@ "rabbitmq": { "url": "amqp://guest:guest@localhost:5672", "socketOptions": {}, - "exchangepubsub": "ds.pubsub", - "queuepubsubOptions": { - "autoDelete": true, - "exclusive": true, - "arguments": { - "x-queue-type": "classic" + "exchangepubsub": { + "name": "ds.pubsub", + "options": { + "durable": true } }, - "queueconverttask": "ds.converttask6", - "queueconverttaskOptions": { - "durable": true, - "maxPriority": 6, - "arguments": { - "x-queue-type": "classic" + "queuepubsub": { + "name": "", + "options": { + "autoDelete": true, + "exclusive": true, + "arguments": { + "x-queue-type": "classic" + } } }, - "queueconvertresponse": "ds.convertresponse", - "queueconvertresponseOptions": { - "durable": true, - "arguments": { - "x-queue-type": "classic" + "queueconverttask": { + "name": "ds.converttask6", + "options": { + "durable": true, + "maxPriority": 6, + "arguments": { + "x-queue-type": "classic" + } } }, - "exchangeconvertdead": "ds.exchangeconvertdead", - "queueconvertdead": "ds.convertdead", - "queueconvertdeadOptions": { - "durable": true, - "arguments": { - "x-queue-type": "classic" + "queueconvertresponse": { + "name": "ds.convertresponse", + "options": { + "durable": true, + "arguments": { + "x-queue-type": "classic" + } } }, - "queuedelayed": "ds.delayed", - "queuedelayedOptions": { - "durable": true, - "arguments": { - "x-queue-type": "classic" + "exchangeconvertdead": { + "name": "ds.exchangeconvertdead", + "options": { + "durable": true + } + }, + "queueconvertdead": { + "name": "ds.convertdead", + "options": { + "durable": true, + "arguments": { + "x-queue-type": "classic" + } + } + }, + "queuedelayed": { + "name": "ds.delayed", + "options": { + "durable": true, + "arguments": { + "x-queue-type": "classic" + } } } }, diff --git a/Common/sources/taskqueueRabbitMQ.js b/Common/sources/taskqueueRabbitMQ.js index 45d6db13..23808df9 100644 --- a/Common/sources/taskqueueRabbitMQ.js +++ b/Common/sources/taskqueueRabbitMQ.js @@ -39,7 +39,6 @@ var utils = require('./utils'); var constants = require('./constants'); var rabbitMQCore = require('./rabbitMQCore'); var activeMQCore = require('./activeMQCore'); -const logger = require('./logger'); const commonDefines = require('./commondefines'); const operationContext = require('./operationContext'); @@ -48,20 +47,15 @@ const cfgQueueType = config.get('queue.type'); var cfgVisibilityTimeout = config.get('queue.visibilityTimeout'); var cfgQueueRetentionPeriod = config.get('queue.retentionPeriod'); var cfgRabbitQueueConvertTask = config.get('rabbitmq.queueconverttask'); -var cfgRabbitQueueConvertTaskOptions = config.get('rabbitmq.queueconverttaskOptions'); var cfgRabbitQueueConvertResponse = config.get('rabbitmq.queueconvertresponse'); -var cfgRabbitQueueConvertResponseOptions = config.get('rabbitmq.queueconvertresponseOptions'); var cfgRabbitExchangeConvertDead = config.get('rabbitmq.exchangeconvertdead'); var cfgRabbitQueueConvertDead = config.get('rabbitmq.queueconvertdead'); -var cfgRabbitQueueConvertDeadOptions = config.get('rabbitmq.queueconvertdeadOptions'); var cfgRabbitQueueDelayed = config.get('rabbitmq.queuedelayed'); -var cfgRabbitQueueDelayedOptions = config.get('rabbitmq.queuedelayedOptions'); var cfgActiveQueueConvertTask = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconverttask'); var cfgActiveQueueConvertResponse = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconvertresponse'); var cfgActiveQueueConvertDead = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queueconvertdead'); var cfgActiveQueueDelayed = constants.ACTIVEMQ_QUEUE_PREFIX + config.get('activemq.queuedelayed'); -const optionsExchnangeDead = {durable: true}; function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAddResponseReceive, isEmitDead, isAddDelayed, callback) { return co(function* () { var e = null; @@ -78,20 +72,20 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd var bAssertTaskQueue = false; let optionsTaskQueueDefault = { messageTtl: cfgQueueRetentionPeriod * 1000, - deadLetterExchange: cfgRabbitExchangeConvertDead + deadLetterExchange: cfgRabbitExchangeConvertDead.name }; - let optionsTaskQueue = {...optionsTaskQueueDefault, ...cfgRabbitQueueConvertTaskOptions}; + let optionsTaskQueue = {...optionsTaskQueueDefault, ...cfgRabbitQueueConvertTask.options}; if (isAddTask) { taskqueue.channelConvertTask = yield rabbitMQCore.createConfirmChannelPromise(conn); - yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTask, cfgRabbitQueueConvertTask, + yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTask, cfgRabbitQueueConvertTask.name, optionsTaskQueue); bAssertTaskQueue = true; } var bAssertResponseQueue = false; if (isAddResponse) { taskqueue.channelConvertResponse = yield rabbitMQCore.createConfirmChannelPromise(conn); - yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponse, cfgRabbitQueueConvertResponse, - cfgRabbitQueueConvertResponseOptions); + yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponse, cfgRabbitQueueConvertResponse.name, + cfgRabbitQueueConvertResponse.options); bAssertResponseQueue = true; } var optionsReceive = {noAck: false}; @@ -99,10 +93,10 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd taskqueue.channelConvertTaskReceive = yield rabbitMQCore.createChannelPromise(conn); taskqueue.channelConvertTaskReceive.prefetch(1); if (!bAssertTaskQueue) { - yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask, + yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask.name, optionsTaskQueue); } - yield rabbitMQCore.consumePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask, + yield rabbitMQCore.consumePromise(taskqueue.channelConvertTaskReceive, cfgRabbitQueueConvertTask.name, function (message) { co(function* () { let ack = function() { @@ -120,10 +114,10 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd if (isAddResponseReceive) { taskqueue.channelConvertResponseReceive = yield rabbitMQCore.createChannelPromise(conn); if (!bAssertResponseQueue) { - yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse, - optionsResponseQueue); + yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse.name, + cfgRabbitQueueConvertResponse.options); } - yield rabbitMQCore.consumePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse, + yield rabbitMQCore.consumePromise(taskqueue.channelConvertResponseReceive, cfgRabbitQueueConvertResponse.name, function (message) { if (message) { taskqueue.emit('response', message.content.toString(), function() { @@ -134,19 +128,19 @@ function initRabbit(taskqueue, isAddTask, isAddResponse, isAddTaskReceive, isAdd } if (isAddDelayed) { let optionsDelayedQueueDefault = { - deadLetterExchange: cfgRabbitExchangeConvertDead + deadLetterExchange: cfgRabbitExchangeConvertDead.name }; - let optionsDelayedQueue = {...optionsDelayedQueueDefault, ...cfgRabbitQueueDelayedOptions}; + let optionsDelayedQueue = {...optionsDelayedQueueDefault, ...cfgRabbitQueueDelayed.options}; taskqueue.channelDelayed = yield rabbitMQCore.createConfirmChannelPromise(conn); - yield rabbitMQCore.assertQueuePromise(taskqueue.channelDelayed, cfgRabbitQueueDelayed, optionsDelayedQueue); + yield rabbitMQCore.assertQueuePromise(taskqueue.channelDelayed, cfgRabbitQueueDelayed.name, optionsDelayedQueue); } if (isEmitDead) { taskqueue.channelConvertDead = yield rabbitMQCore.createChannelPromise(conn); - yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead, 'fanout', - optionsExchnangeDead); - var queue = yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertDead, cfgRabbitQueueConvertDead, cfgRabbitQueueConvertDeadOptions); + yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead.name, 'fanout', + cfgRabbitExchangeConvertDead.options); + var queue = yield rabbitMQCore.assertQueuePromise(taskqueue.channelConvertDead, cfgRabbitQueueConvertDead.name, cfgRabbitQueueConvertDead.options); - taskqueue.channelConvertDead.bindQueue(queue, cfgRabbitExchangeConvertDead, ''); + taskqueue.channelConvertDead.bindQueue(queue, cfgRabbitExchangeConvertDead.name, ''); yield rabbitMQCore.consumePromise(taskqueue.channelConvertDead, queue, function(message) { if (null != taskqueue.channelConvertDead) { if (message) { @@ -370,7 +364,7 @@ function addTaskRabbit(taskqueue, content, priority, callback, opt_expiration, o if (undefined !== opt_headers) { options.headers = opt_headers; } - taskqueue.channelConvertTask.sendToQueue(cfgRabbitQueueConvertTask, content, options, callback); + taskqueue.channelConvertTask.sendToQueue(cfgRabbitQueueConvertTask.name, content, options, callback); } function addTaskActive(taskqueue, content, priority, callback, opt_expiration, opt_headers) { var msg = {durable: true, priority: priority, body: content, ttl: cfgQueueRetentionPeriod * 1000}; @@ -402,7 +396,7 @@ function addTaskString(taskqueue, task, priority, opt_expiration, opt_headers) { } function addResponseRabbit(taskqueue, content, callback) { var options = {persistent: true}; - taskqueue.channelConvertResponse.sendToQueue(cfgRabbitQueueConvertResponse, content, options, callback); + taskqueue.channelConvertResponse.sendToQueue(cfgRabbitQueueConvertResponse.name, content, options, callback); } function addResponseActive(taskqueue, content, callback) { var msg = {durable: true, body: content}; @@ -419,7 +413,7 @@ function closeActive(conn) { } function addDelayedRabbit(taskqueue, content, ttl, callback) { var options = {persistent: true, expiration: ttl.toString()}; - taskqueue.channelDelayed.sendToQueue(cfgRabbitQueueDelayed, content, options, callback); + taskqueue.channelDelayed.sendToQueue(cfgRabbitQueueDelayed.name, content, options, callback); } function addDelayedActive(taskqueue, content, ttl, callback) { var msg = {durable: true, body: content, ttl: ttl}; @@ -434,8 +428,8 @@ function healthCheckRabbit(taskqueue) { if (!taskqueue.channelConvertDead) { return false; } - const exchange = yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead, - 'fanout', optionsExchnangeDead); + const exchange = yield rabbitMQCore.assertExchangePromise(taskqueue.channelConvertDead, cfgRabbitExchangeConvertDead.name, + 'fanout', cfgRabbitExchangeConvertDead.options); return !!exchange; }); } diff --git a/DocService/sources/pubsubRabbitMQ.js b/DocService/sources/pubsubRabbitMQ.js index 2f9c3211..dcb33c03 100644 --- a/DocService/sources/pubsubRabbitMQ.js +++ b/DocService/sources/pubsubRabbitMQ.js @@ -42,11 +42,10 @@ var rabbitMQCore = require('./../../Common/sources/rabbitMQCore'); var activeMQCore = require('./../../Common/sources/activeMQCore'); const cfgQueueType = config.get('queue.type'); -var cfgRabbitExchangePubSub = config.get('rabbitmq.exchangepubsub'); -const cfgRabbitQueuePubsubOptions = config.get('rabbitmq.queuepubsubOptions'); +const cfgRabbitExchangePubSub = config.get('rabbitmq.exchangepubsub'); +const cfgRabbitQueuePubsub = config.get('rabbitmq.queuepubsub'); var cfgActiveTopicPubSub = constants.ACTIVEMQ_TOPIC_PREFIX + config.get('activemq.topicpubsub'); -const optionsExchange = {durable: true}; function initRabbit(pubsub, callback) { return co(function* () { var e = null; @@ -61,12 +60,12 @@ function initRabbit(pubsub, callback) { }); pubsub.connection = conn; pubsub.channelPublish = yield rabbitMQCore.createChannelPromise(conn); - pubsub.exchangePublish = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub, - 'fanout', {durable: true}); + pubsub.exchangePublish = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub.name, + 'fanout', cfgRabbitExchangePubSub.options); pubsub.channelReceive = yield rabbitMQCore.createChannelPromise(conn); - var queue = yield rabbitMQCore.assertQueuePromise(pubsub.channelReceive, '', cfgRabbitQueuePubsubOptions); - pubsub.channelReceive.bindQueue(queue, cfgRabbitExchangePubSub, ''); + var queue = yield rabbitMQCore.assertQueuePromise(pubsub.channelReceive, cfgRabbitQueuePubsub.name, cfgRabbitQueuePubsub.options); + pubsub.channelReceive.bindQueue(queue, cfgRabbitExchangePubSub.name, ''); yield rabbitMQCore.consumePromise(pubsub.channelReceive, queue, function (message) { if(null != pubsub.channelReceive){ if (message) { @@ -190,8 +189,8 @@ function healthCheckRabbit(pubsub) { if (!pubsub.channelPublish) { return false; } - const exchange = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub, - 'fanout', optionsExchange); + const exchange = yield rabbitMQCore.assertExchangePromise(pubsub.channelPublish, cfgRabbitExchangePubSub.name, + 'fanout', cfgRabbitExchangePubSub.options); return !!exchange; }); }