Skip to content

Commit

Permalink
[feat][client] PIP-253 Expose ProducerStats for DeadLetter and RetryL…
Browse files Browse the repository at this point in the history
…etter producers in ConsumerStats (apache#20239)
  • Loading branch information
klevy-toast authored Oct 29, 2023
1 parent e55de39 commit bd86e4e
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,4 +560,136 @@ public void testMsgRateExpired() throws Exception {

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testRetryLetterAndDeadLetterStats() throws PulsarClientException, InterruptedException {
final String topicName = "persistent://my-property/my-ns/testRetryLetterAndDeadLetterStats";

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(3)
.retryLetterTopic("persistent://my-property/my-ns/retry-topic")
.deadLetterTopic("persistent://my-property/my-ns/dlq-topic")
.build())
.subscriptionName("sub")
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

final int messages = 1;
for (int i = 0; i < messages; i++) {
producer.send(("message-" + i).getBytes());
}

for (int i = 0; i < messages * 4; i++) {
// nack and reconsumeLater
Message msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
consumer.reconsumeLater(msg, 100, TimeUnit.MILLISECONDS);
}
}

Awaitility.await().untilAsserted(() -> {
ConsumerStats stats = consumer.getStats();
ProducerStats retryStats = stats.getRetryLetterProducerStats();
ProducerStats deadLetterStats = stats.getDeadLetterProducerStats();
assertNotNull(retryStats);
assertNotNull(deadLetterStats);
assertEquals(retryStats.getTotalMsgsSent(), 3);
assertEquals(deadLetterStats.getTotalMsgsSent(), 1);
});
}
@Test
public void testDeadLetterStats() throws PulsarClientException, InterruptedException {
final String topicName = "persistent://my-property/my-ns/testDeadLetterStats";

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(1)
.deadLetterTopic("persistent://my-property/my-ns/dlq-topic")
.build())
.subscriptionName("sub")
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

final int messages = 1;
for (int i = 0; i < messages; i++) {
producer.send(("message-" + i).getBytes());
}

for (int i = 0; i < messages * 2; i++) {
// nack and reconsumeLater
Message msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
consumer.negativeAcknowledge(msg);
}
}

Awaitility.await().untilAsserted(() -> {
ConsumerStats stats = consumer.getStats();
ProducerStats dlqStats = stats.getDeadLetterProducerStats();
assertNotNull(dlqStats);
assertEquals(dlqStats.getTotalMsgsSent(), 1);
});
}

@Test
public void testPartitionedRetryLetterAndDeadLetterStats()
throws PulsarClientException, InterruptedException, PulsarAdminException {
final String topicName = "persistent://my-property/my-ns/testPartitionedRetryLetterAndDeadLetterStats";

admin.topics().createPartitionedTopic(topicName, 10);
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(3)
.retryLetterTopic("persistent://my-property/my-ns/retry-topic")
.deadLetterTopic("persistent://my-property/my-ns/dlq-topic")
.build())
.subscriptionName("sub")
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();

final int messages = 30;
for (int i = 0; i < messages; i++) {
producer.send(("message-" + i).getBytes());
}

for (int i = 0; i < messages * 4; i++) {
// nack and reconsumeLater
Message msg = consumer.receive(1, TimeUnit.SECONDS);
if (msg != null) {
consumer.reconsumeLater(msg, 100, TimeUnit.MILLISECONDS);
}
}

