Skip to content

Commit fb268bb

Browse files
committed
Initial support for virtual thread migration through thread pools
1 parent a4443cc commit fb268bb

12 files changed

+993
-20
lines changed

build-release-17

Whitespace-only changes.

pom.xml

+49-20
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,6 @@
8686
<artifactId>nativeimage</artifactId>
8787
<version>24.1.2</version>
8888
<scope>provided</scope>
89-
<!-- We only want the included annotations -->
90-
<exclusions>
91-
<exclusion>
92-
<artifactId>*</artifactId>
93-
<groupId>*</groupId>
94-
</exclusion>
95-
</exclusions>
9689
</dependency>
9790
<dependency>
9891
<groupId>org.jboss.logging</groupId>
@@ -182,19 +175,55 @@
182175
</plugin>
183176
<plugin>
184177
<artifactId>maven-compiler-plugin</artifactId>
185-
<configuration>
186-
<annotationProcessorPaths>
187-
<annotationProcessorPath>
188-
<groupId>org.jboss.logging</groupId>
189-
<artifactId>jboss-logging-processor</artifactId>
190-
<version>${version.jboss.logging.tools}</version>
191-
</annotationProcessorPath>
192-
</annotationProcessorPaths>
193-
<compilerArgs>
194-
<!-- This is for SVM dependency -->
195-
<compilerArg>--add-reads=org.jboss.threads=ALL-UNNAMED</compilerArg>
196-
</compilerArgs>
197-
</configuration>
178+
<executions>
179+
<execution>
180+
<id>default-compile</id>
181+
<configuration>
182+
<annotationProcessorPaths>
183+
<annotationProcessorPath>
184+
<groupId>org.jboss.logging</groupId>
185+
<artifactId>jboss-logging-processor</artifactId>
186+
<version>${version.jboss.logging.tools}</version>
187+
</annotationProcessorPath>
188+
</annotationProcessorPaths>
189+
<compilerArgs>
190+
<compilerArg>--add-exports=java.base/jdk.internal.vm=ALL-UNNAMED,org.jboss.threads</compilerArg>
191+
</compilerArgs>
192+
<source>21</source>
193+
<target>21</target>
194+
</configuration>
195+
</execution>
196+
<execution>
197+
<id>compile-virtual-threads</id>
198+
<phase>compile</phase>
199+
<goals>
200+
<goal>compile</goal>
201+
</goals>
202+
<configuration>
203+
<annotationProcessorPaths>
204+
<annotationProcessorPath>
205+
<groupId>org.jboss.logging</groupId>
206+
<artifactId>jboss-logging-processor</artifactId>
207+
<version>${version.jboss.logging.tools}</version>
208+
</annotationProcessorPath>
209+
</annotationProcessorPaths>
210+
<excludes>
211+
<exclude>**/org/jboss/threads/virtual/*.java</exclude>
212+
</excludes>
213+
<release>17</release>
214+
</configuration>
215+
</execution>
216+
<execution>
217+
<id>default-testCompile</id>
218+
<phase>test-compile</phase>
219+
<goals>
220+
<goal>testCompile</goal>
221+
</goals>
222+
<configuration>
223+
<release>17</release>
224+
</configuration>
225+
</execution>
226+
</executions>
198227
</plugin>
199228
<plugin>
200229
<artifactId>maven-source-plugin</artifactId>

src/main/java/module-info.java

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
requires jdk.unsupported;
44
requires org.jboss.logging;
55
requires static org.jboss.logging.annotations;
6+
requires static org.graalvm.nativeimage;
67
requires org.wildfly.common;
78
requires io.smallrye.common.annotation;
89
requires io.smallrye.common.constraint;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package org.jboss.threads.virtual;
2+
3+
import java.lang.invoke.MethodHandle;
4+
import java.lang.invoke.MethodHandles;
5+
import java.lang.invoke.MethodType;
6+
import java.lang.reflect.UndeclaredThrowableException;
7+
import java.util.concurrent.Executor;
8+
9+
import jdk.internal.vm.ThreadContainer;
10+
11+
/**
12+
* Access methods for virtual thread internals.
13+
*/
14+
final class Access {
15+
private static final MethodHandle currentCarrierThread;
16+
private static final MethodHandle virtualThreadFactory;
17+
private static final MethodHandle threadStartWithContainer;
18+
private static final MethodHandle schedulerGetter;
19+
private static final MethodHandle continuationGetter;
20+
21+
static {
22+
MethodHandle ct;
23+
MethodHandle vtf;
24+
MethodHandle tswc;
25+
MethodHandle sg;
26+
MethodHandle cg;
27+
try {
28+
MethodHandles.Lookup thr = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup());
29+
ct = thr.findStatic(Thread.class, "currentCarrierThread", MethodType.methodType(Thread.class));
30+
Class<?> vtbClass = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder", false, null);
31+
vtf = thr.findConstructor(vtbClass, MethodType.methodType(void.class, Executor.class));
32+
// create efficient transformer
33+
vtf = vtf.asType(MethodType.methodType(Thread.Builder.OfVirtual.class, Executor.class));
34+
// todo: maybe instead, we can directly call `java.lang.ThreadBuilders.newVirtualThread`
35+
//void start(jdk.internal.vm.ThreadContainer container)
36+
tswc = thr.findVirtual(Thread.class, "start", MethodType.methodType(void.class, ThreadContainer.class));
37+
Class<?> vtc = thr.findClass("java.lang.VirtualThread");
38+
MethodHandles.Lookup vthr = MethodHandles.privateLookupIn(vtc, MethodHandles.lookup());
39+
sg = vthr.findGetter(vtc, "scheduler", Executor.class).asType(MethodType.methodType(Executor.class, Thread.class));
40+
cg = vthr.findGetter(vtc, "runContinuation", Runnable.class).asType(MethodType.methodType(Runnable.class, Thread.class));
41+
} catch (Throwable e) {
42+
// no good
43+
throw new InternalError("Cannot initialize virtual threads", e);
44+
}
45+
currentCarrierThread = ct;
46+
virtualThreadFactory = vtf;
47+
threadStartWithContainer = tswc;
48+
schedulerGetter = sg;
49+
continuationGetter = cg;
50+
}
51+
52+
static Thread currentCarrier() {
53+
try {
54+
return (Thread) currentCarrierThread.invokeExact();
55+
} catch (RuntimeException | Error e) {
56+
throw e;
57+
} catch (Throwable e) {
58+
throw new UndeclaredThrowableException(e);
59+
}
60+
}
61+
62+
static Thread.Builder.OfVirtual threadBuilder(ThreadScheduler threadScheduler) {
63+
try {
64+
return (Thread.Builder.OfVirtual) virtualThreadFactory.invokeExact((Executor) threadScheduler);
65+
} catch (RuntimeException | Error e) {
66+
throw e;
67+
} catch (Throwable e) {
68+
throw new UndeclaredThrowableException(e);
69+
}
70+
}
71+
72+
static void startThread(Thread thread, ThreadContainer threadContainer) {
73+
try {
74+
threadStartWithContainer.invokeExact(thread, threadContainer);
75+
} catch (RuntimeException | Error e) {
76+
throw e;
77+
} catch (Throwable e) {
78+
throw new UndeclaredThrowableException(e);
79+
}
80+
}
81+
82+
static UserThreadScheduler schedulerOf(Thread thread) {
83+
try {
84+
return (UserThreadScheduler) (Executor) schedulerGetter.invokeExact(thread);
85+
} catch (RuntimeException | Error e) {
86+
throw e;
87+
} catch (Throwable e) {
88+
throw new UndeclaredThrowableException(e);
89+
}
90+
}
91+
92+
static Runnable continuationOf(Thread thread) {
93+
try {
94+
return (Runnable) continuationGetter.invokeExact(thread);
95+
} catch (RuntimeException | Error e) {
96+
throw e;
97+
} catch (Throwable e) {
98+
throw new UndeclaredThrowableException(e);
99+
}
100+
}
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package org.jboss.threads.virtual;
2+
3+
import java.util.concurrent.locks.LockSupport;
4+
5+
import io.smallrye.common.annotation.Experimental;
6+
7+
/**
8+
* An event loop for a virtual thread scheduler.
9+
* There will be one instance per I/O thread within an event loop group.
10+
*/
11+
@Experimental("Experimental virtual thread support")
12+
public abstract class EventLoop {
13+
/**
14+
* Construct a new instance.
15+
*/
16+
protected EventLoop() {}
17+
18+
/**
19+
* Unpark all ready threads and return,
20+
* possibly waiting for some amount of time if no threads are ready.
21+
* The wait time may be {@code 0}, in which case this method should return immediately if no threads are ready,
22+
* or {@code -1}, in which case the method should wait indefinitely for threads to become ready.
23+
* Otherwise, the wait time is the maximum number of nanoseconds to wait for threads to become ready before returning.
24+
* <p>
25+
* Regardless of the wait time, the method should park or return immediately if the {@link #wakeup()} method is invoked
26+
* from any thread.
27+
* <p>
28+
* This method will be called in a loop (the event loop, in fact).
29+
* After each invocation of this method, up to one other waiting thread will be continued.
30+
* Since this generally would lead to busy-looping,
31+
* the implementation of this method <em>should</em> {@linkplain LockSupport#parkNanos(long) park} for some amount of time before returning.
32+
* While the event loop method is parked,
33+
* other threads will be allowed to run.
34+
* If the set of ready threads is exhausted before that time elapses,
35+
* the event loop thread will automatically be unparked,
36+
* allowing the loop to be re-entered from the top to wait for ready events.
37+
* <p>
38+
* Note that {@linkplain Thread#sleep(long) sleeping} instead of parking may cause latency spikes,
39+
* so it is not recommended.
40+
* <p>
41+
* This method should only be called from the event loop virtual thread.
42+
*
43+
* @param waitTime {@code 0} to return immediately after unparking any ready threads (even if there are none),
44+
* {@code -1} unpark any ready threads or to wait indefinitely for a thread to become ready,
45+
* or any positive integer to unpark any ready threads or to wait for no more than that number of nanoseconds
46+
* @throws InterruptedException if some interruptible operation was interrupted
47+
*/
48+
protected abstract void unparkAny(long waitTime) throws InterruptedException;
49+
50+
/**
51+
* Forcibly awaken the event loop, if it is currently blocked in {@link #unparkAny(long)}.
52+
* This method may be called from any thread.
53+
*/
54+
protected abstract void wakeup();
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package org.jboss.threads.virtual;
2+
3+
import java.util.concurrent.ScheduledFuture;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.locks.LockSupport;
6+
7+
/**
8+
* The thread scheduler for an event loop thread.
9+
*/
10+
final class EventLoopThreadScheduler extends ThreadScheduler {
11+
private final IoThread ioThread;
12+
private volatile boolean ready = true;
13+
14+
EventLoopThreadScheduler(final Scheduler scheduler, final Runnable task, final IoThread ioThread, final long idx) {
15+
super(scheduler, task, "Event loop", idx);
16+
this.ioThread = ioThread;
17+
}
18+
19+
boolean ready() {
20+
return ready;
21+
}
22+
23+
void makeReady() {
24+
ready = true;
25+
LockSupport.unpark(virtualThread());
26+
}
27+
28+
public void run() {
29+
ready = false;
30+
try {
31+
super.run();
32+
} finally {
33+
ioThread.setYielded();
34+
}
35+
}
36+
37+
void start() {
38+
super.start();
39+
}
40+
41+
public void execute(final Runnable command) {
42+
ready = true;
43+
}
44+
45+
IoThread ioThread() {
46+
return ioThread;
47+
}
48+
49+
public ScheduledFuture<?> schedule(final Runnable command, final long delay, final TimeUnit unit) {
50+
return ioThread.schedule(command, unit.convert(delay, TimeUnit.NANOSECONDS));
51+
}
52+
}

0 commit comments

Comments
 (0)