Skip to content

Commit 74d7265

Browse files
authored
bugfix: Default exchange consume failed, when broker restart. (#597)
1 parent cc78180 commit 74d7265

File tree

3 files changed

+20
-1
lines changed

3 files changed

+20
-1
lines changed

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AbstractAmqpExchange.java

+11
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,17 @@ public CompletableFuture<Void> addQueue(AmqpQueue queue) {
5050
return CompletableFuture.completedFuture(null);
5151
}
5252

53+
@Override
54+
public AmqpQueue getQueue(String queueName) {
55+
AmqpQueue queue = null;
56+
for (AmqpQueue q : queues) {
57+
if (q.getName().equals(queueName)) {
58+
queue = q;
59+
}
60+
}
61+
return queue;
62+
}
63+
5364
@Override
5465
public void removeQueue(AmqpQueue queue) {
5566
queues.remove(queue);

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,8 @@ public void receiveBasicPublish(AMQShortString exchange, AMQShortString routingK
502502
return;
503503
} else {
504504
// bind to default exchange.
505-
if (amqpQueue.getRouter(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE) == null) {
505+
if (amqpQueue.getRouter(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE) == null
506+
|| amqpExchange.getQueue(queueName) == null) {
506507
amqpQueue.bindExchange(amqpExchange,
507508
AbstractAmqpMessageRouter.generateRouter(AmqpExchange.Type.Direct),
508509
routingKey.toString(), null);

amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchange.java

+7
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,13 @@ public static Type value(String type) {
138138
*/
139139
CompletableFuture<Void> addQueue(AmqpQueue queue);
140140

141+
/**
142+
* Get a queue {@link AmqpQueue} with queue name.
143+
* @param queueName AMQP queue name.
144+
* @return AMQP queue.
145+
*/
146+
AmqpQueue getQueue(String queueName);
147+
141148
/**
142149
* Remove a queue {@link AmqpQueue} from the exchange.
143150
* @param queue AMQP queue.

0 commit comments

Comments
 (0)