layout | title |
---|---|
global |
Running Spark on Mesos |
- This will become a table of contents (this text will be scraped). {:toc}
Spark can run on hardware clusters managed by Apache Mesos.
The advantages of deploying Spark with Mesos include:
- dynamic partitioning between Spark and other frameworks
- scalable partitioning between multiple instances of Spark
In a standalone cluster deployment, the cluster manager in the below diagram is a Spark master instance. When using Mesos, the Mesos master replaces the Spark master as the cluster manager.
Now when a driver creates a job and starts issuing tasks for scheduling, Mesos determines what machines handle what tasks. Because it takes into account other frameworks when scheduling these many short-lived tasks, multiple frameworks can coexist on the same cluster without resorting to a static partitioning of resources.
To get started, follow the steps below to install Mesos and deploy Spark jobs via Mesos.
Spark {{site.SPARK_VERSION}} is designed for use with Mesos {{site.MESOS_VERSION}} or newer and does not require any special patches of Mesos. File and environment-based secrets support requires Mesos 1.3.0 or newer.
If you already have a Mesos cluster running, you can skip this Mesos installation step.
Otherwise, installing Mesos for Spark is no different than installing Mesos for use by other frameworks. You can install Mesos either from source or using prebuilt packages.
To install Apache Mesos from source, follow these steps:
- Download a Mesos release from a mirror
- Follow the Mesos Getting Started page for compiling and installing Mesos
Note: If you want to run Mesos without installing it into the default paths on your system
(e.g., if you lack administrative privileges to install it), pass the
--prefix
option to configure
to tell it where to install. For example, pass
--prefix=/home/me/mesos
. By default the prefix is /usr/local
.
The Apache Mesos project only publishes source releases, not binary packages. But other third party projects publish binary releases that may be helpful in setting Mesos up.
One of those is Mesosphere. To install Mesos using the binary releases provided by Mesosphere:
- Download Mesos installation package from downloads page
- Follow their instructions for installation and configuration
The Mesosphere installation documents suggest setting up ZooKeeper to handle Mesos master failover, but Mesos can be run without ZooKeeper using a single master as well.
To verify that the Mesos cluster is ready for Spark, navigate to the Mesos master webui at port
:5050
Confirm that all expected machines are present in the slaves tab.
To use Mesos from Spark, you need a Spark binary package available in a place accessible by Mesos, and a Spark driver program configured to connect to Mesos.
Alternatively, you can also install Spark in the same location in all the Mesos slaves, and configure
spark.mesos.executor.home
(defaults to SPARK_HOME) to point to that location.
When Mesos Framework authentication is enabled it is necessary to provide a principal and secret by which to authenticate Spark to Mesos. Each Spark job will register with Mesos as a separate framework.
Depending on your deployment environment you may wish to create a single set of framework credentials that are shared across all users or create framework credentials for each user. Creating and managing framework credentials should be done following the Mesos Authentication documentation.
Framework credentials may be specified in a variety of ways depending on your deployment environment and security requirements. The most simple way is to specify the spark.mesos.principal
and spark.mesos.secret
values directly in your Spark configuration. Alternatively you may specify these values indirectly by instead specifying spark.mesos.principal.file
and spark.mesos.secret.file
, these settings point to files containing the principal and secret. These files must be plaintext files in UTF-8 encoding. Combined with appropriate file ownership and mode/ACLs this provides a more secure way to specify these credentials.
Additionally if you prefer to use environment variables you can specify all of the above via environment variables instead, the environment variable names are simply the configuration settings uppercased with .
replaced with _
e.g. SPARK_MESOS_PRINCIPAL
.
Please note that if you specify multiple ways to obtain the credentials then the following preference order applies. Spark will use the first valid value found and any subsequent values are ignored:
spark.mesos.principal
configuration settingSPARK_MESOS_PRINCIPAL
environment variablespark.mesos.principal.file
configuration settingSPARK_MESOS_PRINCIPAL_FILE
environment variable
An equivalent order applies for the secret. Essentially we prefer the configuration to be specified directly rather than indirectly by files, and we prefer that configuration settings are used over environment variables.
When Mesos runs a task on a Mesos slave for the first time, that slave must have a Spark binary
package for running the Spark Mesos executor backend.
The Spark package can be hosted at any Hadoop-accessible URI, including HTTP via http://
,
Amazon Simple Storage Service via s3n://
, or HDFS via hdfs://
.
To use a precompiled package:
- Download a Spark binary package from the Spark download page
- Upload to hdfs/http/s3
To host on HDFS, use the Hadoop fs put command: hadoop fs -put spark-{{site.SPARK_VERSION}}.tar.gz /path/to/spark-{{site.SPARK_VERSION}}.tar.gz
Or if you are using a custom-compiled version of Spark, you will need to create a package using
the dev/make-distribution.sh
script included in a Spark source tarball/checkout.
- Download and build Spark using the instructions here
- Create a binary package using
./dev/make-distribution.sh --tgz
. - Upload archive to http/s3/hdfs
The Master URLs for Mesos are in the form mesos://host:5050
for a single-master Mesos
cluster, or mesos://zk://host1:2181,host2:2181,host3:2181/mesos
for a multi-master Mesos cluster using ZooKeeper.
In client mode, a Spark Mesos framework is launched directly on the client machine and waits for the driver output.
The driver needs some configuration in spark-env.sh
to interact properly with Mesos:
- In
spark-env.sh
set some environment variables:
export MESOS_NATIVE_JAVA_LIBRARY=<path to libmesos.so>
. This path is typically<prefix>/lib/libmesos.so
where the prefix is/usr/local
by default. See Mesos installation instructions above. On Mac OS X, the library is calledlibmesos.dylib
instead oflibmesos.so
.export SPARK_EXECUTOR_URI=<URL of spark-{{site.SPARK_VERSION}}.tar.gz uploaded above>
.
- Also set
spark.executor.uri
to<URL of spark-{{site.SPARK_VERSION}}.tar.gz>
.
Now when starting a Spark application against the cluster, pass a mesos://
URL as the master when creating a SparkContext
. For example:
{% highlight scala %} val conf = new SparkConf() .setMaster("mesos://HOST:5050") .setAppName("My app") .set("spark.executor.uri", "<path to spark-{{site.SPARK_VERSION}}.tar.gz uploaded above>") val sc = new SparkContext(conf) {% endhighlight %}
(You can also use spark-submit
and configure spark.executor.uri
in the conf/spark-defaults.conf file.)
When running a shell, the spark.executor.uri
parameter is inherited from SPARK_EXECUTOR_URI
, so
it does not need to be redundantly passed in as a system property.
{% highlight bash %} ./bin/spark-shell --master mesos://host:5050 {% endhighlight %}
Spark on Mesos also supports cluster mode, where the driver is launched in the cluster and the client can find the results of the driver from the Mesos Web UI.
To use cluster mode, you must start the MesosClusterDispatcher
in your cluster via the sbin/start-mesos-dispatcher.sh
script,
passing in the Mesos master URL (e.g: mesos://host:5050). This starts the MesosClusterDispatcher
as a daemon running on the host.
By setting the Mesos proxy config property (requires mesos version >= 1.4), --conf spark.mesos.proxy.baseURL=http://localhost:5050
when launching the dispatcher, the mesos sandbox URI for each driver is added to the mesos dispatcher UI.
If you like to run the MesosClusterDispatcher
with Marathon, you need to run the MesosClusterDispatcher
in the foreground (i.e: bin/spark-class org.apache.spark.deploy.mesos.MesosClusterDispatcher
). Note that the MesosClusterDispatcher
not yet supports multiple instances for HA.
The MesosClusterDispatcher
also supports writing recovery state into Zookeeper. This will allow the MesosClusterDispatcher
to be able to recover all submitted and running containers on relaunch. In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env by configuring spark.deploy.recoveryMode
and related spark.deploy.zookeeper.* configurations.
For more information about these configurations please refer to the configurations doc.
You can also specify any additional jars required by the MesosClusterDispatcher
in the classpath by setting the environment variable SPARK_DAEMON_CLASSPATH in spark-env.
From the client, you can submit a job to Mesos cluster by running spark-submit
and specifying the master URL
to the URL of the MesosClusterDispatcher
(e.g: mesos://dispatcher:7077). You can view driver statuses on the
Spark cluster Web UI.
For example:
{% highlight bash %}
./bin/spark-submit
--class org.apache.spark.examples.SparkPi
--master mesos://207.184.161.138:7077
--deploy-mode cluster
--supervise
--executor-memory 20G
--total-executor-cores 100
http://path/to/examples.jar
1000
{% endhighlight %}
Note that jars or python files that are passed to spark-submit should be URIs reachable by Mesos slaves, as the Spark driver doesn't automatically upload local jars.
Spark can run over Mesos in two modes: "coarse-grained" (default) and "fine-grained" (deprecated).
In "coarse-grained" mode, each Spark executor runs as a single Mesos task. Spark executors are sized according to the following configuration variables:
- Executor memory:
spark.executor.memory
- Executor cores:
spark.executor.cores
- Number of executors:
spark.cores.max
/spark.executor.cores
Please see the Spark Configuration page for details and default values.
Executors are brought up eagerly when the application starts, until
spark.cores.max
is reached. If you don't set spark.cores.max
, the
Spark application will consume all resources offered to it by Mesos,
so we of course urge you to set this variable in any sort of
multi-tenant cluster, including one which runs multiple concurrent
Spark applications.
The scheduler will start executors round-robin on the offers Mesos gives it, but there are no spread guarantees, as Mesos does not provide such guarantees on the offer stream.
In this mode spark executors will honor port allocation if such is
provided from the user. Specifically if the user defines
spark.blockManager.port
in Spark configuration,
the mesos scheduler will check the available offers for a valid port
range containing the port numbers. If no such range is available it will
not launch any task. If no restriction is imposed on port numbers by the
user, ephemeral ports are used as usual. This port honouring implementation
implies one task per host if the user defines a port. In the future network
isolation shall be supported.
The benefit of coarse-grained mode is much lower startup overhead, but at the cost of reserving Mesos resources for the complete duration of the application. To configure your job to dynamically adjust to its resource requirements, look into Dynamic Allocation.
NOTE: Fine-grained mode is deprecated as of Spark 2.0.0. Consider using Dynamic Allocation for some of the benefits. For a full explanation see SPARK-11857
In "fine-grained" mode, each Spark task inside the Spark executor runs as a separate Mesos task. This allows multiple instances of Spark (and other frameworks) to share cores at a very fine granularity, where each application gets more or fewer cores as it ramps up and down, but it comes with an additional overhead in launching each task. This mode may be inappropriate for low-latency requirements like interactive queries or serving web requests.
Note that while Spark tasks in fine-grained will relinquish cores as they terminate, they will not relinquish memory, as the JVM does not give memory back to the Operating System. Neither will executors terminate when they're idle.
To run in fine-grained mode, set the spark.mesos.coarse
property to false in your
SparkConf:
{% highlight scala %} conf.set("spark.mesos.coarse", "false") {% endhighlight %}
You may also make use of spark.mesos.constraints
to set
attribute-based constraints on Mesos resource offers. By default, all
resource offers will be accepted.
{% highlight scala %} conf.set("spark.mesos.constraints", "os:centos7;us-east-1:false") {% endhighlight %}
For example, Let's say spark.mesos.constraints
is set to os:centos7;us-east-1:false
, then the resource offers will
be checked to see if they meet both these constraints and only then will be accepted to start new executors.
To constrain where driver tasks are run, use spark.mesos.driver.constraints
Spark can make use of a Mesos Docker containerizer by setting the property spark.mesos.executor.docker.image
in your SparkConf.
The Docker image used must have an appropriate version of Spark already part of the image, or you can have Mesos download Spark via the usual methods.
Requires Mesos version 0.20.1 or later.
Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image
tags you can set spark.mesos.executor.docker.forcePullImage
to true
in order to force the agent to always pull the
image before running the executor. Force pulling images is only available in Mesos version 0.22 and above.
You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
separate service on the machines. To access Hadoop data from Spark, a full hdfs://
URL is required
(typically hdfs://<namenode>:9000/path
, but you can find the right URL on your Hadoop Namenode web
UI).
In addition, it is possible to also run Hadoop MapReduce on Mesos for better resource isolation and sharing between the two. In this case, Mesos will act as a unified scheduler that assigns cores to either Hadoop or Spark, as opposed to having them share resources via the Linux scheduler on each node. Please refer to Hadoop on Mesos.
In either case, HDFS runs separately from Hadoop MapReduce, without being scheduled through Mesos.
Mesos supports dynamic allocation only with coarse-grained mode, which can resize the number of executors based on statistics of the application. For general information, see Dynamic Resource Allocation.
The External Shuffle Service to use is the Mesos Shuffle Service. It provides shuffle data cleanup functionality
on top of the Shuffle Service since Mesos doesn't yet support notifying another framework's
termination. To launch it, run $SPARK_HOME/sbin/start-mesos-shuffle-service.sh
on all slave nodes, with spark.shuffle.service.enabled
set to true
.
This can also be achieved through Marathon, using a unique host constraint, and the following command: bin/spark-class org.apache.spark.deploy.mesos.MesosExternalShuffleService
.
See the configuration page for information on Spark configurations. The following configs are specific for Spark on Mesos.
Property Name | Default | Meaning |
---|---|---|
spark.mesos.coarse |
true |
If set to true , runs over Mesos clusters in "coarse-grained" sharing mode, where Spark acquires one long-lived Mesos task on each machine.
If set to false , runs over Mesos cluster in "fine-grained" sharing mode, where one Mesos task is created per Spark task.
Detailed information in 'Mesos Run Modes'.
|
spark.mesos.extra.cores |
0 |
Set the extra number of cores for an executor to advertise. This does not result in more cores allocated. It instead means that an executor will "pretend" it has more cores, so that the driver will send it more tasks. Use this to increase parallelism. This setting is only used for Mesos coarse-grained mode. |
spark.mesos.mesosExecutor.cores |
1.0 |
(Fine-grained mode only) Number of cores to give each Mesos executor. This does not include the cores used to run the Spark tasks. In other words, even if no Spark task is being run, each Mesos executor will occupy the number of cores configured here. The value can be a floating point number. |
spark.mesos.executor.docker.image |
(none) |
Set the name of the docker image that the Spark executors will run in. The selected
image must have Spark installed, as well as a compatible version of the Mesos library.
The installed path of Spark in the image can be specified with spark.mesos.executor.home ;
the installed path of the Mesos library can be specified with spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY .
|
spark.mesos.executor.docker.forcePullImage |
false |
Force Mesos agents to pull the image specified in spark.mesos.executor.docker.image .
By default Mesos agents will not pull images they already have cached.
|
spark.mesos.executor.docker.parameters |
(none) |
Set the list of custom parameters which will be passed into the docker run command when launching the Spark executor on Mesos using the docker containerizer. The format of this property is a comma-separated list of
key/value pairs. Example:
|
spark.mesos.executor.docker.volumes |
(none) |
Set the list of volumes which will be mounted into the Docker image, which was set using
spark.mesos.executor.docker.image . The format of this property is a comma-separated list of
mappings following the form passed to docker run -v . That is they take the form:
|
spark.mesos.task.labels |
(none) | Set the Mesos labels to add to each task. Labels are free-form key-value pairs. Key-value pairs should be separated by a colon, and commas used to list more than one. If your label includes a colon or comma, you can escape it with a backslash. Ex. key:value,key2:a\:b. |
spark.mesos.executor.home |
driver side SPARK_HOME |
Set the directory in which Spark is installed on the executors in Mesos. By default, the
executors will simply use the driver's Spark home directory, which may not be visible to
them. Note that this is only relevant if a Spark binary package is not specified through
spark.executor.uri .
|
spark.mesos.executor.memoryOverhead |
executor memory * 0.10, with minimum of 384 |
The amount of additional memory, specified in MB, to be allocated per executor. By default,
the overhead will be larger of either 384 or 10% of spark.executor.memory . If set,
the final overhead will be this value.
|
spark.mesos.uris |
(none) | A comma-separated list of URIs to be downloaded to the sandbox when driver or executor is launched by Mesos. This applies to both coarse-grained and fine-grained mode. |
spark.mesos.principal |
(none) | Set the principal with which Spark framework will use to authenticate with Mesos. You can also specify this via the environment variable `SPARK_MESOS_PRINCIPAL`. |
spark.mesos.principal.file |
(none) | Set the file containing the principal with which Spark framework will use to authenticate with Mesos. Allows specifying the principal indirectly in more security conscious deployments. The file must be readable by the user launching the job and be UTF-8 encoded plaintext. You can also specify this via the environment variable `SPARK_MESOS_PRINCIPAL_FILE`. |
spark.mesos.secret |
(none) | Set the secret with which Spark framework will use to authenticate with Mesos. Used, for example, when authenticating with the registry. You can also specify this via the environment variable `SPARK_MESOS_SECRET`. |
spark.mesos.secret.file |
(none) | Set the file containing the secret with which Spark framework will use to authenticate with Mesos. Used, for example, when authenticating with the registry. Allows for specifying the secret indirectly in more security conscious deployments. The file must be readable by the user launching the job and be UTF-8 encoded plaintext. You can also specify this via the environment variable `SPARK_MESOS_SECRET_FILE`. |
spark.mesos.role |
* |
Set the role of this Spark framework for Mesos. Roles are used in Mesos for reservations and resource weight sharing. |
spark.mesos.constraints |
(none) |
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. This setting
applies only to executors. Refer to Mesos
Attributes & Resources for more information on attributes.
|
spark.mesos.driver.constraints |
(none) |
Same as spark.mesos.constraints except applied to drivers when launched through the dispatcher. By default,
all offers with sufficient resources will be accepted.
|
spark.mesos.containerizer |
docker |
This only affects docker containers, and must be one of "docker" or "mesos". Mesos supports two types of containerizers for docker: the "docker" containerizer, and the preferred "mesos" containerizer. Read more here: http://mesos.apache.org/documentation/latest/container-image/ |
spark.mesos.driver.webui.url |
(none) |
Set the Spark Mesos driver webui_url for interacting with the framework. If unset it will point to Spark's internal web UI. |
spark.mesos.driver.labels |
(none) |
Mesos labels to add to the driver. See spark.mesos.task.labels
for formatting information.
|
spark.mesos.driver.secret.values ,
spark.mesos.driver.secret.names ,
spark.mesos.executor.secret.values ,
spark.mesos.executor.secret.names ,
|
(none) |
A secret is specified by its contents and destination. These properties specify a secret's contents. To specify a secret's destination, see the cell below. You can specify a secret's contents either (1) by value or (2) by reference.
(1) To specify a secret by value, set the
|
spark.mesos.driver.secret.envkeys ,
spark.mesos.driver.secret.filenames ,
spark.mesos.executor.secret.envkeys ,
spark.mesos.executor.secret.filenames ,
|
(none) |
A secret is specified by its contents and destination. These properties specify a secret's destination. To specify a secret's contents, see the cell above. You can specify a secret's destination in the driver or executors as either (1) an environment variable or (2) as a file.
(1) To make an environment-based secret, set the
|
spark.mesos.driverEnv.[EnvironmentVariableName] |
(none) |
This only affects drivers submitted in cluster mode. Add the environment variable specified by EnvironmentVariableName to the driver process. The user can specify multiple of these to set multiple environment variables. |
spark.mesos.dispatcher.webui.url |
(none) |
Set the Spark Mesos dispatcher webui_url for interacting with the framework. If unset it will point to Spark's internal web UI. |
spark.mesos.dispatcher.driverDefault.[PropertyName] |
(none) |
Set default properties for drivers submitted through the dispatcher. For example, spark.mesos.dispatcher.driverProperty.spark.executor.memory=32g results in the executors for all drivers submitted in cluster mode to run in 32g containers. |
spark.mesos.dispatcher.historyServer.url |
(none) |
Set the URL of the history server. The dispatcher will then link each driver to its entry in the history server. |
spark.mesos.gpus.max |
0 |
Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found since this configuration is just a upper limit and not a guaranteed amount. |
spark.mesos.network.name |
(none) |
Attach containers to the given named network. If this job is launched in cluster mode, also launch the driver in the given named network. See the Mesos CNI docs for more details. |
spark.mesos.network.labels |
(none) |
Pass network labels to CNI plugins. This is a comma-separated list
of key-value pairs, where each key-value pair has the format key:value.
Example:
|
spark.mesos.fetcherCache.enable |
false |
If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the Mesos Fetcher Cache |
spark.mesos.driver.failoverTimeout |
0.0 |
The amount of time (in seconds) that the master will wait for the driver to reconnect, after being temporarily disconnected, before it tears down the driver framework by killing all its executors. The default value is zero, meaning no timeout: if the driver disconnects, the master immediately tears down the framework. |
spark.mesos.rejectOfferDuration |
120s |
Time to consider unused resources refused, serves as a fallback of `spark.mesos.rejectOfferDurationForUnmetConstraints`, `spark.mesos.rejectOfferDurationForReachedMaxCores` |
spark.mesos.rejectOfferDurationForUnmetConstraints |
spark.mesos.rejectOfferDuration |
Time to consider unused resources refused with unmet constraints |
spark.mesos.rejectOfferDurationForReachedMaxCores |
spark.mesos.rejectOfferDuration |
Time to consider unused resources refused when maximum number of cores
spark.cores.max is reached
|
A few places to look during debugging:
- Mesos master on port
:5050
- Slaves should appear in the slaves tab
- Spark applications should appear in the frameworks tab
- Tasks should appear in the details of a framework
- Check the stdout and stderr of the sandbox of failed tasks
- Mesos logs
- Master and slave logs are both in
/var/log/mesos
by default
- Master and slave logs are both in
And common pitfalls:
- Spark assembly not reachable/accessible
- Slaves must be able to download the Spark binary package from the
http://
,hdfs://
ors3n://
URL you gave
- Slaves must be able to download the Spark binary package from the
- Firewall blocking communications
- Check for messages about failed connections
- Temporarily disable firewalls for debugging and then poke appropriate holes