-
-
Notifications
You must be signed in to change notification settings - Fork 406
Use process pool for test runner #4614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use process pool for test runner #4614
Conversation
c09aa01
to
20eccb3
Compare
Local run on my machine yield similar test time when run |
Let's go with the multiple-JVMs-one-thread-each architecture described in the original ticket, rather than the one-JVM-multiple-threads architecture you have in this PR. Having each JVM be single-threaded running tests makes things like resource usage much more attributable and deterministic, since the resource footprint of each JVM can only be due to the one test it is currently running. Whereas if you put a bunch of tests into a single JVM and it crashes due to OOM, you have no idea which test is at fault. In terms of performance, I think the goal should be:
|
Hmm, look like i misunderstood the ideal of the original ticket. From what you said, it seems like each test groups can now spawn multiple sub processes (with respect to total |
@HollandDM that's right. In terms of the (2) benchmarks, feel free to include it in this PR as an integration test. You can have the test code generate the necessary project files before running the test to avoid having to commit all the boilerplate code |
040daa7
to
d96c887
Compare
@lihaoyi I've update this into multiple-JVMs-one-thread-each architect, with the work stealing capability. |
6612052
to
c9ca9c4
Compare
@HollandDM can you write an english summary of how the implementation works? That would make it much easier to review vs. trying to reverse engineer your diff from scratch |
At a first glance, the IPC protocol looks far too complicated for what this task requires. For example:
Before coding everything up, it's worth sketching out exactly what dataflows are necessary between the parent and worker processes. What I can think of are:
I think those are the only ways in which the parent and worker processes need to interact? If we limit our implementation to these dataflows, that will greatly simplify the implementation: Parent:
Worker:
If we want more features in future that our implementation doesn't support we can refactor the code when the time comes. |
Let's me explain the IPC protocol, as it is the heart of all of this implementation:
For host, at first the host spawns 1 subprocess to handle all the test cases. As time go, if any subprocess want to steal, the host will find another subprocess to be the victim, prioritize blocking one. The host also keep track of the state each subprocess is in at the moment: if more blocking than steal, the host will try spawn new one to accommodate the need to off load work, if more stealing and blocking, it will signal denial to the stealers. For subprocesses, they periodically notify host their running state. If they're allow to steal from a victim, a 2nd channel will be used between them to coordinate the steal (write test to steal to disk, and signal other to check this). If they are denied from stealing, they can keep retry for sometimes, but ultimately they will stop. Yes, this is quite a complicate work stealing system. The reason that I went this way are:
Your suggestion is way simpler, and should work fine. The only thing that I'm not too sure is how many test should we write on a single file for subprocess to pick up. If it is one then subprocess can scan and content for files on disk heavily. If it is more then quick test cases can be blocked by one slow test case, and I think work stealing is required in this situation, which will lead to something similar to this |
Got it. What you say makes sense, but lets go with the approach i suggested and see if we can make that work without the complexity of a full peer-to-peer work stealing model
Most things work on the granularity of test classes, so that would be the natural unit of work here. In that case the workers can just pick work directly from the shared queue folder, until the folder is empty. I don't think we need to worry too much about performance of the disk queue; we won't have more than a few thousand test classes per module, and the operations we need to do are cheap ones (just checking if the folder is, trying doing |
c9ca9c4
to
33ba34b
Compare
@lihaoyi I updated the PR to use the simplify version you came up with. |
@HollandDM thanks will take a look |
if (files.nonEmpty) { | ||
var shouldRetry = false | ||
val offset = Random.nextInt(files.size) | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do val shouldRetry = try {
to avoid the var
here
val taskQueue = mutable.Queue.empty[Task] | ||
|
||
@tailrec | ||
def stealFromSelectorFolder(): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move stealFromSelectorFolder
out of stealTasks
into its own top-level method and make it return a Option[String]
with the successfully stolen class name? That will help encapsulate things a bit more and keep the code easier to understand
// we successfully stole a selector | ||
val selectors = os.read.lines(stealFolder / s"selector-stolen") | ||
val classFilter = TestRunnerUtils.globFilter(selectors) | ||
val tasks = runner.tasks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is tasks
always of length 1
here? If so we can add an assert and simplify the code accordingly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure if the tasks returned always has the same length as the taskdefs input.
@tailrec | ||
def stealTaskLoop(): Unit = { | ||
if (taskQueue.nonEmpty) { | ||
val next = taskQueue.dequeue().execute( | ||
new EventHandler { | ||
def handle(event: Event) = { | ||
testReporter.logStart(event) | ||
events.add(event) | ||
testReporter.logFinish(event) | ||
} | ||
}, | ||
Array(new Logger { | ||
def debug(msg: String) = ctx.log.outputStream.println(msg) | ||
def error(msg: String) = ctx.log.outputStream.println(msg) | ||
def ansiCodesSupported() = true | ||
def warn(msg: String) = ctx.log.outputStream.println(msg) | ||
def trace(t: Throwable) = t.printStackTrace(ctx.log.outputStream) | ||
def info(msg: String) = ctx.log.outputStream.println(msg) | ||
}) | ||
) | ||
|
||
taskQueue.enqueueAll(next) | ||
stealTaskLoop() | ||
} else if (stealFromSelectorFolder()) { | ||
stealTaskLoop() | ||
} else { | ||
() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This @tailrec
method can probably be more easily understood written as nested loops, something like:
while({
stealFromSelectorFolder() match{
case Some(className) =>
for(cls <- classes) executeTaskForCls(cls)
true
case None => false
}
})()
The inner loop is always over a fixed collection of elements while the outer loop runs until a condition is met, so having it be two separate loops makes these properties a bit more obvious than having them mixed together in one big tailrec function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more about the coding style used in these repositories. In my other open-source PRs, people tend to prefer tailrec over while loops. It looks like it's the other way around here. I'm happy to change all the tailrec to while loops if you'd like. I don't really have a preference.
val events = new ConcurrentLinkedQueue[Event]() | ||
val doneMessage = { | ||
|
||
val taskQueue = mutable.Queue.empty[Task] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the changes suggested for stealFromSelectorFolder
and stealTaskLoop
, we should be able to remove this mutable taskQueue
as well
val selectorFolder = base / "selectors" | ||
os.makeDir.all(selectorFolder) | ||
selectors2.zipWithIndex.foreach { case (s, i) => | ||
os.write.over(selectorFolder / s"selector-$i", s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we name the files according to the class name s
rather than the index i
? If we need to debug things being able to ls
to see the queued test classes would be much easier than having to dig into each file individually
def runTestRunnerSubprocesses(selectors2: Seq[String], base: os.Path) = { | ||
val selectorFolder = prepareTestSelectorFolder(selectors2, base) | ||
|
||
val resultFutures = Range(0, selectors2.length).map { index => | ||
val indexStr = index.toString | ||
Task.fork.async(base / indexStr, indexStr, s"Test process $indexStr") { | ||
(indexStr, runTestRunnerSubprocess(index, base / indexStr, selectorFolder)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use some heuristics to optimize the way we spawn the worker processes here and avoid the large number of processes you mentioned:
-
We don't ever want to have more processes per group than
testClassList.length
, since they won't have anything to do even if every process takes one test class -
We don't ever want to have more processes per group than
ctx.jobs
, since that's the parallelism limit -
For each
Task.fork.async
we can check theselectors
folder once before runningrunTestRunnerSubprocess
, so that if that group's tests are all claimed before the process starts we can skip the process overhead.
There are probably some other heuristics we can add, but all together this should be enough to mitigate the problem with too many processes: any Task.fork.async
s that do not have any work to do will be cheap and exit quickly without subprocess overhead, so even if we queue up a lot of them there shouldn't be much cost
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation already have the check before spawn, so it's true that the number of meaningful threads that spawn subprocess is small.
What I meant was that whew using testGroup
, the server call runTestRunnerSubprocesses
foreach test groups. These Future
then only await
for runTestRunnerSubprocess
(the real test running work) to return. So we got a lot of awaiting Future
hanging arounds, and these will all show up on the prompt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's look exactly like the thing you saw in #4611
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Since runTestRunnerSubprocesses
already spawns fork.async
blocks, what if we move the caller out into normal synchronous code? So instead of
Task.fork.async(Task.dest / folderName, paddedIndex, groupPromptMessage) {
(folderName, runTestRunnerSubprocesses(testClassList, Task.dest / folderName))
}
Just
(folderName, runTestRunnerSubprocesses(testClassList, Task.dest / folderName))
That way there won't be a middle-layer of async blocks that do nothing except wait for their children, which should reduce the number of blocked tasks showing up in the command line prompt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can make testGroup run sequentially with each other, rather than concurrently
But I understand your thinking now, I think I can make use of this, maybe I'll flatten all the work to the outer layer and fork from there
b2edc51
to
8af8cbc
Compare
@lihaoyi Updated, I replaced the |
d346139
to
9ae5330
Compare
False alarm |
@HollandDM could you elaborate on the CI failures? AFAIK the example tests already check output in a non-ordered fashion: as long as each line in the example expected output appears in the actual output, the test passes, regardless of what order the actual output is in. So if your tests are failing, it seems like there must be some other issue there other than the difference in output ordering |
Oh true, now I check the ode carefully, it does check existence. I guess looking at all the |
new Runnable { | ||
override def run(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be shortened using the lambda syntax () => {...}
without explicitly instantiating an anonymous subclass of Runnable
stealFolder: os.Path, | ||
selectorFolder: os.Path | ||
): Array[Task] = { | ||
val stealLog = stealFolder / os.up / "steal.log" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can now be written stealFolder / "../steal.log"
, or rather stealFolder / "../status.txt"
since it's not really a log file but a snapshot of the current status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file was appended while runner's working. So it can be use to check the order of test that was executed by the runner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also like "/ os.up" more, I think it more verbose and explicit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, it makes sense then. We can leave it as is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe have a comment somewhere explaining how this works?
while (true) { | ||
val files = os.list(selectorFolder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can simplify this further given the current design: we just need to run os.list
once and then loop over all the tests to try and steal each one in order. Since the files in selectorFolder
never grows, running os.list
a second time will never pick up anything the first list didn't.
Let's also get rid of the Random.nextInt
selection and just pick up the tests in order from top to bottom. That will give a bit more determinism to how things run, and although it still nondeterministic due to parallelism that would help people follow the order that tests are being run in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it like this because I want to check if we have any file left, and only steal form the left over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The os.move
already effectively checks whether the file is still left before taking it, so we don't need to run an entire os.list
each time we steal a single file. We could add a os.exists
check before os.move
if we want to avoid throwing and catching an error in the common case
val paddedGroupIndex = mill.internal.Util.leftPad(groupIndex.toString, maxGroupLength, '0') | ||
val paddedProcessIndex = | ||
mill.internal.Util.leftPad(processIndex.toString, maxProcessLength, '0') | ||
val processFolder = baseFolder / processIndex.toString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call this baseFolder / s"worker-$processIndex"
, so if someone looks on disk it's a bit more obvious what is going on
Right now the |
I think this looks pretty good. Adding Next steps are probably:
Once both PRs are ready we can merge it and close out the bounty |
I think there is a bug in I tried a simple fix like this: private[mill] override def message = logger1.message ++ logger2.message
private[mill] override def keySuffix = logger1.keySuffix ++ logger2.keySuffix And the log is good again. But I'm not sure this is the right way to do it. Would you like me to include these fixes in this PR? If not, I think I can skip the problem because it'll be fixed in other PR |
@HollandDM your proposed fix looks reasonable. Lets include it in this PR |
747de59
to
47dff00
Compare
@lihaoyi updated, please check if you are happy with the current state of the PR. In the mean time, I'll assume the PR is good to go, and create a back port for 0.12.x branch using this PR as the source |
@Test | ||
public void test1() throws Exception { | ||
testGreeting("Storm", 38); | ||
} | ||
|
||
@Test | ||
public void test2() throws Exception { | ||
testGreeting("Bella", 25); | ||
} | ||
|
||
@Test | ||
public void test3() throws Exception { | ||
testGreeting("Cameron", 32); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need more than one test per file? If not we could consolidate each file to a single test case to greatly reduce the verbosity of these test examples
@@ -0,0 +1,38 @@ | |||
// Test stealing is an opt-in, powerful feature that enables parallel test execution while maintaining complete reliability and debuggability. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while maintaining complete reliability and debuggability.
is meaningless. You should write why people would turn this on: because it provides both the speedups from parallelism and the efficiency of re-using JVMs between tests, allowing it to be used on all sorts of test suites without downside. And we will likely make it the default in future
75f6fa8
to
43225cf
Compare
update test example to only have 1 test case, and fix some wording in adoc |
I think this looks good, thanks @HollandDM! Let's get #4679 green and merged as well, and send me your international bank transfer details and I will send you the bounty |
ported for 0.12.x from #4614 --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Turns on #4614 by default in 0.13.x, leaving the default as of in 0.12.x
…nc task and de-prioritizing subsequent ones (#4714) This is a follow up improvement to #4701 which uses an explicit priority queue rather than relying on ad-hoc LIFO ordering. By prioritizing the first async task and de-prioritizing subsequent async tasks, this encourages Mill to re-use the same async task JVM to do more work, minimizing JVM startup overhead When testing this out on the Netty example, `testParallelism` prioritizing the first testrunner JVM over all others does seem to make a material difference. I manually performed did ad-hoc benchmarks of the time taken to run the following command, basically running all the tests that we run in the example integration test: ``` /Users/lihaoyi/Github/mill/out/dist/launcher.dest/run 'codec-{dns,haproxy,http,http2,memcache,mqtt,redis,smtp,socks,stomp,xml}.test' + 'transport-{blockhound-tests,native-unix-common,sctp}.test' ``` - `priority = -1` (`fork.async`s always run before other tasks): ~20s - `priority = 1` (`fork.async`s always run after other tasks): ~20s - `priority = if (processIndex == 0) -1 else processIndex` (The first `fork.async` in a test module runs before all other tasks, subsequent `fork.async`s run after all other tasks): ~11s - As a comparison, `testForkGrouping` with 1-element groups: 50s Notably, this first-JVM priority has similar timings with `testParallelism = false` on this benchmark, where the previous approach had some penalty (though not as bad as `testForkGrouping`). With these improvements, the parallel test runner should be self-tuning enough to turn on by default (#4614) without needing manual configuration in most cases This also lays the foundation to other prioritization strategies later. For example, we may want to prioritize historically-slower tasks over faster tasks, all else being equal, to avoid long-running stragglers delaying a command from completing after everything else has finished. But such improvements can come later
fixes #4590.
This PR introduce
testEnableWorkStealing
setting forTestModule
. When enabled, tests in the module will be distributed by a work-stealing process pool, and run in parallel with each other.Internally, when set
testEnableWorkStealing
totrue
, Mill will try to spawn at most--job
test runner for each test groups. Each runner will compete to "steal" a test classes from shared folder to run. This increase throughput of the whole test group, as slow tests now don't block fast tests from running. Each test runner have it own working folder and log files used to communicate with the Mill parent process.