Skip to content

Commit 0d4220f

Browse files
committed
Pin executor to dispatcher instance to cache the string hashing
1 parent b579bb8 commit 0d4220f

File tree

4 files changed

+35
-20
lines changed

4 files changed

+35
-20
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Objects;
2727
import java.util.Optional;
2828
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.Executor;
2930
import java.util.concurrent.ScheduledFuture;
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,6 +62,8 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
6162

6263
private final AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
6364
protected final PersistentTopic topic;
65+
protected final Executor topicExecutor;
66+
protected final Executor dispatcherExecutor;
6467
protected final String name;
6568
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
6669

@@ -77,8 +80,10 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
7780
super(subscriptionType, partitionIndex, topic.getName(), subscription,
7881
topic.getBrokerService().pulsar().getConfiguration(), cursor);
7982
this.topic = topic;
83+
this.topicExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topicName);
8084
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
8185
: ""/* NonDurableCursor doesn't have name */);
86+
this.dispatcherExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(name);
8287
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
8388
this.readFailureBackoff = new Backoff(serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(),
8489
TimeUnit.MILLISECONDS, serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(),
@@ -145,7 +150,7 @@ protected void cancelPendingRead() {
145150

146151
@Override
147152
public void readEntriesComplete(final List<Entry> entries, Object obj) {
148-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
153+
topicExecutor.execute(SafeRun.safeRun(() -> {
149154
internalReadEntriesComplete(entries, obj);
150155
}));
151156
}
@@ -225,8 +230,7 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
225230
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
226231

227232
// Schedule a new read batch operation only after the previous batch has been written to the socket.
228-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
229-
SafeRun.safeRun(() -> {
233+
topicExecutor.execute(SafeRun.safeRun(() -> {
230234
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
231235
Consumer newConsumer = getActiveConsumer();
232236
readMoreEntries(newConsumer);
@@ -238,7 +242,7 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
238242

239243
@Override
240244
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
241-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
245+
topicExecutor.execute(SafeRun.safeRun(() -> {
242246
internalConsumerFlow(consumer);
243247
}));
244248
}
@@ -269,7 +273,7 @@ private synchronized void internalConsumerFlow(Consumer consumer) {
269273

270274
@Override
271275
public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
272-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
276+
topicExecutor.execute(SafeRun.safeRun(() -> {
273277
internalRedeliverUnacknowledgedMessages(consumer, consumerEpoch);
274278
}));
275279
}
@@ -463,7 +467,7 @@ protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
463467

464468
@Override
465469
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
466-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
470+
topicExecutor.execute(SafeRun.safeRun(() -> {
467471
internalReadEntriesFailed(exception, ctx);
468472
}));
469473
}
@@ -513,7 +517,7 @@ private synchronized void internalReadEntriesFailed(ManagedLedgerException excep
513517
topic.getBrokerService().executor().schedule(() -> {
514518

515519
// Jump again into dispatcher dedicated thread
516-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
520+
topicExecutor.execute(SafeRun.safeRun(() -> {
517521
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
518522
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
519523
// we should retry the read if we have an active consumer and there is no pending read

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
2222
import com.google.common.collect.Lists;
2323
import java.util.Set;
24+
import java.util.concurrent.Executor;
2425
import java.util.concurrent.TimeUnit;
2526
import lombok.extern.slf4j.Slf4j;
2627
import org.apache.bookkeeper.mledger.Entry;
@@ -47,10 +48,12 @@ public class PersistentStreamingDispatcherMultipleConsumers extends PersistentDi
4748
private int sendingTaskCounter = 0;
4849
private final StreamingEntryReader streamingEntryReader = new StreamingEntryReader((ManagedCursorImpl) cursor,
4950
this, topic);
51+
private final Executor topicExecutor;
5052

5153
public PersistentStreamingDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
5254
Subscription subscription) {
5355
super(topic, cursor, subscription);
56+
this.topicExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
5457
}
5558

5659
/**
@@ -126,7 +129,7 @@ public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest
126129
public void canReadMoreEntries(boolean withBackoff) {
127130
havePendingRead = false;
128131
topic.getBrokerService().executor().schedule(() -> {
129-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(), SafeRun.safeRun(() -> {
132+
topicExecutor.execute(SafeRun.safeRun(() -> {
130133
synchronized (PersistentStreamingDispatcherMultipleConsumers.this) {
131134
if (!havePendingRead) {
132135
log.info("[{}] Scheduling read operation", name);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public PersistentStreamingDispatcherSingleActiveConsumer(ManagedCursor cursor, S
6464
public void canReadMoreEntries(boolean withBackoff) {
6565
havePendingRead = false;
6666
topic.getBrokerService().executor().schedule(() -> {
67-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
67+
topicExecutor.execute(SafeRun.safeRun(() -> {
6868
synchronized (PersistentStreamingDispatcherSingleActiveConsumer.this) {
6969
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
7070
if (currentConsumer != null && !havePendingRead) {
@@ -111,7 +111,7 @@ public String getName() {
111111
*/
112112
@Override
113113
public void readEntryComplete(Entry entry, PendingReadEntryRequest ctx) {
114-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(name, safeRun(() -> {
114+
dispatcherExecutor.execute(safeRun(() -> {
115115
internalReadEntryComplete(entry, ctx);
116116
}));
117117
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java

+18-10
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import java.util.List;
2323
import java.util.Queue;
2424
import java.util.concurrent.ConcurrentLinkedQueue;
25+
import java.util.concurrent.Executor;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
28-
import lombok.RequiredArgsConstructor;
2929
import lombok.extern.slf4j.Slf4j;
3030
import org.apache.bookkeeper.mledger.AsyncCallbacks;
3131
import org.apache.bookkeeper.mledger.Entry;
@@ -44,7 +44,6 @@
4444
* Entry reader that fulfill read request by streamline the read instead of reading with micro batch.
4545
*/
4646
@Slf4j
47-
@RequiredArgsConstructor
4847
public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, WaitingEntryCallBack {
4948

5049
private final int maxRetry = 3;
@@ -61,6 +60,10 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W
6160

6261
private final PersistentTopic topic;
6362

63+
private final Executor topicExecutor;
64+
65+
private final Executor dispatcherExecutor;
66+
6467
private AtomicInteger currentReadSizeByte = new AtomicInteger(0);
6568

6669
private volatile State state;
@@ -73,6 +76,14 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W
7376
private final Backoff readFailureBackoff = new Backoff(10, TimeUnit.MILLISECONDS,
7477
1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS);
7578

79+
public StreamingEntryReader(ManagedCursorImpl cursor, StreamingDispatcher dispatcher, PersistentTopic topic) {
80+
this.cursor = cursor;
81+
this.dispatcher = dispatcher;
82+
this.topic = topic;
83+
this.topicExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(topic.getName());
84+
this.dispatcherExecutor = topic.getBrokerService().getTopicOrderedExecutor().chooseThread(dispatcher.getName());
85+
}
86+
7687
/**
7788
* Read entries in streaming way, that said instead of reading with micro batch and send entries to consumer after
7889
* all entries in the batch are read from ledger, this method will fire numEntriesToRead requests to managedLedger
@@ -146,7 +157,7 @@ public synchronized void asyncReadEntries(int numEntriesToRead, long maxReadSize
146157
@Override
147158
public void readEntryComplete(Entry entry, Object ctx) {
148159
// Don't block caller thread, complete read entry with dispatcher dedicated thread.
149-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(), SafeRun.safeRun(() -> {
160+
dispatcherExecutor.execute(SafeRun.safeRun(() -> {
150161
internalReadEntryComplete(entry, ctx);
151162
}));
152163
}
@@ -187,7 +198,7 @@ private void internalReadEntryComplete(Entry entry, Object ctx) {
187198
@Override
188199
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
189200
// Don't block caller thread, complete read entry fail with dispatcher dedicated thread.
190-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(), SafeRun.safeRun(() -> {
201+
dispatcherExecutor.execute(SafeRun.safeRun(() -> {
191202
internalReadEntryFailed(exception, ctx);
192203
}));
193204
}
@@ -246,7 +257,7 @@ private void internalCancelReadRequests() {
246257
public boolean cancelReadRequests() {
247258
if (STATE_UPDATER.compareAndSet(this, State.Issued, State.Canceling)) {
248259
// Don't block caller thread, complete cancel read with dispatcher dedicated thread.
249-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topic.getName(), SafeRun.safeRun(() -> {
260+
topicExecutor.execute(SafeRun.safeRun(() -> {
250261
synchronized (StreamingEntryReader.this) {
251262
if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) {
252263
internalCancelReadRequests();
@@ -271,8 +282,7 @@ private void cleanQueue(Queue<PendingReadEntryRequest> queue) {
271282
private void retryReadRequest(PendingReadEntryRequest pendingReadEntryRequest, long delay) {
272283
topic.getBrokerService().executor().schedule(() -> {
273284
// Jump again into dispatcher dedicated thread
274-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(),
275-
SafeRun.safeRun(() -> {
285+
dispatcherExecutor.execute(SafeRun.safeRun(() -> {
276286
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) cursor.getManagedLedger();
277287
managedLedger.asyncReadEntry(pendingReadEntryRequest.position, this, pendingReadEntryRequest);
278288
}));
@@ -281,9 +291,7 @@ private void retryReadRequest(PendingReadEntryRequest pendingReadEntryRequest, l
281291

282292
@Override
283293
public void entriesAvailable() {
284-
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(dispatcher.getName(), SafeRun.safeRun(() -> {
285-
internalEntriesAvailable();
286-
}));
294+
dispatcherExecutor.execute(SafeRun.safeRun(this::internalEntriesAvailable));
287295
}
288296

289297
private synchronized void internalEntriesAvailable() {

0 commit comments

Comments
 (0)