diff --git a/bin/common.sh b/bin/common.sh
index 6447ec8daf9..9380f5d2799 100644
--- a/bin/common.sh
+++ b/bin/common.sh
@@ -122,10 +122,14 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
export JAVA_OPTS
JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
-if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
- JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
-else
+
+if [[ ! -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
+
+elif [[ ! -z "${ZEPPELIN_SPARK_K8_CLUSTER}" ]]; then
+ JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_k8_cluster.properties"
+else
+ JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
fi
export JAVA_INTP_OPTS
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index a8375afed2b..05eca544059 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -105,7 +105,10 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then
ZEPPELIN_LOGFILE+="${ZEPPELIN_IMPERSONATE_USER}-"
fi
ZEPPELIN_LOGFILE+="${ZEPPELIN_IDENT_STRING}-${HOSTNAME}.log"
-JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
+
+if [[ -z "${ZEPPELIN_SPARK_K8_CLUSTER}" ]]; then
+ JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
+fi
if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then
echo "Log dir doesn't exist, create ${ZEPPELIN_LOG_DIR}"
@@ -228,6 +231,7 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUB
INTERPRETER_RUN_COMMAND+="'"
fi
+echo $INTERPRETER_RUN_COMMAND
eval $INTERPRETER_RUN_COMMAND &
pid=$!
diff --git a/conf/log4j_k8_cluster.properties b/conf/log4j_k8_cluster.properties
new file mode 100644
index 00000000000..532fc5ef5f1
--- /dev/null
+++ b/conf/log4j_k8_cluster.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger = INFO, stdout
+
+log4j.appender.stdout = org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
+
diff --git a/docs/interpreter/spark-interpreter-k8s.md b/docs/interpreter/spark-interpreter-k8s.md
new file mode 100644
index 00000000000..9ce576b8580
--- /dev/null
+++ b/docs/interpreter/spark-interpreter-k8s.md
@@ -0,0 +1,163 @@
+---
+layout: page
+title: "Apache Spark Interpreter for Apache Zeppelin on Kubernetes"
+description: "Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution engine. This interpreter runs on the https://github.com/apache-spark-on-k8s/spark version of Spark"
+group: interpreter
+---
+
+{% include JB/setup %}
+
+# How to run Zeppelin Spark notebooks on a Kubernetes cluster
+
+
+
+## Prerequisites
+
+The following tools are required:
+
+ - Kubernetes cluster & kubectl
+
+ For local testing Minikube can be used to create a single node cluster: https://kubernetes.io/docs/tasks/tools/install-minikube/
+
+ - Docker https://kubernetes.io/docs/tasks/tools/install-minikube/
+
+ This documentation uses a pre-built Spark 2.2 Docker images, however you may also build these images as described here: https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/resource-managers/kubernetes/README.md
+
+## Checkout Zeppelin source code
+
+Checkout the latest source code from https://github.com/apache/zeppelin then apply changes from the [Add support to run Spark interpreter on a Kubernetes cluster](https://github.com/apache/zeppelin/pull/2637) pull request.
+
+## Build Zeppelin
+- `./dev/change_scala_version.sh 2.11`
+- `mvn clean install -DskipTests -Pspark-2.2 -Phadoop-2.4 -Pyarn -Ppyspark -Pscala-2.11`
+
+
+## Create distribution
+- `cd zeppelin-distribution`
+- `mvn org.apache.maven.plugins:maven-assembly-plugin:3.0.0:single -P apache-release`
+
+## Create Zeppelin Dockerfile in Zeppelin distribution target folder
+```
+cd {zeppelin_source}/zeppelin-distribution/target/zeppelin-0.8.0-SNAPSHOT
+cat > Dockerfile < zeppelin-service.yaml < zeppelin-pod-local.yaml <>>>>>> ZEPPELIN-3021. Add support to run Spark interpreter on a Kubernetes cluster
## SparkContext, SQLContext, SparkSession, ZeppelinContext
SparkContext, SQLContext and ZeppelinContext are automatically created and exposed as variable names `sc`, `sqlContext` and `z`, respectively, in Scala, Python and R environments.
diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index e4db4696ce1..aa8f9011b99 100644
--- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -260,13 +260,16 @@ public static void main(String[] args)
String callbackHost = null;
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
String portRange = ":";
- if (args.length > 0) {
+ if (args.length == 1) {
+ port = Integer.parseInt(args[0]);
+ } else if (args.length > 0) {
callbackHost = args[0];
port = Integer.parseInt(args[1]);
if (args.length > 2) {
portRange = args[2];
}
}
+
RemoteInterpreterServer remoteInterpreterServer =
new RemoteInterpreterServer(callbackHost, port, portRange);
remoteInterpreterServer.start();
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index d37f6eab88d..a48112e4ccd 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -257,6 +257,37 @@
1.5
+
+ org.mongodb
+ mongo-java-driver
+ 3.4.1
+
+
+
+ io.fabric8
+ kubernetes-client
+ 3.0.0
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.module
+ jackson-module-jaxb-annotations
+
+
+
+
+
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index 04a87fdef37..e7ddeef2b87 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -37,6 +37,7 @@
import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
+import org.apache.zeppelin.interpreter.launcher.SparkK8SInterpreterLauncher;
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
@@ -295,9 +296,16 @@ public InterpreterSetting(InterpreterSetting o) {
this.conf = o.getConf();
}
- private void createLauncher() {
+ private void createLauncher(Properties properties) {
if (group.equals("spark")) {
- this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
+ String deployMode = properties.getProperty("spark.submit.deployMode");
+ String masterUrl = properties.getProperty("master");
+ if (deployMode != null && deployMode.equals("cluster") &&
+ masterUrl != null && masterUrl.startsWith("k8s://")) {
+ this.launcher = new SparkK8SInterpreterLauncher(this.conf, this.recoveryStorage);
+ } else {
+ this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
+ }
} else {
this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage);
}
@@ -709,7 +717,7 @@ synchronized RemoteInterpreterProcess createInterpreterProcess(String interprete
Properties properties)
throws IOException {
if (launcher == null) {
- createLauncher();
+ createLauncher(properties);
}
InterpreterLaunchContext launchContext = new
InterpreterLaunchContext(properties, option, interpreterRunner, userName,
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkK8SInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkK8SInterpreterLauncher.java
new file mode 100644
index 00000000000..04f2c13acf8
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkK8SInterpreterLauncher.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.InterpreterRunner;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
+import org.apache.zeppelin.interpreter.remote.SparkK8SRemoteInterpreterManagedProcess;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.Map;
+
+/**
+ * Interpreter Launcher which use shell script to launch Spark interpreter process,
+ * on Kubernetes cluster.
+ */
+public class SparkK8SInterpreterLauncher extends SparkInterpreterLauncher {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
+ public static final String SPARK_KUBERNETES_DRIVER_LABEL_INTERPRETER_PROCESS_ID =
+ "spark.kubernetes.driver.label.interpreter-processId";
+ public static final String SPARK_APP_NAME = "spark.app.name";
+ public static final String SPARK_METRICS_NAMESPACE = "spark.metrics.namespace";
+
+ public SparkK8SInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
+ super(zConf, recoveryStorage);
+ }
+
+ @Override
+ public InterpreterClient launch(InterpreterLaunchContext context) {
+ LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
+ this.properties = context.getProperties();
+ InterpreterOption option = context.getOption();
+ InterpreterRunner runner = context.getRunner();
+ String groupName = context.getInterpreterSettingGroup();
+
+ int connectTimeout =
+ zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+ if (option.isExistingProcess()) {
+ return new RemoteInterpreterRunningProcess(
+ context.getInterpreterSettingName(),
+ connectTimeout,
+ option.getHost(),
+ option.getPort());
+ } else {
+ // create new remote process
+ String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+ + context.getInterpreterSettingId();
+
+ String groupId = context.getInterpreterGroupId();
+ String processIdLabel = generatePodLabelId(groupId);
+ properties.put(SPARK_KUBERNETES_DRIVER_LABEL_INTERPRETER_PROCESS_ID, processIdLabel);
+ groupId = formatId(groupId, 50);
+ // add groupId to app name, this will be the prefix for driver pod name if it's not
+ // explicitly specified
+ String driverPodNamePrefix = properties.get(SPARK_APP_NAME) + "-" + groupId;
+ properties.put(SPARK_APP_NAME, driverPodNamePrefix);
+ // set same id for metrics namespace to be able to identify metrics of a specific app
+ properties.put(SPARK_METRICS_NAMESPACE, driverPodNamePrefix);
+
+ Map env = super.buildEnvFromProperties(context);
+ String sparkConf = buildSparkConf(localRepoPath, env);
+ LOGGER.debug(sparkConf);
+ env.put("ZEPPELIN_SPARK_CONF", sparkConf);
+ env.put("ZEPPELIN_SPARK_K8_CLUSTER", "true");
+
+ return new SparkK8SRemoteInterpreterManagedProcess(
+ runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
+ zConf.getCallbackPortRange(),
+ zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
+ env, connectTimeout, processIdLabel, context.getInterpreterSettingName(),
+ option.isUserImpersonate());
+ }
+ }
+
+ private String buildSparkConf(String localRepoPath, Map env) {
+ StringBuilder sparkJarsBuilder = new StringBuilder();
+
+ String interpreterLibPath = zConf.getZeppelinHome() + "/lib/interpreter";
+ File interpreterLib = new File(interpreterLibPath);
+ if (interpreterLib.isDirectory()) {
+ for (File file : interpreterLib.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name.endsWith("jar")) {
+ return true;
+ }
+ return false;
+ }
+
+ })) {
+ if (sparkJarsBuilder.length() > 0) {
+ sparkJarsBuilder.append(",");
+ }
+ sparkJarsBuilder.append(file.getPath());
+ }
+ }
+
+ File localRepo = new File(localRepoPath);
+ if (localRepo.isDirectory()) {
+ for (File file : localRepo.listFiles()) {
+ if (sparkJarsBuilder.length() > 0) {
+ sparkJarsBuilder.append(",");
+ }
+ if (file.getName().endsWith("jar")) {
+ sparkJarsBuilder.append(file.getPath());
+ }
+ }
+ }
+
+ StringBuilder sparkConfBuilder = new StringBuilder(env.get("ZEPPELIN_SPARK_CONF"));
+ if (sparkJarsBuilder.length() > 0) {
+ sparkConfBuilder.append(" --jars ").append(sparkJarsBuilder.toString());
+ }
+ sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_k8_cluster" +
+ ".properties");
+ return sparkConfBuilder.toString();
+ }
+
+ /**
+ * Id for spark submit must be formatted to contain only alfanumeric chars.
+ * @param str
+ * @param maxLength
+ * @return
+ */
+ private String formatId(String str, int maxLength) {
+ str = str.replaceAll("[^a-zA-Z0-9]", "-").toLowerCase();
+ if (str.length() > maxLength) {
+ str = str.substring(0, maxLength - 1);
+ }
+ return str;
+ }
+
+ private String generatePodLabelId(String interpreterGroupId ) {
+ return formatId(interpreterGroupId + "_" + System.currentTimeMillis(), 64);
+ }
+
+}
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/SparkK8SRemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/SparkK8SRemoteInterpreterManagedProcess.java
new file mode 100755
index 00000000000..141e95bb771
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/SparkK8SRemoteInterpreterManagedProcess.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodStatus;
+import io.fabric8.kubernetes.api.model.Service;
+import io.fabric8.kubernetes.api.model.ServiceBuilder;
+import io.fabric8.kubernetes.client.Config;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watch;
+import io.fabric8.kubernetes.client.Watcher;
+import org.apache.commons.exec.CommandLine;
+import org.apache.commons.exec.DefaultExecutor;
+import org.apache.commons.exec.ExecuteException;
+import org.apache.commons.exec.ExecuteResultHandler;
+import org.apache.commons.exec.ExecuteWatchdog;
+import org.apache.commons.exec.LogOutputStream;
+import org.apache.commons.exec.PumpStreamHandler;
+import org.apache.commons.exec.environment.EnvironmentUtils;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This class manages start / stop of Spark remote interpreter process on a Kubernetes cluster.
+ * After Spark Driver started by spark-submit is in Running state, creates a Kubernetes service
+ * to connect to RemoteInterpreterServer running inside Spark Driver.
+ */
+public class SparkK8SRemoteInterpreterManagedProcess extends RemoteInterpreterProcess
+ implements ExecuteResultHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(
+ SparkK8SRemoteInterpreterManagedProcess.class);
+
+ private static final String SPARK_APP_SELECTOR = "spark-app-selector";
+ private static final String DRIVER_SERVICE_NAME_SUFFIX = "-ri-svc";
+ private static final String KUBERNETES_NAMESPACE = "default";
+ private static final String INTERPRETER_PROCESS_ID = "interpreter-processId";
+
+ protected final String interpreterRunner;
+ protected final String portRange;
+ private DefaultExecutor executor;
+ protected ProcessLogOutputStream processOutput;
+ private ExecuteWatchdog watchdog;
+ protected AtomicBoolean running = new AtomicBoolean(false);
+ protected String host = "localhost";
+ protected int port = -1;
+ protected final String interpreterDir;
+ protected final String localRepoDir;
+
+ protected Map env;
+ private final String processLabelId;
+ private final String interpreterSettingName;
+ private final boolean isUserImpersonated;
+
+ /**
+ * Default url for Kubernetes inside of an Kubernetes cluster.
+ */
+ private static String K8_URL = "https://kubernetes:443";
+
+ private KubernetesClient kubernetesClient;
+
+ private String driverPodName;
+ private PodStatus podStatus;
+ private Service driverService;
+
+ private AtomicBoolean serviceRunning = new AtomicBoolean(false);
+ private Watch podWatch;
+
+ public SparkK8SRemoteInterpreterManagedProcess(String intpRunner,
+ String portRange,
+ String intpDir,
+ String localRepoDir,
+ Map env,
+ int connectTimeout,
+ String processLabelId,
+ String interpreterSettingName,
+ boolean isUserImpersonated) {
+
+ super(connectTimeout);
+ this.interpreterRunner = intpRunner;
+ this.portRange = portRange;
+ this.env = env;
+ this.interpreterDir = intpDir;
+ this.localRepoDir = localRepoDir;
+ this.processLabelId = processLabelId;
+ this.port = 30000;
+ this.interpreterSettingName = interpreterSettingName;
+ this.isUserImpersonated = isUserImpersonated;
+ }
+
+
+ @Override
+ public String getInterpreterSettingName() {
+ return interpreterSettingName;
+ }
+
+ @Override
+ public void start(String userName) {
+ CommandLine cmdLine = CommandLine.parse(interpreterRunner);
+ cmdLine.addArgument("-d", false);
+ cmdLine.addArgument(interpreterDir, false);
+ cmdLine.addArgument("-p", false);
+ cmdLine.addArgument(Integer.toString(port), false);
+ if (isUserImpersonated && !userName.equals("anonymous")) {
+ cmdLine.addArgument("-u", false);
+ cmdLine.addArgument(userName, false);
+ }
+ cmdLine.addArgument("-l", false);
+ cmdLine.addArgument(localRepoDir, false);
+ cmdLine.addArgument("-g", false);
+ cmdLine.addArgument(interpreterSettingName, false);
+
+ ByteArrayOutputStream cmdOut = executeCommand(cmdLine);
+
+ podWatch = getKubernetesClient().pods().inNamespace(KUBERNETES_NAMESPACE).
+ withLabel(INTERPRETER_PROCESS_ID, processLabelId).watch(new Watcher() {
+
+ @Override
+ public void eventReceived(Action action, Pod pod) {
+ driverPodName = pod.getMetadata().getName();
+ logger.debug("Driver Pod {} Status: {}", driverPodName, pod.getStatus().getPhase());
+ podStatus = pod.getStatus();
+ String status = podStatus.getPhase();
+ if (status.equalsIgnoreCase("running")) {
+ Service driverService = getOrCreateEndpointService(pod);
+ if (driverService != null) {
+ host = driverService.getSpec().getClusterIP();
+ logger.info("Driver Service created: {}:{}", host, port);
+ synchronized (serviceRunning) {
+ serviceRunning.set(true);
+ serviceRunning.notifyAll();
+ }
+ }
+ } else if (status.equalsIgnoreCase("failed")) {
+ synchronized (serviceRunning) {
+ serviceRunning.set(false);
+ serviceRunning.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void onClose(KubernetesClientException cause) {
+ logger.debug("Watcher close due to " + cause);
+ }
+
+ });
+
+ synchronized (serviceRunning) {
+ try {
+ serviceRunning.wait(getConnectTimeout());
+ } catch (InterruptedException e) {
+ logger.error("wait for connect interrupted", e);
+ }
+ }
+ podWatch.close();
+
+ // try to connect if service is started
+ if (serviceRunning.get()) {
+ for (int retryCount = 0; !running.get() && retryCount < 10; retryCount++) {
+ if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(host, port)) {
+ logger.info("Remote endpoint accessible at: {}:{}", host, port);
+ running.set(true);
+ }
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error("wait for connect interrupted", e);
+ }
+ }
+ }
+
+ if (!running.get()) {
+ String errorMessage = "Unable to start SparkK8RemoteInterpreterManagedProcess: " +
+ "Spark Driver not found." + "\n" + new String(cmdOut.toByteArray());
+ if (podStatus != null) {
+ errorMessage = "Unable to start SparkK8RemoteInterpreterManagedProcess: " +
+ "Not able to connect to endpoint, Spark Driver status: " + podStatus + "\n"
+ + new String(cmdOut.toByteArray());
+ }
+ stop();
+ throw new RuntimeException(errorMessage);
+ }
+
+ }
+
+ private KubernetesClient getKubernetesClient() {
+ if (kubernetesClient == null) {
+ Config config = new ConfigBuilder().withMasterUrl(K8_URL).build();
+ logger.info("Connect to Kubernetes cluster at: {}", K8_URL);
+ kubernetesClient = new DefaultKubernetesClient(config);
+ }
+ return kubernetesClient;
+ }
+
+ private Service getEndpointService(String serviceName)
+ throws KubernetesClientException {
+ logger.debug("Check if RemoteInterpreterServer service {} exists", serviceName);
+ return getKubernetesClient().services().inNamespace(KUBERNETES_NAMESPACE).withName(serviceName)
+ .get();
+ }
+
+ private Service getOrCreateEndpointService(Pod driverPod)
+ throws KubernetesClientException {
+ String serviceName = driverPodName + DRIVER_SERVICE_NAME_SUFFIX;
+ driverService = getEndpointService(serviceName);
+
+ // create endpoint service for RemoteInterpreterServer
+ if (driverService == null) {
+ Map labels = driverPod.getMetadata().getLabels();
+ String label = labels.get(SPARK_APP_SELECTOR);
+ logger.info("Create RemoteInterpreterServer service for spark-app-selector: {}", label);
+ driverService = new ServiceBuilder().withNewMetadata()
+ .withName(serviceName).endMetadata()
+ .withNewSpec().addNewPort().withProtocol("TCP")
+ .withPort(getPort()).withNewTargetPort(getPort()).endPort()
+ .addToSelector(SPARK_APP_SELECTOR, label)
+ .withType("ClusterIP")
+ .endSpec().build();
+ driverService = getKubernetesClient().services().inNamespace(KUBERNETES_NAMESPACE)
+ .create(driverService);
+ }
+
+ return driverService;
+ }
+
+ private void deleteEndpointService()
+ throws KubernetesClientException {
+ if (driverService != null) {
+ boolean result = getKubernetesClient().services().inNamespace(KUBERNETES_NAMESPACE)
+ .delete(driverService);
+ logger.info("Delete RemoteInterpreterServer service {} : {}",
+ driverService.getMetadata().getName(), result);
+ }
+ }
+
+ private void deleteDriverPod() {
+ List podList = getKubernetesClient().pods().inNamespace(KUBERNETES_NAMESPACE)
+ .withLabel(INTERPRETER_PROCESS_ID, processLabelId).list().getItems();
+ if (podList.size() >= 1) {
+ Pod driverPod = podList.iterator().next();
+ String podName = driverPod.getMetadata().getName();
+ logger.debug("Delete Driver pod {} if Running, with status: ", podName,
+ driverPod.getStatus().getPhase());
+ getKubernetesClient().pods().delete(driverPod);
+ } else {
+ logger.debug("Pod not found!");
+ }
+ }
+
+ protected void stopEndPoint() {
+ if (driverPodName != null) {
+ try {
+ deleteEndpointService();
+ //deleteDriverPod();
+ getKubernetesClient().close();
+ kubernetesClient = null;
+ } catch (KubernetesClientException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ protected ByteArrayOutputStream executeCommand(CommandLine cmdLine) {
+
+ executor = new DefaultExecutor();
+
+ ByteArrayOutputStream cmdOut = new ByteArrayOutputStream();
+ processOutput = new ProcessLogOutputStream(logger);
+ processOutput.setOutputStream(cmdOut);
+
+ executor.setStreamHandler(new PumpStreamHandler(processOutput));
+ watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
+ executor.setWatchdog(watchdog);
+
+ try {
+ Map procEnv = EnvironmentUtils.getProcEnvironment();
+ procEnv.putAll(env);
+
+ logger.info("Run interpreter process {}", cmdLine);
+ executor.execute(cmdLine, procEnv, this);
+ } catch (IOException e) {
+ running.set(false);
+ throw new RuntimeException(e);
+ }
+
+ return cmdOut;
+ }
+
+ public void stop() {
+ // shutdown EventPoller first.
+ getRemoteInterpreterEventPoller().shutdown();
+ if (isRunning()) {
+ logger.info("kill interpreter process");
+ try {
+ callRemoteFunction(new RemoteFunction() {
+ @Override
+ public Void call(RemoteInterpreterService.Client client) throws Exception {
+ client.shutdown();
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ logger.warn("ignore the exception when shutting down");
+ }
+ }
+ stopEndPoint();
+ executor = null;
+ if (watchdog != null) {
+ watchdog.destroyProcess();
+ watchdog = null;
+ }
+ running.set(false);
+ logger.info("Remote process terminated");
+ }
+
+ public void onProcessComplete(int exitValue) {
+ logger.info("Interpreter process exited {}", exitValue);
+ running.set(false);
+ synchronized (serviceRunning) {
+ serviceRunning.notifyAll();
+ }
+ }
+
+ public void onProcessFailed(ExecuteException e) {
+ logger.info("Interpreter process failed {}", e);
+ running.set(false);
+ synchronized (serviceRunning) {
+ serviceRunning.notifyAll();
+ }
+ }
+
+ @Override
+ public String getHost() {
+ return host;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @VisibleForTesting
+ public Map getEnv() {
+ return env;
+ }
+
+ @VisibleForTesting
+ public String getLocalRepoDir() {
+ return localRepoDir;
+ }
+
+ @VisibleForTesting
+ public String getInterpreterDir() {
+ return interpreterDir;
+ }
+
+ @VisibleForTesting
+ public String getInterpreterRunner() {
+ return interpreterRunner;
+ }
+
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ /**
+ * ProcessLogOutputStream
+ */
+ protected static class ProcessLogOutputStream extends LogOutputStream {
+
+ private Logger logger;
+ OutputStream out;
+
+ public ProcessLogOutputStream(Logger logger) {
+ this.logger = logger;
+ }
+
+ @Override
+ protected void processLine(String s, int i) {
+ this.logger.debug(s);
+ }
+
+ @Override
+ public void write(byte [] b) throws IOException {
+ super.write(b);
+
+ if (out != null) {
+ synchronized (this) {
+ if (out != null) {
+ out.write(b);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void write(byte [] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+
+ if (out != null) {
+ synchronized (this) {
+ if (out != null) {
+ out.write(b, offset, len);
+ }
+ }
+ }
+ }
+
+ public void setOutputStream(OutputStream out) {
+ synchronized (this) {
+ this.out = out;
+ }
+ }
+ }
+
+}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkK8SInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkK8SInterpreterLauncherTest.java
new file mode 100644
index 00000000000..8353bae29f6
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkK8SInterpreterLauncherTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterOption;
+import org.apache.zeppelin.interpreter.remote.SparkK8SRemoteInterpreterManagedProcess;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class SparkK8SInterpreterLauncherTest {
+
+ @Test
+ public void testLauncher() {
+ ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+ SparkK8SInterpreterLauncher launcher = new SparkK8SInterpreterLauncher(zConf, null);
+ Properties properties = new Properties();
+ properties.setProperty("ENV_1", "VALUE_1");
+ properties.setProperty("property_1", "value_1");
+ InterpreterOption option = new InterpreterOption();
+ InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null,
+ "user1", "settingsId", "settingsGroupId",
+ "settingsGroupName", "settingsName");
+ InterpreterClient client = launcher.launch(context);
+ assertTrue( client instanceof SparkK8SRemoteInterpreterManagedProcess);
+ SparkK8SRemoteInterpreterManagedProcess interpreterProcess =
+ (SparkK8SRemoteInterpreterManagedProcess) client;
+ assertEquals(".//interpreter/settingsGroupName", interpreterProcess.getInterpreterDir());
+ assertEquals(".//local-repo/settingsGroupId", interpreterProcess.getLocalRepoDir());
+ assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
+ assertEquals(3, interpreterProcess.getEnv().size());
+ assertEquals(Boolean.TRUE.toString(), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_K8_CLUSTER"));
+ }
+}