From bd4802aebcd6aea22a14316d2664706b00478b3f Mon Sep 17 00:00:00 2001 From: autumnqfeng Date: Wed, 6 Jul 2022 11:28:40 +0800 Subject: [PATCH] bugfix: Default echange consume failed, when broker restart. --- .../pulsar/handlers/amqp/AbstractAmqpExchange.java | 11 +++++++++++ .../pulsar/handlers/amqp/AmqpChannel.java | 3 ++- .../pulsar/handlers/amqp/AmqpExchange.java | 7 +++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AbstractAmqpExchange.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AbstractAmqpExchange.java index 41a2ed96..da5c70bc 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AbstractAmqpExchange.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AbstractAmqpExchange.java @@ -42,6 +42,17 @@ public void addQueue(AmqpQueue queue) { queues.add(queue); } + @Override + public AmqpQueue getQueue(String queueName) { + AmqpQueue queue = null; + for (AmqpQueue q : queues) { + if (q.getName().equals(queueName)) { + queue = q; + } + } + return queue; + } + @Override public void removeQueue(AmqpQueue queue) { queues.remove(queue); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java index 2d506c1c..fedb6b1d 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java @@ -334,7 +334,8 @@ public void receiveBasicPublish(AMQShortString exchange, AMQShortString routingK return; } else { // bind to default exchange. - if (amqpQueue.getRouter(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE) == null) { + if (amqpQueue.getRouter(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE) == null + || amqpExchange.getQueue(queueName) == null) { amqpQueue.bindExchange(amqpExchange, AbstractAmqpMessageRouter.generateRouter(AmqpExchange.Type.Direct), routingKey.toString(), null); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchange.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchange.java index 42c0c344..c618b3ce 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchange.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchange.java @@ -133,6 +133,13 @@ public static Type value(String type) { */ void addQueue(AmqpQueue queue); + /** + * Get a queue {@link AmqpQueue} with queue name. + * @param queueName AMQP queue name. + * @return AMQP queue. + */ + AmqpQueue getQueue(String queueName); + /** * Remove a queue {@link AmqpQueue} from the exchange. * @param queue AMQP queue.