From 86a33dc7fe35752b2a6a2b93760e099f3547c3f2 Mon Sep 17 00:00:00 2001 From: renechoi <115696395+renechoi@users.noreply.github.com> Date: Fri, 25 Jul 2025 00:11:12 +0900 Subject: [PATCH] [ZEPPELIN-6258] Improve Process resource management in SparkInterpreterLauncher - Added stdout consumption to prevent buffer overflow - Implemented process timeout (30 seconds) with forceful termination - Added exit value validation and logging - Ensured process cleanup in finally block - Maintained backward compatibility This prevents process hangs and ensures proper resource cleanup. --- .../launcher/SparkInterpreterLauncher.java | 64 +++++++++++++++---- .../SparkInterpreterLauncherTest.java | 26 ++++++++ 2 files changed, 77 insertions(+), 13 deletions(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index 33b3e4ba62d..c52e85e192f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -20,6 +20,7 @@ import java.io.FileInputStream; import java.io.FilenameFilter; import java.io.IOException; +import java.io.InputStream; import java.net.URL; import java.net.URLClassLoader; import java.nio.charset.StandardCharsets; @@ -28,6 +29,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -35,6 +37,7 @@ import java.util.stream.StreamSupport; import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.NullOutputStream; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; @@ -272,22 +275,57 @@ private String detectSparkScalaVersion(String sparkHome, Map env builder.environment().putAll(env); File processOutputFile = File.createTempFile("zeppelin-spark", ".out"); builder.redirectError(processOutputFile); + Process process = builder.start(); - process.waitFor(); - String processOutput = IOUtils.toString(new FileInputStream(processOutputFile), StandardCharsets.UTF_8); - Pattern pattern = Pattern.compile(".*Using Scala version (.*),.*"); - Matcher matcher = pattern.matcher(processOutput); - if (matcher.find()) { - String scalaVersion = matcher.group(1); - if (scalaVersion.startsWith("2.12")) { - return "2.12"; - } else if (scalaVersion.startsWith("2.13")) { - return "2.13"; + try { + // Consume stdout to prevent buffer overflow + try (InputStream stdout = process.getInputStream()) { + IOUtils.copy(stdout, NullOutputStream.NULL_OUTPUT_STREAM); + } + + // Wait with timeout (30 seconds) + boolean finished = process.waitFor(30, TimeUnit.SECONDS); + if (!finished) { + process.destroyForcibly(); + throw new IOException("spark-submit --version command timed out after 30 seconds"); + } + + // Check exit value + int exitValue = process.exitValue(); + if (exitValue != 0) { + LOGGER.warn("spark-submit --version exited with non-zero code: {}", exitValue); + } + + // Read the output from the file + String processOutput; + try (FileInputStream in = new FileInputStream(processOutputFile)) { + processOutput = IOUtils.toString(in, StandardCharsets.UTF_8); + } + + Pattern pattern = Pattern.compile(".*Using Scala version (.*),.*"); + Matcher matcher = pattern.matcher(processOutput); + if (matcher.find()) { + String scalaVersion = matcher.group(1); + if (scalaVersion.startsWith("2.12")) { + return "2.12"; + } else if (scalaVersion.startsWith("2.13")) { + return "2.13"; + } else { + throw new Exception("Unsupported scala version: " + scalaVersion); + } } else { - throw new Exception("Unsupported scala version: " + scalaVersion); + LOGGER.debug("Could not detect Scala version from spark-submit output, falling back to jar inspection"); + return detectSparkScalaVersionByReplClass(sparkHome); + } + } finally { + // Ensure process is cleaned up + if (process.isAlive()) { + process.destroyForcibly(); + } + // Clean up temporary file + if (!processOutputFile.delete() && processOutputFile.exists()) { + LOGGER.warn("Failed to delete temporary file: {}", processOutputFile.getAbsolutePath()); } - } else { - return detectSparkScalaVersionByReplClass(sparkHome); } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index c1dc975b3c5..9378d823698 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -31,10 +31,13 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -325,4 +328,27 @@ void testYarnClusterMode_3() throws IOException { } FileUtils.deleteDirectory(localRepoPath.toFile()); } + + @Test + void testDetectSparkScalaVersionProcessManagement() throws Exception { + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + + // Use reflection to access private method + Method detectSparkScalaVersionMethod = SparkInterpreterLauncher.class.getDeclaredMethod( + "detectSparkScalaVersion", String.class, Map.class); + detectSparkScalaVersionMethod.setAccessible(true); + + Map env = new HashMap<>(); + + // Call the method multiple times to ensure processes are properly cleaned + for (int i = 0; i < 3; i++) { + String scalaVersion = (String) detectSparkScalaVersionMethod.invoke(launcher, sparkHome, env); + assertTrue(scalaVersion.equals("2.12") || scalaVersion.equals("2.13"), + "Expected scala version 2.12 or 2.13 but got: " + scalaVersion); + } + + // Note: We cannot easily test that processes are destroyed or that stdout is consumed + // without mocking ProcessBuilder, which would require significant refactoring. + // The test above ensures the method still works correctly with the new implementation. + } }