Skip to content

Add support to run Spark interpreter on a Kubernetes cluster #2637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions bin/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,14 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
export JAVA_OPTS

JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
else

if [[ ! -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"

elif [[ ! -z "${ZEPPELIN_SPARK_K8_CLUSTER}" ]]; then
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_k8_cluster.properties"
else
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
fi
export JAVA_INTP_OPTS

Expand Down
6 changes: 5 additions & 1 deletion bin/interpreter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then
ZEPPELIN_LOGFILE+="${ZEPPELIN_IMPERSONATE_USER}-"
fi
ZEPPELIN_LOGFILE+="${ZEPPELIN_IDENT_STRING}-${HOSTNAME}.log"
JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"

if [[ -z "${ZEPPELIN_SPARK_K8_CLUSTER}" ]]; then
JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
fi

if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then
echo "Log dir doesn't exist, create ${ZEPPELIN_LOG_DIR}"
Expand Down Expand Up @@ -228,6 +231,7 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUB
INTERPRETER_RUN_COMMAND+="'"
fi

echo $INTERPRETER_RUN_COMMAND
eval $INTERPRETER_RUN_COMMAND &
pid=$!

Expand Down
23 changes: 23 additions & 0 deletions conf/log4j_k8_cluster.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

log4j.rootLogger = INFO, stdout

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n

163 changes: 163 additions & 0 deletions docs/interpreter/spark-interpreter-k8s.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
---
layout: page
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could put this in spark.md or at least add reference link to this doc in spark.md

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a link to this page in spark.md.

title: "Apache Spark Interpreter for Apache Zeppelin on Kubernetes"
description: "Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution engine. This interpreter runs on the https://github.com/apache-spark-on-k8s/spark version of Spark"
group: interpreter
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
{% include JB/setup %}

# How to run Zeppelin Spark notebooks on a Kubernetes cluster

<div id="toc"></div>

## Prerequisites

The following tools are required:

- Kubernetes cluster & kubectl

For local testing Minikube can be used to create a single node cluster: https://kubernetes.io/docs/tasks/tools/install-minikube/

- Docker https://kubernetes.io/docs/tasks/tools/install-minikube/

This documentation uses a pre-built Spark 2.2 Docker images, however you may also build these images as described here: https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/resource-managers/kubernetes/README.md

## Checkout Zeppelin source code

Checkout the latest source code from https://github.com/apache/zeppelin then apply changes from the [Add support to run Spark interpreter on a Kubernetes cluster](https://github.com/apache/zeppelin/pull/2637) pull request.

## Build Zeppelin
- `./dev/change_scala_version.sh 2.11`
- `mvn clean install -DskipTests -Pspark-2.2 -Phadoop-2.4 -Pyarn -Ppyspark -Pscala-2.11`


## Create distribution
- `cd zeppelin-distribution`
- `mvn org.apache.maven.plugins:maven-assembly-plugin:3.0.0:single -P apache-release`

## Create Zeppelin Dockerfile in Zeppelin distribution target folder
```
cd {zeppelin_source}/zeppelin-distribution/target/zeppelin-0.8.0-SNAPSHOT
cat > Dockerfile <<EOF
FROM kubespark/spark-base:v2.2.0-kubernetes-0.5.0
COPY zeppelin-0.8.0-SNAPSHOT /opt/zeppelin
ADD https://storage.googleapis.com/kubernetes-release/release/v1.7.4/bin/linux/amd64/kubectl /usr/local/bin
WORKDIR /opt/zeppelin
ENTRYPOINT bin/zeppelin.sh
EOF
```

## Create / Start a Kubernetes cluster
In case of using Minikube on Linux with KVM:

`minikube start --vm-driver=kvm --cpus={nr_of_cpus} --memory={mem}`

You can check the Kubernetes dashboard address by running: `minikube dashboard`.

Init docker env: `eval $(minikube docker-env)`

## Build & tag Docker image

```
docker build -t zeppelin-server:v2.2.0-kubernetes -f Dockerfile .
```

You can retrieve the `imageid` by running docker images`

## Start ResourceStagingServer for spark-submit

Spark-submit will use ResourceStagingServer to distribute resources (in our case the Zeppelin Spark interpreter JAR) across Spark driver and executors.

```
wget https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/conf/kubernetes-resource-staging-server.yaml
kubectl create -f kubernetes-resource-staging-server.yaml
```

## Create a Kubernetes service to reach Zeppelin server from outside the cluster

```
cat > zeppelin-service.yaml <<EOF
apiVersion: v1
kind: Service
metadata:
name: zeppelin-k8-service
labels:
app: zeppelin-server
spec:
ports:
- port: 8080
targetPort: 8080
selector:
app: zeppelin-server
type: NodePort
EOF

kubectl create -f zeppelin-service.yaml

```

## Start Zeppelin server

```
cat > zeppelin-pod-local.yaml <<EOF
apiVersion: v1
kind: Pod
metadata:
name: zeppelin-server
labels:
app: zeppelin-server
spec:
containers:
- name: zeppelin-server
image: zeppelin-server:v2.2.0-kubernetes
env:
- name: SPARK_SUBMIT_OPTIONS
value: --kubernetes-namespace default
--conf spark.executor.instances=1
--conf spark.kubernetes.resourceStagingServer.uri=http://{RESOURCE_STAGING_SERVER_ADDRESS}:10000
--conf spark.kubernetes.resourceStagingServer.internal.uri=http://{RESOURCE_STAGING_SERVER_ADDRESS}:10000
--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0 --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.2.0-kubernetes-0.5.0 --conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.5.0
ports:
- containerPort: 8080
EOF
```

## Edit SPARK_SUBMIT_OPTIONS:

- Set RESOURCE_STAGING_SERVER_ADDRESS address retrieving either from K8 dashboard or running:

`kubectl get svc spark-resource-staging-service -o jsonpath='{.spec.clusterIP}'`

## Start Zeppelin server:

`kubectl create -f zeppelin-pod-local.yaml`

You can retrieve Zeppelin server address either from K8 dashboard or using kubectl.
Zeppelin server should be reachable from outside of K8 cluster on K8 node address (same as in k8 master url KUBERNATES_NODE_ADDRESS) and nodePort property returned by running:

`kubectl get svc --selector=app=zeppelin-server -o jsonpath='{.items[0].spec.ports}'.`

## Edit spark interpreter settings
Set master url to point to your Kubernetes cluster: k8s://https://x.x.x.x:8443 or use default address which works inside a Kubernetes cluster:
k8s://https://kubernetes:443.
Add property 'spark.submit.deployMode' and set value to 'cluster'.


## Run ’Zeppelin Tutorial/Basic Features (Spark)’ notebook
In case of problems you can check for spark-submit output in Zeppelin logs after logging into zeppelin-server pod and restart Spark interpreter to try again.

`kubectl exec -it zeppelin-server bash`
Logs files are in /opt/zeppelin/logs folder.
7 changes: 7 additions & 0 deletions docs/interpreter/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,16 @@ Zeppelin support both yarn client and yarn cluster mode (yarn cluster mode is su
You can either specify them in `zeppelin-env.sh`, or in interpreter setting page. Specifying them in `zeppelin-env.sh` means you can use only one version of `spark` & `hadoop`. Specifying them
in interpreter setting page means you can use multiple versions of `spark` & `hadoop` in one zeppelin instance.

<<<<<<< HEAD
### 4. New Version of SparkInterpreter
There's one new version of SparkInterpreter starting with better spark support and code completion from Zeppelin 0.8.0, by default we still use the old version of SparkInterpreter.
If you want to use the new one, you can configure `zeppelin.spark.useNew` as `true` in its interpreter setting.
=======
### 4. Kubernetes cluster modules

Zeppelin supports running Spark notebooks on Kubernetes in cluster mode, you can find more detailed description here: [How to run Zeppelin Spark notebooks on a Kubernetes cluster
](../interpreter/spark-interpreter-k8s.html)
>>>>>>> ZEPPELIN-3021. Add support to run Spark interpreter on a Kubernetes cluster

## SparkContext, SQLContext, SparkSession, ZeppelinContext
SparkContext, SQLContext and ZeppelinContext are automatically created and exposed as variable names `sc`, `sqlContext` and `z`, respectively, in Scala, Python and R environments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,16 @@ public static void main(String[] args)
String callbackHost = null;
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
String portRange = ":";
if (args.length > 0) {
if (args.length == 1) {
port = Integer.parseInt(args[0]);
} else if (args.length > 0) {
callbackHost = args[0];
port = Integer.parseInt(args[1]);
if (args.length > 2) {
portRange = args[2];
}
}

RemoteInterpreterServer remoteInterpreterServer =
new RemoteInterpreterServer(callbackHost, port, portRange);
remoteInterpreterServer.start();
Expand Down
31 changes: 31 additions & 0 deletions zeppelin-zengine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,37 @@
<version>1.5</version>
</dependency>

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.4.1</version>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>3.0.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>


</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
import org.apache.zeppelin.interpreter.launcher.SparkK8SInterpreterLauncher;
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
Expand Down Expand Up @@ -295,9 +296,16 @@ public InterpreterSetting(InterpreterSetting o) {
this.conf = o.getConf();
}

private void createLauncher() {
private void createLauncher(Properties properties) {
if (group.equals("spark")) {
this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
String deployMode = properties.getProperty("spark.submit.deployMode");
String masterUrl = properties.getProperty("master");
if (deployMode != null && deployMode.equals("cluster") &&
masterUrl != null && masterUrl.startsWith("k8s://")) {
this.launcher = new SparkK8SInterpreterLauncher(this.conf, this.recoveryStorage);
} else {
this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
}
} else {
this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage);
}
Expand Down Expand Up @@ -709,7 +717,7 @@ synchronized RemoteInterpreterProcess createInterpreterProcess(String interprete
Properties properties)
throws IOException {
if (launcher == null) {
createLauncher();
createLauncher(properties);
}
InterpreterLaunchContext launchContext = new
InterpreterLaunchContext(properties, option, interpreterRunner, userName,
Expand Down
Loading