From 984ac77f3a601bb31ab83ba03cc3dd0d53fecfca Mon Sep 17 00:00:00 2001 From: Charlie Drage Date: Tue, 19 Apr 2016 16:27:53 -0400 Subject: [PATCH] test --- atomicapp/providers/kubernetes.py | 369 +++++++++++++++--------------- 1 file changed, 183 insertions(+), 186 deletions(-) diff --git a/atomicapp/providers/kubernetes.py b/atomicapp/providers/kubernetes.py index 50e32cdd..1a3e3395 100644 --- a/atomicapp/providers/kubernetes.py +++ b/atomicapp/providers/kubernetes.py @@ -20,18 +20,60 @@ import anymarkup import logging import os -from string import Template -from atomicapp.constants import (LOGGER_COCKPIT, +from atomicapp.constants import (ACCESS_TOKEN_KEY, + ANSWERS_FILE, + DEFAULT_NAMESPACE, LOGGER_DEFAULT, - PERSISTENT_STORAGE_FORMAT) + NAMESPACE_KEY, + PROVIDER_API_KEY, + PROVIDER_CA_KEY, + LOGGER_COCKPIT) from atomicapp.plugin import Provider, ProviderFailedException -from atomicapp.utils import Utils + +from pykube.config import KubeConfig +from pykube.http import HTTPClient, HTTPError +from pykube.objects import Pod, ReplicationController, Service, Namespace cockpit_logger = logging.getLogger(LOGGER_COCKPIT) logger = logging.getLogger(LOGGER_DEFAULT) +class KubernetesAPI(object): + + def __init__(self, config): + self.api = HTTPClient(config) + + def namespaces(self): + return Namespace.objects(self.api).all().response["items"] + + def create(self, kind, artifact): + if kind in ["pod", "po"]: + k8s = Pod(self.api, artifact) + elif kind in ["rc", "replicationcontroller"]: + k8s = ReplicationController(self.api, artifact) + elif kind == "service": + k8s = Service(self.api, artifact) + else: + raise ProviderFailedException( + "No Kubernetes API of that kind: %s" % kind) + k8s.create() + + def delete(self, kind, artifact): + kind = artifact['kind'] + if kind in ["pod", "po"]: + k8s = Pod(self.api, artifact) + elif kind in ["rc", "replicationcontroller"]: + k8s = ReplicationController(self.api, artifact) + k8s.scale(replicas=0) + elif kind == "service": + k8s = Service(self.api, artifact) + else: + raise ProviderFailedException( + "No Kubernetes API of that kind: %s" % kind) + k8s.delete() + + class KubernetesProvider(Provider): """Operations for Kubernetes provider is implemented in this class. @@ -39,226 +81,181 @@ class KubernetesProvider(Provider): Kubernetes provider. """ key = "kubernetes" + namespace = DEFAULT_NAMESPACE + k8s_artifacts = {} config_file = None - kubectl = None - def init(self): - self.namespace = "default" + # Kubernetes cmd line settings + providerapi = "https://127.0.0.1:8080" + access_token = None + provider_ca = None - self.k8s_manifests = [] + def init(self): + self.k8s_artifacts = {} logger.debug("Given config: %s", self.config) if self.config.get("namespace"): self.namespace = self.config.get("namespace") logger.info("Using namespace %s", self.namespace) - if self.container: - self.kubectl = self._find_kubectl(Utils.getRoot()) - kube_conf_path = "/etc/kubernetes" - host_kube_conf_path = os.path.join(Utils.getRoot(), kube_conf_path.lstrip("/")) - if not os.path.exists(kube_conf_path) and os.path.exists(host_kube_conf_path): - if self.dryrun: - logger.info("DRY-RUN: link %s from %s" % (kube_conf_path, host_kube_conf_path)) - else: - os.symlink(host_kube_conf_path, kube_conf_path) - else: - self.kubectl = self._find_kubectl() - if not self.dryrun: - if not os.access(self.kubectl, os.X_OK): - raise ProviderFailedException("Command: " + self.kubectl + " not found") + self._process_artifacts() - # Check if Kubernetes config file is accessible, but only - # if one was provided by the user; config file is optional. - if self.config_file: - self.checkConfigFile() + if self.config_file: + self.api = KubernetesAPI(KubeConfig.from_file(self.config_file)) + else: + self.api = KubernetesAPI(self._from_env()) + + self._check_namespaces() + + def _from_env(self): + result = {PROVIDER_API_KEY: self.providerapi, + ACCESS_TOKEN_KEY: self.access_token, + PROVIDER_CA_KEY: self.provider_ca} + + for k in [PROVIDER_API_KEY, ACCESS_TOKEN_KEY, NAMESPACE_KEY]: + if result[k] is None: + msg = "Kubernetes API access: You need to set %s in %s" % (k, ANSWERS_FILE) + logger.error(msg) + raise ProviderFailedException(msg) + + config = { + "clusters": [ + { + "name": "self", + "cluster": { + "server": self.providerapi, + "certificate-authority": self.provider_ca, + }, + }, + ], + "users": [ + { + "name": "self", + "user": { + "token": self.access_token, + }, + }, + ], + "contexts": [ + { + "name": "self", + "context": { + "cluster": "self", + "user": "self", + }, + } + ], + "current-context": "self", + } + return config + + def _check_namespaces(self): + namespace_list = self.api.namespaces() + logger.debug("There are currently %s namespaces in the cluster." % str(len(namespace_list))) + + namespaces = [] + for ns in namespace_list: + namespaces.append(ns["metadata"]["name"]) + + if self.namespace not in namespaces: + logger.error("%s namespace does not exist. Please create the namespace and try again.") + + def _process_artifacts(self): + """ + Parse each Kubernetes file and convert said format into an Object for + deployment. + """ + for artifact in self.artifacts: + logger.debug("Processing artifact: %s", artifact) + data = None + with open(os.path.join(self.path, artifact), "r") as fp: + data = anymarkup.parse(fp, force_types=None) - def _find_kubectl(self, prefix=""): - """Determine the path to the kubectl program on the host. - 1) Check the config for a provider_cli in the general section - remember to add /host prefix - 2) Search /usr/bin:/usr/local/bin + self._process_artifact_data(artifact, data) - Use the first valid value found + def _process_artifact_data(self, artifact, data): """ + Process the data for an artifact - if self.dryrun: - # Testing env does not have kubectl in it - return "/usr/bin/kubectl" + Args: + artifact (str): Artifact name + data (dict): Artifact data + """ - test_paths = ['/usr/bin/kubectl', '/usr/local/bin/kubectl'] - if self.config.get("provider_cli"): - logger.info("caller gave provider_cli: " + self.config.get("provider_cli")) - test_paths.insert(0, self.config.get("provider_cli")) + # Check that the artifact is using the correct API + self._identify_api(artifact, data) - for path in test_paths: - test_path = prefix + path - logger.info("trying kubectl at " + test_path) - kubectl = test_path - if os.access(kubectl, os.X_OK): - logger.info("found kubectl at " + test_path) - return kubectl + # Check if kind exists + if "kind" not in data.keys(): + raise ProviderFailedException( + "Error processing %s artifact. There is no kind" % artifact) - raise ProviderFailedException("No kubectl found in %s" % ":".join(test_paths)) + # Change to lower case so it's easier to parse + kind = data["kind"].lower() - def _call(self, cmd): - """Calls given command + if kind not in self.k8s_artifacts.keys(): + self.k8s_artifacts[kind] = [] - :arg cmd: Command to be called in a form of list - :raises: Exception - """ + # Change to the namespace specified on init() + data['metadata']['namespace'] = self.namespace + data['metadata']['labels']['namespace'] = self.namespace - if self.dryrun: - logger.info("DRY-RUN: %s", " ".join(cmd)) - else: - ec, stdout, stderr = Utils.run_cmd(cmd, checkexitcode=True) - return stdout + self.k8s_artifacts[kind].append(data) - def process_k8s_artifacts(self): - """Processes Kubernetes manifests files and checks if manifest under - process is valid. + def _identify_api(self, artifact, data): """ - for artifact in self.artifacts: - data = None - with open(os.path.join(self.path, artifact), "r") as fp: - logger.debug(os.path.join(self.path, artifact)) - try: - data = anymarkup.parse(fp) - except Exception: - msg = "Error processing %s artifcats, Error:" % os.path.join( - self.path, artifact) - cockpit_logger.error(msg) - raise - if "kind" in data: - self.k8s_manifests.append((data["kind"].lower(), artifact)) - else: - apath = os.path.join(self.path, artifact) - raise ProviderFailedException("Malformed kube file: %s" % apath) - - def _resource_identity(self, path): - """Finds the Kubernetes resource name / identity from resource manifest - and raises if manifest is not supported. - - :arg path: Absolute path to Kubernetes resource manifest - - :return: str -- Resource name / identity - - :raises: ProviderFailedException + Make sure that the artifact is using the correct API + + Args: + artifact (str): Artifact name + data (dict): Artifact data """ - data = anymarkup.parse_file(path) if data["apiVersion"] == "v1": - return data["metadata"]["name"] + pass elif data["apiVersion"] in ["v1beta3", "v1beta2", "v1beta1"]: - msg = ("%s is not supported API version, update Kubernetes " + msg = ("%s is not a supported API version, update Kubernetes " "artifacts to v1 API version. Error in processing " - "%s manifest." % (data["apiVersion"], path)) + "%s manifest." % (data["apiVersion"], artifact)) raise ProviderFailedException(msg) else: - raise ProviderFailedException("Malformed kube file: %s" % path) - - def _scale_replicas(self, path, replicas=0): - """Scales replicationController to specified replicas size - - :arg path: Path to replicationController manifest - :arg replicas: Replica size to scale to. - """ - rname = self._resource_identity(path) - cmd = [self.kubectl, "scale", "rc", rname, - "--replicas=%s" % str(replicas), - "--namespace=%s" % self.namespace] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - - self._call(cmd) + raise ProviderFailedException("Malformed kubernetes artifact: %s" % artifact) def run(self): - """Deploys the app by given resource manifests. + """ + Deploys the app by given resource artifacts. """ logger.info("Deploying to Kubernetes") - self.process_k8s_artifacts() - for kind, artifact in self.k8s_manifests: - if not artifact: - continue - - k8s_file = os.path.join(self.path, artifact) - - cmd = [self.kubectl, "create", "-f", k8s_file, "--namespace=%s" % self.namespace] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - self._call(cmd) + for kind, objects in self.k8s_artifacts.iteritems(): + for artifact in objects: + if self.dryrun: + logger.info("DRY-RUN: Deploying k8s KIND: %s, ARTIFACT: %s" + % (kind, artifact)) + else: + try: + self.api.create(kind, artifact) + except HTTPError as e: + msg = "Failed to deploy Kubernetes artifact kind: %s. " \ + "Error: %s" % (kind, e) + raise ProviderFailedException(msg) def stop(self): """Undeploys the app by given resource manifests. Undeploy operation first scale down the replicas to 0 and then deletes the resource from cluster. """ - logger.info("Undeploying from Kubernetes") - self.process_k8s_artifacts() - - for kind, artifact in self.k8s_manifests: - if not artifact: - continue - - path = os.path.join(self.path, artifact) - - if kind in ["ReplicationController", "rc", "replicationcontroller"]: - self._scale_replicas(path, replicas=0) - - cmd = [self.kubectl, "delete", "-f", path, "--namespace=%s" % self.namespace] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - self._call(cmd) - - def persistent_storage(self, graph, action): - """ - Actions are either: run, stop or uninstall as per the Requirements class - Curently run is the only function implemented for k8s persistent storage - """ + logger.info("Deploying to Kubernetes") - logger.debug("Persistent storage enabled! Running action: %s" % action) - - if graph["accessMode"] not in PERSISTENT_STORAGE_FORMAT: - raise ProviderFailedException("{} is an invalid storage format " - "(choose from {})" - .format(graph["accessMode"], - ', '.join(PERSISTENT_STORAGE_FORMAT))) - - if action not in ['run']: - logger.warning( - "%s action is not available for provider %s. Doing nothing." % - (action, self.key)) - return - - self._check_persistent_volumes() - - # Get the path of the persistent storage yaml file includes in /external - # Plug the information from the graph into the persistent storage file - base_path = os.path.dirname(os.path.realpath(__file__)) - template_path = os.path.join(base_path, - 'external/kubernetes/persistent_storage.yaml') - with open(template_path, 'r') as f: - content = f.read() - template = Template(content) - rendered_template = template.safe_substitute(graph) - - tmp_file = Utils.getTmpFile(rendered_template, '.yaml') - - # Pass the .yaml file and execute - if action is "run": - cmd = [self.kubectl, "create", "-f", tmp_file, "--namespace=%s" % self.namespace] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - self._call(cmd) - os.unlink(tmp_file) - - def _check_persistent_volumes(self): - cmd = [self.kubectl, "get", "pv"] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - lines = self._call(cmd) - - # If there are no persistent volumes to claim, warn the user - if not self.dryrun and len(lines.split("\n")) == 2: - logger.warning("No persistent volumes detected in Kubernetes. Volume claim will not " - "initialize unless persistent volumes exist.") + for kind, objects in self.k8s_artifacts.iteritems(): + for artifact in objects: + if self.dryrun: + logger.info("DRY-RUN: Deploying k8s KIND: %s, ARTIFACT: %s" + % (kind, artifact)) + else: + try: + self.api.delete(kind, artifact) + except HTTPError as e: + msg = "Failed to stop Kubernetes artifact kind: %s. " \ + "Error: %s" % (kind, e) + raise ProviderFailedException(msg)