Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement : Delay-Queue, Message-TTL, Queue-TTL, Dead-Letter-Queue, Multi-Tenant in AoP(Implementation based on PR#748) #851

Closed
wants to merge 44 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
a36680f
new proxy init
gaoran10 Sep 23, 2022
355bb66
[fea] New proxy to use multi bundles
gaoran10 Nov 14, 2022
e5bb613
fix
gaoran10 Nov 14, 2022
ede4f6e
fix check style
gaoran10 Nov 14, 2022
8f65669
fix code analysis
gaoran10 Nov 14, 2022
21580b5
fix check style
gaoran10 Nov 14, 2022
02bfa5b
fix check style
gaoran10 Nov 16, 2022
d1b146b
fix
gaoran10 Nov 16, 2022
368ddce
fix
gaoran10 Nov 18, 2022
adf995f
new proxy based on consumer
gaoran10 Dec 3, 2022
6f9a022
fix
gaoran10 Dec 7, 2022
ea99126
fix
gaoran10 Dec 12, 2022
f70219d
support using multi bundle
gaoran10 Dec 13, 2022
2cf9e91
improve consume and message ack
gaoran10 Dec 20, 2022
0a0b79b
fix
gaoran10 Dec 20, 2022
514896c
fix
gaoran10 Dec 20, 2022
d7f8e74
fix
gaoran10 Dec 20, 2022
f8528de
remove useless code
gaoran10 Dec 20, 2022
46c1d3d
add new config for amqp pulsar consumer queue size
gaoran10 Dec 21, 2022
3abc402
fix
gaoran10 Dec 27, 2022
1026bf4
fix
gaoran10 Dec 27, 2022
709b539
address comments
gaoran10 Feb 28, 2023
dc07c27
fix springboot Health check.
Mar 8, 2023
3d1649f
1.Support default exchange.
Mar 8, 2023
479126d
fix
gaoran10 Mar 9, 2023
0be7f28
fix
gaoran10 Mar 9, 2023
ce5c27a
fix
Mar 10, 2023
d709c2f
fix
Mar 10, 2023
9834caf
1.Support dead letter queue.
Mar 14, 2023
eee5391
fix
Mar 14, 2023
2dae7c4
Re-implement the expired message detection code.
Mar 15, 2023
a332f0b
Implement rabbitmq delay queue.
Mar 15, 2023
d491700
fix
Mar 16, 2023
6865ba3
fix
Mar 16, 2023
c1179a6
rewind cursor
Mar 16, 2023
08606c5
fix
Mar 17, 2023
fe7b716
support queue arguments modify.
Mar 18, 2023
f1cf01f
fix
Mar 18, 2023
1d2276b
1.add admin api
Mar 23, 2023
44bb791
fix
Mar 23, 2023
a2064b5
fix
Mar 23, 2023
b117794
fix
Mar 23, 2023
9851819
1. Complete the admin api and monitoring
Mar 30, 2023
5768d3d
1. Adjust the delete api
Apr 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion amqp-client-auth/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-amqp-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>3.0.0.1-SNAPSHOT</version>
<version>2.11.0-SNAPSHOT</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
27 changes: 22 additions & 5 deletions amqp-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-amqp-parent</artifactId>
<version>3.0.0.1-SNAPSHOT</version>
<version>2.11.0-SNAPSHOT</version>
</parent>

<artifactId>pulsar-protocol-handler-amqp</artifactId>
Expand All @@ -31,12 +31,29 @@

<!-- include the dependencies -->
<dependencies>

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
<scope>test</scope>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
<version>${qpid-protocol-plugin.version}</version>
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>com.fasterxml.jackson.core</groupId>-->
<!-- <artifactId>jackson-core</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>com.fasterxml.jackson.core</groupId>-->
<!-- <artifactId>jackson-databind</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
</dependency>

<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
<version>${qpid-protocol-plugin.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;

/**
* Base class of AMQP exchange.
*/
public abstract class AbstractAmqpExchange implements AmqpExchange {

@Getter
protected final Map<String, String> properties;
protected final String exchangeName;
protected final AmqpExchange.Type exchangeType;
protected Set<AmqpQueue> queues;
Expand All @@ -37,7 +40,8 @@ public abstract class AbstractAmqpExchange implements AmqpExchange {

protected AbstractAmqpExchange(String exchangeName, AmqpExchange.Type exchangeType,
Set<AmqpQueue> queues, boolean durable, boolean autoDelete, boolean internal,
Map<String, Object> arguments) {
Map<String, Object> arguments, Map<String, String> properties) {
this.properties = properties;
this.exchangeName = exchangeName;
this.exchangeType = exchangeType;
this.queues = queues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package io.streamnative.pulsar.handlers.amqp;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Getter;

/**
* Base class for AMQP queue.
Expand All @@ -31,6 +33,9 @@ public abstract class AbstractAmqpQueue implements AmqpQueue {
protected boolean exclusive;
protected boolean autoDelete;
protected final Map<String, AmqpMessageRouter> routers = new ConcurrentHashMap<>();
protected final Map<String, Object> arguments = new HashMap<>();
@Getter
protected Map<String, String> properties;

protected AbstractAmqpQueue(String queueName, boolean durable, long connectionId) {
this.queueName = queueName;
Expand All @@ -42,12 +47,13 @@ protected AbstractAmqpQueue(String queueName, boolean durable, long connectionId

protected AbstractAmqpQueue(String queueName,
boolean durable, long connectionId,
boolean exclusive, boolean autoDelete) {
boolean exclusive, boolean autoDelete, Map<String, String> properties) {
this.queueName = queueName;
this.durable = durable;
this.connectionId = connectionId;
this.exclusive = exclusive;
this.autoDelete = autoDelete;
this.properties = properties;
}

@Override
Expand All @@ -60,6 +66,20 @@ public boolean getDurable() {
return durable;
}

@Override
public boolean getExclusive() {
return exclusive;
}
@Override
public boolean getAutoDelete() {
return autoDelete;
}

@Override
public Map<String, Object> getArguments() {
return arguments;
}

@Override
public AmqpMessageRouter getRouter(String exchangeName) {
return routers.get(exchangeName);
Expand Down Expand Up @@ -133,4 +153,8 @@ public boolean isAutoDelete() {
return autoDelete;
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package io.streamnative.pulsar.handlers.amqp;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.Executor;
import io.streamnative.pulsar.handlers.amqp.admin.AmqpAdmin;
import io.streamnative.pulsar.handlers.amqp.admin.prometheus.PrometheusAdmin;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
Expand All @@ -39,19 +41,26 @@ public class AmqpBrokerService {
private ConnectionContainer connectionContainer;
@Getter
private PulsarService pulsarService;
@Getter
private AmqpAdmin amqpAdmin;
@Getter
PrometheusAdmin prometheusAdmin;

public AmqpBrokerService(PulsarService pulsarService, AmqpServiceConfiguration config) {
String clusterName = pulsarService.getBrokerService().getPulsar().getConfiguration().getClusterName();
this.amqpAdmin = new AmqpAdmin(config.getAdvertisedAddress(), config.getAmqpAdminPort());
this.prometheusAdmin = new PrometheusAdmin(config.getAmqpPrometheusUrl(), clusterName);
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, initRouteExecutor(config),
config.getAmqpExchangeRouteQueueSize());
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService,
initRouteExecutor(config), config, amqpAdmin);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, config);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer, amqpTopicManager);
this.connectionContainer = new ConnectionContainer(pulsarService, exchangeContainer, queueContainer);
}

private Executor initRouteExecutor(AmqpServiceConfiguration config) {
private ExecutorService initRouteExecutor(AmqpServiceConfiguration config) {
return Executors.newFixedThreadPool(config.getAmqpExchangeRouteExecutorThreads(),
new DefaultThreadFactory("exchange-route"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,56 +83,56 @@
@Log4j2
public class AmqpChannel implements ServerChannelMethodProcessor {

private final int channelId;
private final AmqpConnection connection;
protected final int channelId;
protected final AmqpConnection connection;
private final AtomicBoolean blocking = new AtomicBoolean(false);
private final AtomicBoolean closing = new AtomicBoolean(false);
protected final AtomicBoolean closing = new AtomicBoolean(false);
private final java.util.Queue<AsyncCommand> unfinishedCommandsQueue = new ConcurrentLinkedQueue<>();
private long confirmedMessageCounter;
protected long confirmedMessageCounter;
private volatile ServerTransaction transaction;
private boolean confirmOnPublish;
protected boolean confirmOnPublish;
/** A channel has a default queue (the last declared) that is used when no queue name is explicitly set. */
private volatile AmqpQueue defaultQueue;
protected volatile AmqpQueue defaultQueue;

private final UnacknowledgedMessageMap unacknowledgedMessageMap;
protected final UnacknowledgedMessageMap unacknowledgedMessageMap;

/** Maps from consumer tag to consumers instance. */
private final Map<String, Consumer> tag2ConsumersMap = new ConcurrentHashMap<>();
protected final Map<String, Consumer> tag2ConsumersMap = new ConcurrentHashMap<>();

private final Map<String, AmqpConsumer> fetchConsumerMap = new ConcurrentHashMap<>();
protected final Map<String, AmqpConsumer> fetchConsumerMap = new ConcurrentHashMap<>();

/**
* The current message - which may be partial in the sense that not all frames have been received yet - which has
* been received by this channel. As the frames are received the message gets updated and once all frames have been
* received the message can then be routed.
*/
private IncomingMessage currentMessage;
protected IncomingMessage currentMessage;

private final String defaultSubscription = "defaultSubscription";
protected final String defaultSubscription = "defaultSubscription";
public static final AMQShortString EMPTY_STRING = AMQShortString.createAMQShortString((String) null);
/**
* ConsumerTag prefix, the tag is unique per subscription to a queue.
* The server returns this in response to a basic.consume request.
*/
private static final String CONSUMER_TAG_PREFIX = "aop.ctag-";
protected static final String CONSUMER_TAG_PREFIX = "aop.ctag-";

/**
* The consumer ID.
*/
private static final AtomicLong CONSUMER_ID = new AtomicLong(0);
protected static final AtomicLong CONSUMER_ID = new AtomicLong(0);

/**
* The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
* value of this represents the <b>last</b> tag sent out.
*/
private volatile long deliveryTag = 0;
private final AmqpFlowCreditManager creditManager;
private final AtomicBoolean blockedOnCredit = new AtomicBoolean(false);
protected volatile long deliveryTag = 0;
protected final AmqpFlowCreditManager creditManager;
protected final AtomicBoolean blockedOnCredit = new AtomicBoolean(false);
public static final int DEFAULT_CONSUMER_PERMIT = 1000;
private ExchangeService exchangeService;
private QueueService queueService;
private ExchangeContainer exchangeContainer;
private QueueContainer queueContainer;
protected ExchangeService exchangeService;
protected QueueService queueService;
protected ExchangeContainer exchangeContainer;
protected QueueContainer queueContainer;

public AmqpChannel(int channelId, AmqpConnection connection, AmqpBrokerService amqpBrokerService) {
this.channelId = channelId;
Expand Down Expand Up @@ -253,7 +253,8 @@ public void receiveQueueDeclare(AMQShortString queue, boolean passive, boolean d
channelId, queue, passive, durable, exclusive, autoDelete, nowait, arguments);
}
queueService.queueDeclare(connection.getNamespaceName(), queue.toString(), passive, durable, exclusive,
autoDelete, nowait, arguments, connection.getConnectionId()).thenAccept(amqpQueue -> {
autoDelete, nowait, FieldTable.convertToMap(arguments), connection.getConnectionId())
.thenAccept(amqpQueue -> {
setDefaultQueue(amqpQueue);
MethodRegistry methodRegistry = connection.getMethodRegistry();
QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(
Expand Down Expand Up @@ -401,7 +402,7 @@ public void receiveBasicConsume(AMQShortString queue, AMQShortString consumerTag
});
}

private String getConsumerTag(AMQShortString consumerTag) {
protected String getConsumerTag(AMQShortString consumerTag) {
if (consumerTag == null) {
return CONSUMER_TAG_PREFIX + connection.remoteAddress + "-" + UUID.randomUUID();
} else {
Expand All @@ -411,7 +412,7 @@ private String getConsumerTag(AMQShortString consumerTag) {
}

private synchronized void subscribe(String consumerTag, String queueName, Topic topic,
boolean ack, boolean exclusive, boolean nowait){
boolean ack, boolean exclusive, boolean nowait) {

CompletableFuture<Void> future = new CompletableFuture<>();
future.whenComplete((ignored, e) -> {
Expand Down Expand Up @@ -685,7 +686,7 @@ private void publishContentHeader(ContentHeaderBody contentHeaderBody) {
deliverCurrentMessageIfComplete();
}

private void deliverCurrentMessageIfComplete() {
protected void deliverCurrentMessageIfComplete() {
if (currentMessage.allContentReceived()) {
MessagePublishInfo info = currentMessage.getMessagePublishInfo();
String routingKey = AMQShortString.toString(info.getRoutingKey());
Expand Down Expand Up @@ -750,6 +751,8 @@ public void messageNAck(long deliveryTag, boolean multiple, boolean requeue) {
if (!ackedMessages.isEmpty()) {
if (requeue) {
requeue(ackedMessages);
} else {
discardMessage(ackedMessages);
}
} else {
closeChannel(ErrorCodes.IN_USE, "deliveryTag not found");
Expand All @@ -759,6 +762,17 @@ public void messageNAck(long deliveryTag, boolean multiple, boolean requeue) {
}
}

private void discardMessage(Collection<UnacknowledgedMessageMap.MessageConsumerAssociation> messages) {
Map<UnacknowledgedMessageMap.MessageProcessor, List<PositionImpl>> positionMap = new HashMap<>();
messages.forEach(association -> {
UnacknowledgedMessageMap.MessageProcessor consumer = association.getConsumer();
List<PositionImpl> positions = positionMap.computeIfAbsent(consumer,
list -> new ArrayList<>());
positions.add((PositionImpl) association.getPosition());
});
positionMap.forEach(UnacknowledgedMessageMap.MessageProcessor::discardMessage);
}

@Override
public void receiveBasicRecover(boolean requeue, boolean sync) {
Collection<UnacknowledgedMessageMap.MessageConsumerAssociation> ackedMessages =
Expand All @@ -777,15 +791,15 @@ public void receiveBasicRecover(boolean requeue, boolean sync) {
}

private void requeue(Collection<UnacknowledgedMessageMap.MessageConsumerAssociation> messages) {
Map<AmqpConsumer, List<PositionImpl>> positionMap = new HashMap<>();
Map<UnacknowledgedMessageMap.MessageProcessor, List<PositionImpl>> positionMap = new HashMap<>();
messages.stream().forEach(association -> {
AmqpConsumer consumer = association.getConsumer();
UnacknowledgedMessageMap.MessageProcessor consumer = association.getConsumer();
List<PositionImpl> positions = positionMap.computeIfAbsent(consumer,
list -> new ArrayList<>());
positions.add((PositionImpl) association.getPosition());
});
positionMap.entrySet().stream().forEach(entry -> {
entry.getKey().redeliverAmqpMessages(entry.getValue());
entry.getKey().requeue(entry.getValue());
});
}

Expand All @@ -799,10 +813,10 @@ public void receiveBasicAck(long deliveryTag, boolean multiple) {

private void messageAck(long deliveryTag, boolean multiple) {
Collection<UnacknowledgedMessageMap.MessageConsumerAssociation> ackedMessages =
unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
unacknowledgedMessageMap.acknowledge(deliveryTag, multiple);
if (!ackedMessages.isEmpty()) {
ackedMessages.stream().forEach(entry -> {
entry.getConsumer().messagesAck(entry.getPosition());
entry.getConsumer().messageAck(entry.getPosition());
});
} else {
closeChannel(ErrorCodes.IN_USE, "deliveryTag not found");
Expand Down Expand Up @@ -1012,7 +1026,7 @@ public AmqpFlowCreditManager getCreditManager() {
return creditManager;
}

private void handleAoPException(Throwable t) {
protected void handleAoPException(Throwable t) {
Throwable cause = FutureUtil.unwrapCompletionException(t);
if (!(cause instanceof AoPException)) {
connection.sendConnectionClose(INTERNAL_ERROR, t.getMessage(), channelId);
Expand Down
Loading