diff --git a/dubbo-common/pom.xml b/dubbo-common/pom.xml
index 041a674c1944..ddfb1474e1b1 100644
--- a/dubbo-common/pom.xml
+++ b/dubbo-common/pom.xml
@@ -99,6 +99,11 @@
javax.annotationjavax.annotation-api
+
+
+ org.jctools
+ jctools-core
+ cglib
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java
index 1fc08f97caeb..a58fe4d6b59f 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/timer/HashedWheelTimer.java
@@ -5,7 +5,7 @@
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
@@ -18,17 +18,39 @@
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.ClassUtils;
-import org.apache.dubbo.common.utils.SystemPropertyConfigUtils;
+import org.jctools.queues.MpscChunkedArrayQueue;
+import org.jctools.queues.MpscUnboundedArrayQueue;
+import org.jctools.queues.atomic.MpscChunkedAtomicArrayQueue;
+import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
+import org.jctools.util.Pow2;
+import org.jctools.util.UnsafeAccess;
+import sun.misc.Unsafe;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.AccessibleObject;
+import java.lang.reflect.Array;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.HashSet;
import java.util.Locale;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -37,16 +59,19 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
+import static java.lang.invoke.MethodType.methodType;
import static org.apache.dubbo.common.constants.CommonConstants.SystemProperty.SYSTEM_OS_NAME;
import static org.apache.dubbo.common.constants.CommonConstants.OS_WIN_PREFIX;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_ERROR_RUN_THREAD_TASK;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_ERROR_TOO_MANY_INSTANCES;
+import static org.apache.dubbo.common.constants.LoggerCodeConstants.COMMON_UNEXPECTED_EXCEPTION;
+import static org.apache.dubbo.common.utils.ClassUtils.simpleClassName;
/**
* A {@link Timer} optimized for approximated I/O timeout scheduling.
*
*
Tick Duration
- *
+ *
* As described with 'approximated', this timer does not execute the scheduled
* {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
* check if there are any {@link TimerTask}s behind the schedule and execute
@@ -55,33 +80,33 @@
* You can increase or decrease the accuracy of the execution timing by
* specifying smaller or larger tick duration in the constructor. In most
* network applications, I/O timeout does not need to be accurate. Therefore,
- * the default tick duration is 100 milliseconds, and you will not need to try
+ * the default tick duration is 100 milliseconds and you will not need to try
* different configurations in most cases.
*
*
Ticks per Wheel (Wheel Size)
- *
+ *
* {@link HashedWheelTimer} maintains a data structure called 'wheel'.
* To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
- * function is 'deadline of the task'. The default number of ticks per wheel
+ * function is 'dead line of the task'. The default number of ticks per wheel
* (i.e. the size of the wheel) is 512. You could specify a larger value
* if you are going to schedule a lot of timeouts.
*
*
Do not create many instances.
- *
+ *
* {@link HashedWheelTimer} creates a new thread whenever it is instantiated and
* started. Therefore, you should make sure to create only one instance and
* share it across your application. One of the common mistakes, that makes
* your application unresponsive, is to create a new instance for every connection.
*
*
Implementation Details
- *
+ *
* {@link HashedWheelTimer} is based on
- * George Varghese and
+ * George Varghese and
* Tony Lauck's paper,
- * 'Hashed
+ * 'Hashed
* and Hierarchical Timing Wheels: data structures to efficiently implement a
* timer facility'. More comprehensive slides are located
- * here.
+ * here.
*/
public class HashedWheelTimer implements Timer {
@@ -95,15 +120,17 @@ public class HashedWheelTimer implements Timer {
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();
private static final int INSTANCE_COUNT_LIMIT = 64;
+ private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);
+
private static final AtomicIntegerFieldUpdater WORKER_STATE_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
+ AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
private final Worker worker = new Worker();
private final Thread workerThread;
- private static final int WORKER_STATE_INIT = 0;
- private static final int WORKER_STATE_STARTED = 1;
- private static final int WORKER_STATE_SHUTDOWN = 2;
+ public static final int WORKER_STATE_INIT = 0;
+ public static final int WORKER_STATE_STARTED = 1;
+ public static final int WORKER_STATE_SHUTDOWN = 2;
/**
* 0 - init, 1 - started, 2 - shut down
@@ -115,10 +142,11 @@ public class HashedWheelTimer implements Timer {
private final HashedWheelBucket[] wheel;
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
- private final Queue timeouts = new LinkedBlockingQueue<>();
- private final Queue cancelledTimeouts = new LinkedBlockingQueue<>();
+ private final Queue timeouts = newMpscQueue();
+ private final Queue cancelledTimeouts = newMpscQueue();
private final AtomicLong pendingTimeouts = new AtomicLong(0);
private final long maxPendingTimeouts;
+ private final Executor taskExecutor;
private volatile long startTime;
@@ -184,7 +212,7 @@ public HashedWheelTimer(ThreadFactory threadFactory) {
* @throws IllegalArgumentException if {@code tickDuration} is <= 0
*/
public HashedWheelTimer(
- ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
+ ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
this(threadFactory, tickDuration, unit, 512);
}
@@ -201,32 +229,60 @@ public HashedWheelTimer(
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
*/
public HashedWheelTimer(
- ThreadFactory threadFactory,
- long tickDuration, TimeUnit unit, int ticksPerWheel) {
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, -1);
}
/**
* Creates a new timer.
*
- * @param threadFactory a {@link ThreadFactory} that creates a
- * background {@link Thread} which is dedicated to
- * {@link TimerTask} execution.
- * @param tickDuration the duration between tick
- * @param unit the time unit of the {@code tickDuration}
- * @param ticksPerWheel the size of the wheel
- * @param maxPendingTimeouts The maximum number of pending timeouts after which call to
- * {@code newTimeout} will result in
- * {@link java.util.concurrent.RejectedExecutionException}
- * being thrown. No maximum pending timeouts limit is assumed if
- * this value is 0 or negative.
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @param maxPendingTimeouts The maximum number of pending timeouts after which call to
+ * {@code newTimeout} will result in
+ * {@link java.util.concurrent.RejectedExecutionException}
+ * being thrown. No maximum pending timeouts limit is assumed if
+ * this value is 0 or negative.
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
*/
public HashedWheelTimer(
- ThreadFactory threadFactory,
- long tickDuration, TimeUnit unit, int ticksPerWheel,
- long maxPendingTimeouts) {
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel,
+ long maxPendingTimeouts) {
+ this(threadFactory, tickDuration, unit, ticksPerWheel,
+ maxPendingTimeouts, ImmediateExecutor.INSTANCE);
+ }
+
+ /**
+ * Creates a new timer.
+ *
+ * @param threadFactory a {@link ThreadFactory} that creates a
+ * background {@link Thread} which is dedicated to
+ * {@link TimerTask} execution.
+ * @param tickDuration the duration between tick
+ * @param unit the time unit of the {@code tickDuration}
+ * @param ticksPerWheel the size of the wheel
+ * @param maxPendingTimeouts The maximum number of pending timeouts after which call to
+ * {@code newTimeout} will result in
+ * {@link java.util.concurrent.RejectedExecutionException}
+ * being thrown. No maximum pending timeouts limit is assumed if
+ * this value is 0 or negative.
+ * @param taskExecutor The {@link Executor} that is used to execute the submitted {@link TimerTask}s.
+ * The caller is responsible to shutdown the {@link Executor} once it is not needed
+ * anymore.
+ * @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
+ * @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is <= 0
+ */
+ public HashedWheelTimer(
+ ThreadFactory threadFactory,
+ long tickDuration, TimeUnit unit, int ticksPerWheel,
+ long maxPendingTimeouts, Executor taskExecutor) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
@@ -240,26 +296,38 @@ public HashedWheelTimer(
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
+ if (taskExecutor == null) {
+ throw new NullPointerException("taskExecutor");
+ }
+ this.taskExecutor = taskExecutor;
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
// Convert tickDuration to nanos.
- this.tickDuration = unit.toNanos(tickDuration);
+ long duration = unit.toNanos(tickDuration);
// Prevent overflow.
- if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
+ if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
- "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
- tickDuration, Long.MAX_VALUE / wheel.length));
+ "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
+ tickDuration, Long.MAX_VALUE / wheel.length));
+ }
+
+ if (duration < MILLISECOND_NANOS) {
+ logger.warn(COMMON_UNEXPECTED_EXCEPTION, "", "", "Configured tickDuration " + tickDuration + " smaller than " + MILLISECOND_NANOS + ", using 1ms.");
+ this.tickDuration = MILLISECOND_NANOS;
+ } else {
+ this.tickDuration = duration;
}
+
workerThread = threadFactory.newThread(worker);
this.maxPendingTimeouts = maxPendingTimeouts;
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
- WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
+ WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
@@ -278,42 +346,25 @@ protected void finalize() throws Throwable {
}
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
- if (ticksPerWheel <= 0) {
- throw new IllegalArgumentException(
- "ticksPerWheel must be greater than 0: " + ticksPerWheel);
- }
- if (ticksPerWheel > 1073741824) {
- throw new IllegalArgumentException(
- "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
- }
+ ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
- ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
- for (int i = 0; i < wheel.length; i++) {
+ for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
- private static int normalizeTicksPerWheel(int ticksPerWheel) {
- int normalizedTicksPerWheel = ticksPerWheel - 1;
- normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 1;
- normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 2;
- normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 4;
- normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 8;
- normalizedTicksPerWheel |= normalizedTicksPerWheel >>> 16;
- return normalizedTicksPerWheel + 1;
- }
-
/**
- * Starts the background thread explicitly. The background thread will
+ * Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
- * {@linkplain #stop() stopped} already
+ * {@linkplain #stop() stopped} already
*/
public void start() {
- switch (WORKER_STATE_UPDATER.get(this)) {
+ int state = WORKER_STATE_UPDATER.get(this);
+ switch (state) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
@@ -324,7 +375,7 @@ public void start() {
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
- throw new Error("Invalid WorkerState");
+ throw new Error("Invalid WorkerState: " + state);
}
// Wait until the startTime is initialized by the worker.
@@ -341,9 +392,9 @@ public void start() {
public Set stop() {
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
- HashedWheelTimer.class.getSimpleName() +
- ".stop() cannot be called from " +
- TimerTask.class.getSimpleName());
+ HashedWheelTimer.class.getSimpleName() +
+ ".stop() cannot be called from " +
+ TimerTask.class.getSimpleName());
}
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
@@ -372,7 +423,14 @@ public Set stop() {
} finally {
INSTANCE_COUNTER.decrementAndGet();
}
- return worker.unprocessedTimeouts();
+ Set unprocessed = worker.unprocessedTimeouts();
+ Set cancelled = new HashSet(unprocessed.size());
+ for (Timeout timeout : unprocessed) {
+ if (timeout.cancel()) {
+ cancelled.add(timeout);
+ }
+ }
+ return cancelled;
}
@Override
@@ -394,8 +452,8 @@ public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
- + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
- + "timeouts (" + maxPendingTimeouts + ")");
+ + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ + "timeouts (" + maxPendingTimeouts + ")");
}
start();
@@ -421,14 +479,16 @@ public long pendingTimeouts() {
}
private static void reportTooManyInstances() {
- String resourceType = ClassUtils.simpleClassName(HashedWheelTimer.class);
- logger.error(COMMON_ERROR_TOO_MANY_INSTANCES, "", "", "You are creating too many " + resourceType + " instances. " +
- resourceType + " is a shared resource that must be reused across the JVM, " +
- "so that only a few instances are created.");
+ if (logger.isErrorEnabled()) {
+ String resourceType = simpleClassName(HashedWheelTimer.class);
+ logger.error(COMMON_ERROR_TOO_MANY_INSTANCES, "", "", "You are creating too many " + resourceType + " instances. " +
+ resourceType + " is a shared resource that must be reused across the JVM, " +
+ "so that only a few instances are created.");
+ }
}
private final class Worker implements Runnable {
- private final Set unprocessedTimeouts = new HashSet<>();
+ private final Set unprocessedTimeouts = new HashSet();
private long tick;
@@ -450,7 +510,7 @@ public void run() {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
- wheel[idx];
+ wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
@@ -458,10 +518,10 @@ public void run() {
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
- for (HashedWheelBucket bucket : wheel) {
+ for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
- for (; ; ) {
+ for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
@@ -507,7 +567,7 @@ private void processCancelledTasks() {
break;
}
try {
- timeout.remove();
+ timeout.removeAfterCancellation();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(COMMON_ERROR_RUN_THREAD_TASK, "", "", "An exception was thrown while process a cancellation task", t);
@@ -537,8 +597,17 @@ private long waitForNextTick() {
return currentTime;
}
}
+
+ // Check if we run on windows, as if thats the case we will need
+ // to round the sleepTime as workaround for a bug that only affect
+ // the JVM if it runs on windows.
+ //
+ // See https://github.com/netty/netty/issues/356
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
+ if (sleepTimeMs == 0) {
+ sleepTimeMs = 1;
+ }
}
try {
@@ -551,42 +620,141 @@ private long waitForNextTick() {
}
}
- Set unprocessedTimeouts() {
+ public Set unprocessedTimeouts() {
return Collections.unmodifiableSet(unprocessedTimeouts);
}
}
- private static final class HashedWheelTimeout implements Timeout {
+ private static final class ImmediateExecutor implements Executor {
+ public static final ImmediateExecutor INSTANCE = new ImmediateExecutor();
+
+ private ImmediateExecutor() {
+ // use static instance
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ if (command == null) {
+ throw new NullPointerException("command");
+ }
+ command.run();
+ }
+ }
+
+ private static final class MathUtil {
+
+ private MathUtil() {
+ }
+
+ /**
+ * Fast method of finding the next power of 2 greater than or equal to the supplied value.
+ *
+ *
If the value is {@code <= 0} then 1 will be returned.
+ * This method is not suitable for {@link Integer#MIN_VALUE} or numbers greater than 2^30.
+ *
+ * @param value from which to search for next power of 2
+ * @return The next power of 2 or the value itself if it is a power of 2
+ */
+ public static int findNextPositivePowerOfTwo(final int value) {
+ if (value <= Integer.MIN_VALUE) {
+ throw new IllegalArgumentException(
+ "ticksPerWheel must be greater than " + Integer.MIN_VALUE + ": " + value);
+ }
+ if (value >= 0x40000000) {
+ throw new IllegalArgumentException(
+ "ticksPerWheel must be less than or equal to 0x40000000: " + value);
+ }
+ return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
+ }
+
+ /**
+ * Fast method of finding the next power of 2 greater than or equal to the supplied value.
+ *
This method will do runtime bounds checking and call {@link #findNextPositivePowerOfTwo(int)} if within a
+ * valid range.
+ * @param value from which to search for next power of 2
+ * @return The next power of 2 or the value itself if it is a power of 2.
+ *
Special cases for return values are as follows:
+ *
+ *
{@code <= 0} -> 1
+ *
{@code >= 2^30} -> 2^30
+ *
+ */
+ public static int safeFindNextPositivePowerOfTwo(final int value) {
+ return value <= 0 ? 1 : value >= 0x40000000 ? 0x40000000 : findNextPositivePowerOfTwo(value);
+ }
+
+ /**
+ * Determine if the requested {@code index} and {@code length} will fit within {@code capacity}.
+ * @param index The starting index.
+ * @param length The length which will be utilized (starting from {@code index}).
+ * @param capacity The capacity that {@code index + length} is allowed to be within.
+ * @return {@code false} if the requested {@code index} and {@code length} will fit within {@code capacity}.
+ * {@code true} if this would result in an index out of bounds exception.
+ */
+ public static boolean isOutOfBounds(int index, int length, int capacity) {
+ return (index | length | capacity | index + length) < 0 || index + length > capacity;
+ }
+
+ /**
+ * @deprecated not used anymore. User Integer.compare() instead. For removal.
+ * Compares two {@code int} values.
+ *
+ * @param x the first {@code int} to compare
+ * @param y the second {@code int} to compare
+ * @return the value {@code 0} if {@code x == y};
+ * {@code -1} if {@code x < y}; and
+ * {@code 1} if {@code x > y}
+ */
+ @Deprecated
+ public static int compare(final int x, final int y) {
+ // do not subtract for comparison, it could overflow
+ return Integer.compare(x, y);
+ }
+
+ /**
+ * @deprecated not used anymore. User Long.compare() instead. For removal.
+ * Compare two {@code long} values.
+ * @param x the first {@code long} to compare.
+ * @param y the second {@code long} to compare.
+ * @return
+ *
+ *
0 if {@code x == y}
+ *
{@code > 0} if {@code x > y}
+ *
{@code < 0} if {@code x < y}
+ *
+ */
+ @Deprecated
+ public static int compare(long x, long y) {
+ return Long.compare(x, y);
+ }
+
+ }
+
+ private static final class HashedWheelTimeout implements Timeout, Runnable {
private static final int ST_INIT = 0;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private static final AtomicIntegerFieldUpdater STATE_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
+ AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
private final HashedWheelTimer timer;
private final TimerTask task;
private final long deadline;
- @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
+ @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
private volatile int state = ST_INIT;
- /**
- * RemainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
- * HashedWheelTimeout will be added to the correct HashedWheelBucket.
- */
+ // remainingRounds will be calculated and set by Worker.transferTimeoutsToBuckets() before the
+ // HashedWheelTimeout will be added to the correct HashedWheelBucket.
long remainingRounds;
- /**
- * This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
- * As only the workerThread will act on it there is no need for synchronization / volatile.
- */
+ // This will be used to chain timeouts in HashedWheelTimerBucket via a double-linked-list.
+ // As only the workerThread will act on it there is no need for synchronization / volatile.
HashedWheelTimeout next;
HashedWheelTimeout prev;
- /**
- * The bucket to which the timeout was added
- */
+ // The bucket to which the timeout was added
HashedWheelBucket bucket;
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
@@ -612,19 +780,21 @@ public boolean cancel() {
return false;
}
// If a task should be canceled we put this to another queue which will be processed on each tick.
- // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way we
- // can make again use of our LinkedBlockingQueue and so minimize the locking / overhead as much as possible.
+ // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way
+ // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible.
timer.cancelledTimeouts.add(this);
return true;
}
- void remove() {
+ private void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
- } else {
- timer.pendingTimeouts.decrementAndGet();
}
+ timer.pendingTimeouts.decrementAndGet();
+ }
+ void removeAfterCancellation() {
+ remove();
}
public boolean compareAndSetState(int expected, int state) {
@@ -650,6 +820,19 @@ public void expire() {
return;
}
+ try {
+ remove();
+ timer.taskExecutor.execute(this);
+ } catch (Throwable t) {
+ if (logger.isWarnEnabled()) {
+ logger.warn(COMMON_ERROR_RUN_THREAD_TASK, "", "", "An exception was thrown while submit " + TimerTask.class.getSimpleName()
+ + " for execution.", t);
+ }
+ }
+ }
+
+ @Override
+ public void run() {
try {
task.run(this);
} catch (Throwable t) {
@@ -663,18 +846,17 @@ public void expire() {
public String toString() {
final long currentTime = System.nanoTime();
long remaining = deadline - currentTime + timer.startTime;
- String simpleClassName = ClassUtils.simpleClassName(this.getClass());
StringBuilder buf = new StringBuilder(192)
- .append(simpleClassName)
- .append('(')
- .append("deadline: ");
+ .append(simpleClassName(this.getClass()))
+ .append('(')
+ .append("deadline: ");
if (remaining > 0) {
buf.append(remaining)
- .append(" ns later");
+ .append(" ns later");
} else if (remaining < 0) {
buf.append(-remaining)
- .append(" ns ago");
+ .append(" ns ago");
} else {
buf.append("now");
}
@@ -684,9 +866,9 @@ public String toString() {
}
return buf.append(", task: ")
- .append(task())
- .append(')')
- .toString();
+ .append(task())
+ .append(')')
+ .toString();
}
}
@@ -706,7 +888,7 @@ private static final class HashedWheelBucket {
/**
* Add {@link HashedWheelTimeout} to this bucket.
*/
- void addTimeout(HashedWheelTimeout timeout) {
+ public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;
timeout.bucket = this;
if (head == null) {
@@ -721,65 +903,57 @@ void addTimeout(HashedWheelTimeout timeout) {
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
- void expireTimeouts(long deadline) {
+ public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
- next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
- "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
+ "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
- } else if (timeout.isCancelled()) {
- next = remove(timeout);
- } else {
- timeout.remainingRounds--;
+ } else if (!timeout.isCancelled()) {
+ timeout.remainingRounds --;
}
timeout = next;
}
}
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
+ HashedWheelTimeout prev = timeout.prev;
HashedWheelTimeout next = timeout.next;
+
// remove timeout that was either processed or cancelled by updating the linked-list
- if (timeout.prev != null) {
- timeout.prev.next = next;
+ if (prev != null) {
+ prev.next = next;
}
- if (timeout.next != null) {
- timeout.next.prev = timeout.prev;
+ if (next != null) {
+ next.prev = prev;
}
if (timeout == head) {
- // if timeout is also the tail we need to adjust the entry too
- if (timeout == tail) {
- tail = null;
- head = null;
- } else {
- head = next;
- }
- } else if (timeout == tail) {
- // if the timeout is the tail modify the tail to be the prev node.
- tail = timeout.prev;
+ head = next;
+ }
+ if (timeout == tail) {
+ tail = prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;
- timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}
/**
* Clear this bucket and return all not expired / cancelled {@link Timeout}s.
*/
- void clearTimeouts(Set set) {
- for (; ; ) {
+ public void clearTimeouts(Set set) {
+ for (;;) {
HashedWheelTimeout timeout = pollTimeout();
if (timeout == null) {
return;
@@ -798,7 +972,7 @@ private HashedWheelTimeout pollTimeout() {
}
HashedWheelTimeout next = head.next;
if (next == null) {
- tail = this.head = null;
+ tail = this.head = null;
} else {
this.head = next;
next.prev = null;
@@ -812,9 +986,1335 @@ private HashedWheelTimeout pollTimeout() {
}
}
- private static final boolean IS_OS_WINDOWS = SystemPropertyConfigUtils.getSystemProperty(SYSTEM_OS_NAME, "").toLowerCase(Locale.US).contains(OS_WIN_PREFIX);
+ private static final boolean IS_OS_WINDOWS = System.getProperty(
+ SYSTEM_OS_NAME, "").toLowerCase(Locale.US).contains(OS_WIN_PREFIX);
private boolean isWindows() {
return IS_OS_WINDOWS;
}
+
+ private static final int MAX_ALLOWED_MPSC_CAPACITY = Pow2.MAX_POW2;
+
+ private static final int MPSC_CHUNK_SIZE = 1024;
+
+ private static final int MIN_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * 2;
+
+ private static Queue newMpscQueue() {
+ return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue(MPSC_CHUNK_SIZE)
+ : new MpscUnboundedAtomicArrayQueue(MPSC_CHUNK_SIZE);
+ }
+
+ private static final boolean USE_MPSC_CHUNKED_ARRAY_QUEUE;
+
+ static {
+ Object unsafe = null;
+ if (hasUnsafe()) {
+ // jctools goes through its own process of initializing unsafe; of
+ // course, this requires permissions which might not be granted to calling code, so we
+ // must mark this block as privileged too
+ unsafe = AccessController.doPrivileged(new PrivilegedAction
+
+
+ org.jctools
+ jctools-core
+ ${jctools_version}
+
+
org.junit.jupiter
diff --git a/dubbo-distribution/dubbo-all-shaded/pom.xml b/dubbo-distribution/dubbo-all-shaded/pom.xml
index b5c1d40dd884..1d4a7bd81afa 100644
--- a/dubbo-distribution/dubbo-all-shaded/pom.xml
+++ b/dubbo-distribution/dubbo-all-shaded/pom.xml
@@ -465,6 +465,10 @@
com.google.protobufprotobuf-java
+
+ org.jctools
+ jctools-core
+
diff --git a/dubbo-distribution/dubbo-all/pom.xml b/dubbo-distribution/dubbo-all/pom.xml
index a766ee055896..05e28abaa5d7 100644
--- a/dubbo-distribution/dubbo-all/pom.xml
+++ b/dubbo-distribution/dubbo-all/pom.xml
@@ -465,6 +465,10 @@
com.google.protobufprotobuf-java
+
+ org.jctools
+ jctools-core
+
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/buffer/ChannelBuffersTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/buffer/ChannelBuffersTest.java
index 17effade2a37..3fcac074ed8a 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/buffer/ChannelBuffersTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/buffer/ChannelBuffersTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.dubbo.remoting.buffer;
+import java.nio.Buffer;
import java.nio.ByteBuffer;
import org.junit.jupiter.api.Assertions;
@@ -75,7 +76,8 @@ void testWrappedBuffer() {
channelBuffer = ChannelBuffers.wrappedBuffer(byteBuffer);
Assertions.assertTrue(channelBuffer instanceof ByteBufferBackedChannelBuffer);
- byteBuffer.position(byteBuffer.limit());
+ // be compatible with jdk8 by casting byteBuffer's type to its parent class - `java.nio.Buffer`.
+ ((Buffer) byteBuffer).position(byteBuffer.limit());
channelBuffer = ChannelBuffers.wrappedBuffer(byteBuffer);
Assertions.assertEquals(channelBuffer, EMPTY_BUFFER);
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
index 7b57a71d253a..e7001d00b80a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleCustomerProtocolWrapper.java
@@ -19,6 +19,7 @@
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.common.utils.CollectionUtils;
+import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -67,7 +68,8 @@ public static int readRawVarint32(ByteBuffer byteBuffer) {
val = val << 7;
val = val | (byteBuffer.get(index) & 0x7F);
}
- byteBuffer.position(currentPosition + varIntLength);
+ // be compatible with jdk8 by casting byteBuffer's type to its parent class - `java.nio.Buffer`.
+ ((Buffer) byteBuffer).position(currentPosition + varIntLength);
return val;
}
diff --git a/pom.xml b/pom.xml
index b2bc12a8d5b3..d680a9418f89 100644
--- a/pom.xml
+++ b/pom.xml
@@ -811,22 +811,7 @@
-
- jdk9-compile
-
- [1.9,)
-
-
-
-
- maven-compiler-plugin
-
- 8
-
-
-
-
-
+
jdk9-jdk11-spotless