-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18278] [Scheduler] Spark on Kubernetes - Basic Scheduler Backend #19468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f6fdd6a
75e31a9
cf82b21
488c535
82b79a7
c052212
c565c9f
2fb596d
992acbe
b0a5839
a4f9797
2b5dcac
018f4d8
4b32134
6cf4ed7
1f271be
71a971f
0ab9ca7
7f14b71
7afce3f
b75b413
3b587b4
cb12fec
ae396cf
f8e3249
a44c29e
4bed817
c386186
b85cfc4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2648,6 +2648,13 @@ | |
</modules> | ||
</profile> | ||
|
||
<profile> | ||
<id>kubernetes</id> | ||
<modules> | ||
<module>resource-managers/kubernetes/core</module> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this going to be a multi-module module later? If so, might want to create a parent pom in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We expected it would be a multi-module (https://github.com/apache-spark-on-k8s/spark/tree/branch-2.2-kubernetes/resource-managers/kubernetes). The other modules being - configuration files for the docker images and integration tests. The docker files are pretty much static configuration files, so, moving that instead to a directory like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on a discussion in last week's meeting with Shane Knapp from RISELab, we want to keep the integration tests as a sub-module here - in the interest of keeping test code together. We should have the additional parent pom to facilitate that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vanzin this isn't a multi-module project in the sense that the Kubernetes cluster manager and spark-submit implementation are split across multiple projects - but rather that there is a module for said cluster manager + spark-submit implementation, and then there are modules for integration testing said code. @foxish The Dockerfiles feel more like application code rather than static configuration but that might just be a matter of implementation. The structure of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are integration tests in a separate module? e.g. maven has an
That would mean keeping the test code in the same module as the core code, not in a separate module. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not absolutely necessary to have integration tests in a specific separate module. However, there are some nice organizational benefits we can get. For example, integration tests in the same module as the core code will need a specific package namespace that is omitted from the It's also IMO easier to read the
We definitely don't want to run these during unit tests - they are relatively expensive, require building Docker images, and require Minikube to be pre-installed on the given machine. Having them in at least the separate integration test phase makes these differences clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That (keeping them separate) is actually pretty useful for SBT. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. both yarn and meson don't have a sub-directory called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #19468 (comment) |
||
</modules> | ||
</profile> | ||
|
||
<profile> | ||
<id>hive-thriftserver</id> | ||
<modules> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
~ Licensed to the Apache Software Foundation (ASF) under one or more | ||
~ contributor license agreements. See the NOTICE file distributed with | ||
~ this work for additional information regarding copyright ownership. | ||
~ The ASF licenses this file to You under the Apache License, Version 2.0 | ||
~ (the "License"); you may not use this file except in compliance with | ||
~ the License. You may obtain a copy of the License at | ||
~ | ||
~ http://www.apache.org/licenses/LICENSE-2.0 | ||
~ | ||
~ Unless required by applicable law or agreed to in writing, software | ||
~ distributed under the License is distributed on an "AS IS" BASIS, | ||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
~ See the License for the specific language governing permissions and | ||
~ limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
<parent> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-parent_2.11</artifactId> | ||
<version>2.3.0-SNAPSHOT</version> | ||
<relativePath>../../../pom.xml</relativePath> | ||
</parent> | ||
|
||
<artifactId>spark-kubernetes_2.11</artifactId> | ||
<packaging>jar</packaging> | ||
<name>Spark Project Kubernetes</name> | ||
<properties> | ||
<sbt.project.name>kubernetes</sbt.project.name> | ||
<kubernetes.client.version>2.2.13</kubernetes.client.version> | ||
|
||
</properties> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.spark</groupId> | ||
<artifactId>spark-core_${scala.binary.version}</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>io.fabric8</groupId> | ||
<artifactId>kubernetes-client</artifactId> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these dependencies need to be listed, please see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The dependency needs to be added to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Listed in f8e3249. |
||
<version>${kubernetes.client.version}</version> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-core</artifactId> | ||
|
||
</exclusion> | ||
<exclusion> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-databind</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-annotations</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>com.fasterxml.jackson.dataformat</groupId> | ||
<artifactId>jackson-dataformat-yaml</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
|
||
<!-- Required by kubernetes-client but we exclude it --> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.dataformat</groupId> | ||
<artifactId>jackson-dataformat-yaml</artifactId> | ||
<version>${fasterxml.jackson.version}</version> | ||
</dependency> | ||
|
||
<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive --> | ||
<dependency> | ||
<groupId>com.google.guava</groupId> | ||
<artifactId>guava</artifactId> | ||
</dependency> | ||
<!-- End of shaded deps. --> | ||
|
||
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-core</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
</dependencies> | ||
|
||
|
||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> | ||
</build> | ||
|
||
</project> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.k8s | ||
|
||
import org.apache.spark.{SparkConf, SparkException} | ||
import org.apache.spark.internal.Logging | ||
|
||
private[spark] object ConfigurationUtils extends Logging { | ||
|
||
def parsePrefixedKeyValuePairs( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add comment to explain what does the function do, it not only return the configs, but also ensure no duplicate configs are set. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
sparkConf: SparkConf, | ||
prefix: String, | ||
configType: String): Map[String, String] = { | ||
|
||
val fromPrefix = sparkConf.getAllWithPrefix(prefix) | ||
fromPrefix.groupBy(_._1).foreach { | ||
case (key, values) => | ||
require(values.size == 1, | ||
|
||
s"Cannot have multiple values for a given $configType key, got key $key with" + | ||
s" values $values") | ||
} | ||
fromPrefix.toMap | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.deploy.k8s | ||
|
||
private[spark] object OptionRequirements { | ||
|
||
|
||
def requireBothOrNeitherDefined( | ||
opt1: Option[_], | ||
opt2: Option[_], | ||
errMessageWhenFirstIsMissing: String, | ||
errMessageWhenSecondIsMissing: String): Unit = { | ||
requireSecondIfFirstIsDefined(opt1, opt2, errMessageWhenSecondIsMissing) | ||
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) | ||
} | ||
|
||
def requireSecondIfFirstIsDefined( | ||
opt1: Option[_], opt2: Option[_], errMessageWhenSecondIsMissing: String): Unit = { | ||
|
||
opt1.foreach { _ => | ||
require(opt2.isDefined, errMessageWhenSecondIsMissing) | ||
} | ||
} | ||
|
||
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { | ||
opt1.foreach { _ => require(opt2.isEmpty, errMessage) } | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.deploy.k8s | ||
|
||
import java.io.File | ||
|
||
import com.google.common.base.Charsets | ||
import com.google.common.io.Files | ||
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClient} | ||
|
||
import io.fabric8.kubernetes.client.utils.HttpClientUtils | ||
import okhttp3.Dispatcher | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems new, should this be listed in pom file? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and license.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in f8e3249. |
||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.deploy.k8s.config._ | ||
import org.apache.spark.util.ThreadUtils | ||
|
||
/** | ||
* Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to | ||
* parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL | ||
* options for different components. | ||
*/ | ||
private[spark] object SparkKubernetesClientFactory { | ||
|
||
def createKubernetesClient( | ||
master: String, | ||
namespace: Option[String], | ||
kubernetesAuthConfPrefix: String, | ||
sparkConf: SparkConf, | ||
maybeServiceAccountToken: Option[File], | ||
|
||
maybeServiceAccountCaCert: Option[File]): KubernetesClient = { | ||
val oauthTokenFileConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_FILE_CONF_SUFFIX" | ||
val oauthTokenConf = s"$kubernetesAuthConfPrefix.$OAUTH_TOKEN_CONF_SUFFIX" | ||
val oauthTokenFile = sparkConf.getOption(oauthTokenFileConf) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not create constants like for other config options? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This lacks context from the We intend to have two different sets of authentication options for the Kubernetes API. The first is the credentials for creating a driver pod and all the Kubernetes resources that the application requires outside of executor pods. The second is a set of credentials that the driver can use to create executor pods. These options will have shared suffixes in the configuration keys but different prefixes. The reasoning for two sets of credentials is twofold:
|
||
.map(new File(_)) | ||
.orElse(maybeServiceAccountToken) | ||
val oauthTokenValue = sparkConf.getOption(oauthTokenConf) | ||
OptionRequirements.requireNandDefined( | ||
oauthTokenFile, | ||
oauthTokenValue, | ||
s"Cannot specify OAuth token through both a file $oauthTokenFileConf and a" + | ||
s" value $oauthTokenConf.") | ||
|
||
val caCertFile = sparkConf | ||
.getOption(s"$kubernetesAuthConfPrefix.$CA_CERT_FILE_CONF_SUFFIX") | ||
.orElse(maybeServiceAccountCaCert.map(_.getAbsolutePath)) | ||
val clientKeyFile = sparkConf | ||
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_KEY_FILE_CONF_SUFFIX") | ||
val clientCertFile = sparkConf | ||
.getOption(s"$kubernetesAuthConfPrefix.$CLIENT_CERT_FILE_CONF_SUFFIX") | ||
val dispatcher = new Dispatcher( | ||
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher")) | ||
val config = new ConfigBuilder() | ||
.withApiVersion("v1") | ||
.withMasterUrl(master) | ||
.withWebsocketPingInterval(0) | ||
.withOption(oauthTokenValue) { | ||
(token, configBuilder) => configBuilder.withOauthToken(token) | ||
}.withOption(oauthTokenFile) { | ||
(file, configBuilder) => | ||
configBuilder.withOauthToken(Files.toString(file, Charsets.UTF_8)) | ||
}.withOption(caCertFile) { | ||
(file, configBuilder) => configBuilder.withCaCertFile(file) | ||
}.withOption(clientKeyFile) { | ||
(file, configBuilder) => configBuilder.withClientKeyFile(file) | ||
}.withOption(clientCertFile) { | ||
(file, configBuilder) => configBuilder.withClientCertFile(file) | ||
}.withOption(namespace) { | ||
(ns, configBuilder) => configBuilder.withNamespace(ns) | ||
}.build() | ||
val baseHttpClient = HttpClientUtils.createHttpClient(config) | ||
val httpClientWithCustomDispatcher = baseHttpClient.newBuilder() | ||
.dispatcher(dispatcher) | ||
.build() | ||
new DefaultKubernetesClient(httpClientWithCustomDispatcher, config) | ||
} | ||
|
||
private implicit class OptionConfigurableConfigBuilder(configBuilder: ConfigBuilder) { | ||
|
||
|
||
def withOption[T] | ||
(option: Option[T]) | ||
(configurator: ((T, ConfigBuilder) => ConfigBuilder)): OptionConfigurableConfigBuilder = { | ||
new OptionConfigurableConfigBuilder(option.map { opt => | ||
configurator(opt, configBuilder) | ||
}.getOrElse(configBuilder)) | ||
} | ||
|
||
def build(): Config = configBuilder.build() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this new profile is not use in build script, shall we add this so that the new contributions in this PR will be built and tested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also change the sbt file to make it work using sbt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. PTAL.