Awaitility.await().untilAsserted(() -> {
ConsumerStats stats = consumer.getStats();
ProducerStats retryStats = stats.getRetryLetterProducerStats();
ProducerStats deadLetterStats = stats.getDeadLetterProducerStats();
assertNotNull(retryStats);
assertNotNull(deadLetterStats);
assertEquals(retryStats.getTotalMsgsSent(), 3 * messages);
assertEquals(deadLetterStats.getTotalMsgsSent(), messages);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,14 @@ public interface ConsumerStats extends Serializable {
default Map<String, ConsumerStats> getPartitionStats() {
return Collections.emptyMap();
}

/**
* @return producer stats for deadLetterProducer if available
*/
ProducerStats getDeadLetterProducerStats();

/**
* @return producer stats for retryLetterProducer if available
*/
ProducerStats getRetryLetterProducerStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
.enableChunking(true)
.blockIfQueueFull(false)
.create();
stats.setRetryLetterProducerStats(retryLetterProducer.getStats());
}
} catch (Exception e) {
log.error("Create retry letter producer exception with topic: {}",
Expand Down Expand Up @@ -2168,6 +2169,9 @@ private void initDeadLetterProducerIfNeeded() {
.enableBatching(false)
.enableChunking(true)
.createAsync();
deadLetterProducer.thenAccept(dlqProducer -> {
stats.setDeadLetterProducerStats(dlqProducer.getStats());
});
}
} finally {
createProducerLock.writeLock().unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerStats;

public class ConsumerStatsDisabled implements ConsumerStatsRecorder {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -124,6 +125,16 @@ public Map<Long, Integer> getMsgNumInSubReceiverQueue() {
return null;
}

@Override
public ProducerStats getDeadLetterProducerStats() {
return null;
}

@Override
public ProducerStats getRetryLetterProducerStats() {
return null;
}

@Override
public double getRateMsgsReceived() {
return 0;
Expand All @@ -148,4 +159,14 @@ public void reset() {
public void updateCumulativeStats(ConsumerStats stats) {
// do nothing
}

@Override
public void setDeadLetterProducerStats(ProducerStats producerStats) {
// do nothing
}

@Override
public void setRetryLetterProducerStats(ProducerStats producerStats) {
// do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerStats;

public interface ConsumerStatsRecorder extends ConsumerStats {
void updateNumMsgsReceived(Message<?> message);
Expand All @@ -39,4 +40,8 @@ public interface ConsumerStatsRecorder extends ConsumerStats {
void reset();

void updateCumulativeStats(ConsumerStats stats);

void setDeadLetterProducerStats(ProducerStats producerStats);

void setRetryLetterProducerStats(ProducerStats producerStats);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,6 +64,10 @@ public class ConsumerStatsRecorderImpl implements ConsumerStatsRecorder {
private volatile double receivedMsgsRate;
private volatile double receivedBytesRate;

volatile ProducerStats deadLetterProducerStats;

volatile ProducerStats retryLetterProducerStats;

private static final DecimalFormat THROUGHPUT_FORMAT = new DecimalFormat("0.00");

public ConsumerStatsRecorderImpl() {
Expand Down Expand Up @@ -259,6 +264,26 @@ public Map<Long, Integer> getMsgNumInSubReceiverQueue() {
return null;
}

@Override
public ProducerStats getDeadLetterProducerStats() {
return deadLetterProducerStats;
}

@Override
public ProducerStats getRetryLetterProducerStats() {
return retryLetterProducerStats;
}

@Override
public void setDeadLetterProducerStats(ProducerStats producerStats) {
this.deadLetterProducerStats = producerStats;
}

@Override
public void setRetryLetterProducerStats(ProducerStats producerStats) {
this.retryLetterProducerStats = producerStats;
}

@Override
public long getNumMsgsReceived() {
return numMsgsReceived.longValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.MultiTopicConsumerStats;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +33,10 @@ public class MultiTopicConsumerStatsRecorderImpl extends ConsumerStatsRecorderIm
private static final long serialVersionUID = 1L;
private Map<String, ConsumerStats> partitionStats = new ConcurrentHashMap<>();

private PartitionedTopicProducerStatsRecorderImpl deadLetterStats = new PartitionedTopicProducerStatsRecorderImpl();
private PartitionedTopicProducerStatsRecorderImpl retryLetterStats =
new PartitionedTopicProducerStatsRecorderImpl();

public MultiTopicConsumerStatsRecorderImpl() {
super();
}
Expand All @@ -55,5 +60,21 @@ public Map<String, ConsumerStats> getPartitionStats() {
return partitionStats;
}

@Override
public ProducerStats getDeadLetterProducerStats() {
deadLetterStats.reset();
partitionStats.forEach((partition, consumerStats) -> deadLetterStats.updateCumulativeStats(partition,
consumerStats.getDeadLetterProducerStats()));
return deadLetterStats;
}

@Override
public ProducerStats getRetryLetterProducerStats() {
retryLetterStats.reset();
partitionStats.forEach((partition, consumerStats) -> retryLetterStats.updateCumulativeStats(partition,
consumerStats.getRetryLetterProducerStats()));
return retryLetterStats;
}

private static final Logger log = LoggerFactory.getLogger(MultiTopicConsumerStatsRecorderImpl.class);
}
10 changes: 10 additions & 0 deletions pulsar-client/src/main/resources/findbugsExclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1033,4 +1033,14 @@
<Method name="getAckSet"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.apache.pulsar.client.impl.ConsumerImpl"/>
<Method name="getStats"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
<Class name="org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"/>
<Method name="getStats"/>
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
</FindBugsFilter>

0 comments on commit bd86e4e

Please sign in to comment.