Skip to content

Commit a088a4d

Browse files
committed
Initial support for virtual thread migration through thread pools
1 parent a4443cc commit a088a4d

13 files changed

+1188
-20
lines changed

build-release-17

Whitespace-only changes.

pom.xml

+49-20
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,6 @@
8686
<artifactId>nativeimage</artifactId>
8787
<version>24.1.2</version>
8888
<scope>provided</scope>
89-
<!-- We only want the included annotations -->
90-
<exclusions>
91-
<exclusion>
92-
<artifactId>*</artifactId>
93-
<groupId>*</groupId>
94-
</exclusion>
95-
</exclusions>
9689
</dependency>
9790
<dependency>
9891
<groupId>org.jboss.logging</groupId>
@@ -182,19 +175,55 @@
182175
</plugin>
183176
<plugin>
184177
<artifactId>maven-compiler-plugin</artifactId>
185-
<configuration>
186-
<annotationProcessorPaths>
187-
<annotationProcessorPath>
188-
<groupId>org.jboss.logging</groupId>
189-
<artifactId>jboss-logging-processor</artifactId>
190-
<version>${version.jboss.logging.tools}</version>
191-
</annotationProcessorPath>
192-
</annotationProcessorPaths>
193-
<compilerArgs>
194-
<!-- This is for SVM dependency -->
195-
<compilerArg>--add-reads=org.jboss.threads=ALL-UNNAMED</compilerArg>
196-
</compilerArgs>
197-
</configuration>
178+
<executions>
179+
<execution>
180+
<id>default-compile</id>
181+
<configuration>
182+
<annotationProcessorPaths>
183+
<annotationProcessorPath>
184+
<groupId>org.jboss.logging</groupId>
185+
<artifactId>jboss-logging-processor</artifactId>
186+
<version>${version.jboss.logging.tools}</version>
187+
</annotationProcessorPath>
188+
</annotationProcessorPaths>
189+
<compilerArgs>
190+
<compilerArg>--add-exports=java.base/jdk.internal.vm=ALL-UNNAMED,org.jboss.threads</compilerArg>
191+
</compilerArgs>
192+
<source>21</source>
193+
<target>21</target>
194+
</configuration>
195+
</execution>
196+
<execution>
197+
<id>compile-virtual-threads</id>
198+
<phase>compile</phase>
199+
<goals>
200+
<goal>compile</goal>
201+
</goals>
202+
<configuration>
203+
<annotationProcessorPaths>
204+
<annotationProcessorPath>
205+
<groupId>org.jboss.logging</groupId>
206+
<artifactId>jboss-logging-processor</artifactId>
207+
<version>${version.jboss.logging.tools}</version>
208+
</annotationProcessorPath>
209+
</annotationProcessorPaths>
210+
<excludes>
211+
<exclude>**/org/jboss/threads/virtual/*.java</exclude>
212+
</excludes>
213+
<release>17</release>
214+
</configuration>
215+
</execution>
216+
<execution>
217+
<id>default-testCompile</id>
218+
<phase>test-compile</phase>
219+
<goals>
220+
<goal>testCompile</goal>
221+
</goals>
222+
<configuration>
223+
<release>17</release>
224+
</configuration>
225+
</execution>
226+
</executions>
198227
</plugin>
199228
<plugin>
200229
<artifactId>maven-source-plugin</artifactId>

src/main/java/module-info.java

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
requires jdk.unsupported;
44
requires org.jboss.logging;
55
requires static org.jboss.logging.annotations;
6+
requires static org.graalvm.nativeimage;
67
requires org.wildfly.common;
78
requires io.smallrye.common.annotation;
89
requires io.smallrye.common.constraint;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package org.jboss.threads.virtual;
2+
3+
import java.lang.invoke.MethodHandle;
4+
import java.lang.invoke.MethodHandles;
5+
import java.lang.invoke.MethodType;
6+
import java.lang.reflect.UndeclaredThrowableException;
7+
import java.util.concurrent.Executor;
8+
import java.util.concurrent.ScheduledExecutorService;
9+
10+
import jdk.internal.vm.ThreadContainer;
11+
12+
/**
13+
* Access methods for virtual thread internals.
14+
*/
15+
final class Access {
16+
private static final MethodHandle currentCarrierThread;
17+
private static final MethodHandle virtualThreadFactory;
18+
private static final MethodHandle threadStartWithContainer;
19+
private static final MethodHandle schedulerGetter;
20+
private static final MethodHandle continuationGetter;
21+
22+
static {
23+
MethodHandle ct;
24+
MethodHandle vtf;
25+
MethodHandle tswc;
26+
MethodHandle sg;
27+
MethodHandle cg;
28+
try {
29+
MethodHandles.Lookup thr = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup());
30+
ct = thr.findStatic(Thread.class, "currentCarrierThread", MethodType.methodType(Thread.class));
31+
Class<?> vtbClass = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder", false, null);
32+
try {
33+
vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, Executor.class));
34+
} catch (NoSuchMethodError ignored) {
35+
// patched JDK
36+
vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, ScheduledExecutorService.class));
37+
}
38+
// create efficient transformer
39+
vtf = vtf.asType(MethodType.methodType(Thread.Builder.OfVirtual.class, ThreadScheduler.class));
40+
// todo: maybe instead, we can directly call `java.lang.ThreadBuilders.newVirtualThread`
41+
//void start(jdk.internal.vm.ThreadContainer container)
42+
tswc = thr.findVirtual(Thread.class, "start", MethodType.methodType(void.class, ThreadContainer.class));
43+
Class<?> vtc = thr.findClass("java.lang.VirtualThread");
44+
MethodHandles.Lookup vthr = MethodHandles.privateLookupIn(vtc, MethodHandles.lookup());
45+
sg = vthr.findGetter(vtc, "scheduler", Executor.class).asType(MethodType.methodType(Executor.class, Thread.class));
46+
cg = vthr.findGetter(vtc, "runContinuation", Runnable.class).asType(MethodType.methodType(Runnable.class, Thread.class));
47+
} catch (Throwable e) {
48+
// no good
49+
throw new InternalError("Cannot initialize virtual threads", e);
50+
}
51+
currentCarrierThread = ct;
52+
virtualThreadFactory = vtf;
53+
threadStartWithContainer = tswc;
54+
schedulerGetter = sg;
55+
continuationGetter = cg;
56+
}
57+
58+
static Thread currentCarrier() {
59+
try {
60+
return (Thread) currentCarrierThread.invokeExact();
61+
} catch (RuntimeException | Error e) {
62+
throw e;
63+
} catch (Throwable e) {
64+
throw new UndeclaredThrowableException(e);
65+
}
66+
}
67+
68+
static Thread.Builder.OfVirtual threadBuilder(ThreadScheduler threadScheduler) {
69+
try {
70+
return (Thread.Builder.OfVirtual) virtualThreadFactory.invokeExact(threadScheduler);
71+
} catch (RuntimeException | Error e) {
72+
throw e;
73+
} catch (Throwable e) {
74+
throw new UndeclaredThrowableException(e);
75+
}
76+
}
77+
78+
static void startThread(Thread thread, ThreadContainer threadContainer) {
79+
try {
80+
threadStartWithContainer.invokeExact(thread, threadContainer);
81+
} catch (RuntimeException | Error e) {
82+
throw e;
83+
} catch (Throwable e) {
84+
throw new UndeclaredThrowableException(e);
85+
}
86+
}
87+
88+
static ThreadScheduler schedulerOf(Thread thread) {
89+
try {
90+
return (ThreadScheduler) (Executor) schedulerGetter.invokeExact(thread);
91+
} catch (RuntimeException | Error e) {
92+
throw e;
93+
} catch (Throwable e) {
94+
throw new UndeclaredThrowableException(e);
95+
}
96+
}
97+
98+
static Runnable continuationOf(Thread thread) {
99+
try {
100+
return (Runnable) continuationGetter.invokeExact(thread);
101+
} catch (RuntimeException | Error e) {
102+
throw e;
103+
} catch (Throwable e) {
104+
throw new UndeclaredThrowableException(e);
105+
}
106+
}
107+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.jboss.threads.virtual;
2+
3+
import java.util.concurrent.ScheduledFuture;
4+
5+
abstract class Dispatcher {
6+
abstract void execute(UserThreadScheduler continuation);
7+
8+
abstract ScheduledFuture<?> schedule(Runnable task, long nanos);
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.jboss.threads.virtual;
2+
3+
import java.util.concurrent.locks.LockSupport;
4+
5+
import io.smallrye.common.annotation.Experimental;
6+
7+
/**
8+
* An event loop for a virtual thread scheduler.
9+
* There will be one instance per I/O thread within an event loop group.
10+
*/
11+
@Experimental("Experimental virtual thread support")
12+
public abstract class EventLoop {
13+
/**
14+
* Construct a new instance.
15+
*/
16+
protected EventLoop() {}
17+
18+
/**
19+
* Unpark all ready threads and return,
20+
* possibly waiting for some amount of time if no threads are ready.
21+
* The wait time may be {@code 0}, in which case this method should return immediately if no threads are ready,
22+
* or {@code -1}, in which case the method should wait indefinitely for threads to become ready.
23+
* Otherwise, the wait time is the maximum number of nanoseconds to wait for threads to become ready before returning.
24+
* <p>
25+
* Regardless of the wait time, the method should park or return immediately if the {@link #wakeup()} method is invoked
26+
* from any thread.
27+
* <p>
28+
* This method will be called in a loop (the event loop, in fact).
29+
* After each invocation of this method, up to one other waiting thread will be continued.
30+
* Since this generally would lead to busy-looping,
31+
* the implementation of this method <em>should</em> {@linkplain LockSupport#parkNanos(long) park} for some amount of time before returning.
32+
* While the event loop method is parked,
33+
* other threads will be allowed to run.
34+
* If the set of ready threads is exhausted before that time elapses,
35+
* the event loop thread will automatically be unparked,
36+
* allowing the loop to be re-entered from the top to wait for ready events.
37+
* <p>
38+
* Note that {@linkplain Thread#sleep(long) sleeping} instead of parking may cause latency spikes,
39+
* so it is not recommended.
40+
* <p>
41+
* This method should only be called from the event loop virtual thread.
42+
*
43+
* @param waitTime {@code 0} to return immediately after unparking any ready threads (even if there are none),
44+
* {@code -1} unpark any ready threads or to wait indefinitely for a thread to become ready,
45+
* or any positive integer to unpark any ready threads or to wait for no more than that number of nanoseconds
46+
* @throws InterruptedException if some interruptible operation was interrupted
47+
*/
48+
protected abstract void unparkAny(long waitTime) throws InterruptedException;
49+
50+
/**
51+
* Forcibly awaken the event loop, if it is currently blocked in {@link #unparkAny(long)}.
52+
* This method may be called from any thread.
53+
*/
54+
protected abstract void wakeup();
55+
}

0 commit comments

Comments
 (0)