diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java index 40e401d28665c..b74e396cea526 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java @@ -560,4 +560,136 @@ public void testMsgRateExpired() throws Exception { log.info("-- Exiting {} test --", methodName); } + + @Test + public void testRetryLetterAndDeadLetterStats() throws PulsarClientException, InterruptedException { + final String topicName = "persistent://my-property/my-ns/testRetryLetterAndDeadLetterStats"; + + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(3) + .retryLetterTopic("persistent://my-property/my-ns/retry-topic") + .deadLetterTopic("persistent://my-property/my-ns/dlq-topic") + .build()) + .subscriptionName("sub") + .subscribe(); + + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + + final int messages = 1; + for (int i = 0; i < messages; i++) { + producer.send(("message-" + i).getBytes()); + } + + for (int i = 0; i < messages * 4; i++) { + // nack and reconsumeLater + Message msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + consumer.reconsumeLater(msg, 100, TimeUnit.MILLISECONDS); + } + } + + Awaitility.await().untilAsserted(() -> { + ConsumerStats stats = consumer.getStats(); + ProducerStats retryStats = stats.getRetryLetterProducerStats(); + ProducerStats deadLetterStats = stats.getDeadLetterProducerStats(); + assertNotNull(retryStats); + assertNotNull(deadLetterStats); + assertEquals(retryStats.getTotalMsgsSent(), 3); + assertEquals(deadLetterStats.getTotalMsgsSent(), 1); + }); + } + @Test + public void testDeadLetterStats() throws PulsarClientException, InterruptedException { + final String topicName = "persistent://my-property/my-ns/testDeadLetterStats"; + + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(1) + .deadLetterTopic("persistent://my-property/my-ns/dlq-topic") + .build()) + .subscriptionName("sub") + .subscribe(); + + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .create(); + + final int messages = 1; + for (int i = 0; i < messages; i++) { + producer.send(("message-" + i).getBytes()); + } + + for (int i = 0; i < messages * 2; i++) { + // nack and reconsumeLater + Message msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + consumer.negativeAcknowledge(msg); + } + } + + Awaitility.await().untilAsserted(() -> { + ConsumerStats stats = consumer.getStats(); + ProducerStats dlqStats = stats.getDeadLetterProducerStats(); + assertNotNull(dlqStats); + assertEquals(dlqStats.getTotalMsgsSent(), 1); + }); + } + + @Test + public void testPartitionedRetryLetterAndDeadLetterStats() + throws PulsarClientException, InterruptedException, PulsarAdminException { + final String topicName = "persistent://my-property/my-ns/testPartitionedRetryLetterAndDeadLetterStats"; + + admin.topics().createPartitionedTopic(topicName, 10); + Consumer consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(3) + .retryLetterTopic("persistent://my-property/my-ns/retry-topic") + .deadLetterTopic("persistent://my-property/my-ns/dlq-topic") + .build()) + .subscriptionName("sub") + .subscribe(); + + Producer producer = pulsarClient.newProducer() + .topic(topicName) + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition) + .create(); + + final int messages = 30; + for (int i = 0; i < messages; i++) { + producer.send(("message-" + i).getBytes()); + } + + for (int i = 0; i < messages * 4; i++) { + // nack and reconsumeLater + Message msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg != null) { + consumer.reconsumeLater(msg, 100, TimeUnit.MILLISECONDS); + } + } + + Awaitility.await().untilAsserted(() -> { + ConsumerStats stats = consumer.getStats(); + ProducerStats retryStats = stats.getRetryLetterProducerStats(); + ProducerStats deadLetterStats = stats.getDeadLetterProducerStats(); + assertNotNull(retryStats); + assertNotNull(deadLetterStats); + assertEquals(retryStats.getTotalMsgsSent(), 3 * messages); + assertEquals(deadLetterStats.getTotalMsgsSent(), messages); + }); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java index 529101ecde39c..7935e05d55b66 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerStats.java @@ -122,4 +122,14 @@ public interface ConsumerStats extends Serializable { default Map getPartitionStats() { return Collections.emptyMap(); } + + /** + * @return producer stats for deadLetterProducer if available + */ + ProducerStats getDeadLetterProducerStats(); + + /** + * @return producer stats for retryLetterProducer if available + */ + ProducerStats getRetryLetterProducerStats(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index f390b80a7f01c..15cfeb267411d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -617,6 +617,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a .enableChunking(true) .blockIfQueueFull(false) .create(); + stats.setRetryLetterProducerStats(retryLetterProducer.getStats()); } } catch (Exception e) { log.error("Create retry letter producer exception with topic: {}", @@ -2168,6 +2169,9 @@ private void initDeadLetterProducerIfNeeded() { .enableBatching(false) .enableChunking(true) .createAsync(); + deadLetterProducer.thenAccept(dlqProducer -> { + stats.setDeadLetterProducerStats(dlqProducer.getStats()); + }); } } finally { createProducerLock.writeLock().unlock(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java index e8719753befd1..374b88ab3eb7d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java @@ -23,6 +23,7 @@ import java.util.Optional; import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerStats; public class ConsumerStatsDisabled implements ConsumerStatsRecorder { private static final long serialVersionUID = 1L; @@ -124,6 +125,16 @@ public Map getMsgNumInSubReceiverQueue() { return null; } + @Override + public ProducerStats getDeadLetterProducerStats() { + return null; + } + + @Override + public ProducerStats getRetryLetterProducerStats() { + return null; + } + @Override public double getRateMsgsReceived() { return 0; @@ -148,4 +159,14 @@ public void reset() { public void updateCumulativeStats(ConsumerStats stats) { // do nothing } + + @Override + public void setDeadLetterProducerStats(ProducerStats producerStats) { + // do nothing + } + + @Override + public void setRetryLetterProducerStats(ProducerStats producerStats) { + // do nothing + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java index 1a7de725f31b6..1d0d9e734b38d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorder.java @@ -22,6 +22,7 @@ import java.util.Optional; import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerStats; public interface ConsumerStatsRecorder extends ConsumerStats { void updateNumMsgsReceived(Message message); @@ -39,4 +40,8 @@ public interface ConsumerStatsRecorder extends ConsumerStats { void reset(); void updateCumulativeStats(ConsumerStats stats); + + void setDeadLetterProducerStats(ProducerStats producerStats); + + void setRetryLetterProducerStats(ProducerStats producerStats); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java index 3a47ddc5d4b08..8dfc0af8e1d93 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerStats; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.slf4j.Logger; @@ -63,6 +64,10 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder { private volatile double receivedMsgsRate; private volatile double receivedBytesRate; + volatile ProducerStats deadLetterProducerStats; + + volatile ProducerStats retryLetterProducerStats; + private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00"); public ConsumerStatsRecorderImpl() { @@ -259,6 +264,26 @@ public Map getMsgNumInSubReceiverQueue() { return null; } + @Override + public ProducerStats getDeadLetterProducerStats() { + return deadLetterProducerStats; + } + + @Override + public ProducerStats getRetryLetterProducerStats() { + return retryLetterProducerStats; + } + + @Override + public void setDeadLetterProducerStats(ProducerStats producerStats) { + this.deadLetterProducerStats = producerStats; + } + + @Override + public void setRetryLetterProducerStats(ProducerStats producerStats) { + this.retryLetterProducerStats = producerStats; + } + @Override public long getNumMsgsReceived() { return numMsgsReceived.longValue(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java index 17018be02befc..eb4a339e20b2b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicConsumerStatsRecorderImpl.java @@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerStats; import org.apache.pulsar.client.api.MultiTopicConsumerStats; +import org.apache.pulsar.client.api.ProducerStats; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,10 @@ public class MultiTopicConsumerStatsRecorderImpl extends ConsumerStatsRecorderIm private static final long serialVersionUID = 1L; private Map partitionStats = new ConcurrentHashMap<>(); + private PartitionedTopicProducerStatsRecorderImpl deadLetterStats = new PartitionedTopicProducerStatsRecorderImpl(); + private PartitionedTopicProducerStatsRecorderImpl retryLetterStats = + new PartitionedTopicProducerStatsRecorderImpl(); + public MultiTopicConsumerStatsRecorderImpl() { super(); } @@ -55,5 +60,21 @@ public Map getPartitionStats() { return partitionStats; } + @Override + public ProducerStats getDeadLetterProducerStats() { + deadLetterStats.reset(); + partitionStats.forEach((partition, consumerStats) -> deadLetterStats.updateCumulativeStats(partition, + consumerStats.getDeadLetterProducerStats())); + return deadLetterStats; + } + + @Override + public ProducerStats getRetryLetterProducerStats() { + retryLetterStats.reset(); + partitionStats.forEach((partition, consumerStats) -> retryLetterStats.updateCumulativeStats(partition, + consumerStats.getRetryLetterProducerStats())); + return retryLetterStats; + } + private static final Logger log = LoggerFactory.getLogger(MultiTopicConsumerStatsRecorderImpl.class); } diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml index f47f9b4a31a09..0e05d20cb9bb4 100644 --- a/pulsar-client/src/main/resources/findbugsExclude.xml +++ b/pulsar-client/src/main/resources/findbugsExclude.xml @@ -1033,4 +1033,14 @@ + + + + + + + + + +