diff --git a/README.md b/README.md index cf8e4e89..3374933e 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,8 @@ Peel offers the following features for your experiments. | HDFS | 2.7.2 | `hdfs-2.7.2` | | HDFS | 2.7.3 | `hdfs-2.7.3` | | HDFS | 2.8.0 | `hdfs-2.8.0` | +| HDFS | 3.1.1 | `hdfs-3.1.1` | +| Yarn | 3.1.1 | `yarn-3.1.1` | | Flink | 0.8.0 | `flink-0.8.0` | | Flink | 0.8.1 | `flink-0.8.1` | | Flink | 0.9.0 | `flink-0.9.0` | @@ -56,6 +58,10 @@ Peel offers the following features for your experiments. | Flink | 1.3.1 | `flink-1.3.1` | | Flink | 1.3.2 | `flink-1.3.2` | | Flink | 1.4.0 | `flink-1.4.0` | +| Flink Standalone Cluster | 1.7.0 | `flink-1.7.0` | +| Flink Standalone Cluster | 1.7.2 | `flink-1.7.2` | +| Flink Yarn Session | 1.7.0 | `flink-yarn-1.7.0` | +| Flink Yarn Session | 1.7.2 | `flink-yarn-1.7.2` | | MapReduce | 1.2.1 | `mapred-1.2.1` | | MapReduce | 2.4.1 | `mapred-2.4.1` | | Spark | 1.3.1 | `spark-1.3.1` | diff --git a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala index aa84d3f0..acfae2aa 100644 --- a/peel-core/src/main/scala/org/peelframework/core/config/Model.scala +++ b/peel-core/src/main/scala/org/peelframework/core/config/Model.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.ListBuffer trait Model { case class Pair(name: String, value: Any) {} + case class Section(name: String, entries: util.List[Pair]) {} } @@ -76,6 +77,54 @@ object Model { } } + /** A model for INI type configuration files (e.g. etc/hadoop/container-executor.xml). + * + * Consists of a single entry `sections` which is constructed by collecting + * all direct children under the specified `prefix` path. Each child + * represents a section in the INI file and collects the (key, value) pairs + * below it. + * + * (key, value) pairs that should not appear in a section have to be + * listed under the child with a special name "_root_" (without quotes). + * + * See https://en.wikipedia.org/wiki/INI_file for details. + * + * @param c The HOCON config to use when constructing the model. + * @param prefix The prefix path which has to be rendered. + */ + class INI(val c: Config, val prefix: String) extends Model { + + val sections = { + val sectionBuffer = ListBuffer[Section]() + + def sanitize(s: String) = + s.stripPrefix(s"$prefix.") // remove prefix + + def fixRoot(s: String) = if (s == "_root_") null else s + + def collectPairs(c: Config, name: String): Unit = { + val buffer = ListBuffer[Pair]() + + for (e <- c.entrySet().asScala) { + val key = sanitize(e.getKey) + .replace("\"_root_\"", "_root_") + .stripPrefix(s"$name.") + buffer += Pair(key, c.getString(e.getKey)) + } + + sectionBuffer += Section(fixRoot(name), buffer.toList.asJava) + } + + for (e <- c.getObject(prefix).entrySet().asScala) { + val name = sanitize(e.getKey) + collectPairs(c.withOnlyPath(s"$prefix.$name"), name) + } + + sectionBuffer.toList.asJava + } + + } + /** A model for environment files (e.g., etc/hadoop/hadoop-env.sh). * * The children of the specified `prefix` path in the given `c` config are converted as (key, value) pairs in a @@ -99,7 +148,7 @@ object Model { } } - /** A model for (key, value) based yaml files (e.g. conf/flink-conf.yaml). + /** A model for (key, value) based files (e.g. conf/flink-conf.yaml or etc/hadoop/capacity-manager.xml). * * Consists of multiple (key, value) entries which are constructed by collecting all values under * the specified `prefix` path. Intermediate paths are thereby flattened, i.e. @@ -118,7 +167,7 @@ object Model { * @param c The HOCON config to use when constructing the model. * @param prefix The prefix path which has to be rendered. */ - class Yaml(val c: Config, val prefix: String) extends util.HashMap[String, Object] with Model { + class KeyValue(val c: Config, val prefix: String) extends util.HashMap[String, Object] with Model { // constructor { @@ -139,6 +188,11 @@ object Model { } } + /** Alias for Model.KeyValue. + * + */ + class Yaml(override val c: Config, override val prefix: String) extends KeyValue(c, prefix) { } + /** A model for list based hosts files (e.g. etc/hadoop/slaves). * * Consists of a single entry `hosts` which is constructed by extracting the string list value under the diff --git a/peel-core/src/main/scala/org/peelframework/core/graph/DependencyGraph.scala b/peel-core/src/main/scala/org/peelframework/core/graph/DependencyGraph.scala index 465173c4..03867783 100644 --- a/peel-core/src/main/scala/org/peelframework/core/graph/DependencyGraph.scala +++ b/peel-core/src/main/scala/org/peelframework/core/graph/DependencyGraph.scala @@ -173,7 +173,7 @@ class DependencyGraph[T: ClassTag] { throw new Exception("Cannot reverse empty Graph!") } - /** Collects descendants in a depth-first manner starting from the given set. + /** Collects descendants in a breadth-first manner starting from the given set. * * @param toVisit A set of nodes that are yet to be visited. * @param visited A list of already visited nodes. @@ -191,7 +191,7 @@ class DependencyGraph[T: ClassTag] { case x: Any => !visited.contains(x) } - collect(children ++ toVisit.tail, next :: visited, excluded) + collect(toVisit.tail ++ children, next :: visited, excluded) } } diff --git a/peel-extensions/src/main/resources/reference.flink-1.7.0.conf b/peel-extensions/src/main/resources/reference.flink-1.7.0.conf new file mode 100644 index 00000000..e0731977 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.flink-1.7.0.conf @@ -0,0 +1,19 @@ +# include common flink configuration +include "reference.flink.conf" + +system { + flink { + path { + archive.url = "http://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.12.tgz" + archive.md5 = "b66459379703adb25c64fbe20bf8f4b1" + archive.src = ${app.path.downloads}"/flink-1.7.0-bin-hadoop28-scala_2.12.tgz" + home = ${system.flink.path.archive.dst}"/flink-1.7.0" + } + config { + # flink.yaml entries + yaml { + env.pid.dir = "/tmp/flink-1.7.0-pid" + } + } + } +} diff --git a/peel-extensions/src/main/resources/reference.flink-1.7.2.conf b/peel-extensions/src/main/resources/reference.flink-1.7.2.conf new file mode 100644 index 00000000..ddf4d250 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.flink-1.7.2.conf @@ -0,0 +1,19 @@ +# include common flink configuration +include "reference.flink.conf" + +system { + flink { + path { + archive.url = "http://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-hadoop28-scala_2.12.tgz" + archive.md5 = "b9469d4d166520ec767bac0d82c165a7" + archive.src = ${app.path.downloads}"/flink-1.7.2-bin-hadoop28-scala_2.12.tgz" + home = ${system.flink.path.archive.dst}"/flink-1.7.2" + } + config { + # flink.yaml entries + yaml { + env.pid.dir = "/tmp/flink-1.7.2-pid" + } + } + } +} diff --git a/peel-extensions/src/main/resources/reference.flink-yarn-1.7.0.conf b/peel-extensions/src/main/resources/reference.flink-yarn-1.7.0.conf new file mode 100644 index 00000000..ec2dd218 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.flink-yarn-1.7.0.conf @@ -0,0 +1,2 @@ +# include common flink configuration +include "reference.flink-1.7.0.conf" \ No newline at end of file diff --git a/peel-extensions/src/main/resources/reference.flink-yarn-1.7.2.conf b/peel-extensions/src/main/resources/reference.flink-yarn-1.7.2.conf new file mode 100644 index 00000000..1e7c71c8 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.flink-yarn-1.7.2.conf @@ -0,0 +1,2 @@ +# include common flink configuration +include "reference.flink-1.7.2.conf" \ No newline at end of file diff --git a/peel-extensions/src/main/resources/reference.hadoop-3.x.conf b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf new file mode 100644 index 00000000..adc827ca --- /dev/null +++ b/peel-extensions/src/main/resources/reference.hadoop-3.x.conf @@ -0,0 +1,58 @@ +system { + hadoop-3 { + user = ${system.default.user} + group = ${system.default.group} + path { + isShared = ${system.default.path.isShared} + archive.dst = ${app.path.systems} + config = ${system.hadoop-3.path.home}"/etc/hadoop" + log = ${system.hadoop-3.path.home}"/logs" + input = ${system.hadoop-3.config.core.fs.default.name}"/tmp/input" + output = ${system.hadoop-3.config.core.fs.default.name}"/tmp/output" + } + format = true + startup { + max.attempts = ${system.default.startup.max.attempts} + polling { + counter = ${system.default.startup.polling.counter} + interval = ${system.default.startup.polling.interval} + } + } + config { + # put list of masters + masters = ${system.default.config.masters} + # put list of workers + workers = ${system.default.config.slaves} + # unfortunately, the slaves config key is hard-coded in Java code + slaves = ${system.default.config.slaves} + # hadoop-env.sh entries + env { + JAVA_HOME = ${system.default.config.java.home} + HADOOP_INSTALL = ${system.hadoop-3.path.home} + # directory where process IDs are stored + HADOOP_PID_DIR = "/tmp/hadoop-3/pid" + # avoids loading wrong native library in the default case + # override with /lib/native if lib exists + HADOOP_COMMON_LIB_NATIVE_DIR = "$HADOOP_INSTALL/lib/native" + } + # core-site.xml entries + core { + fs.default.name = "hdfs://localhost:9000" + } + # hdfs-site.xml entries + hdfs { + dfs.replication = 1 + dfs.namenode.name.dir = "file:///tmp/hdfs-3/name" + dfs.datanode.data.dir = "file:///tmp/hdfs-3/data" + } + # mapred-site.xml entries + mapred { } + # yarn-site.xml entries + yarn { } + # capacity-scheduler.xml entries + capacity-scheduler { } + # container-executor.cfg entries + container-executor { } + } + } +} diff --git a/peel-extensions/src/main/resources/reference.hdfs-3.1.1.conf b/peel-extensions/src/main/resources/reference.hdfs-3.1.1.conf new file mode 100644 index 00000000..588b423b --- /dev/null +++ b/peel-extensions/src/main/resources/reference.hdfs-3.1.1.conf @@ -0,0 +1,20 @@ +# include common hadoop-3.x configuration +include "reference.hadoop-3.x.conf" + +system { + hadoop-3 { + path { + archive.url = "http://archive.apache.org/dist/hadoop/core/hadoop-3.1.1/hadoop-3.1.1.tar.gz" + archive.md5 = "0b6ab06b59ae75f433de387783f19011" + archive.src = ${app.path.downloads}"/hadoop-3.1.1.tar.gz" + home = ${system.hadoop-3.path.archive.dst}"/hadoop-3.1.1" + } + config { + # hadoop-env.sh entries + env { + # directory where process IDs are stored + HADOOP_PID_DIR = "/tmp/hadoop-3.1.1-pid" + } + } + } +} \ No newline at end of file diff --git a/peel-extensions/src/main/resources/reference.yarn-3.1.1.conf b/peel-extensions/src/main/resources/reference.yarn-3.1.1.conf new file mode 100644 index 00000000..56baabe2 --- /dev/null +++ b/peel-extensions/src/main/resources/reference.yarn-3.1.1.conf @@ -0,0 +1,5 @@ +# include common yarn-3.x configuration +include "reference.yarn-3.x.conf" + +# pick up hdfs-3.1.1 configuration containing archive data +include "reference.hdfs-3.1.1.conf" \ No newline at end of file diff --git a/peel-extensions/src/main/resources/reference.yarn-3.x.conf b/peel-extensions/src/main/resources/reference.yarn-3.x.conf new file mode 100644 index 00000000..d752059b --- /dev/null +++ b/peel-extensions/src/main/resources/reference.yarn-3.x.conf @@ -0,0 +1,2 @@ +# include common hadoop-3.x configuration +include "reference.hadoop-3.x.conf" \ No newline at end of file diff --git a/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache b/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache new file mode 100644 index 00000000..a00b60a7 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/flink/1.7/conf/flink-conf.yaml.mustache @@ -0,0 +1,253 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +#============================================================================== +# Common +#============================================================================== + +# The external address of the host on which the JobManager runs and can be +# reached by the TaskManagers and any clients which want to connect. This setting +# is only used in Standalone mode and may be overwritten on the JobManager side +# by specifying the --host parameter of the bin/jobmanager.sh executable. +# In high availability mode, if you use the bin/start-cluster.sh script and setup +# the conf/masters file, this will be taken care of automatically. Yarn/Mesos +# automatically configure the host name based on the hostname of the node where the +# JobManager runs. + +{{#jobmanager.rpc.address}}jobmanager.rpc.address: {{jobmanager.rpc.address}}{{/jobmanager.rpc.address}}{{^jobmanager.rpc.address}}jobmanager.rpc.address: localhost{{/jobmanager.rpc.address}} + +# The RPC port where the JobManager is reachable. + +{{#jobmanager.rpc.port}}jobmanager.rpc.port: {{jobmanager.rpc.port}}{{/jobmanager.rpc.port}}{{^jobmanager.rpc.port}}jobmanager.rpc.port: 6123{{/jobmanager.rpc.port}} + + +# The heap size for the JobManager JVM + +{{#jobmanager.heap.size}}jobmanager.heap.size: {{jobmanager.heap.size}}{{/jobmanager.heap.size}}{{^jobmanager.heap.size}}jobmanager.heap.size: 1024m{{/jobmanager.heap.size}} + + +# The heap size for the TaskManager JVM + +{{#taskmanager.heap.size}}taskmanager.heap.size: {{taskmanager.heap.size}}{{/taskmanager.heap.size}}{{^taskmanager.heap.size}}taskmanager.heap.size: 1024m{{/taskmanager.heap.size}} + + +# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. + +{{#taskmanager.numberOfTaskSlots}}taskmanager.numberOfTaskSlots: {{taskmanager.numberOfTaskSlots}}{{/taskmanager.numberOfTaskSlots}}{{^taskmanager.numberOfTaskSlots}}taskmanager.numberOfTaskSlots: 1{{/taskmanager.numberOfTaskSlots}} + +# The parallelism used for programs that did not specify and other parallelism. + +{{#parallelism.default}}parallelism.default: {{parallelism.default}}{{/parallelism.default}}{{^parallelism.default}}parallelism.default: 1{{/parallelism.default}} + +# The default file system scheme and authority. +# +# By default file paths without scheme are interpreted relative to the local +# root file system 'file:///'. Use this to override the default and interpret +# relative paths relative to a different file system, +# for example 'hdfs://mynamenode:12345' +# +# fs.default-scheme + +#============================================================================== +# High Availability +#============================================================================== + +# The high-availability mode. Possible options are 'NONE' or 'zookeeper'. +# +{{#high-availability}}high-availability: {{high-availability}}{{/high-availability}}{{^high-availability}}# high-availability: zookeeper{{/high-availability}} + +# The path where metadata for master recovery is persisted. While ZooKeeper stores +# the small ground truth for checkpoint and leader election, this location stores +# the larger objects, like persisted dataflow graphs. +# +# Must be a durable file system that is accessible from all nodes +# (like HDFS, S3, Ceph, nfs, ...) +# +{{#high-availability.storageDir}}high-availability.storageDir: {{high-availability.storageDir}}{{/high-availability.storageDir}}{{^high-availability.storageDir}}# high-availability.storageDir: hdfs:///flink/ha/{{/high-availability.storageDir}} + +# The list of ZooKeeper quorum peers that coordinate the high-availability +# setup. This must be a list of the form: +# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) +# +{{#high-availability.zookeeper.quorum}}high-availability.zookeeper.quorum: {{high-availability.zookeeper.quorum}}{{/high-availability.zookeeper.quorum}}{{^high-availability.zookeeper.quorum}}# high-availability.zookeeper.quorum: localhost:2181{{/high-availability.zookeeper.quorum}} + + +# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes +# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) +# The default value is "open" and it can be changed to "creator" if ZK security is enabled +# +{{#high-availability.zookeeper.client.acl}}high-availability.zookeeper.client.acl: {{high-availability.zookeeper.client.acl}}{{/high-availability.zookeeper.client.acl}}{{^high-availability.zookeeper.client.acl}}# high-availability.zookeeper.client.acl: open{{/high-availability.zookeeper.client.acl}} + +#============================================================================== +# Fault tolerance and checkpointing +#============================================================================== + +# The backend that will be used to store operator state checkpoints if +# checkpointing is enabled. +# +# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the +# . +# +{{#state.backend}}state.backend: {{state.backend}}{{/state.backend}}{{^state.backend}}# state.backend: filesystem{{/state.backend}} + +# Directory for checkpoints filesystem, when using any of the default bundled +# state backends. +# +{{#state.checkpoints.dir}}state.checkpoints.dir: {{state.checkpoints.dir}}{{/state.checkpoints.dir}}{{^state.checkpoints.dir}}# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints{{/state.checkpoints.dir}} + +# Default target directory for savepoints, optional. +# +{{#state.savepoints.dir}}state.savepoints.dir: {{state.savepoints.dir}}{{/state.savepoints.dir}}{{^state.savepoints.dir}}# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints{{/state.savepoints.dir}} + +# Flag to enable/disable incremental checkpoints for backends that +# support incremental checkpoints (like the RocksDB state backend). +# +{{#state.backend.incremental}}state.backend.incremental: {{state.backend.incremental}}{{/state.backend.incremental}}{{^state.backend.incremental}}# state.backend.incremental: false{{/state.backend.incremental}} + +#============================================================================== +# Web Frontend +#============================================================================== + +# The address under which the web-based runtime monitor listens. +# +#web.address: 0.0.0.0 + +# The port under which the web-based runtime monitor listens. +# A value of -1 deactivates the web server. + +{{#rest.port}}rest.port: {{rest.port}}{{/rest.port}}{{^rest.port}}rest.port: 8081{{/rest.port}} + +# Flag to specify whether job submission is enabled from the web-based +# runtime monitor. Uncomment to disable. + +#web.submit.enable: false + +#============================================================================== +# Advanced +#============================================================================== + +# Override the directories for temporary files. If not specified, the +# system-specific Java temporary directory (java.io.tmpdir property) is taken. +# +# For framework setups on Yarn or Mesos, Flink will automatically pick up the +# containers' temp directories without any need for configuration. +# +# Add a delimited list for multiple directories, using the system directory +# delimiter (colon ':' on unix) or a comma, e.g.: +# /data1/tmp:/data2/tmp:/data3/tmp +# +{{#Note}}Note: {{Note}}{{/Note}}{{^Note}}# Note: Each directory entry is read from and written to by a different I/O{{/Note}} +# thread. You can include the same directory multiple times in order to create +# multiple I/O threads against that directory. This is for example relevant for +# high-throughput RAIDs. +# +{{#io.tmp.dirs}}io.tmp.dirs: {{io.tmp.dirs}}{{/io.tmp.dirs}}{{^io.tmp.dirs}}# io.tmp.dirs: /tmp{{/io.tmp.dirs}} + +# Specify whether TaskManager's managed memory should be allocated when starting +# up (true) or when memory is requested. +# +# We recommend to set this value to 'true' only in setups for pure batch +# processing (DataSet API). Streaming setups currently do not use the TaskManager's +{{#managed memory}}managed memory: {{managed memory}}{{/managed memory}}{{^managed memory}}# managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,{{/managed memory}} +# while the 'memory' and 'filesystem' backends explicitly keep data as objects +# to save on serialization cost. +# +{{#taskmanager.memory.preallocate}}taskmanager.memory.preallocate: {{taskmanager.memory.preallocate}}{{/taskmanager.memory.preallocate}}{{^taskmanager.memory.preallocate}}# taskmanager.memory.preallocate: false{{/taskmanager.memory.preallocate}} + +# The classloading resolve order. Possible values are 'child-first' (Flink's default) +# and 'parent-first' (Java's default). +# +# Child first classloading allows users to use different dependency/library +# versions in their application than those in the classpath. Switching back +# to 'parent-first' may help with debugging dependency issues. +# +{{#classloader.resolve-order}}classloader.resolve-order: {{classloader.resolve-order}}{{/classloader.resolve-order}}{{^classloader.resolve-order}}# classloader.resolve-order: child-first{{/classloader.resolve-order}} + +# The amount of memory going to the network stack. These numbers usually need +# no tuning. Adjusting them may be necessary in case of an "Insufficient number +# of network buffers" error. The default min is 64MB, teh default max is 1GB. +# +{{#taskmanager.network.memory.fraction}}taskmanager.network.memory.fraction: {{taskmanager.network.memory.fraction}}{{/taskmanager.network.memory.fraction}}{{^taskmanager.network.memory.fraction}}# taskmanager.network.memory.fraction: 0.1{{/taskmanager.network.memory.fraction}} +{{#taskmanager.network.memory.min}}taskmanager.network.memory.min: {{taskmanager.network.memory.min}}{{/taskmanager.network.memory.min}}{{^taskmanager.network.memory.min}}# taskmanager.network.memory.min: 64mb{{/taskmanager.network.memory.min}} +{{#taskmanager.network.memory.max}}taskmanager.network.memory.max: {{taskmanager.network.memory.max}}{{/taskmanager.network.memory.max}}{{^taskmanager.network.memory.max}}# taskmanager.network.memory.max: 1gb{{/taskmanager.network.memory.max}} + +#============================================================================== +# Flink Cluster Security Configuration +#============================================================================== + +# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - +# may be enabled in four steps: +# 1. configure the local krb5.conf file +# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) +# 3. make the credentials available to various JAAS login contexts +# 4. configure the connector to use JAAS/SASL + +# The below configure how Kerberos credentials are provided. A keytab will be used instead of +# a ticket cache if the keytab path and principal are set. + +{{#security.kerberos.login.use-ticket-cache}}security.kerberos.login.use-ticket-cache: {{security.kerberos.login.use-ticket-cache}}{{/security.kerberos.login.use-ticket-cache}}{{^security.kerberos.login.use-ticket-cache}}# security.kerberos.login.use-ticket-cache: true{{/security.kerberos.login.use-ticket-cache}} +{{#security.kerberos.login.keytab}}security.kerberos.login.keytab: {{security.kerberos.login.keytab}}{{/security.kerberos.login.keytab}}{{^security.kerberos.login.keytab}}# security.kerberos.login.keytab: /path/to/kerberos/keytab{{/security.kerberos.login.keytab}} +{{#security.kerberos.login.principal}}security.kerberos.login.principal: {{security.kerberos.login.principal}}{{/security.kerberos.login.principal}}{{^security.kerberos.login.principal}}# security.kerberos.login.principal: flink-user{{/security.kerberos.login.principal}} + +# The configuration below defines which JAAS login contexts + +{{#security.kerberos.login.contexts}}security.kerberos.login.contexts: {{security.kerberos.login.contexts}}{{/security.kerberos.login.contexts}}{{^security.kerberos.login.contexts}}# security.kerberos.login.contexts: Client,KafkaClient{{/security.kerberos.login.contexts}} + +#============================================================================== +# ZK Security Configuration +#============================================================================== + +# Below configurations are applicable if ZK ensemble is configured for security + +# Override below configuration to provide custom ZK service name if configured +{{#zookeeper.sasl.service-name}}zookeeper.sasl.service-name: {{zookeeper.sasl.service-name}}{{/zookeeper.sasl.service-name}}{{^zookeeper.sasl.service-name}}# zookeeper.sasl.service-name: zookeeper{{/zookeeper.sasl.service-name}} + +# The configuration below must match one of the values set in "security.kerberos.login.contexts" +{{#zookeeper.sasl.login-context-name}}zookeeper.sasl.login-context-name: {{zookeeper.sasl.login-context-name}}{{/zookeeper.sasl.login-context-name}}{{^zookeeper.sasl.login-context-name}}# zookeeper.sasl.login-context-name: Client{{/zookeeper.sasl.login-context-name}} + +#============================================================================== +# HistoryServer +#============================================================================== + +# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) + +# Directory to upload completed jobs to. Add this directory to the list of +# monitored directories of the HistoryServer as well (see below). +#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ + +# The address under which the web-based HistoryServer listens. +#historyserver.web.address: 0.0.0.0 + +# The port under which the web-based HistoryServer listens. +#historyserver.web.port: 8082 + +# Comma separated list of directories to monitor for completed jobs. +#historyserver.archive.fs.dir: hdfs:///completed-jobs/ + +# Interval in milliseconds for refreshing the monitored directories. +#historyserver.archive.fs.refresh-interval: 10000 + +# The directory where the PID files are stored +{{#env.pid.dir}}env.pid.dir: {{env.pid.dir}}{{/env.pid.dir}}{{^env.pid.dir}}env.pid.dir: /tmp{{/env.pid.dir}} + +# Hadoop / YARN configuration directory +{{#env.hadoop.conf.dir}}env.hadoop.conf.dir: {{env.hadoop.conf.dir}}{{/env.hadoop.conf.dir}} + +# Use accelerators provided by Hadoop +{{#taskmanager.useAccelerators}}taskmanager.useAccelerators: {{taskmanager.useAccelerators}}{{/taskmanager.useAccelerators}} \ No newline at end of file diff --git a/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j-yarn-session.properties.mustache b/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j-yarn-session.properties.mustache new file mode 100644 index 00000000..63331c44 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j-yarn-session.properties.mustache @@ -0,0 +1,43 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This affects logging for both user code and Flink +log4j.rootLogger=INFO, file + +# The following lines keep the log level of common libraries/connectors on +# log level INFO. The root logger does not override this. You have to manually +# change the log levels here. +log4j.logger.akka=INFO +log4j.logger.org.apache.kafka=INFO +log4j.logger.org.apache.hadoop=INFO +log4j.logger.org.apache.zookeeper=INFO + +# Log all infos in the given file +{{#log4j.appender.file._root_}}log4j.appender.file={{log4j.appender.file._root_}}{{/log4j.appender.file._root_}}{{^log4j.appender.file._root_}}log4j.appender.file=org.apache.log4j.FileAppender{{/log4j.appender.file._root_}} +{{#log4j.appender.file.file}}log4j.appender.file.file={{log4j.appender.file.file}}{{/log4j.appender.file.file}}{{^log4j.appender.file.file}}log4j.appender.file.file=${log.file}{{/log4j.appender.file.file}} +{{#log4j.appender.file.append}}log4j.appender.file.append={{log4j.appender.file.append}}{{/log4j.appender.file.append}}{{^log4j.appender.file.append}}log4j.appender.file.append=false{{/log4j.appender.file.append}} +{{#log4j.appender.file.layout._root_}}log4j.appender.file.layout={{log4j.appender.file.layout._root_}}{{/log4j.appender.file.layout._root_}}{{^log4j.appender.file.layout._root_}}log4j.appender.file.layout=org.apache.log4j.PatternLayout{{/log4j.appender.file.layout._root_}} +{{#log4j.appender.file.layout.ConversionPattern}}log4j.appender.file.layout.ConversionPattern={{log4j.appender.file.layout.ConversionPattern}}{{/log4j.appender.file.layout.ConversionPattern}}{{^log4j.appender.file.layout.ConversionPattern}}log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n{{/log4j.appender.file.layout.ConversionPattern}} + +# suppress the irrelevant (wrong) warnings from the netty channel handler +{{#log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline={{log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}{{/log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}{{^log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file{{/log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}} +{{#log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline={{log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}}{{/log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}}{{^log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, stdout{{/log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline}} +{{#log4j.logger.org.apache.zookeeper}}log4j.logger.org.apache.zookeeper={{log4j.logger.org.apache.zookeeper}}{{/log4j.logger.org.apache.zookeeper}}{{^log4j.logger.org.apache.zookeeper}}log4j.logger.org.apache.zookeeper=WARN, stdout{{/log4j.logger.org.apache.zookeeper}} +{{#log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}}log4j.logger.org.apache.flink.shaded.org.apache.curator.framework={{log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}}{{/log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}}{{^log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}}log4j.logger.org.apache.flink.shaded.org.apache.curator.framework=WARN, stdout{{/log4j.logger.org.apache.flink.shaded.org.apache.curator.framework}} +{{#log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}}log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils={{log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}}{{/log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}}{{^log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}}log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils=WARN, stdout{{/log4j.logger.org.apache.flink.runtime.util.ZooKeeperUtils}} +{{#log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}}log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService={{log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}}{{/log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}}{{^log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}}log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService=WARN, stdout{{/log4j.logger.org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService}} diff --git a/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j.properties.mustache b/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j.properties.mustache new file mode 100644 index 00000000..78f7a22e --- /dev/null +++ b/peel-extensions/src/main/resources/templates/flink/1.7/conf/log4j.properties.mustache @@ -0,0 +1,38 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# This affects logging for both user code and Flink +log4j.rootLogger=INFO, file + +# The following lines keep the log level of common libraries/connectors on +# log level INFO. The root logger does not override this. You have to manually +# change the log levels here. +log4j.logger.akka=INFO +log4j.logger.org.apache.kafka=INFO +log4j.logger.org.apache.hadoop=INFO +log4j.logger.org.apache.zookeeper=INFO + +# Log all infos in the given file +{{#log4j.appender.file._root_}}log4j.appender.file={{log4j.appender.file._root_}}{{/log4j.appender.file._root_}}{{^log4j.appender.file._root_}}log4j.appender.file=org.apache.log4j.FileAppender{{/log4j.appender.file._root_}} +{{#log4j.appender.file.file}}log4j.appender.file.file={{log4j.appender.file.file}}{{/log4j.appender.file.file}}{{^log4j.appender.file.file}}log4j.appender.file.file=${log.file}{{/log4j.appender.file.file}} +{{#log4j.appender.file.append}}log4j.appender.file.append={{log4j.appender.file.append}}{{/log4j.appender.file.append}}{{^log4j.appender.file.append}}log4j.appender.file.append=false{{/log4j.appender.file.append}} +{{#log4j.appender.file.layout._root_}}log4j.appender.file.layout={{log4j.appender.file.layout._root_}}{{/log4j.appender.file.layout._root_}}{{^log4j.appender.file.layout._root_}}log4j.appender.file.layout=org.apache.log4j.PatternLayout{{/log4j.appender.file.layout._root_}} +{{#log4j.appender.file.layout.ConversionPattern}}log4j.appender.file.layout.ConversionPattern={{log4j.appender.file.layout.ConversionPattern}}{{/log4j.appender.file.layout.ConversionPattern}}{{^log4j.appender.file.layout.ConversionPattern}}log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n{{/log4j.appender.file.layout.ConversionPattern}} + +# suppress the irrelevant (wrong) warnings from the netty channel handler +{{#log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline={{log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}{{/log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}{{^log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}}log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file{{/log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline}} diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/capacity-scheduler.xml.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/capacity-scheduler.xml.mustache new file mode 100644 index 00000000..6bda9cac --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/capacity-scheduler.xml.mustache @@ -0,0 +1,217 @@ + + + + + yarn.scheduler.capacity.maximum-applications + {{#yarn.scheduler.capacity.maximum-applications}}{{yarn.scheduler.capacity.maximum-applications}}{{/yarn.scheduler.capacity.maximum-applications}}{{^yarn.scheduler.capacity.maximum-applications}}10000{{/yarn.scheduler.capacity.maximum-applications}} + + Maximum number of applications that can be pending and running. + + + + + yarn.scheduler.capacity.maximum-am-resource-percent + {{#yarn.scheduler.capacity.maximum-am-resource-percent}}{{yarn.scheduler.capacity.maximum-am-resource-percent}}{{/yarn.scheduler.capacity.maximum-am-resource-percent}}{{^yarn.scheduler.capacity.maximum-am-resource-percent}}0.1{{/yarn.scheduler.capacity.maximum-am-resource-percent}} + + Maximum percent of resources in the cluster which can be used to run + application masters i.e. controls number of concurrent running + applications. + + + + + yarn.scheduler.capacity.resource-calculator + {{#yarn.scheduler.capacity.resource-calculator}}{{yarn.scheduler.capacity.resource-calculator}}{{/yarn.scheduler.capacity.resource-calculator}}{{^yarn.scheduler.capacity.resource-calculator}}org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator{{/yarn.scheduler.capacity.resource-calculator}} + + The ResourceCalculator implementation to be used to compare + Resources in the scheduler. + The default i.e. DefaultResourceCalculator only uses Memory while + DominantResourceCalculator uses dominant-resource to compare + multi-dimensional resources such as Memory, CPU etc. + + + + + yarn.scheduler.capacity.root.queues + {{#yarn.scheduler.capacity.root.queues}}{{yarn.scheduler.capacity.root.queues}}{{/yarn.scheduler.capacity.root.queues}}{{^yarn.scheduler.capacity.root.queues}}default{{/yarn.scheduler.capacity.root.queues}} + + The queues at the this level (root is the root queue). + + + + + yarn.scheduler.capacity.root.default.capacity + {{#yarn.scheduler.capacity.root.default.capacity}}{{yarn.scheduler.capacity.root.default.capacity}}{{/yarn.scheduler.capacity.root.default.capacity}}{{^yarn.scheduler.capacity.root.default.capacity}}100{{/yarn.scheduler.capacity.root.default.capacity}} + Default queue target capacity. + + + + yarn.scheduler.capacity.root.default.user-limit-factor + {{#yarn.scheduler.capacity.root.default.user-limit-factor}}{{yarn.scheduler.capacity.root.default.user-limit-factor}}{{/yarn.scheduler.capacity.root.default.user-limit-factor}}{{^yarn.scheduler.capacity.root.default.user-limit-factor}}1{{/yarn.scheduler.capacity.root.default.user-limit-factor}} + + Default queue user limit a percentage from 0.0 to 1.0. + + + + + yarn.scheduler.capacity.root.default.maximum-capacity + {{#yarn.scheduler.capacity.root.default.maximum-capacity}}{{yarn.scheduler.capacity.root.default.maximum-capacity}}{{/yarn.scheduler.capacity.root.default.maximum-capacity}}{{^yarn.scheduler.capacity.root.default.maximum-capacity}}100{{/yarn.scheduler.capacity.root.default.maximum-capacity}} + + The maximum capacity of the default queue. + + + + + yarn.scheduler.capacity.root.default.state + {{#yarn.scheduler.capacity.root.default.state}}{{yarn.scheduler.capacity.root.default.state}}{{/yarn.scheduler.capacity.root.default.state}}{{^yarn.scheduler.capacity.root.default.state}}RUNNING{{/yarn.scheduler.capacity.root.default.state}} + + The state of the default queue. State can be one of RUNNING or STOPPED. + + + + + yarn.scheduler.capacity.root.default.acl_submit_applications + {{#yarn.scheduler.capacity.root.default.acl_submit_applications}}{{yarn.scheduler.capacity.root.default.acl_submit_applications}}{{/yarn.scheduler.capacity.root.default.acl_submit_applications}}{{^yarn.scheduler.capacity.root.default.acl_submit_applications}}*{{/yarn.scheduler.capacity.root.default.acl_submit_applications}} + + The ACL of who can submit jobs to the default queue. + + + + + yarn.scheduler.capacity.root.default.acl_administer_queue + {{#yarn.scheduler.capacity.root.default.acl_administer_queue}}{{yarn.scheduler.capacity.root.default.acl_administer_queue}}{{/yarn.scheduler.capacity.root.default.acl_administer_queue}}{{^yarn.scheduler.capacity.root.default.acl_administer_queue}}*{{/yarn.scheduler.capacity.root.default.acl_administer_queue}} + + The ACL of who can administer jobs on the default queue. + + + + + yarn.scheduler.capacity.root.default.acl_application_max_priority + {{#yarn.scheduler.capacity.root.default.acl_application_max_priority}}{{yarn.scheduler.capacity.root.default.acl_application_max_priority}}{{/yarn.scheduler.capacity.root.default.acl_application_max_priority}}{{^yarn.scheduler.capacity.root.default.acl_application_max_priority}}*{{/yarn.scheduler.capacity.root.default.acl_application_max_priority}} + + The ACL of who can submit applications with configured priority. + For e.g, [user={name} group={name} max_priority={priority} default_priority={priority}] + + + + + yarn.scheduler.capacity.root.default.maximum-application-lifetime + {{#yarn.scheduler.capacity.root.default.maximum-application-lifetime}}{{yarn.scheduler.capacity.root.default.maximum-application-lifetime}}{{/yarn.scheduler.capacity.root.default.maximum-application-lifetime}}{{^yarn.scheduler.capacity.root.default.maximum-application-lifetime}}-1{{/yarn.scheduler.capacity.root.default.maximum-application-lifetime}} + + Maximum lifetime of an application which is submitted to a queue + in seconds. Any value less than or equal to zero will be considered as + disabled. + This will be a hard time limit for all applications in this + queue. If positive value is configured then any application submitted + to this queue will be killed after exceeds the configured lifetime. + User can also specify lifetime per application basis in + application submission context. But user lifetime will be + overridden if it exceeds queue maximum lifetime. It is point-in-time + configuration. + Note : Configuring too low value will result in killing application + sooner. This feature is applicable only for leaf queue. + + + + + yarn.scheduler.capacity.root.default.default-application-lifetime + {{#yarn.scheduler.capacity.root.default.default-application-lifetime}}{{yarn.scheduler.capacity.root.default.default-application-lifetime}}{{/yarn.scheduler.capacity.root.default.default-application-lifetime}}{{^yarn.scheduler.capacity.root.default.default-application-lifetime}}-1{{/yarn.scheduler.capacity.root.default.default-application-lifetime}} + + Default lifetime of an application which is submitted to a queue + in seconds. Any value less than or equal to zero will be considered as + disabled. + If the user has not submitted application with lifetime value then this + value will be taken. It is point-in-time configuration. + Note : Default lifetime can't exceed maximum lifetime. This feature is + applicable only for leaf queue. + + + + + yarn.scheduler.capacity.node-locality-delay + {{#yarn.scheduler.capacity.node-locality-delay}}{{yarn.scheduler.capacity.node-locality-delay}}{{/yarn.scheduler.capacity.node-locality-delay}}{{^yarn.scheduler.capacity.node-locality-delay}}40{{/yarn.scheduler.capacity.node-locality-delay}} + + Number of missed scheduling opportunities after which the CapacityScheduler + attempts to schedule rack-local containers. + When setting this parameter, the size of the cluster should be taken into account. + We use 40 as the default value, which is approximately the number of nodes in one rack. + Note, if this value is -1, the locality constraint in the container request + will be ignored, which disables the delay scheduling. + + + + + yarn.scheduler.capacity.rack-locality-additional-delay + {{#yarn.scheduler.capacity.rack-locality-additional-delay}}{{yarn.scheduler.capacity.rack-locality-additional-delay}}{{/yarn.scheduler.capacity.rack-locality-additional-delay}}{{^yarn.scheduler.capacity.rack-locality-additional-delay}}-1{{/yarn.scheduler.capacity.rack-locality-additional-delay}} + + Number of additional missed scheduling opportunities over the node-locality-delay + ones, after which the CapacityScheduler attempts to schedule off-switch containers, + instead of rack-local ones. + Example: with node-locality-delay=40 and rack-locality-delay=20, the scheduler will + attempt rack-local assignments after 40 missed opportunities, and off-switch assignments + after 40+20=60 missed opportunities. + When setting this parameter, the size of the cluster should be taken into account. + We use -1 as the default value, which disables this feature. In this case, the number + of missed opportunities for assigning off-switch containers is calculated based on + the number of containers and unique locations specified in the resource request, + as well as the size of the cluster. + + + + + yarn.scheduler.capacity.queue-mappings + {{#yarn.scheduler.capacity.queue-mappings}}{{yarn.scheduler.capacity.queue-mappings}}{{/yarn.scheduler.capacity.queue-mappings}}{{^yarn.scheduler.capacity.queue-mappings}}{{/yarn.scheduler.capacity.queue-mappings}} + + A list of mappings that will be used to assign jobs to queues + The syntax for this list is [u|g]:[name]:[queue_name][,next mapping]* + Typically this list will be used to map users to queues, + for example, u:%user:%user maps all users to queues with the same name + as the user. + + + + + yarn.scheduler.capacity.queue-mappings-override.enable + {{#yarn.scheduler.capacity.queue-mappings-override.enable}}{{yarn.scheduler.capacity.queue-mappings-override.enable}}{{/yarn.scheduler.capacity.queue-mappings-override.enable}}{{^yarn.scheduler.capacity.queue-mappings-override.enable}}false{{/yarn.scheduler.capacity.queue-mappings-override.enable}} + + If a queue mapping is present, will it override the value specified + by the user? This can be used by administrators to place jobs in queues + that are different than the one specified by the user. + The default is false. + + + + + yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments + {{#yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}}{{yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}}{{/yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}}{{^yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}}1{{/yarn.scheduler.capacity.per-node-heartbeat.maximum-offswitch-assignments}} + + Controls the number of OFF_SWITCH assignments allowed + during a node's heartbeat. Increasing this value can improve + scheduling rate for OFF_SWITCH containers. Lower values reduce + "clumping" of applications on particular nodes. The default is 1. + Legal values are 1-MAX_INT. This config is refreshable. + + + + + yarn.scheduler.capacity.application.fail-fast + {{#yarn.scheduler.capacity.application.fail-fast}}{{yarn.scheduler.capacity.application.fail-fast}}{{/yarn.scheduler.capacity.application.fail-fast}}{{^yarn.scheduler.capacity.application.fail-fast}}false{{/yarn.scheduler.capacity.application.fail-fast}} + + Whether RM should fail during recovery if previous applications' + queue is no longer valid. + + + + diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/hadoop-env.sh.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/hadoop-env.sh.mustache new file mode 100644 index 00000000..3c85688a --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/hadoop-env.sh.mustache @@ -0,0 +1,440 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set Hadoop-specific environment variables here. +export HADOOP_HOME={{HADOOP_INSTALL}} # TODO remove + +## +## THIS FILE ACTS AS THE MASTER FILE FOR ALL HADOOP PROJECTS. +## SETTINGS HERE WILL BE READ BY ALL HADOOP COMMANDS. THEREFORE, +## ONE CAN USE THIS FILE TO SET YARN, HDFS, AND MAPREDUCE +## CONFIGURATION OPTIONS INSTEAD OF xxx-env.sh. +## +## Precedence rules: +## +## {yarn-env.sh|hdfs-env.sh} > hadoop-env.sh > hard-coded defaults +## +## {YARN_xyz|HDFS_xyz} > HADOOP_xyz > hard-coded defaults +## + +# Many of the options here are built from the perspective that users +# may want to provide OVERWRITING values on the command line. +# For example: +# +# JAVA_HOME=/usr/java/testing hdfs dfs -ls +# +# Therefore, the vast majority (BUT NOT ALL!) of these defaults +# are configured for substitution and not append. If append +# is preferable, modify this file accordingly. + +### +# Generic settings for HADOOP +### + +# Technically, the only required environment variable is JAVA_HOME. +# All others are optional. However, the defaults are probably not +# preferred. Many sites configure these options outside of Hadoop, +# such as in /etc/profile.d + +# The java implementation to use. By default, this environment +# variable is REQUIRED on ALL platforms except OS X! +{{#JAVA_HOME}}export JAVA_HOME={{JAVA_HOME}}{{/JAVA_HOME}}{{^JAVA_HOME}}export JAVA_HOME=${JAVA_HOME}{{/JAVA_HOME}} + +#Hadoop paths # TODO Is this really necessary? Can this be generalized? +export HADOOP_INSTALL={{HADOOP_INSTALL}} +export PATH=$PATH:$HADOOP_INSTALL/bin +export PATH=$PATH:$HADOOP_INSTALL/sbin +export HADOOP_HOME=$HADOOP_INSTALL +export HADOOP_MAPRED_HOME=$HADOOP_INSTALL +export HADOOP_COMMON_HOME=$HADOOP_INSTALL +export HADOOP_HDFS_HOME=$HADOOP_INSTALL +export YARN_HOME=$HADOOP_INSTALL + +# Location of Hadoop. By default, Hadoop will attempt to determine +# this location based upon its execution path. +# export HADOOP_HOME= + +# Location of Hadoop's configuration information. i.e., where this +# file is living. If this is not defined, Hadoop will attempt to +# locate it based upon its execution path. +# +# NOTE: It is recommend that this variable not be set here but in +# /etc/profile.d or equivalent. Some options (such as +# --config) may react strangely otherwise. +# +# export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop + +# The maximum amount of heap to use (Java -Xmx). If no unit +# is provided, it will be converted to MB. Daemons will +# prefer any Xmx setting in their respective _OPT variable. +# There is no default; the JVM will autoscale based upon machine +# memory size. +# export HADOOP_HEAPSIZE_MAX= + +# The minimum amount of heap to use (Java -Xms). If no unit +# is provided, it will be converted to MB. Daemons will +# prefer any Xms setting in their respective _OPT variable. +# There is no default; the JVM will autoscale based upon machine +# memory size. +# export HADOOP_HEAPSIZE_MIN= + +# Enable extra debugging of Hadoop's JAAS binding, used to set up +# Kerberos security. +# export HADOOP_JAAS_DEBUG=true + +# Extra Java runtime options for all Hadoop commands. We don't support +# IPv6 yet/still, so by default the preference is set to IPv4. +# export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true" +{{#HADOOP_OPTS}}export HADOOP_OPTS={{HADOOP_OPTS}}{{/HADOOP_OPTS}}{{^HADOOP_OPTS}}export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true"{{/HADOOP_OPTS}} + +# For Kerberos debugging, an extended option set logs more invormation +# export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug" + +# Some parts of the shell code may do special things dependent upon +# the operating system. We have to set this here. See the next +# section as to why.... +export HADOOP_OS_TYPE=${HADOOP_OS_TYPE:-$(uname -s)} + + +# Under certain conditions, Java on OS X will throw SCDynamicStore errors +# in the system logs. +# See HADOOP-8719 for more information. If one needs Kerberos +# support on OS X, one will want to change/remove this extra bit. +case ${HADOOP_OS_TYPE} in +Darwin*) +export HADOOP_OPTS="${HADOOP_OPTS} -Djava.security.krb5.realm= " +export HADOOP_OPTS="${HADOOP_OPTS} -Djava.security.krb5.kdc= " +export HADOOP_OPTS="${HADOOP_OPTS} -Djava.security.krb5.conf= " +;; +esac + +# Extra Java runtime options for some Hadoop commands +# and clients (i.e., hdfs dfs -blah). These get appended to HADOOP_OPTS for +# such commands. In most cases, # this should be left empty and +# let users supply it on the command line. +# export HADOOP_CLIENT_OPTS="" + +# +# A note about classpaths. +# +# By default, Apache Hadoop overrides Java's CLASSPATH +# environment variable. It is configured such +# that it sarts out blank with new entries added after passing +# a series of checks (file/dir exists, not already listed aka +# de-deduplication). During de-depulication, wildcards and/or +# directories are *NOT* expanded to keep it simple. Therefore, +# if the computed classpath has two specific mentions of +# awesome-methods-1.0.jar, only the first one added will be seen. +# If two directories are in the classpath that both contain +# awesome-methods-1.0.jar, then Java will pick up both versions. + +# An additional, custom CLASSPATH. Site-wide configs should be +# handled via the shellprofile functionality, utilizing the +# hadoop_add_classpath function for greater control and much +# harder for apps/end-users to accidentally override. +# Similarly, end users should utilize ${HOME}/.hadooprc . +# This variable should ideally only be used as a short-cut, +# interactive way for temporary additions on the command line. +# export HADOOP_CLASSPATH="/some/cool/path/on/your/machine" + +# Should HADOOP_CLASSPATH be first in the official CLASSPATH? +# export HADOOP_USER_CLASSPATH_FIRST="yes" + +# If HADOOP_USE_CLIENT_CLASSLOADER is set, the classpath along +# with the main jar are handled by a separate isolated +# client classloader when 'hadoop jar', 'yarn jar', or 'mapred job' +# is utilized. If it is set, HADOOP_CLASSPATH and +# HADOOP_USER_CLASSPATH_FIRST are ignored. +# export HADOOP_USE_CLIENT_CLASSLOADER=true + +# HADOOP_CLIENT_CLASSLOADER_SYSTEM_CLASSES overrides the default definition of +# system classes for the client classloader when HADOOP_USE_CLIENT_CLASSLOADER +# is enabled. Names ending in '.' (period) are treated as package names, and +# names starting with a '-' are treated as negative matches. For example, +# export HADOOP_CLIENT_CLASSLOADER_SYSTEM_CLASSES="-org.apache.hadoop.UserClass,java.,javax.,org.apache.hadoop." + +# Enable optional, bundled Hadoop features +# This is a comma delimited list. It may NOT be overridden via .hadooprc +# Entries may be added/removed as needed. +# export HADOOP_OPTIONAL_TOOLS="hadoop-azure-datalake,hadoop-azure,hadoop-openstack,hadoop-kafka,hadoop-aws,hadoop-aliyun" + +### +# Options for remote shell connectivity +### + +# There are some optional components of hadoop that allow for +# command and control of remote hosts. For example, +# start-dfs.sh will attempt to bring up all NNs, DNS, etc. + +# Options to pass to SSH when one of the "log into a host and +# start/stop daemons" scripts is executed +# export HADOOP_SSH_OPTS="-o BatchMode=yes -o StrictHostKeyChecking=no -o ConnectTimeout=10s" + +# The built-in ssh handler will limit itself to 10 simultaneous connections. +# For pdsh users, this sets the fanout size ( -f ) +# Change this to increase/decrease as necessary. +# export HADOOP_SSH_PARALLEL=10 + +# Filename which contains all of the hosts for any remote execution +# helper scripts # such as workers.sh, start-dfs.sh, etc. +# export HADOOP_WORKERS="${HADOOP_CONF_DIR}/workers" + +### +# Options for all daemons +### +# + +# +# Many options may also be specified as Java properties. It is +# very common, and in many cases, desirable, to hard-set these +# in daemon _OPTS variables. Where applicable, the appropriate +# Java property is also identified. Note that many are re-used +# or set differently in certain contexts (e.g., secure vs +# non-secure) +# + +# Where (primarily) daemon log files are stored. +# ${HADOOP_HOME}/logs by default. +# Java property: hadoop.log.dir +# export HADOOP_LOG_DIR=${HADOOP_HOME}/logs + +# A string representing this instance of hadoop. $USER by default. +# This is used in writing log and pid files, so keep that in mind! +# Java property: hadoop.id.str +# export HADOOP_IDENT_STRING=$USER +{{#HADOOP_IDENT_STRING}}export HADOOP_IDENT_STRING={{HADOOP_IDENT_STRING}}{{/HADOOP_IDENT_STRING}}{{^HADOOP_IDENT_STRING}}export HADOOP_IDENT_STRING=$USER{{/HADOOP_IDENT_STRING}} + +# How many seconds to pause after stopping a daemon +# export HADOOP_STOP_TIMEOUT=5 + +# Where pid files are stored. /tmp by default. +# export HADOOP_PID_DIR=/tmp +{{#HADOOP_PID_DIR}}export HADOOP_PID_DIR={{HADOOP_PID_DIR}}{{/HADOOP_PID_DIR}}{{^HADOOP_PID_DIR}}export HADOOP_PID_DIR=${HADOOP_PID_DIR}{{/HADOOP_PID_DIR}} + + +# Default log4j setting for interactive commands +# Java property: hadoop.root.logger +# export HADOOP_ROOT_LOGGER=INFO,console +{{#HADOOP_ROOT_LOGGER}}export HADOOP_ROOT_LOGGER={{HADOOP_ROOT_LOGGER}}{{/HADOOP_ROOT_LOGGER}}{{^HADOOP_ROOT_LOGGER}}export HADOOP_ROOT_LOGGER=INFO,console{{/HADOOP_ROOT_LOGGER}} + +# Default log4j setting for daemons spawned explicitly by +# --daemon option of hadoop, hdfs, mapred and yarn command. +# Java property: hadoop.root.logger +# export HADOOP_DAEMON_ROOT_LOGGER=INFO,RFA + +# Default log level and output location for security-related messages. +# You will almost certainly want to change this on a per-daemon basis via +# the Java property (i.e., -Dhadoop.security.logger=foo). (Note that the +# defaults for the NN and 2NN override this by default.) +# Java property: hadoop.security.logger +# export HADOOP_SECURITY_LOGGER=INFO,NullAppender + +# Default process priority level +# Note that sub-processes will also run at this level! +# export HADOOP_NICENESS=0 + +# Default name for the service level authorization file +# Java property: hadoop.policy.file +# export HADOOP_POLICYFILE="hadoop-policy.xml" + +# +# NOTE: this is not used by default! <----- +# You can define variables right here and then re-use them later on. +# For example, it is common to use the same garbage collection settings +# for all the daemons. So one could define: +# +# export HADOOP_GC_SETTINGS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps" +# +# .. and then use it as per the b option under the namenode. + +### +# Secure/privileged execution +### + +# +# Out of the box, Hadoop uses jsvc from Apache Commons to launch daemons +# on privileged ports. This functionality can be replaced by providing +# custom functions. See hadoop-functions.sh for more information. +# + +# The jsvc implementation to use. Jsvc is required to run secure datanodes +# that bind to privileged ports to provide authentication of data transfer +# protocol. Jsvc is not required if SASL is configured for authentication of +# data transfer protocol using non-privileged ports. +# export JSVC_HOME=/usr/bin + +# +# This directory contains pids for secure and privileged processes. +#export HADOOP_SECURE_PID_DIR=${HADOOP_PID_DIR} +{{#HADOOP_SECURE_PID_DIR}}export HADOOP_SECURE_PID_DIR={{HADOOP_SECURE_PID_DIR}}{{/HADOOP_SECURE_PID_DIR}}{{^HADOOP_SECURE_PID_DIR}}export HADOOP_SECURE_PID_DIR=${HADOOP_PID_DIR}{{/HADOOP_SECURE_PID_DIR}} + +# +# This directory contains the logs for secure and privileged processes. +# Java property: hadoop.log.dir +# export HADOOP_SECURE_LOG=${HADOOP_LOG_DIR} + +# +# When running a secure daemon, the default value of HADOOP_IDENT_STRING +# ends up being a bit bogus. Therefore, by default, the code will +# replace HADOOP_IDENT_STRING with HADOOP_xx_SECURE_USER. If one wants +# to keep HADOOP_IDENT_STRING untouched, then uncomment this line. +# export HADOOP_SECURE_IDENT_PRESERVE="true" + +### +# NameNode specific parameters +### + +# Default log level and output location for file system related change +# messages. For non-namenode daemons, the Java property must be set in +# the appropriate _OPTS if one wants something other than INFO,NullAppender +# Java property: hdfs.audit.logger +# export HDFS_AUDIT_LOGGER=INFO,NullAppender + +# Specify the JVM options to be used when starting the NameNode. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# a) Set JMX options +# export HDFS_NAMENODE_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=1026" +# +# b) Set garbage collection logs +# export HDFS_NAMENODE_OPTS="${HADOOP_GC_SETTINGS} -Xloggc:${HADOOP_LOG_DIR}/gc-rm.log-$(date +'%Y%m%d%H%M')" +# +# c) ... or set them directly +# export HDFS_NAMENODE_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:${HADOOP_LOG_DIR}/gc-rm.log-$(date +'%Y%m%d%H%M')" + +# this is the default: +# export HDFS_NAMENODE_OPTS="-Dhadoop.security.logger=INFO,RFAS" + +### +# SecondaryNameNode specific parameters +### +# Specify the JVM options to be used when starting the SecondaryNameNode. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# This is the default: +# export HDFS_SECONDARYNAMENODE_OPTS="-Dhadoop.security.logger=INFO,RFAS" + +### +# DataNode specific parameters +### +# Specify the JVM options to be used when starting the DataNode. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# This is the default: +# export HDFS_DATANODE_OPTS="-Dhadoop.security.logger=ERROR,RFAS" + +# On secure datanodes, user to run the datanode as after dropping privileges. +# This **MUST** be uncommented to enable secure HDFS if using privileged ports +# to provide authentication of data transfer protocol. This **MUST NOT** be +# defined if SASL is configured for authentication of data transfer protocol +# using non-privileged ports. +# This will replace the hadoop.id.str Java property in secure mode. +# export HDFS_DATANODE_SECURE_USER=hdfs + +# Supplemental options for secure datanodes +# By default, Hadoop uses jsvc which needs to know to launch a +# server jvm. +# export HDFS_DATANODE_SECURE_EXTRA_OPTS="-jvm server" + +### +# NFS3 Gateway specific parameters +### +# Specify the JVM options to be used when starting the NFS3 Gateway. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_NFS3_OPTS="" + +# Specify the JVM options to be used when starting the Hadoop portmapper. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_PORTMAP_OPTS="-Xmx512m" + +# Supplemental options for priviliged gateways +# By default, Hadoop uses jsvc which needs to know to launch a +# server jvm. +# export HDFS_NFS3_SECURE_EXTRA_OPTS="-jvm server" + +# On privileged gateways, user to run the gateway as after dropping privileges +# This will replace the hadoop.id.str Java property in secure mode. +# export HDFS_NFS3_SECURE_USER=nfsserver + +### +# ZKFailoverController specific parameters +### +# Specify the JVM options to be used when starting the ZKFailoverController. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_ZKFC_OPTS="" + +### +# QuorumJournalNode specific parameters +### +# Specify the JVM options to be used when starting the QuorumJournalNode. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_JOURNALNODE_OPTS="" + +### +# HDFS Balancer specific parameters +### +# Specify the JVM options to be used when starting the HDFS Balancer. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_BALANCER_OPTS="" + +### +# HDFS Mover specific parameters +### +# Specify the JVM options to be used when starting the HDFS Mover. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_MOVER_OPTS="" + +### +# Router-based HDFS Federation specific parameters +# Specify the JVM options to be used when starting the RBF Routers. +# These options will be appended to the options specified as HADOOP_OPTS +# and therefore may override any similar flags set in HADOOP_OPTS +# +# export HDFS_DFSROUTER_OPTS="" +### + +### +# Advanced Users Only! +### + +# +# When building Hadoop, one can add the class paths to the commands +# via this special env var: +# export HADOOP_ENABLE_BUILD_PATHS="true" + +# +# To prevent accidents, shell commands be (superficially) locked +# to only allow certain users to execute certain subcommands. +# It uses the format of (command)_(subcommand)_USER. +# +# For example, to limit who can execute the namenode command, +# export HDFS_NAMENODE_USER=hdfs diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/hosts.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/hosts.mustache new file mode 100644 index 00000000..f70d5453 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/hosts.mustache @@ -0,0 +1,3 @@ +{{#hosts}} +{{{.}}} +{{/hosts}} diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/ini.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/ini.mustache new file mode 100644 index 00000000..9119b7b6 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/ini.mustache @@ -0,0 +1,7 @@ +{{#sections}}{{#name}}[{{name}}] +{{/name}} +{{#entries}} +{{name}}={{value}} +{{/entries}} + +{{/sections}} \ No newline at end of file diff --git a/peel-extensions/src/main/resources/templates/hadoop-3/conf/site.xml.mustache b/peel-extensions/src/main/resources/templates/hadoop-3/conf/site.xml.mustache new file mode 100644 index 00000000..68076ff3 --- /dev/null +++ b/peel-extensions/src/main/resources/templates/hadoop-3/conf/site.xml.mustache @@ -0,0 +1,12 @@ + + + + + + +{{#properties}} + + {{{name}}} + {{{value}}} + {{/properties}} + diff --git a/peel-extensions/src/main/scala/org/peelframework/extensions.scala b/peel-extensions/src/main/scala/org/peelframework/extensions.scala index a5583be2..d1d8d94e 100644 --- a/peel-extensions/src/main/scala/org/peelframework/extensions.scala +++ b/peel-extensions/src/main/scala/org/peelframework/extensions.scala @@ -18,8 +18,8 @@ package org.peelframework import com.samskivert.mustache.Mustache import org.peelframework.core.beans.system.Lifespan import org.peelframework.dstat.beans.system.Dstat -import org.peelframework.flink.beans.system.Flink -import org.peelframework.hadoop.beans.system.{HDFS2, Yarn} +import org.peelframework.flink.beans.system.{Flink, FlinkStandaloneCluster, FlinkYarnSession} +import org.peelframework.hadoop.beans.system.{HDFS2, HDFS3, Yarn, Yarn3} import org.peelframework.spark.beans.system.Spark import org.peelframework.zookeeper.beans.system.Zookeeper import org.springframework.context.annotation.{Bean, Configuration} @@ -99,6 +99,14 @@ class extensions extends ApplicationContextAware { mc = ctx.getBean(classOf[Mustache.Compiler]) ) + @Bean(name = Array("hdfs-3.1.1")) + def `hdfs-3.1.1`: HDFS3 = new HDFS3( + version = "3.1.1", + configKey = "hadoop-3", + lifespan = Lifespan.SUITE, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + // Yarn @Bean(name = Array("yarn-2.4.1")) @@ -117,10 +125,18 @@ class extensions extends ApplicationContextAware { mc = ctx.getBean(classOf[Mustache.Compiler]) ) + @Bean(name = Array("yarn-3.1.1")) + def `yarn-3.1.1`: Yarn3 = new Yarn3( + version = "3.1.1", + configKey = "hadoop-3", + lifespan = Lifespan.SUITE, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + // Flink @Bean(name = Array("flink-0.8.0")) - def `flink-0.8.0`: Flink = new Flink( + def `flink-0.8.0`: FlinkStandaloneCluster = new FlinkStandaloneCluster( version = "0.8.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -128,7 +144,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.8.1")) - def `flink-0.8.1`: Flink = new Flink( + def `flink-0.8.1`: Flink = new FlinkStandaloneCluster( version = "0.8.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -136,7 +152,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.9.0")) - def `flink-0.9.0`: Flink = new Flink( + def `flink-0.9.0`: Flink = new FlinkStandaloneCluster( version = "0.9.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -144,7 +160,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.10.0")) - def `flink-0.10.0`: Flink = new Flink( + def `flink-0.10.0`: Flink = new FlinkStandaloneCluster( version = "0.10.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -152,7 +168,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.10.1")) - def `flink-0.10.1`: Flink = new Flink( + def `flink-0.10.1`: Flink = new FlinkStandaloneCluster( version = "0.10.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -160,7 +176,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-0.10.2")) - def `flink-0.10.2`: Flink = new Flink( + def `flink-0.10.2`: Flink = new FlinkStandaloneCluster( version = "0.10.2", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -168,7 +184,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.0.0")) - def `flink-1.0.0`: Flink = new Flink( + def `flink-1.0.0`: Flink = new FlinkStandaloneCluster( version = "1.0.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -176,7 +192,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.0.1")) - def `flink-1.0.1`: Flink = new Flink( + def `flink-1.0.1`: Flink = new FlinkStandaloneCluster( version = "1.0.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -184,7 +200,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.0.2")) - def `flink-1.0.2`: Flink = new Flink( + def `flink-1.0.2`: Flink = new FlinkStandaloneCluster( version = "1.0.2", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -192,7 +208,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.0.3")) - def `flink-1.0.3`: Flink = new Flink( + def `flink-1.0.3`: Flink = new FlinkStandaloneCluster( version = "1.0.3", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -200,7 +216,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.0")) - def `flink-1.1.0`: Flink = new Flink( + def `flink-1.1.0`: Flink = new FlinkStandaloneCluster( version = "1.1.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -208,7 +224,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.1")) - def `flink-1.1.1`: Flink = new Flink( + def `flink-1.1.1`: Flink = new FlinkStandaloneCluster( version = "1.1.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -216,7 +232,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.2")) - def `flink-1.1.2`: Flink = new Flink( + def `flink-1.1.2`: Flink = new FlinkStandaloneCluster( version = "1.1.2", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -224,7 +240,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.3")) - def `flink-1.1.3`: Flink = new Flink( + def `flink-1.1.3`: Flink = new FlinkStandaloneCluster( version = "1.1.3", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -232,7 +248,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.1.4")) - def `flink-1.1.4`: Flink = new Flink( + def `flink-1.1.4`: Flink = new FlinkStandaloneCluster( version = "1.1.4", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -240,7 +256,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.2.0")) - def `flink-1.2.0`: Flink = new Flink( + def `flink-1.2.0`: Flink = new FlinkStandaloneCluster( version = "1.2.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -248,7 +264,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.2.1")) - def `flink-1.2.1`: Flink = new Flink( + def `flink-1.2.1`: Flink = new FlinkStandaloneCluster( version = "1.2.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -256,7 +272,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.3.0")) - def `flink-1.3.0`: Flink = new Flink( + def `flink-1.3.0`: Flink = new FlinkStandaloneCluster( version = "1.3.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -264,7 +280,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.3.1")) - def `flink-1.3.1`: Flink = new Flink( + def `flink-1.3.1`: Flink = new FlinkStandaloneCluster( version = "1.3.1", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -272,7 +288,7 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.3.2")) - def `flink-1.3.2`: Flink = new Flink( + def `flink-1.3.2`: Flink = new FlinkStandaloneCluster( version = "1.3.2", configKey = "flink", lifespan = Lifespan.EXPERIMENT, @@ -280,13 +296,47 @@ class extensions extends ApplicationContextAware { ) @Bean(name = Array("flink-1.4.0")) - def `flink-1.4.0`: Flink = new Flink( + def `flink-1.4.0`: Flink = new FlinkStandaloneCluster( version = "1.4.0", configKey = "flink", lifespan = Lifespan.EXPERIMENT, mc = ctx.getBean(classOf[Mustache.Compiler]) ) + @Bean(name = Array("flink-1.7.0")) + def `flink-1.7.0`: Flink = new FlinkStandaloneCluster( + version = "1.7.0", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + @Bean(name = Array("flink-1.7.2")) + def `flink-1.7.2`: Flink = new FlinkStandaloneCluster( + version = "1.7.2", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + // Flink YARN session + + @Bean(name = Array("flink-yarn-1.7.0")) + def `flink-yarn-1.7.0`: Flink = new FlinkYarnSession( + version = "1.7.0", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + + @Bean(name = Array("flink-yarn-1.7.2")) + def `flink-yarn-1.7.2`: Flink = new FlinkYarnSession( + version = "1.7.2", + configKey = "flink", + lifespan = Lifespan.EXPERIMENT, + mc = ctx.getBean(classOf[Mustache.Compiler]) + ) + // Spark @Bean(name = Array("spark-1.3.1")) diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala index 60393578..3c6afada 100644 --- a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/Flink.scala @@ -30,9 +30,9 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.{Await, Future} import scala.util.matching.Regex -/** Wrapper class for Flink. +/** Base class for Flink systems. * - * Implements Flink as a Peel `System` and provides setup and teardown methods. + * Common base class to implement Flink Cluster and Flink YARN session environments as a Peel `System` and provides setup and teardown methods. * * @param version Version of the system (e.g. "7.1") * @param configKey The system configuration resides under `system.\${configKey}` @@ -40,7 +40,7 @@ import scala.util.matching.Regex * @param dependencies Set of dependencies that this system needs * @param mc The moustache compiler to compile the templates that are used to generate property files for the system */ -class Flink( +abstract class Flink( version : String, configKey : String, lifespan : Lifespan, @@ -48,113 +48,4 @@ class Flink( mc : Mustache.Compiler) extends System("flink", version, configKey, lifespan, dependencies, mc) with DistributedLogCollection { - // --------------------------------------------------- - // LogCollection. - // --------------------------------------------------- - - override def hosts = { - val master = config.getString(s"system.$configKey.config.yaml.jobmanager.rpc.address") - val slaves = config.getStringList(s"system.$configKey.config.slaves").asScala - master +: slaves - } - - /** The patterns of the log files to watch. */ - override protected def logFilePatterns(): Seq[Regex] = { - val user = Pattern.quote(config.getString(s"system.$configKey.user")) - hosts.map(Pattern.quote).flatMap(host => Seq( - s"flink-$user-jobmanager-\\d+-$host\\.log".r, - s"flink-$user-jobmanager-\\d+-$host\\.out".r, - s"flink-$user-taskmanager-\\d+-$host\\.log".r, - s"flink-$user-taskmanager-\\d+-$host\\.out".r)) - } - - // --------------------------------------------------- - // System. - // --------------------------------------------------- - - override def configuration() = SystemConfig(config, { - val conf = config.getString(s"system.$configKey.path.config") - List( - SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.slaves", s"$conf/slaves", templatePath("conf/hosts"), mc), - SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.yaml", s"$conf/flink-conf.yaml", templatePath("conf/flink-conf.yaml"), mc), - SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.log4j", s"$conf/log4j.properties", templatePath("conf/log4j.properties"), mc) - ) - }) - - override protected def start(): Unit = { - val user = config.getString(s"system.$configKey.user") - val logDir = config.getString(s"system.$configKey.path.log") - - val init = (host: String, paths: Seq[String]) => { - val cmd = paths.map(path => s"rm -Rf $path && mkdir -p $path").mkString(" && ") - s""" ssh $user@$host "$cmd" """ - } - - val hosts = config.getStringList(s"system.$configKey.config.slaves").asScala - val paths = config.getString(s"system.$configKey.config.yaml.taskmanager.tmp.dirs").split(':') - - val futureInitOps = Future.traverse(hosts)(host => Future { - logger.info(s"Initializing Flink tmp directories '${paths.mkString(":")}' at $host") - shell ! (init(host, paths), s"Unable to initialize Flink tmp directories '${paths.mkString(":")}' at $host.") - }) - - // await for all futureInitOps to finish - Await.result(futureInitOps, Math.max(30, 5 * hosts.size).seconds) - - var failedStartUpAttempts = 0 - while (!isUp) { - try { - val totl = config.getStringList(s"system.$configKey.config.slaves").size() - val init = 0 // Flink resets the job manager log on startup - - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-cluster.sh" - if (Version(version) < Version("1.0")) { - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-webclient.sh" - } - logger.info(s"Waiting for nodes to connect") - - var curr = init - var cntr = config.getInt(s"system.$configKey.startup.polling.counter") - while (curr - init < totl) { - logger.info(s"Connected ${curr - init} from $totl nodes") - // wait a bit - Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) - // get new values - if (Version(version) <= Version("0.6")) - curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Creating instance' | wc -l""").trim()) - else if (Version(version) <= Version("1.6")) - curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Registered TaskManager' | wc -l""").trim()) - else - curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-standalonesession-*.log | grep 'Registering TaskManager' | wc -l""").trim()) - // timeout if counter goes below zero - cntr = cntr - 1 - if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") - } - isUp = true - } catch { - case e: SetUpTimeoutException => - failedStartUpAttempts = failedStartUpAttempts + 1 - if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-cluster.sh" - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" - logger.info(s"Could not bring system '$toString' up in time, trying again...") - } else { - throw e - } - } - } - } - - override protected def stop() = { - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-cluster.sh" - if (Version(version) < Version("1.0")) { - shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" - } - shell ! s"rm -f ${config.getString(s"system.$configKey.config.yaml.env.pid.dir")}/flink-*.pid" - isUp = false - } - - def isRunning = { - (shell ! s"""ps -p `cat ${config.getString(s"system.$configKey.config.yaml.env.pid.dir")}/flink-*.pid`""") == 0 - } } diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkStandaloneCluster.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkStandaloneCluster.scala new file mode 100644 index 00000000..a8ad31a1 --- /dev/null +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkStandaloneCluster.scala @@ -0,0 +1,158 @@ +/** + * Copyright (C) 2014 TU Berlin (peel@dima.tu-berlin.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.peelframework.flink.beans.system + +import java.util.regex.Pattern + +import com.samskivert.mustache.Mustache +import org.peelframework.core.beans.system.Lifespan.Lifespan +import org.peelframework.core.beans.system.{DistributedLogCollection, SetUpTimeoutException, System} +import org.peelframework.core.config.{Model, SystemConfig} +import org.peelframework.core.util.{Version, shell} + +import scala.collection.JavaConverters._ +import scala.collection.Seq +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.matching.Regex + +/** Wrapper class for Flink. + * + * Implements a Flink standalone cluster as a Peel `System` and provides setup and teardown methods. + * + * @param version Version of the system (e.g. "7.1") + * @param configKey The system configuration resides under `system.\${configKey}` + * @param lifespan `Lifespan` of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class FlinkStandaloneCluster( + version : String, + configKey : String, + lifespan : Lifespan, + dependencies : Set[System] = Set(), + mc : Mustache.Compiler) extends Flink(version, configKey, lifespan, dependencies, mc) { + + // --------------------------------------------------- + // LogCollection. + // --------------------------------------------------- + + override def hosts = { + val master = config.getString(s"system.$configKey.config.yaml.jobmanager.rpc.address") + val slaves = config.getStringList(s"system.$configKey.config.slaves").asScala + master +: slaves + } + + /** The patterns of the log files to watch. */ + override protected def logFilePatterns(): Seq[Regex] = { + val user = Pattern.quote(config.getString(s"system.$configKey.user")) + hosts.map(Pattern.quote).flatMap(host => Seq( + s"flink-$user-standalonesession-\\d+-$host\\.log".r, + s"flink-$user-standalonesession-\\d+-$host\\.out".r, + s"flink-$user-jobmanager-\\d+-$host\\.log".r, + s"flink-$user-jobmanager-\\d+-$host\\.out".r, + s"flink-$user-taskmanager-\\d+-$host\\.log".r, + s"flink-$user-taskmanager-\\d+-$host\\.out".r)) + } + + // --------------------------------------------------- + // System. + // --------------------------------------------------- + + override def configuration() = SystemConfig(config, { + val conf = config.getString(s"system.$configKey.path.config") + List( + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.slaves", s"$conf/slaves", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.yaml", s"$conf/flink-conf.yaml", templatePath("conf/flink-conf.yaml"), mc), + SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.log4j", s"$conf/log4j.properties", templatePath("conf/log4j.properties"), mc) + ) + }) + + override protected def start(): Unit = { + val user = config.getString(s"system.$configKey.user") + val logDir = config.getString(s"system.$configKey.path.log") + + val init = (host: String, paths: Seq[String]) => { + val cmd = paths.map(path => s"rm -Rf $path && mkdir -p $path").mkString(" && ") + s""" ssh $user@$host "$cmd" """ + } + + val hosts = config.getStringList(s"system.$configKey.config.slaves").asScala + val paths = config.getString(s"system.$configKey.config.yaml.taskmanager.tmp.dirs").split(':') + + val futureInitOps = Future.traverse(hosts)(host => Future { + logger.info(s"Initializing Flink tmp directories '${paths.mkString(":")}' at $host") + shell ! (init(host, paths), s"Unable to initialize Flink tmp directories '${paths.mkString(":")}' at $host.") + }) + + // await for all futureInitOps to finish + Await.result(futureInitOps, Math.max(30, 5 * hosts.size).seconds) + + var failedStartUpAttempts = 0 + while (!isUp) { + try { + val totl = config.getStringList(s"system.$configKey.config.slaves").size() + val init = 0 // Flink resets the job manager log on startup + + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-cluster.sh" + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/start-webclient.sh" + logger.info(s"Waiting for nodes to connect") + + var curr = init + var cntr = config.getInt(s"system.$configKey.startup.polling.counter") + while (curr - init < totl) { + logger.info(s"Connected ${curr - init} from $totl nodes") + // wait a bit + Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) + // get new values + if (Version(version) < Version("0.6")) { + curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Creating instance' | wc -l""").trim()) + } else if (Version(version) < Version("1.6")) { + curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-jobmanager-*.log | grep 'Registered TaskManager' | wc -l""").trim()) + } else { + curr = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-standalonesession-*.log | grep 'Registering TaskManager' | wc -l""").trim()) + } + // timeout if counter goes below zero + cntr = cntr - 1 + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-cluster.sh" + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + throw e + } + } + } + } + + override protected def stop() = { + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-cluster.sh" + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/stop-webclient.sh" + shell ! s"rm -f ${config.getString(s"system.$configKey.config.yaml.env.pid.dir")}/flink-*.pid" + isUp = false + } + + def isRunning = { + (shell ! s"""ps -p `cat ${config.getString(s"system.$configKey.config.yaml.env.pid.dir")}/flink-*.pid`""") == 0 + } +} diff --git a/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala new file mode 100644 index 00000000..661e1108 --- /dev/null +++ b/peel-extensions/src/main/scala/org/peelframework/flink/beans/system/FlinkYarnSession.scala @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2014 TU Berlin (peel@dima.tu-berlin.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.peelframework.flink.beans.system + +import java.util.regex.Pattern + +import com.samskivert.mustache.Mustache +import org.peelframework.core.beans.system.Lifespan.Lifespan +import org.peelframework.core.beans.system.{DistributedLogCollection, SetUpTimeoutException, System} +import org.peelframework.core.config.{Model, SystemConfig} +import org.peelframework.core.util.{Version, shell} + +import scala.collection.JavaConverters._ +import scala.collection.Seq +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.matching.Regex + +/** Wrapper class for a Flink YARN session. + * + * Implements a Flink YARN session as a Peel `System` and provides setup and teardown methods. + * + * @param version Version of the system (e.g. "7.1") + * @param configKey The system configuration resides under `system.\${configKey}` + * @param lifespan `Lifespan` of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class FlinkYarnSession( + version : String, + configKey : String, + lifespan : Lifespan, + dependencies : Set[System] = Set(), + mc : Mustache.Compiler) extends Flink(version, configKey, lifespan, dependencies, mc) { + + // --------------------------------------------------- + // LogCollection. + // --------------------------------------------------- + + override def hosts: Seq[String] = Seq(config.getString(s"system.$configKey.config.yaml.jobmanager.rpc.address")) + + /** The patterns of the log files to watch. */ + override protected def logFilePatterns(): Seq[Regex] = { + val user = Pattern.quote(config.getString(s"system.$configKey.user")) + hosts.map(Pattern.quote).flatMap(host => Seq( + s"flink-$user-yarn-session-\\d+-$host\\.log".r, + s"flink-$user-yarn-session-\\d+-$host\\.out".r)) + } + + // --------------------------------------------------- + // System. + // --------------------------------------------------- + + override def configuration() = SystemConfig(config, { + val conf = config.getString(s"system.$configKey.path.config") + List( + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.slaves", s"$conf/slaves", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.yaml", s"$conf/flink-conf.yaml", templatePath("conf/flink-conf.yaml"), mc), + SystemConfig.Entry[Model.Yaml](s"system.$configKey.config.log4j", s"$conf/log4j-yarn-session.properties", templatePath("conf/log4j-yarn-session.properties"), mc) + ) + }) + + override protected def start(): Unit = { + val user = config.getString(s"system.$configKey.user") + val logDir = config.getString(s"system.$configKey.path.log") + + val init = (host: String, paths: Seq[String]) => { + val cmd = paths.map(path => s"rm -Rf $path && mkdir -p $path").mkString(" && ") + s""" ssh $user@$host "$cmd" """ + } + + val hosts = config.getStringList(s"system.$configKey.config.slaves").asScala + val paths = config.getString(s"system.$configKey.config.yaml.taskmanager.tmp.dirs").split(':') + + val futureInitOps = Future.traverse(hosts)(host => Future { + logger.info(s"Initializing Flink tmp directories '${paths.mkString(":")}' at $host") + shell ! (init(host, paths), s"Unable to initialize Flink tmp directories '${paths.mkString(":")}' at $host.") + }) + + // await for all futureInitOps to finish + Await.result(futureInitOps, Math.max(30, 5 * hosts.size).seconds) + val numberOfTaskSlots = config.getString(s"system.$configKey.config.yaml.taskmanager.numberOfTaskSlots") + + var failedStartUpAttempts = 0 + while (!isUp) { + try { + var done = false + + shell ! s"${config.getString(s"system.$configKey.path.home")}/bin/yarn-session.sh -n ${hosts.size} -s $numberOfTaskSlots -d" + + var cntr = config.getInt(s"system.$configKey.startup.polling.counter") + while (!done) { + logger.info(s"Waiting for session to start") + // wait a bit + Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) + // get new values + done = Integer.parseInt((shell !! s"""cat $logDir/flink-$user-yarn-session-*.log | grep 'YARN application has been deployed successfully.' | wc -l""").trim) == 1 + // timeout if counter goes below zero + cntr = cntr - 1 + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + // Flink yarn-session.sh does not create the Flink PID directory (that happens in config.sh and flink-daemon.sh). + // However, the application ID is stored in /tmp/.yarn-properties-$user + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { + // should Peel try to stop the system here? + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + shell ! s"""rm -f /tmp/.yarn-properties-$user""" + throw e + } + } + } + } + + override protected def stop(): Unit = { + val user = config.getString(s"system.$configKey.user") + val appId = (shell !! s"""grep applicationID /tmp/.yarn-properties-$user | sed -e 's/applicationID=\\(.*\\).*/\\1/'""").trim() + shell ! s"""echo quit | ${config.getString(s"system.$configKey.path.home")}/bin/yarn-session.sh -id $appId""" + if (isRunning) { + logger.warn(s"Flink YARN session still appears to be running after attempted shutdown (file /tmp/.yarn-properties-$user exists)") + shell ! s"rm -f /tmp/.yarn-properties-$user" + } + isUp = false + } + + def isRunning: Boolean = { + // maybe query YARN rest API + val user = config.getString(s"system.$configKey.user") + (shell ! s"""ls /tmp/.yarn-properties-$user""") == 0 + } +} diff --git a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala new file mode 100644 index 00000000..4309054b --- /dev/null +++ b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/HDFS3.scala @@ -0,0 +1,180 @@ +/** + * Copyright (C) 2014 TU Berlin (peel@dima.tu-berlin.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.peelframework.hadoop.beans.system + +import java.net.URI +import java.util.regex.Pattern + +import com.samskivert.mustache.Mustache +import org.peelframework.core.beans.system.Lifespan.Lifespan +import org.peelframework.core.beans.system.{DistributedLogCollection, SetUpTimeoutException, System} +import org.peelframework.core.config.{Model, SystemConfig} +import org.peelframework.core.util.shell + +import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} +import scala.util.matching.Regex + +/** Wrapper class for HDFS3. + * + * Implements HDFS3 as a Peel `System` and provides setup and teardown methods. + * Additionally it offers the Filesysem capabilities to interact with hdfs. + * + * @param version Version of the system (e.g. "7.1") + * @param configKey The system configuration resides under `system.\${configKey}`. + * @param lifespan `Lifespan` of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class HDFS3( + version : String, + configKey : String, + lifespan : Lifespan, + dependencies : Set[System] = Set(), + mc : Mustache.Compiler) extends System("hdfs-3", version, configKey, lifespan, dependencies, mc) + with HDFSFileSystem + with DistributedLogCollection { + + // --------------------------------------------------- + // LogCollection. + // --------------------------------------------------- + + override def hosts = { + val master = config.getString("runtime.hostname") + val workers = config.getStringList(s"system.$configKey.config.workers").asScala + master +: workers + } + + /** The patterns of the log files to watch. */ + override protected def logFilePatterns(): Seq[Regex] = { + val user = Pattern.quote(config.getString(s"system.$configKey.user")) + hosts.map(Pattern.quote).flatMap(host => Seq( + s"hadoop-$user-namenode-$host\\.log".r, + s"hadoop-$user-namenode-$host\\.out".r, + s"hadoop-$user-datanode-$host\\.log".r, + s"hadoop-$user-datanode-$host\\.out".r)) + } + + // --------------------------------------------------- + // System. + // --------------------------------------------------- + + override def configuration() = SystemConfig(config, { + val conf = config.getString(s"system.$configKey.path.config") + List( + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/workers", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/hdfs-workers", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Env](s"system.$configKey.config.env", s"$conf/hadoop-env.sh", templatePath("conf/hadoop-env.sh"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.core", s"$conf/core-site.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.hdfs", s"$conf/hdfs-site.xml", templatePath("conf/site.xml"), mc) + ) + }) + + /** Checks if all datanodes have connected and the system is out of safemode. */ + override protected def start(): Unit = { + if (config.getBoolean(s"system.$configKey.format")) format() + + val user = config.getString(s"system.$configKey.user") + val home = config.getString(s"system.$configKey.path.home") + val logDir = config.getString(s"system.$configKey.path.log") + val hostname = config.getString("app.hostname") + + var failedStartUpAttempts = 0 + while (!isUp) { + try { + val totl = config.getStringList(s"system.$configKey.config.workers").size() + var init = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-namenode-$hostname.log | grep 'registerDatanode:' | wc -l""").trim()) + + shell ! s"$home/sbin/start-dfs.sh" + logger.info(s"Waiting for nodes to connect") + + var curr = init + var safe = !(shell !! s"$home/bin/hdfs dfsadmin -safemode get").toLowerCase.contains("off") + var cntr = config.getInt(s"system.$configKey.startup.polling.counter") + while (curr - init < totl || safe) { + logger.info(s"Connected ${curr - init} from $totl nodes, safemode is ${if (safe) "ON" else "OFF"}") + // wait a bit + Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) + // get new values + // depending on the log level its either in *.log or *.out (debug) + curr = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-namenode-$hostname.out | grep 'registerDatanode:' | wc -l""").trim()) + if (curr == 0) { + curr = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-namenode-$hostname.log | grep 'registerDatanode:' | wc -l""").trim()) + } + safe = !(shell !! s"$home/bin/hdfs dfsadmin -safemode get").toLowerCase.contains("off") + // timeout if counter goes below zero + cntr = cntr - 1 + if (curr - init < 0) init = 0 // protect against log reset on startup + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { + shell ! s"$home/sbin/stop-dfs.sh" + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + throw e + } + } + } + mkdir(config.getString(s"system.$configKey.path.input")) + } + + override protected def stop() = { + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/stop-dfs.sh" + if (config.getBoolean(s"system.$configKey.format")) format() + isUp = false + } + + def isRunning = { + val pidDir = config.getString(s"system.$configKey.config.env.HADOOP_PID_DIR") + (shell ! s""" ps -p `cat $pidDir/hadoop-*-namenode.pid` """) == 0 || + (shell ! s""" ps -p `cat $pidDir/hadoop-*-secondarynamenode.pid` """) == 0 || + (shell ! s""" ps -p `cat $pidDir/hadoop-*-datanode.pid` """) == 0 + } + + // --------------------------------------------------- + // Helper methods. + // --------------------------------------------------- + + private def format() = { + val user = config.getString(s"system.$configKey.user") + val home = config.getString(s"system.$configKey.path.home") + + logger.info(s"Formatting namenode") + shell ! (s"$home/bin/hdfs namenode -format -nonInteractive -force", "Unable to format namenode.") + + val init = (host: String, path: String) => + s""" ssh $user@$host "rm -Rf $path && mkdir -p $path/current" """ + + val list = for { + host <- config.getStringList(s"system.$configKey.config.workers").asScala + path <- config.getString(s"system.$configKey.config.hdfs.dfs.datanode.data.dir").split(',') + } yield (host, new URI(path).getPath) + + val futureInitOps = Future.traverse(list)(((host: String, path: String) => Future { + logger.info(s"Initializing HDFS data directory '$path' at $host") + shell ! (init(host, path), s"Unable to initialize HDFS data directory '$path' at $host.") + }).tupled) + + // await for all futureInitOps to finish + Await.result(futureInitOps, Math.max(30, 5 * list.size).seconds) + } +} diff --git a/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala new file mode 100644 index 00000000..3c94ea27 --- /dev/null +++ b/peel-extensions/src/main/scala/org/peelframework/hadoop/beans/system/Yarn3.scala @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2014 TU Berlin (peel@dima.tu-berlin.de) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.peelframework.hadoop.beans.system + +import java.util.regex.Pattern + +import com.samskivert.mustache.Mustache +import org.peelframework.core.beans.system.Lifespan.Lifespan +import org.peelframework.core.beans.system.{LogCollection, SetUpTimeoutException, System} +import org.peelframework.core.config.{Model, SystemConfig} +import org.peelframework.core.util.shell + +import scala.collection.JavaConverters._ +import scala.util.matching.Regex + +/** Wrapper class for Yarn. + * + * Implements Yarn as a Peel `System` and provides setup and teardown methods. + * + * @param version Version of the system (e.g. "7.1") + * @param configKey The system configuration resides under `system.\${configKey}` + * @param lifespan `Lifespan` of the system + * @param dependencies Set of dependencies that this system needs + * @param mc The moustache compiler to compile the templates that are used to generate property files for the system + */ +class Yarn3( + version : String, + configKey : String, + lifespan : Lifespan, + dependencies : Set[System] = Set(), + mc : Mustache.Compiler) extends System("yarn", version, configKey, lifespan, dependencies, mc) + with LogCollection { + + // --------------------------------------------------- + // LogCollection. + // --------------------------------------------------- + + /** The patterns of the log files to watch. */ + override protected def logFilePatterns(): Seq[Regex] = { + // TODO: rework based on http://hortonworks.com/blog/simplifying-user-logs-management-and-access-in-yarn/ + val user = Pattern.quote(config.getString(s"system.$configKey.user")) + config.getStringList(s"system.$configKey.config.workers").asScala.map(Pattern.quote).map(slave => + s"hadoop-$user-resourcemanager-$slave\\.log".r) + } + + // --------------------------------------------------- + // System. + // --------------------------------------------------- + + override def configuration() = SystemConfig(config, { + val conf = config.getString(s"system.$configKey.path.config") + List( + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/workers", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Hosts](s"system.$configKey.config.workers", s"$conf/yarn-workers", templatePath("conf/hosts"), mc), + SystemConfig.Entry[Model.Env](s"system.$configKey.config.env", s"$conf/hadoop-env.sh", templatePath("conf/hadoop-env.sh"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.core", s"$conf/core-site.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.yarn", s"$conf/yarn-site.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.mapred", s"$conf/mapred-site.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.Site](s"system.$configKey.config.resource-types", s"$conf/resource-types.xml", templatePath("conf/site.xml"), mc), + SystemConfig.Entry[Model.KeyValue](s"system.$configKey.config.capacity-scheduler", s"$conf/capacity-scheduler.xml", templatePath("conf/capacity-scheduler.xml"), mc), + SystemConfig.Entry[Model.INI](s"system.$configKey.config.container-executor", s"$conf/container-executor.cfg", templatePath("conf/ini"), mc) + ) + }) + + override def start(): Unit = { + val user = config.getString(s"system.$configKey.user") + val logDir = config.getString(s"system.$configKey.path.log") + + var failedStartUpAttempts = 0 + while(!isUp) { + try { + val total = config.getStringList(s"system.$configKey.config.workers").size() + // yarn does not reset the resourcemanagers log at startup + val init = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-resourcemanager-*.log | grep 'registered with capability:' | wc -l""").trim()) + + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/start-yarn.sh" + logger.info(s"Waiting for nodes to connect") + + var curr = init + var cntr = config.getInt(s"system.$configKey.startup.polling.counter") + while (curr - init < total) { + logger.info(s"Connected ${curr - init} from $total nodes") + // wait a bit + Thread.sleep(config.getInt(s"system.$configKey.startup.polling.interval")) + // get new values + curr = Integer.parseInt((shell !! s"""cat $logDir/hadoop-$user-resourcemanager-*.log | grep 'registered with capability:' | wc -l""").trim()) + // timeout if counter goes below zero + cntr = cntr - 1 + if (cntr < 0) throw new SetUpTimeoutException(s"Cannot start system '$toString'; node connection timeout at system ") + } + logger.info(s"Connected ${curr - init} from $total nodes") + isUp = true + } catch { + case e: SetUpTimeoutException => + failedStartUpAttempts = failedStartUpAttempts + 1 + if (failedStartUpAttempts < config.getInt(s"system.$configKey.startup.max.attempts")) { + stop() + logger.info(s"Could not bring system '$toString' up in time, trying again...") + } else { + throw e + } + } + } + } + + override def stop(): Unit = { + shell ! s"${config.getString(s"system.$configKey.path.home")}/sbin/stop-yarn.sh" + isUp = false + } + + def isRunning = { + (shell ! s""" ps -p `cat ${config.getString(s"system.$configKey.config.env.HADOOP_PID_DIR")}/hadoop-*-resourcemanager.pid` """) == 0 || + (shell ! s""" ps -p `cat ${config.getString(s"system.$configKey.config.env.HADOOP_PID_DIR")}/hadoop-*-nodemanager.pid` """) == 0 + } +} diff --git a/pom.xml b/pom.xml index 657c3f07..66b7c2bc 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,7 @@ 1.3.2 3.4 - 1.9 + 1.15 0.6.0