ranges = result.get(firstConsumer);
+ if (lastKey != Integer.MAX_VALUE - 1) {
+ ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1));
}
} finally {
rwLock.readLock().unlock();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java
new file mode 100644
index 0000000000000..2aae1d9b0622e
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+/**
+ * A wrapper class for a Consumer instance that provides custom implementations
+ * of equals and hashCode methods. The equals method returns true if and only if
+ * the compared instance is the same instance.
+ *
+ * The reason for this class is the custom implementation of {@link Consumer#equals(Object)}.
+ * Using this wrapper class will be useful in use cases where it's necessary to match a key
+ * in a map by instance or a value in a set by instance.
+ */
+class ConsumerIdentityWrapper {
+ final Consumer consumer;
+
+ public ConsumerIdentityWrapper(Consumer consumer) {
+ this.consumer = consumer;
+ }
+
+ /**
+ * Compares this wrapper to the specified object. The result is true if and only if
+ * the argument is not null and is a ConsumerIdentityWrapper object that wraps
+ * the same Consumer instance.
+ *
+ * @param obj the object to compare this ConsumerIdentityWrapper against
+ * @return true if the given object represents a ConsumerIdentityWrapper
+ * equivalent to this wrapper, false otherwise
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof ConsumerIdentityWrapper) {
+ ConsumerIdentityWrapper other = (ConsumerIdentityWrapper) obj;
+ return consumer == other.consumer;
+ }
+ return false;
+ }
+
+ /**
+ * Returns a hash code for this wrapper. The hash code is computed based on
+ * the wrapped Consumer instance.
+ *
+ * @return a hash code value for this object
+ */
+ @Override
+ public int hashCode() {
+ return consumer.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return consumer.toString();
+ }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java
new file mode 100644
index 0000000000000..1f93313ab1b71
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsumerNameIndexTracker.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.roaringbitmap.RoaringBitmap;
+
+/**
+ * Tracks the used consumer name indexes for each consumer name.
+ * This is used by {@link ConsistentHashingStickyKeyConsumerSelector} to get a unique "consumer name index"
+ * for each consumer name. It is useful when there are multiple consumers with the same name, but they are
+ * different consumers. The purpose of the index is to prevent collisions in the hash ring.
+ *
+ * The consumer name index serves as an additional key for the hash ring assignment. The logic keeps track of
+ * used "index slots" for each consumer name and assigns the first unused index when a new consumer is added.
+ * This approach minimizes hash collisions due to using the same consumer name.
+ *
+ * An added benefit of this tracking approach is that a consumer that leaves and then rejoins immediately will get the
+ * same index and therefore the same assignments in the hash ring. This improves stability since the hash assignment
+ * changes are minimized over time, although a better solution would be to avoid reusing the same consumer name
+ * in the first place.
+ *
+ * When a consumer is removed, the index is deallocated. RoaringBitmap is used to keep track of the used indexes.
+ * The data structure to track a consumer name is removed when the reference count of the consumer name is zero.
+ *
+ * This class is not thread-safe and should be used in a synchronized context in the caller.
+ */
+@NotThreadSafe
+class ConsumerNameIndexTracker {
+ // tracks the used index slots for each consumer name
+ private final Map consumerNameIndexSlotsMap = new HashMap<>();
+ // tracks the active consumer entries
+ private final Map consumerEntries = new HashMap<>();
+
+ // Represents a consumer entry in the tracker, including the consumer name, index, and reference count.
+ record ConsumerEntry(String consumerName, int nameIndex, MutableInt refCount) {
+ }
+
+ /*
+ * Tracks the used indexes for a consumer name using a RoaringBitmap.
+ * A specific index slot is used when the bit is set.
+ * When all bits are cleared, the customer name can be removed from tracking.
+ */
+ static class ConsumerNameIndexSlots {
+ private RoaringBitmap indexSlots = new RoaringBitmap();
+
+ public int allocateIndexSlot() {
+ // find the first index that is not set, if there is no such index, add a new one
+ int index = (int) indexSlots.nextAbsentValue(0);
+ if (index == -1) {
+ index = indexSlots.getCardinality();
+ }
+ indexSlots.add(index);
+ return index;
+ }
+
+ public boolean deallocateIndexSlot(int index) {
+ indexSlots.remove(index);
+ return indexSlots.isEmpty();
+ }
+ }
+
+ /*
+ * Adds a reference to the consumer and returns the index assigned to this consumer.
+ */
+ public int increaseConsumerRefCountAndReturnIndex(ConsumerIdentityWrapper wrapper) {
+ ConsumerEntry entry = consumerEntries.computeIfAbsent(wrapper, k -> {
+ String consumerName = wrapper.consumer.consumerName();
+ return new ConsumerEntry(consumerName, allocateConsumerNameIndex(consumerName), new MutableInt(0));
+ });
+ entry.refCount.increment();
+ return entry.nameIndex;
+ }
+
+ private int allocateConsumerNameIndex(String consumerName) {
+ return getConsumerNameIndexBitmap(consumerName).allocateIndexSlot();
+ }
+
+ private ConsumerNameIndexSlots getConsumerNameIndexBitmap(String consumerName) {
+ return consumerNameIndexSlotsMap.computeIfAbsent(consumerName, k -> new ConsumerNameIndexSlots());
+ }
+
+ /*
+ * Decreases the reference count of the consumer and removes the consumer name from tracking if the ref count is
+ * zero.
+ */
+ public void decreaseConsumerRefCount(ConsumerIdentityWrapper removed) {
+ ConsumerEntry consumerEntry = consumerEntries.get(removed);
+ int refCount = consumerEntry.refCount.decrementAndGet();
+ if (refCount == 0) {
+ deallocateConsumerNameIndex(consumerEntry.consumerName, consumerEntry.nameIndex);
+ consumerEntries.remove(removed, consumerEntry);
+ }
+ }
+
+ private void deallocateConsumerNameIndex(String consumerName, int index) {
+ if (getConsumerNameIndexBitmap(consumerName).deallocateIndexSlot(index)) {
+ consumerNameIndexSlotsMap.remove(consumerName);
+ }
+ }
+
+ /*
+ * Returns the currently tracked index for the consumer.
+ */
+ public int getTrackedIndex(ConsumerIdentityWrapper wrapper) {
+ ConsumerEntry consumerEntry = consumerEntries.get(wrapper);
+ return consumerEntry != null ? consumerEntry.nameIndex : -1;
+ }
+
+ int getTrackedConsumerNamesCount() {
+ return consumerNameIndexSlotsMap.size();
+ }
+
+ int getTrackedConsumersCount() {
+ return consumerEntries.size();
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
index 48311c57338b5..04aafc49b47e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -18,19 +18,27 @@
*/
package org.apache.pulsar.broker.service;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.client.api.Range;
+import org.assertj.core.data.Offset;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -40,7 +48,7 @@ public class ConsistentHashingStickyKeyConsumerSelectorTest {
@Test
public void testConsumerSelect() throws ConsumerAssignException {
- ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+ ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200);
String key1 = "anyKey";
Assert.assertNull(selector.select(key1.getBytes()));
@@ -146,31 +154,115 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(3);
List consumerName = Arrays.asList("consumer1", "consumer2", "consumer3");
List consumers = new ArrayList<>();
+ long id=0;
for (String s : consumerName) {
- Consumer consumer = mock(Consumer.class);
- when(consumer.consumerName()).thenReturn(s);
+ Consumer consumer = createMockConsumer(s, s, id++);
selector.addConsumer(consumer);
consumers.add(consumer);
}
+
+ // check that results are the same when called multiple times
+ assertThat(selector.getConsumerKeyHashRanges())
+ .containsExactlyEntriesOf(selector.getConsumerKeyHashRanges());
+
Map> expectedResult = new HashMap<>();
+ assertThat(consumers.get(0).consumerName()).isEqualTo("consumer1");
expectedResult.put(consumers.get(0), Arrays.asList(
- Range.of(119056335, 242013991),
- Range.of(722195657, 1656011842),
- Range.of(1707482098, 1914695766)));
+ Range.of(95615213, 440020355),
+ Range.of(440020356, 455987436),
+ Range.of(1189794593, 1264144431)));
+ assertThat(consumers.get(1).consumerName()).isEqualTo("consumer2");
expectedResult.put(consumers.get(1), Arrays.asList(
- Range.of(0, 90164503),
- Range.of(90164504, 119056334),
- Range.of(382436668, 722195656)));
+ Range.of(939655188, 1189794592),
+ Range.of(1314727625, 1977451233),
+ Range.of(1977451234, 2016237253)));
+ assertThat(consumers.get(2).consumerName()).isEqualTo("consumer3");
expectedResult.put(consumers.get(2), Arrays.asList(
- Range.of(242013992, 242377547),
- Range.of(242377548, 382436667),
- Range.of(1656011843, 1707482097)));
- for (Map.Entry> entry : selector.getConsumerKeyHashRanges().entrySet()) {
- System.out.println(entry.getValue());
- Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
- expectedResult.remove(entry.getKey());
+ Range.of(0, 95615212),
+ Range.of(455987437, 939655187),
+ Range.of(1264144432, 1314727624),
+ Range.of(2016237254, 2147483646)));
+ Map> consumerKeyHashRanges = selector.getConsumerKeyHashRanges();
+ assertThat(consumerKeyHashRanges).containsExactlyInAnyOrderEntriesOf(expectedResult);
+
+ // check that ranges are continuous and cover the whole range
+ List allRanges =
+ consumerKeyHashRanges.values().stream().flatMap(List::stream).sorted().collect(Collectors.toList());
+ Range previousRange = null;
+ for (Range range : allRanges) {
+ if (previousRange != null) {
+ assertThat(range.getStart()).isEqualTo(previousRange.getEnd() + 1);
+ }
+ previousRange = range;
+ }
+ assertThat(allRanges.stream().mapToInt(r -> r.getEnd() - r.getStart() + 1).sum()).isEqualTo(Integer.MAX_VALUE);
+ }
+
+ @Test
+ public void testConsumersGetSufficientlyAccuratelyEvenlyMapped()
+ throws BrokerServiceException.ConsumerAssignException {
+ ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(200);
+ List consumers = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ // use the same name for all consumers, use toString to distinguish them
+ Consumer consumer = createMockConsumer("consumer", String.format("index %02d", i), i);
+ selector.addConsumer(consumer);
+ consumers.add(consumer);
}
- Assert.assertEquals(expectedResult.size(), 0);
+ printConsumerRangesStats(selector);
+
+ int totalSelections = 10000;
+
+ Map consumerSelectionCount = new HashMap<>();
+ for (int i = 0; i < totalSelections; i++) {
+ Consumer selectedConsumer = selector.select(("key " + i).getBytes(StandardCharsets.UTF_8));
+ consumerSelectionCount.computeIfAbsent(selectedConsumer, c -> new MutableInt()).increment();
+ }
+
+ printSelectionCountStats(consumerSelectionCount);
+
+ int averageCount = totalSelections / consumers.size();
+ int allowedVariance = (int) (0.2d * averageCount);
+ System.out.println("averageCount: " + averageCount + " allowedVariance: " + allowedVariance);
+
+ for (Map.Entry entry : consumerSelectionCount.entrySet()) {
+ assertThat(entry.getValue().intValue()).describedAs("consumer: %s", entry.getKey())
+ .isCloseTo(averageCount, Offset.offset(allowedVariance));
+ }
+
+ consumers.forEach(selector::removeConsumer);
+ assertThat(selector.getConsumerKeyHashRanges()).isEmpty();
+ }
+
+ private static void printSelectionCountStats(Map consumerSelectionCount) {
+ int totalSelections = consumerSelectionCount.values().stream().mapToInt(MutableInt::intValue).sum();
+ consumerSelectionCount.entrySet().stream()
+ .sorted(Map.Entry.comparingByKey(Comparator.comparing(Consumer::toString)))
+ .forEach(entry -> System.out.println(
+ String.format("consumer: %s got selected %d times. ratio: %.2f%%", entry.getKey(),
+ entry.getValue().intValue(),
+ ((double) entry.getValue().intValue() / totalSelections) * 100.0d)));
+ }
+
+ private static void printConsumerRangesStats(ConsistentHashingStickyKeyConsumerSelector selector) {
+ selector.getConsumerKeyHashRanges().entrySet().stream()
+ .map(entry -> Map.entry(entry.getKey(),
+ entry.getValue().stream().mapToInt(r -> r.getEnd() - r.getStart() + 1).sum()))
+ .sorted(Map.Entry.comparingByKey(Comparator.comparing(Consumer::toString)))
+ .forEach(entry -> System.out.println(
+ String.format("consumer: %s total ranges size: %d ratio: %.2f%%", entry.getKey(),
+ entry.getValue(),
+ ((double) entry.getValue() / (Integer.MAX_VALUE - 1)) * 100.0d)));
+ }
+
+ private static Consumer createMockConsumer(String consumerName, String toString, long id) {
+ // without stubOnly, the mock will record method invocations and run into OOME
+ Consumer consumer = mock(Consumer.class, Mockito.withSettings().stubOnly());
+ when(consumer.consumerName()).thenReturn(consumerName);
+ when(consumer.getPriorityLevel()).thenReturn(0);
+ when(consumer.toString()).thenReturn(toString);
+ when(consumer.consumerId()).thenReturn(id);
+ return consumer;
}
// reproduces https://github.com/apache/pulsar/issues/22050
@@ -215,5 +307,243 @@ public void shouldRemoveConsumersFromConsumerKeyHashRanges() {
consumers.forEach(selector::removeConsumer);
// then there should be no mapping remaining
Assert.assertEquals(selector.getConsumerKeyHashRanges().size(), 0);
+ // when consumers are removed again, should not fail
+ consumers.forEach(selector::removeConsumer);
+ }
+
+ @Test
+ public void testShouldNotChangeSelectedConsumerWhenConsumerIsRemoved() {
+ final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+ final String consumerName = "consumer";
+ final int numOfInitialConsumers = 100;
+ List consumers = new ArrayList<>();
+ for (int i = 0; i < numOfInitialConsumers; i++) {
+ final Consumer consumer = createMockConsumer(consumerName, "index " + i, i);
+ consumers.add(consumer);
+ selector.addConsumer(consumer);
+ }
+
+ int hashRangeSize = Integer.MAX_VALUE;
+ int validationPointCount = 200;
+ int increment = hashRangeSize / (validationPointCount + 1);
+ List selectedConsumerBeforeRemoval = new ArrayList<>();
+
+ for (int i = 0; i < validationPointCount; i++) {
+ selectedConsumerBeforeRemoval.add(selector.select(i * increment));
+ }
+
+ for (int i = 0; i < validationPointCount; i++) {
+ Consumer selected = selector.select(i * increment);
+ Consumer expected = selectedConsumerBeforeRemoval.get(i);
+ assertThat(selected.consumerId()).as("validationPoint %d", i).isEqualTo(expected.consumerId());
+ }
+
+ Set removedConsumers = new HashSet<>();
+ for (Consumer removedConsumer : consumers) {
+ selector.removeConsumer(removedConsumer);
+ removedConsumers.add(removedConsumer);
+ for (int i = 0; i < validationPointCount; i++) {
+ int hash = i * increment;
+ Consumer selected = selector.select(hash);
+ Consumer expected = selectedConsumerBeforeRemoval.get(i);
+ if (!removedConsumers.contains(expected)) {
+ assertThat(selected.consumerId()).as("validationPoint %d, removed %s, hash %d ranges %s", i,
+ removedConsumer.toString(), hash, selector.getConsumerKeyHashRanges()).isEqualTo(expected.consumerId());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testShouldNotChangeSelectedConsumerWhenConsumerIsRemovedCheckHashRanges() {
+ final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+ final String consumerName = "consumer";
+ final int numOfInitialConsumers = 25;
+ List consumers = new ArrayList<>();
+ for (int i = 0; i < numOfInitialConsumers; i++) {
+ final Consumer consumer = createMockConsumer(consumerName, "index " + i, i);
+ consumers.add(consumer);
+ selector.addConsumer(consumer);
+ }
+
+ Map> expected = selector.getConsumerKeyHashRanges();
+ assertThat(selector.getConsumerKeyHashRanges()).as("sanity check").containsExactlyInAnyOrderEntriesOf(expected);
+ System.out.println(expected);
+
+ for (Consumer removedConsumer : consumers) {
+ selector.removeConsumer(removedConsumer);
+ for (Map.Entry> entry : expected.entrySet()) {
+ if (entry.getKey() == removedConsumer) {
+ continue;
+ }
+ for (Range range : entry.getValue()) {
+ Consumer rangeStartConsumer = selector.select(range.getStart());
+ assertThat(rangeStartConsumer).as("removed %s, range %s", removedConsumer, range)
+ .isEqualTo(entry.getKey());
+ Consumer rangeEndConsumer = selector.select(range.getEnd());
+ assertThat(rangeEndConsumer).as("removed %s, range %s", removedConsumer, range)
+ .isEqualTo(entry.getKey());
+ assertThat(rangeStartConsumer).isSameAs(rangeEndConsumer);
+ }
+ }
+ expected = selector.getConsumerKeyHashRanges();
+ }
+ }
+
+ @Test
+ public void testShouldNotChangeSelectedConsumerUnnecessarilyWhenConsumerIsAddedCheckHashRanges() {
+ final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+ final String consumerName = "consumer";
+ final int numOfInitialConsumers = 25;
+ List consumers = new ArrayList<>();
+ for (int i = 0; i < numOfInitialConsumers; i++) {
+ final Consumer consumer = createMockConsumer(consumerName, "index " + i, i);
+ consumers.add(consumer);
+ selector.addConsumer(consumer);
+ }
+
+ Map> expected = selector.getConsumerKeyHashRanges();
+ assertThat(selector.getConsumerKeyHashRanges()).as("sanity check").containsExactlyInAnyOrderEntriesOf(expected);
+
+ for (int i = numOfInitialConsumers; i < numOfInitialConsumers * 2; i++) {
+ final Consumer addedConsumer = createMockConsumer(consumerName, "index " + i, i);
+ selector.addConsumer(addedConsumer);
+ for (Map.Entry> entry : expected.entrySet()) {
+ if (entry.getKey() == addedConsumer) {
+ continue;
+ }
+ for (Range range : entry.getValue()) {
+ Consumer rangeStartConsumer = selector.select(range.getStart());
+ if (rangeStartConsumer != addedConsumer) {
+ assertThat(rangeStartConsumer).as("added %s, range start %s", addedConsumer, range)
+ .isEqualTo(entry.getKey());
+ }
+ Consumer rangeEndConsumer = selector.select(range.getStart());
+ if (rangeEndConsumer != addedConsumer) {
+ assertThat(rangeEndConsumer).as("added %s, range end %s", addedConsumer, range)
+ .isEqualTo(entry.getKey());
+ }
+ }
+ }
+ expected = selector.getConsumerKeyHashRanges();
+ }
+ }
+
+ @Test
+ public void testShouldNotChangeSelectedConsumerWhenConsumerIsAdded() {
+ final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+ final String consumerName = "consumer";
+ final int numOfInitialConsumers = 50;
+ List consumers = new ArrayList<>();
+ for (int i = 0; i < numOfInitialConsumers; i++) {
+ final Consumer consumer = createMockConsumer(consumerName, "index " + i, i);
+ consumers.add(consumer);
+ selector.addConsumer(consumer);
+ }
+
+ int hashRangeSize = Integer.MAX_VALUE;
+ int validationPointCount = 200;
+ int increment = hashRangeSize / (validationPointCount + 1);
+ List selectedConsumerBeforeRemoval = new ArrayList<>();
+
+ for (int i = 0; i < validationPointCount; i++) {
+ selectedConsumerBeforeRemoval.add(selector.select(i * increment));
+ }
+
+ for (int i = 0; i < validationPointCount; i++) {
+ Consumer selected = selector.select(i * increment);
+ Consumer expected = selectedConsumerBeforeRemoval.get(i);
+ assertThat(selected.consumerId()).as("validationPoint %d", i).isEqualTo(expected.consumerId());
+ }
+
+ Set addedConsumers = new HashSet<>();
+ for (int i = numOfInitialConsumers; i < numOfInitialConsumers * 2; i++) {
+ final Consumer addedConsumer = createMockConsumer(consumerName, "index " + i, i);
+ selector.addConsumer(addedConsumer);
+ addedConsumers.add(addedConsumer);
+ for (int j = 0; j < validationPointCount; j++) {
+ int hash = j * increment;
+ Consumer selected = selector.select(hash);
+ Consumer expected = selectedConsumerBeforeRemoval.get(j);
+ if (!addedConsumers.contains(addedConsumer)) {
+ assertThat(selected.consumerId()).as("validationPoint %d, hash %d", j, hash).isEqualTo(expected.consumerId());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testShouldNotChangeMappingWhenConsumerLeavesAndRejoins() {
+ final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+ final String consumerName = "consumer";
+ final int numOfInitialConsumers = 25;
+ List consumers = new ArrayList<>();
+ for (int i = 0; i < numOfInitialConsumers; i++) {
+ final Consumer consumer = createMockConsumer(consumerName, "index " + i, i);
+ consumers.add(consumer);
+ selector.addConsumer(consumer);
+ }
+
+ Map> expected = selector.getConsumerKeyHashRanges();
+ assertThat(selector.getConsumerKeyHashRanges()).as("sanity check").containsExactlyInAnyOrderEntriesOf(expected);
+
+ selector.removeConsumer(consumers.get(0));
+ selector.removeConsumer(consumers.get(numOfInitialConsumers / 2));
+ selector.addConsumer(consumers.get(0));
+ selector.addConsumer(consumers.get(numOfInitialConsumers / 2));
+
+ assertThat(selector.getConsumerKeyHashRanges()).as("ranges shouldn't change").containsExactlyInAnyOrderEntriesOf(expected);
+ }
+
+ @Test
+ public void testConsumersReconnect() {
+ final ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+ final String consumerName = "consumer";
+ final int numOfInitialConsumers = 50;
+ final int validationPointCount = 200;
+ final List pointsToTest = pointsToTest(validationPointCount);
+ List consumers = new ArrayList<>();
+ for (int i = 0; i < numOfInitialConsumers; i++) {
+ final Consumer consumer = createMockConsumer(consumerName, "index " + i, i);
+ consumers.add(consumer);
+ selector.addConsumer(consumer);
+ }
+
+ // Mark original results.
+ List selectedConsumersBeforeRemove = new ArrayList<>();
+ for (int i = 0; i < validationPointCount; i++) {
+ int point = pointsToTest.get(i);
+ selectedConsumersBeforeRemove.add(selector.select(point));
+ }
+
+ // All consumers leave (in any order)
+ List randomOrderConsumers = new ArrayList<>(consumers);
+ Collections.shuffle(randomOrderConsumers);
+ for (Consumer c : randomOrderConsumers) {
+ selector.removeConsumer(c);
+ }
+
+ // All consumers reconnect in the same order as originally
+ for (Consumer c : consumers) {
+ selector.addConsumer(c);
+ }
+
+ // Check that the same consumers are selected as before
+ for (int j = 0; j < validationPointCount; j++) {
+ int point = pointsToTest.get(j);
+ Consumer selected = selector.select(point);
+ Consumer expected = selectedConsumersBeforeRemove.get(j);
+ assertThat(selected.consumerId()).as("validationPoint %d, hash %d", j, point).isEqualTo(expected.consumerId());
+ }
+ }
+
+ private List pointsToTest(int validationPointCount) {
+ List res = new ArrayList<>();
+ int hashRangeSize = Integer.MAX_VALUE;
+ final int increment = hashRangeSize / (validationPointCount + 1);
+ for (int i = 0; i < validationPointCount; i++) {
+ res.add(i * increment);
+ }
+ return res;
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java
new file mode 100644
index 0000000000000..75c8e6db5d2a0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerIdentityWrapperTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ConsumerIdentityWrapperTest {
+ private static Consumer mockConsumer() {
+ return mockConsumer("consumer");
+ }
+
+ private static Consumer mockConsumer(String consumerName) {
+ Consumer consumer = mock(Consumer.class);
+ when(consumer.consumerName()).thenReturn(consumerName);
+ return consumer;
+ }
+
+ @Test
+ public void testEquals() {
+ Consumer consumer = mockConsumer();
+ assertEquals(new ConsumerIdentityWrapper(consumer), new ConsumerIdentityWrapper(consumer));
+ }
+
+ @Test
+ public void testHashCode() {
+ Consumer consumer = mockConsumer();
+ assertEquals(new ConsumerIdentityWrapper(consumer).hashCode(),
+ new ConsumerIdentityWrapper(consumer).hashCode());
+ }
+
+ @Test
+ public void testEqualsAndHashCode() {
+ Consumer consumer1 = mockConsumer();
+ Consumer consumer2 = mockConsumer();
+ ConsumerIdentityWrapper wrapper1 = new ConsumerIdentityWrapper(consumer1);
+ ConsumerIdentityWrapper wrapper2 = new ConsumerIdentityWrapper(consumer1);
+ ConsumerIdentityWrapper wrapper3 = new ConsumerIdentityWrapper(consumer2);
+
+ // Test equality
+ assertEquals(wrapper1, wrapper2);
+ assertNotEquals(wrapper1, wrapper3);
+
+ // Test hash code
+ assertEquals(wrapper1.hashCode(), wrapper2.hashCode());
+ assertNotEquals(wrapper1.hashCode(), wrapper3.hashCode());
+ }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java
new file mode 100644
index 0000000000000..0f18ecce2ffb4
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerNameIndexTrackerTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class ConsumerNameIndexTrackerTest {
+ private ConsumerNameIndexTracker tracker;
+
+ @BeforeMethod
+ public void setUp() {
+ tracker = new ConsumerNameIndexTracker();
+ }
+
+ private static Consumer mockConsumer() {
+ return mockConsumer("consumer");
+ }
+
+
+ private static Consumer mockConsumer(String consumerName) {
+ Consumer consumer = mock(Consumer.class);
+ when(consumer.consumerName()).thenReturn(consumerName);
+ return consumer;
+ }
+
+ @Test
+ public void testIncreaseConsumerRefCountAndReturnIndex() {
+ Consumer consumer1 = mockConsumer();
+ Consumer consumer2 = mockConsumer();
+ ConsumerIdentityWrapper wrapper1 = new ConsumerIdentityWrapper(consumer1);
+ ConsumerIdentityWrapper wrapper2 = new ConsumerIdentityWrapper(consumer2);
+ int index1 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper1);
+ int index2 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper2);
+ assertNotEquals(index1, index2);
+ assertEquals(index1, tracker.getTrackedIndex(wrapper1));
+ assertEquals(index2, tracker.getTrackedIndex(wrapper2));
+ }
+
+ @Test
+ public void testTrackingReturnsStableIndexWhenRemovedAndAddedInSameOrder() {
+ List consumerIdentityWrappers =
+ IntStream.range(0, 100).mapToObj(i -> mockConsumer()).map(ConsumerIdentityWrapper::new).toList();
+ Map trackedIndexes =
+ consumerIdentityWrappers.stream().collect(Collectors.toMap(
+ wrapper -> wrapper, wrapper -> tracker.increaseConsumerRefCountAndReturnIndex(wrapper)));
+ // stop tracking every other consumer
+ for (int i = 0; i < consumerIdentityWrappers.size(); i++) {
+ if (i % 2 == 0) {
+ tracker.decreaseConsumerRefCount(consumerIdentityWrappers.get(i));
+ }
+ }
+ // check that others are tracked
+ for (int i = 0; i < consumerIdentityWrappers.size(); i++) {
+ ConsumerIdentityWrapper wrapper = consumerIdentityWrappers.get(i);
+ int trackedIndex = tracker.getTrackedIndex(wrapper);
+ assertEquals(trackedIndex, i % 2 == 0 ? -1 : trackedIndexes.get(wrapper));
+ }
+ // check that new consumers are tracked with the same index
+ for (int i = 0; i < consumerIdentityWrappers.size(); i++) {
+ ConsumerIdentityWrapper wrapper = consumerIdentityWrappers.get(i);
+ if (i % 2 == 0) {
+ int trackedIndex = tracker.increaseConsumerRefCountAndReturnIndex(wrapper);
+ assertEquals(trackedIndex, trackedIndexes.get(wrapper));
+ }
+ }
+ // check that all consumers are tracked with the original indexes
+ for (int i = 0; i < consumerIdentityWrappers.size(); i++) {
+ ConsumerIdentityWrapper wrapper = consumerIdentityWrappers.get(i);
+ int trackedIndex = tracker.getTrackedIndex(wrapper);
+ assertEquals(trackedIndex, trackedIndexes.get(wrapper));
+ }
+ }
+
+ @Test
+ public void testTrackingMultipleTimes() {
+ List consumerIdentityWrappers =
+ IntStream.range(0, 100).mapToObj(i -> mockConsumer()).map(ConsumerIdentityWrapper::new).toList();
+ Map trackedIndexes =
+ consumerIdentityWrappers.stream().collect(Collectors.toMap(
+ wrapper -> wrapper, wrapper -> tracker.increaseConsumerRefCountAndReturnIndex(wrapper)));
+ Map trackedIndexes2 =
+ consumerIdentityWrappers.stream().collect(Collectors.toMap(
+ wrapper -> wrapper, wrapper -> tracker.increaseConsumerRefCountAndReturnIndex(wrapper)));
+ assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(1);
+ assertThat(trackedIndexes).containsExactlyInAnyOrderEntriesOf(trackedIndexes2);
+ consumerIdentityWrappers.forEach(wrapper -> tracker.decreaseConsumerRefCount(wrapper));
+ for (ConsumerIdentityWrapper wrapper : consumerIdentityWrappers) {
+ int trackedIndex = tracker.getTrackedIndex(wrapper);
+ assertEquals(trackedIndex, trackedIndexes.get(wrapper));
+ }
+ consumerIdentityWrappers.forEach(wrapper -> tracker.decreaseConsumerRefCount(wrapper));
+ assertThat(tracker.getTrackedConsumersCount()).isEqualTo(0);
+ assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testDecreaseConsumerRefCount() {
+ Consumer consumer1 = mockConsumer();
+ ConsumerIdentityWrapper wrapper1 = new ConsumerIdentityWrapper(consumer1);
+ int index1 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper1);
+ assertNotEquals(index1, -1);
+ tracker.decreaseConsumerRefCount(wrapper1);
+ assertEquals(tracker.getTrackedIndex(wrapper1), -1);
+ }
+
+ @Test
+ public void testGetTrackedIndex() {
+ Consumer consumer1 = mockConsumer();
+ Consumer consumer2 = mockConsumer();
+ ConsumerIdentityWrapper wrapper1 = new ConsumerIdentityWrapper(consumer1);
+ ConsumerIdentityWrapper wrapper2 = new ConsumerIdentityWrapper(consumer2);
+ int index1 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper1);
+ int index2 = tracker.increaseConsumerRefCountAndReturnIndex(wrapper2);
+ assertEquals(index1, tracker.getTrackedIndex(wrapper1));
+ assertEquals(index2, tracker.getTrackedIndex(wrapper2));
+ }
+
+ @Test
+ public void testTrackingMultipleNames() {
+ List consumerIdentityWrappers =
+ IntStream.range(0, 100).mapToObj(i -> mockConsumer("consumer" + i)).map(ConsumerIdentityWrapper::new)
+ .toList();
+ consumerIdentityWrappers.forEach(wrapper -> tracker.increaseConsumerRefCountAndReturnIndex(wrapper));
+ assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(100);
+ assertThat(tracker.getTrackedConsumersCount()).isEqualTo(100);
+ consumerIdentityWrappers.forEach(wrapper -> tracker.decreaseConsumerRefCount(wrapper));
+ assertThat(tracker.getTrackedConsumersCount()).isEqualTo(0);
+ assertThat(tracker.getTrackedConsumerNamesCount()).isEqualTo(0);
+ }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index dcd852f409dbb..a0054f7e71425 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -20,6 +20,7 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
@@ -326,7 +327,7 @@ public void testSkipRedeliverTemporally() {
redeliverEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key1")));
final List readEntries = new ArrayList<>();
readEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1")));
- readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key22")));
+ readEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key2")));
try {
Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class.getDeclaredField("totalAvailablePermits");
@@ -417,7 +418,7 @@ public void testMessageRedelivery() throws Exception {
// Messages with key1 are routed to consumer1 and messages with key2 are routed to consumer2
final List allEntries = new ArrayList<>();
- allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key22")));
+ allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key2")));
allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1")));
allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key1")));
allEntries.forEach(entry -> ((EntryImpl) entry).retain());
@@ -518,8 +519,8 @@ public void testMessageRedelivery() throws Exception {
persistentDispatcher.readMoreEntries();
}
- assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1);
- assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2);
+ assertThat(actualEntriesToConsumer1).containsExactlyElementsOf(expectedEntriesToConsumer1);
+ assertThat(actualEntriesToConsumer2).containsExactlyElementsOf(expectedEntriesToConsumer2);
allEntries.forEach(entry -> entry.release());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index 1e8754a2d675c..5dfdfaa9802f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -726,7 +726,7 @@ public void testAddEntryOperationTimeout() throws Exception {
class MockLedgerHandle extends PulsarMockLedgerHandle {
public MockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd)
throws GeneralSecurityException {
- super(bk, id, digest, passwd);
+ super(bk, id, digest, passwd, null);
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
index 29b06f68b64eb..e2895b1d01e9f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java
@@ -18,11 +18,16 @@
*/
package org.apache.pulsar.client.impl;
-import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -37,6 +42,8 @@
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -49,8 +56,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import com.google.common.collect.Sets;
-import io.netty.util.concurrent.DefaultThreadFactory;
@Test(groups = "broker-impl")
public class MessageRedeliveryTest extends ProducerConsumerBase {
@@ -539,4 +544,57 @@ public void testMultiConsumerBatchRedeliveryAddEpoch(boolean enableBatch) throws
// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);
}
+
+ /**
+ * This test validates that client lib correctly increases permits of individual consumer to retrieve data in case
+ * of incorrect epoch for partition-topic multi-consumer.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRedeliveryWithMultiConsumerAndListenerAddEpoch() throws Exception {
+ final String topic = "testRedeliveryWithMultiConsumerAndListenerAddEpoch";
+ final String subName = "my-sub";
+ int totalMessages = 100;
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ Map ids = new ConcurrentHashMap<>();
+ CountDownLatch latch = new CountDownLatch(totalMessages);
+ MessageListener msgListener = (Consumer consumer, Message msg) -> {
+ String id = msg.getMessageId().toString();
+ consumer.acknowledgeCumulativeAsync(msg);
+ if (ids.put(msg.getMessageId(), id) == null) {
+ latch.countDown();
+ }
+ };
+ @Cleanup
+ Consumer newConsumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subName)
+ .messageListener(msgListener).subscriptionType(SubscriptionType.Failover)
+ .receiverQueueSize(totalMessages / 10).subscribe();
+
+ MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl) newConsumer;
+ long epoch = consumer.getConsumerEpoch() + 1;
+ consumer.setConsumerEpoch(epoch);
+ @Cleanup
+ Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false)
+ .create();
+
+ for (int i = 0; i < totalMessages; i++) {
+ producer.sendAsync("test" + i);
+ }
+ producer.flush();
+
+ // make sure listener has not received any messages until
+ // we call redelivery with correct epoch
+ for (int i = 0; i < 2; i++) {
+ assertTrue(ids.isEmpty());
+ Thread.sleep(1000);
+ }
+ // make epoch valid to consume redelivery message again
+ consumer.setConsumerEpoch(epoch - 1);
+ consumer.redeliverUnacknowledgedMessages();
+
+ latch.await(10, TimeUnit.SECONDS);
+ assertEquals(ids.size(), totalMessages);
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
index 4437ffc4ac6a2..488083f484b76 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Range.java
@@ -27,7 +27,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public class Range {
+public class Range implements Comparable {
private final int start;
private final int end;
@@ -84,4 +84,13 @@ public int hashCode() {
public String toString() {
return "[" + start + ", " + end + "]";
}
+
+ @Override
+ public int compareTo(Range o) {
+ int result = Integer.compare(start, o.start);
+ if (result == 0) {
+ result = Integer.compare(end, o.end);
+ }
+ return result;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 03ccbae01c276..b7010a1ddc7b4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1842,6 +1842,9 @@ protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Sending permit-cmd to broker with available permits = {}", topic, available);
+ }
sendFlowPermitsToBroker(currentCnx, available);
break;
} else {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 513c0101ac6ac..ff293af230838 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -270,8 +270,13 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer, boolean batchR
// Process the message, add to the queue and trigger listener or async callback
messages.forEach(msg -> {
final boolean skipDueToSeek = duringSeek;
- if (isValidConsumerEpoch((MessageImpl) msg) && !skipDueToSeek) {
+ MessageImpl msgImpl = (MessageImpl) msg;
+ ClientCnx cnx = msgImpl.getCnx();
+ boolean isValidEpoch = isValidConsumerEpoch(msgImpl);
+ if (isValidEpoch && !skipDueToSeek) {
messageReceived(consumer, msg);
+ } else if (!isValidEpoch) {
+ consumer.increaseAvailablePermits(cnx);
} else if (skipDueToSeek) {
log.info("[{}] [{}] Skip processing message {} received during seek", topic, subscription,
msg.getMessageId());
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
index 4516cfea01f05..f1a9694b3d165 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java
@@ -124,7 +124,7 @@ public void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSiz
long id = sequence.getAndIncrement();
log.info("Creating ledger {}", id);
PulsarMockLedgerHandle lh =
- new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd);
+ new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd, properties);
ledgers.put(id, lh);
return FutureUtils.value(lh);
} catch (Throwable t) {
@@ -147,7 +147,7 @@ public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorum
try {
long id = sequence.getAndIncrement();
log.info("Creating ledger {}", id);
- PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(this, id, digestType, passwd);
+ PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(this, id, digestType, passwd, null);
ledgers.put(id, lh);
return lh;
} catch (Throwable t) {
diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
index aa61e541d0d6b..400065e41b7f0 100644
--- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
+++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockLedgerHandle.java
@@ -26,8 +26,10 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
@@ -65,8 +67,10 @@ public class PulsarMockLedgerHandle extends LedgerHandle {
boolean fenced = false;
public PulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id,
- DigestType digest, byte[] passwd) throws GeneralSecurityException {
- super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, digest, passwd), new LongVersion(0L)),
+ DigestType digest, byte[] passwd,
+ Map properties) throws GeneralSecurityException {
+ super(bk.getClientCtx(), id, new Versioned<>(createMetadata(id, digest, passwd, properties),
+ new LongVersion(0L)),
digest, passwd, WriteFlag.NONE);
this.bk = bk;
this.id = id;
@@ -267,13 +271,15 @@ public CompletableFuture readLastAddConfirmedAndEntryAsyn
return readHandle.readLastAddConfirmedAndEntryAsync(entryId, timeOutInMillis, parallel);
}
- private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd) {
+ private static LedgerMetadata createMetadata(long id, DigestType digest, byte[] passwd,
+ Map properties) {
List ensemble = new ArrayList<>(PulsarMockBookKeeper.getMockEnsemble());
return LedgerMetadataBuilder.create()
.withDigestType(digest.toApiDigestType())
.withPassword(passwd)
.withId(id)
.newEnsembleEntry(0L, ensemble)
+ .withCustomMetadata(properties != null ? properties : Collections.emptyMap())
.build();
}