Skip to content

Commit 0ffb953

Browse files
committed
Fix race condition in PosixPluginFrontend
1 parent afe76c8 commit 0ffb953

File tree

9 files changed

+75
-76
lines changed

9 files changed

+75
-76
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
key: ${{ runner.os }}-sbt-${{ hashFiles('**/*.sbt') }}
2626
- name: Compile and test
2727
run: |
28-
sbt test
28+
sbt bridgeStressTest/compile test
2929
shell: bash
3030
- name: Format check
3131
if: ${{ runner.os == 'Linux' }}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package protocbridge.frontend
2+
3+
import org.apache.commons.io.IOUtils
4+
import org.scalatest.flatspec.AnyFlatSpec
5+
import org.scalatest.matchers.must.Matchers
6+
import protocbridge.{ExtraEnv, ProtocCodeGenerator}
7+
8+
import java.io.ByteArrayOutputStream
9+
import scala.sys.process.ProcessIO
10+
import scala.util.Random
11+
12+
class PosixPluginFrontendSpec extends AnyFlatSpec with Matchers {
13+
if (!PluginFrontend.isWindows) {
14+
it must "execute a program that forwards input and output to given stream" in {
15+
val random = new Random()
16+
val toSend = Array.fill(123)(random.nextInt(256).toByte)
17+
val toReceive = Array.fill(456)(random.nextInt(256).toByte)
18+
val env = new ExtraEnv(secondaryOutputDir = "tmp")
19+
20+
val fakeGenerator = new ProtocCodeGenerator {
21+
override def run(request: Array[Byte]): Array[Byte] = {
22+
request mustBe (toSend ++ env.toByteArrayAsField)
23+
toReceive
24+
}
25+
}
26+
27+
// Repeat 10,000 times since named pipes on macOS are flaky.
28+
val repeatCount = 10000
29+
for (i <- 1 to repeatCount) {
30+
if (i % 100 == 1) println(s"Running iteration $i of $repeatCount at ${java.time.LocalDateTime.now}")
31+
32+
val (path, state) = PosixPluginFrontend.prepare(
33+
fakeGenerator,
34+
env
35+
)
36+
val actualOutput = new ByteArrayOutputStream()
37+
val process = sys.process
38+
.Process(path.toAbsolutePath.toString)
39+
.run(new ProcessIO(writeInput => {
40+
writeInput.write(toSend)
41+
writeInput.close()
42+
}, processOutput => {
43+
IOUtils.copy(processOutput, actualOutput)
44+
processOutput.close()
45+
}, _.close()))
46+
process.exitValue()
47+
actualOutput.toByteArray mustBe toReceive
48+
PosixPluginFrontend.cleanup(state)
49+
}
50+
}
51+
}
52+
}

bridge/src/main/scala/protocbridge/frontend/MacPluginFrontend.scala

-36
This file was deleted.

bridge/src/main/scala/protocbridge/frontend/PluginFrontend.scala

-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@ object PluginFrontend {
137137

138138
def newInstance: PluginFrontend = {
139139
if (isWindows) WindowsPluginFrontend
140-
else if (isMac) MacPluginFrontend
141140
else PosixPluginFrontend
142141
}
143142
}

bridge/src/main/scala/protocbridge/frontend/PosixPluginFrontend.scala

+10-9
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,13 @@ object PosixPluginFrontend extends PluginFrontend {
3939

4040
Future {
4141
blocking {
42+
// Each of the following two calls below block until the shell script opens the
43+
// corresponding pipe. This ensures that we are not writing to any of the pipes
4244
val fsin = Files.newInputStream(inputPipe)
45+
val fsout = Files.newOutputStream(outputPipe)
46+
4347
val response = PluginFrontend.runWithInputStream(plugin, fsin, env)
4448
fsin.close()
45-
46-
// Note that the output pipe must be opened after the input pipe is consumed.
47-
// Otherwise, there might be a deadlock that
48-
// - The shell script is stuck writing to the input pipe (which has a full buffer),
49-
// and doesn't open the write end of the output pipe.
50-
// - This thread is stuck waiting for the write end of the output pipe to be opened.
51-
val fsout = Files.newOutputStream(outputPipe)
5249
fsout.write(response)
5350
fsout.close()
5451
}
@@ -77,8 +74,12 @@ object PosixPluginFrontend extends PluginFrontend {
7774
"",
7875
s"""|#!$shell
7976
|set -e
80-
|cat /dev/stdin > "$inputPipe"
81-
|cat "$outputPipe"
77+
|exec 4> "$inputPipe"
78+
|exec 5< "$outputPipe"
79+
|cat /dev/stdin >&4
80+
|exec 4>&-
81+
|cat <&5
82+
|exec 5<&-
8283
""".stripMargin
8384
)
8485
val perms = new ju.HashSet[PosixFilePermission]

bridge/src/test/scala/protocbridge/ProtocIntegrationSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ class ProtocIntegrationSpec extends AnyFlatSpec with Matchers {
156156

157157
Await.result(
158158
Future.sequence(invocations),
159-
Duration(60, SECONDS)
159+
Duration(120, SECONDS)
160160
) must be(List.fill(parallelProtocInvocations)(0))
161161
}
162162
}

bridge/src/test/scala/protocbridge/frontend/MacPluginFrontendSpec.scala

-15
This file was deleted.

bridge/src/test/scala/protocbridge/frontend/PosixPluginFrontendSpec.scala

-13
This file was deleted.

build.sbt

+11
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ lazy val bridge: Project = project
4646
)
4747
)
4848

49+
lazy val bridgeStressTest = project
50+
.in(file("bridge-stress-test"))
51+
.dependsOn(bridge)
52+
.settings(
53+
name := "bridge-stress-test",
54+
libraryDependencies ++= Seq(
55+
"org.scalatest" %% "scalatest" % "3.2.19" % "test",
56+
"commons-io" % "commons-io" % "2.11.0" % "test"
57+
)
58+
)
59+
4960
lazy val protocCacheCoursier = project
5061
.in(file("protoc-cache-coursier"))
5162
.dependsOn(bridge)

0 commit comments

Comments
 (0)