Skip to content

Commit

Permalink
chore: add a helper method to create a dedicated virtual thread sched…
Browse files Browse the repository at this point in the history
…uler. (#164)

Motivation: 
Add a helper method to create a separate virtual thread scheduler.
  • Loading branch information
He-Pin authored Feb 2, 2025
1 parent 9497af4 commit b27a63e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
49 changes: 47 additions & 2 deletions cask/src/cask/internal/Util.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import java.io.{InputStream, PrintWriter, StringWriter}
import scala.collection.generic.CanBuildFrom
import scala.collection.mutable
import java.io.OutputStream
import java.lang.invoke.{MethodHandles, MethodType}
import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ThreadFactory}
import java.lang.invoke.{MethodHandle, MethodHandles, MethodType}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ForkJoinWorkerThread, ThreadFactory}
import scala.annotation.switch
import scala.concurrent.duration.TimeUnit
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
import scala.util.control.NonFatal
Expand Down Expand Up @@ -69,6 +71,49 @@ object Util {
}
}

/**
* A helper class to create the carrier thread for the virtual thread,
* Require Java 21 or above.
* */
private object CarrierThreadFactory extends ForkJoinPool.ForkJoinWorkerThreadFactory {
private val counter = new AtomicInteger(0)
private val clazz = lookup.findClass("jdk.internal.misc.CarrierThread")
private val constructor: MethodHandle = lookup.findConstructor(
clazz,
MethodType.methodType(classOf[Unit], classOf[ForkJoinPool]))

override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
val carrierThread = constructor.invoke(pool).asInstanceOf[ForkJoinWorkerThread]
// Set the name of the carrier thread
carrierThread.setName("cask-carrier-thread-" + counter.incrementAndGet())
carrierThread
}
}

/**
* Create a dedicated forkjoin based scheduler for the virtual thread.
* NOTE: you can use other threads pool as scheduler too, this method just integrated the `CarrierThreadFactory`
* when creating the ForkJoinPool.
* */
def createForkJoinPoolBasedScheduler(parallelism: Int,
corePoolSize: Int,
maximumPoolSize: Int,
keepAliveTime: Int,
timeUnit: TimeUnit): Executor = {
new ForkJoinPool(
parallelism,
CarrierThreadFactory,
(_: Thread, _: Throwable) => {}, // ignored for carrier thread
true, //FIFO
corePoolSize,
maximumPoolSize,
parallelism / 2,
(_: ForkJoinPool) => true, //which is needed for virtual thread
keepAliveTime,
timeUnit
)
}

/**
* Create a virtual thread factory with a executor, the executor will be used as the scheduler of
* virtual thread.
Expand Down
1 change: 1 addition & 0 deletions docs/pages/1 - Cask - a Scala HTTP micro-framework.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ Cask can support using Virtual Threads to handle the request out of the box, you
1. You can change the default scheduler of the carrier threads with `cask.internal.Util.createVirtualThreadExecutor` method, but keep in mind, that's not officially supported by JDK for now.
2. You can supply your own `Executor` by override the `handlerExecutor()` method in your `cask.Main` object, which will be called only once when the server starts.
3. You can use `jdk.internal.misc.Blocker`'s `begin` and `end` methods to help the `ForkJoinPool` when needed.
4. You can use `Util.createVirtualThreadScheduler` to create separate `ForkJoinPool` as scheduler for the virtual threads.
**NOTE**:
Expand Down

0 comments on commit b27a63e

Please sign in to comment.