From b27a63e0e232ff793ae22777b16f4c7dd4ada025 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Sun, 2 Feb 2025 20:18:51 +0800 Subject: [PATCH] chore: add a helper method to create a dedicated virtual thread scheduler. (#164) Motivation: Add a helper method to create a separate virtual thread scheduler. --- cask/src/cask/internal/Util.scala | 49 ++++++++++++++++++- ...1 - Cask - a Scala HTTP micro-framework.md | 1 + 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/cask/src/cask/internal/Util.scala b/cask/src/cask/internal/Util.scala index b3bb82d587..7f4706336b 100644 --- a/cask/src/cask/internal/Util.scala +++ b/cask/src/cask/internal/Util.scala @@ -4,9 +4,11 @@ import java.io.{InputStream, PrintWriter, StringWriter} import scala.collection.generic.CanBuildFrom import scala.collection.mutable import java.io.OutputStream -import java.lang.invoke.{MethodHandles, MethodType} -import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ThreadFactory} +import java.lang.invoke.{MethodHandle, MethodHandles, MethodType} +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ForkJoinWorkerThread, ThreadFactory} import scala.annotation.switch +import scala.concurrent.duration.TimeUnit import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.Try import scala.util.control.NonFatal @@ -69,6 +71,49 @@ object Util { } } + /** + * A helper class to create the carrier thread for the virtual thread, + * Require Java 21 or above. + * */ + private object CarrierThreadFactory extends ForkJoinPool.ForkJoinWorkerThreadFactory { + private val counter = new AtomicInteger(0) + private val clazz = lookup.findClass("jdk.internal.misc.CarrierThread") + private val constructor: MethodHandle = lookup.findConstructor( + clazz, + MethodType.methodType(classOf[Unit], classOf[ForkJoinPool])) + + override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { + val carrierThread = constructor.invoke(pool).asInstanceOf[ForkJoinWorkerThread] + // Set the name of the carrier thread + carrierThread.setName("cask-carrier-thread-" + counter.incrementAndGet()) + carrierThread + } + } + + /** + * Create a dedicated forkjoin based scheduler for the virtual thread. + * NOTE: you can use other threads pool as scheduler too, this method just integrated the `CarrierThreadFactory` + * when creating the ForkJoinPool. + * */ + def createForkJoinPoolBasedScheduler(parallelism: Int, + corePoolSize: Int, + maximumPoolSize: Int, + keepAliveTime: Int, + timeUnit: TimeUnit): Executor = { + new ForkJoinPool( + parallelism, + CarrierThreadFactory, + (_: Thread, _: Throwable) => {}, // ignored for carrier thread + true, //FIFO + corePoolSize, + maximumPoolSize, + parallelism / 2, + (_: ForkJoinPool) => true, //which is needed for virtual thread + keepAliveTime, + timeUnit + ) + } + /** * Create a virtual thread factory with a executor, the executor will be used as the scheduler of * virtual thread. diff --git a/docs/pages/1 - Cask - a Scala HTTP micro-framework.md b/docs/pages/1 - Cask - a Scala HTTP micro-framework.md index bea249aa66..6c88ef1266 100644 --- a/docs/pages/1 - Cask - a Scala HTTP micro-framework.md +++ b/docs/pages/1 - Cask - a Scala HTTP micro-framework.md @@ -477,6 +477,7 @@ Cask can support using Virtual Threads to handle the request out of the box, you 1. You can change the default scheduler of the carrier threads with `cask.internal.Util.createVirtualThreadExecutor` method, but keep in mind, that's not officially supported by JDK for now. 2. You can supply your own `Executor` by override the `handlerExecutor()` method in your `cask.Main` object, which will be called only once when the server starts. 3. You can use `jdk.internal.misc.Blocker`'s `begin` and `end` methods to help the `ForkJoinPool` when needed. +4. You can use `Util.createVirtualThreadScheduler` to create separate `ForkJoinPool` as scheduler for the virtual threads. **NOTE**: