diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..c01a5d7 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,28 @@ +--- +default_language_version: + # force all unspecified python hooks to run python3 + python: python3 +repos: + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v3.4.0 + hooks: + - id: trailing-whitespace + - id: mixed-line-ending + args: ['--fix', 'lf'] + exclude: '.*\.(svg)$' + - id: check-byte-order-marker + - id: check-executables-have-shebangs + - id: check-merge-conflict + - id: debug-statements + - id: check-yaml + files: .*\.(yaml|yml)$ + - repo: local + hooks: + - id: flake8 + name: flake8 + additional_dependencies: + - hacking>=3.0.1,<3.1.0 + language: python + entry: flake8 + files: '^.*\.py$' + exclude: '^(doc|releasenotes|tools)/.*$' diff --git a/.zuul.yaml b/.zuul.yaml index ba2458e..a5e8661 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -31,6 +31,7 @@ jobs: - otc-tox-pep8 - otc-tox-py38 + - otc-tox-functional - apimon-build-image check-post: jobs: @@ -38,6 +39,8 @@ gate: jobs: - otc-tox-pep8 + - otc-tox-py38 + - otc-tox-functional - apimon-upload-image release: jobs: diff --git a/Dockerfile b/Dockerfile index d5ff0ed..62e2225 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,7 +10,7 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -FROM fedora:33 +FROM fedora:35 LABEL description="APImon (OpenStack API monitoring) container" LABEL maintainer="Open Telekom Cloud (ecosystem)" @@ -34,7 +34,7 @@ WORKDIR /usr/app COPY ./requirements.txt /usr/app/requirements.txt -RUN \ +RUN \ git clone https://github.com/opentelekomcloud/python-otcextensions && \ # git clone https://github.com/ansible/ansible --branch stable-2.10 && \ git clone https://review.opendev.org/openstack/openstacksdk diff --git a/apimon/epmon/server.py b/apimon/epmon/server.py index 97c9d35..cba70c4 100644 --- a/apimon/epmon/server.py +++ b/apimon/epmon/server.py @@ -15,14 +15,10 @@ import time import openstack -from keystoneauth1.exceptions import ClientException - -try: - from alertaclient.api import Client as alerta_client -except ImportError: - alerta_client = None +from keystoneauth1 import exceptions as keystone_exceptions from apimon.lib import commandsocket +from apimon.lib.alerta_client import AlertaClient from apimon.lib.statsd import get_statsd @@ -33,35 +29,29 @@ class EndpointMonitor(threading.Thread): """A thread that checks endpoints. """ log = logging.getLogger("apimon.EndpointMonitor") - def __init__(self, config, target_cloud, - zone: str = None, alerta: dict = None) -> None: + def __init__( + self, config, target_cloud, zone: str = None + ) -> None: threading.Thread.__init__(self) self.log.info('Starting watching %s cloud' % target_cloud) + self.config = config + self.target_cloud = target_cloud + self.zone = zone + self.daemon = True self.wake_event = threading.Event() self._stopped = False self._pause = False - self.config = config - self.alerta = alerta + self.alerta = AlertaClient(self.config) + self.alerta.connect() + self.conn = None self.service_override = None - self.interval = int(self.config.get_default( - 'epmon', 'interval', 5)) - - self.influx_cnf = self.config.get_default( - 'metrics', 'influxdb', {}).copy() - self.zone = zone - self.statsd_extra_keys = { - 'zone': self.zone - } - self.statsd = get_statsd( - self.config, - self.statsd_extra_keys) - self.target_cloud = target_cloud self.reload() def stop(self) -> None: + self.log.info(f"Stopping epmon for {self.target_cloud}") self._stopped = True self.wake_event.set() @@ -72,6 +62,22 @@ def resume(self) -> None: self._pause = False def reload(self) -> None: + # Force alerta connect (reseting counter) + self.alerta.connect(force=True) + self.connect_retries = 0 + + self.interval = int(self.config.get_default( + 'epmon', 'interval', 5)) + + self.influx_cnf = self.config.get_default( + 'metrics', 'influxdb', {}).copy() + + self.statsd_extra_keys = { + 'zone': self.zone + } + self.statsd = get_statsd(self.config, self.statsd_extra_keys) + + # Read list of clouds we need to monitor for cl in self.config.get_default('epmon', 'clouds', []): if isinstance(cl, dict): if len(cl.items()) != 1: @@ -87,97 +93,132 @@ def reload(self) -> None: self.interval = int(vals.get('interval', 5)) except Exception: self.interval = 5 - self.log.debug('Need to monitor cloud %s' % target_cloud) - - auth_part = None - - for cnf in self.config.config.get('clouds', []): - if cnf.get('name') == self.target_cloud: - auth_part = cnf.get('data') - if self.influx_cnf and 'additional_metric_tags' in auth_part: - self.influx_cnf['additional_metric_tags'] = \ - auth_part['additional_metric_tags'] - - if not auth_part: - raise RuntimeError('Requested cloud %s is not found' % - target_cloud) + self.log.debug(f"Need to monitor cloud {target_cloud}") override_measurement = self.config.get_default('epmon', 'measurement') if override_measurement and self.influx_cnf: self.influx_cnf['measurement'] = override_measurement - self.region = openstack.config.get_cloud_region( - load_yaml_config=False, - **auth_part) - if self.influx_cnf: - self.region._influxdb_config = self.influx_cnf - statsd_config = self.config.get_default('metrics', 'statsd') - if statsd_config: - # Inject statsd reporter - self.region._statsd_host = statsd_config.get('host', 'localhost') - self.region._statsd_port = int(statsd_config.get('port', 8125)) - self.region._statsd_prefix = ( - 'openstack.api.{environment}.{zone}' - .format( - environment=self.target_cloud, - zone=self.zone) - ) - - self._connect() + # Reset openstack conn + self._connect_cloud(force=True) def run(self) -> None: - self._connect() + """Main loop""" while True: if self._stopped: return if not self.conn: # Not sure whether it works if we loose connection - self._connect() + self._connect_cloud() try: if self._pause: # Do not send heartbeat as well to not to forget to resume continue - if self.conn: + # If we are connected try to perform query + if self.conn and self.conn.config.get_auth().get_auth_state(): self._execute() - if self.alerta: - try: - self.alerta.heartbeat( - origin='apimon.epmon.%s.%s' % ( - self.zone, self.target_cloud), - tags=['apimon', 'epmon'], - attributes={ - 'zone': self.zone, - 'cloud': self.target_cloud, - 'service': ['apimon', 'epmon'], - } - ) - except Exception: - self.log.exception('Error sending heartbeat') + self.alerta.heartbeat( + origin='apimon.epmon.%s.%s' % ( + self.zone, self.target_cloud), + tags=['apimon', 'epmon'], + attributes={ + 'zone': self.zone, + 'cloud': self.target_cloud, + 'service': ['apimon', 'epmon'], + } + ) + + # Have some rest time.sleep(self.interval) except Exception: self.log.exception("Exception checking endpoints:") - def _connect(self): + def _connect_cloud(self, force=False): + """Establish cloud connection""" + if force: + self.connect_retries = 0 + self.conn = None + if self.connect_retries > 1: + self.log.warning( + "Reached cloud reconnect limit. Not reconnecting anymore.") + self.conn = None + return + else: + self.connect_retries += 1 + + cloud_config = self.config.get_cloud('clouds', self.target_cloud) + + if not cloud_config: + raise RuntimeError( + f"Requested cloud {self.target_cloud} is not found") + + if self.influx_cnf and 'additional_metric_tags' in cloud_config: + self.influx_cnf['additional_metric_tags'] = \ + cloud_config['additional_metric_tags'] + + # Get cloud region config + self.region = openstack.config.get_cloud_region( + name=self.target_cloud, + load_yaml_config=False, + **cloud_config) + + if self.influx_cnf: + self.region._influxdb_config = self.influx_cnf + + statsd_config = self.config.get_default('metrics', 'statsd') + if statsd_config: + # Inject statsd reporter + self.region._statsd_host = statsd_config.get('host', 'localhost') + self.region._statsd_port = int(statsd_config.get('port', 8125)) + self.region._statsd_prefix = ( + f"openstack.api.{self.target_cloud}.{self.zone}" + ) + + # Try to connect try: self.conn = openstack.connection.Connection( config=self.region, ) - except AttributeError as e: - # NOTE(gtema): SDK chains attribute error when calling - # conn.authorize, but response is not present - self.log.error('Cannot establish connection: %s' % e.__context__) - self.send_alert('identity', e.__context__) + self.conn.authorize() + except openstack.exceptions.SDKException as ex: + self.conn = None + self.log.error(f"Cannot connect to the cloud: {str(ex)}") + self.send_alert('identity', 'ConnectionException', str(ex)) except Exception as ex: - self.log.exception('Cannot establish connection to cloud %s: %s' % - (self.target_cloud, ex)) + self.log.exception( + f"Cannot establish connection to cloud {self.target_cloud}: " + f"{str(ex)}") self.send_alert('identity', 'ConnectionException', str(ex)) def _execute(self): - eps = self.conn.config.get_service_catalog().get_endpoints().items() + """Perform the tests""" + eps = [] + try: + eps = self.conn.config.get_service_catalog( + ).get_endpoints().items() + except keystone_exceptions.http.Unauthorized as ex: + # If our auth expires and we can not even renew it try to trigger + # reconnect + self.log.error( + f"Got Unauthorized exception while listing endpoints:" + f"{str(ex)}" + ) + self.conn = None + return + except Exception as ex: + self.log.error( + f"Got unexpected exception while listing endpoints: {str(ex)}") + self.conn = None + return + for service, data in eps: + if self._stopped: + # stop faster (do not wait for remaining services to be + # checked) + return endpoint = data[0]['url'] - self.log.debug('Checking service %s' % service) + self.log.debug(f"Checking service {service}") srv = None sdk_srv = None try: @@ -195,18 +236,19 @@ def _execute(self): if urls: for url in urls: if isinstance(url, str): - self._query_endpoint(client, service, - endpoint, url) + self._query_endpoint( + client, service, endpoint, url) else: - self.log.error('Wrong configuration, ' - 'service_override must be list of ' - 'string urls') + self.log.error( + 'Wrong configuration, service_override must' + ' be list of string urls') else: - self.log.debug('Skipping querying service %s' % service) + self.log.debug(f"Skipping querying service {service}") else: self._query_endpoint(client, service, endpoint, endpoint) def _query_endpoint(self, client, service, endpoint, url): + """Query concrete endpoint""" response = None error = None try: @@ -214,13 +256,17 @@ def _query_endpoint(self, client, service, endpoint, url): url, headers={'content-type': 'application/json'}, timeout=5) - except (openstack.exceptions.SDKException, ClientException) as ex: + except openstack.exceptions.HttpException as ex: + error = ex + self.log.error( + f"Got HTTP exception for endpoint {url}: {str(ex)}") + except openstack.exceptions.SDKException as ex: error = ex - self.log.error('Got exception for endpoint %s: %s' % (url, - ex)) + self.log.error( + f"Got openstack exception for endpoint {url}: {str(ex)}") except Exception: - self.log.exception('Got uncatched exception doing request to %s' % - url) + self.log.exception( + f"Got uncatched exception doing request to {url}") status_code = -1 if response is not None: @@ -232,11 +278,11 @@ def _query_endpoint(self, client, service, endpoint, url): else: query_url = url result = status_code if status_code != -1 else 'Timeout(5)' + # Construct a nice helper message for Alerta value = ( - 'curl -g -i -X GET %s -H ' + f'curl -g -i -X GET {query_url} -H ' '"X-Auth-Token: ${TOKEN}" ' - '-H "content-type: application/json" fails (%s)' % ( - query_url, result) + f'-H "content-type: application/json" fails ({result})' ) self.send_alert( resource=service, @@ -244,9 +290,11 @@ def _query_endpoint(self, client, service, endpoint, url): raw_data=str(error.message if error else response) ) - def send_alert(self, resource: str, value: str, - raw_data: str=None) -> None: - if self.alerta: + def send_alert( + self, resource: str, value: str, raw_data: str=None + ) -> None: + """Send alert to alerta""" + if self.alerta.client: self.alerta.send_alert( severity='critical', environment=self.target_cloud, @@ -259,8 +307,9 @@ def send_alert(self, resource: str, value: str, raw_data=raw_data ) else: - self.log.error('Got error from the endpoint check, but ' - 'cannot report it to alerta') + self.log.error( + 'Got error from the endpoint check, but ' + 'cannot report it to alerta') class EndpointMonitorServer: @@ -295,15 +344,6 @@ def __init__(self, config, zone: str = None): # self.accepting_work = False - def _connect_alerta(self) -> None: - if alerta_client: - alerta_ep = self.config.get_default('alerta', 'endpoint') - alerta_token = self.config.get_default('alerta', 'token') - if alerta_ep and alerta_token: - self.alerta = alerta_client( - endpoint=alerta_ep, - key=alerta_token) - def start(self): self._running = True self._command_running = True @@ -315,8 +355,6 @@ def start(self): self.command_thread.daemon = True self.command_thread.start() - self._connect_alerta() - for cl in self.config.get_default('epmon', 'clouds', []): if isinstance(cl, dict): if len(cl.items()) != 1: @@ -325,11 +363,10 @@ def start(self): target_cloud = list(cl.keys())[0] else: target_cloud = cl - self.log.debug('Need to monitor cloud %s' % target_cloud) + self.log.debug(f"Need to monitor cloud {target_cloud}") self._monitors[target_cloud] = EndpointMonitor( - self.config, target_cloud=target_cloud, - zone=self.zone, alerta=self.alerta) + self.config, target_cloud=target_cloud, zone=self.zone) self._monitors[target_cloud].start() def stop(self): diff --git a/apimon/executor/server.py b/apimon/executor/server.py index 2483aef..098c89e 100644 --- a/apimon/executor/server.py +++ b/apimon/executor/server.py @@ -30,16 +30,12 @@ import openstack -from apimon.lib.statsd import get_statsd - -try: - from alertaclient.api import Client as alerta_client -except ImportError: - alerta_client = None - from apimon.lib import commandsocket +from apimon.lib import alerta_client from apimon.lib.logutils import get_annotated_logger from apimon.lib.gearworker import GearWorker +from apimon.lib.statsd import get_statsd +from apimon.lib import utils from apimon.project import Project from apimon.executor import resultprocessor @@ -532,7 +528,7 @@ class ExecutorServer: log = logging.getLogger('apimon.ExecutorServer') - def __init__(self, config, zone: str = None): + def __init__(self, config, zone: str = None, standalone=False): self.log.info('Starting Executor server') self.config = config self._running = False @@ -562,7 +558,10 @@ def __init__(self, config, zone: str = None): } self.statsd = get_statsd(self.config, self.statsd_extra_keys) - self.result_processor = resultprocessor.ResultProcessor(self.config) + if not standalone: + # Be more tests friendly + self.result_processor = resultprocessor.ResultProcessor( + self.config) self._command_running = False self.accepting_work = False @@ -578,7 +577,7 @@ def __init__(self, config, zone: str = None): self._projects = {} self._config_version = None self._clouds_config = {} - self.alerta = None + self.alerta = alerta_client.AlertaClient(self.config) self.executor_jobs = { 'apimon:ansible': self.execute_ansible_job, @@ -604,7 +603,9 @@ def __init__(self, config, zone: str = None): server.get('ssl_ca'), keepalive=True, tcp_keepidle=60, tcp_keepintvl=30, tcp_keepcnt=5) - self.gear_client.waitForServer() + if not standalone: + # Be more tests friendly + self.gear_client.waitForServer() def start(self) -> None: self._running = True @@ -619,13 +620,7 @@ def start(self) -> None: self.result_processor.daemon = True self.result_processor.start() - if alerta_client: - alerta_ep = self.config.get_default('alerta', 'endpoint') - alerta_token = self.config.get_default('alerta', 'token') - if alerta_ep and alerta_token: - self.alerta = alerta_client( - endpoint=alerta_ep, - key=alerta_token) + self.alerta.connect() self.accepting_work = True @@ -711,18 +706,12 @@ def run_governor(self) -> None: self.manage_load() except Exception: self.log.exception("Exception in governor thread:") - self._send_alerta_heartbeat() - def _send_alerta_heartbeat(self): - if self.alerta: - try: - self.alerta.heartbeat( - origin='apimon.executor.%s' % self.hostname, - tags=['apimon', 'executor'], - timeout=300 - ) - except Exception: - self.log.exception('Error sending heartbeat') + self.alerta.heartbeat( + origin='apimon.executor.%s' % self.hostname, + tags=['apimon', 'executor'], + timeout=300 + ) def _create_logs_container(self, connection, container_name): container = connection.object_store.create_container( @@ -768,16 +757,15 @@ def _upload_log_file_to_swift(self, job_log_file, job_id) -> str: return self._get_logs_link(job_id, job_log_file.name) except openstack.exceptions.SDKException as e: self.log.exception('Error uploading log to Swift') - if self.alerta: - self.alerta.send_alert( - severity='major', - environment=self.config.alerta_env, - origin=self.config.alerta_origin, - service=['apimon', 'task_executor'], - resource='task', - event='LogUpload', - value=str(e) - ) + self.alerta.send_alert( + severity='major', + environment='apimon', + origin=('executor@%s' % self.zone), + service=['apimon', 'task_executor'], + resource='task', + event='LogUpload', + value=str(e) + ) return '' @@ -976,4 +964,5 @@ def _get_clouds_config(self, version=None) -> None: if d: data = json.loads(d) self._config_version = data.pop('_version') - self._clouds_config = data + self._clouds_config = utils.expand_dict_vars( + data, vault_client=self.config.vault_client) diff --git a/apimon/lib/alerta_client.py b/apimon/lib/alerta_client.py new file mode 100644 index 0000000..5873fcc --- /dev/null +++ b/apimon/lib/alerta_client.py @@ -0,0 +1,91 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +try: + from alertaclient.api import Client as alerta_client + from alertaclient import exceptions as alerta_exceptions +except ImportError: + alerta_client = None + + +class AlertaClient: + log = logging.getLogger('apimon.AlertaClient') + + def __init__(self, config): + self.config = config + self.retries = 0 + self.client = None + + def connect(self, force=False): + if alerta_client: + if force: + self.retries = 0 + if self.retries > 2: + self.log.warning( + "Reached alerta reconnect limit. Not reconnecting anymore." + ) + self.client = None + return + else: + self.retries += 1 + + alerta_ep = self.config.get_default('alerta', 'endpoint') + alerta_token = self.config.get_default('alerta', 'token') + if alerta_ep and alerta_token: + self.client = alerta_client( + endpoint=alerta_ep, + token=alerta_token) + + def send_alert( + self, severity: str, environment: str, resource: str, service: list, + value: str, origin: str, event: str='Failure', raw_data: str=None + ) -> None: + if not self.client: + self.log.warning('Sending alert is skipped') + return + try: + self.client.send_alert( + severity=severity, + environment=environment, + service=service, + origin=origin, + resource=resource, + event=event, + value=value, + raw_data=raw_data + ) + except alerta_exceptions.AlertaException as ex: + if ex.message == 'Token is invalid': + self.connect() + self.log.error( + f"Failed to communicate with alerta: {str(ex)}" + ) + + def heartbeat( + self, origin: str, tags: list, attributes: dict + ): + if not self.client: + return + try: + self.client.heartbeat( + origin=origin, + tags=tags, + attributes=attributes + ) + except alerta_exceptions.AlertaException as ex: + if ex.message == 'Token is invalid': + self.connect() + self.log.error( + f"Failed to communicate with alerta: {str(ex)}" + ) diff --git a/apimon/lib/config.py b/apimon/lib/config.py index 84e0a68..0509cd8 100644 --- a/apimon/lib/config.py +++ b/apimon/lib/config.py @@ -13,33 +13,16 @@ import os import yaml +import hvac -def merge_nested_dicts(a, b): - """Naive merge of nested dictinaries - """ - result = dict() - if isinstance(a, str) and isinstance(b, str): - return b - elif isinstance(a, list) and isinstance(b, list): - return [a, b] - elif isinstance(a, dict) and isinstance(b, dict): - for k in a.keys() | b.keys(): - if k not in a and k in b: - result[k] = b[k] - elif k in a and k not in b: - result[k] = a[k] - else: - result[k] = merge_nested_dicts(a[k], b[k]) - else: - raise ValueError('Cannot merge different types') - - return result +from apimon.lib import utils class Config(object): def __init__(self): self._fp = None - self.config = None + self.config = {} + self.vault_client = None def _find_config(self, path=None): if not path and self._fp: @@ -56,6 +39,19 @@ def _find_config(self, path=None): return self._fp raise Exception("Unable to locate config file in %s" % locations) + def _connect_to_vault(self, url, **kwargs): + self.vault_client = hvac.Client( + url=url, + timeout=int(kwargs.get('timeout', 10)) + ) + if 'role_id' in kwargs and 'secret_id' in kwargs: + self.vault_client.auth.approle.login( + role_id=kwargs['role_id'], + secret_id=kwargs['secret_id'], + ) + elif 'token' in kwargs: + self.vault_client.token = kwargs['token'] + def read(self, path=None): fp = self._find_config(path) @@ -67,18 +63,23 @@ def read(self, path=None): with open(secure, 'r') as f: secure_config = yaml.load(f, Loader=yaml.SafeLoader) # Merge secure_config into the main config - self.config = merge_nested_dicts(self.config, secure_config) -# self.config.update(secure_config) -# self.config = {k: dict(self.config.get(k, {}), -# **secure_config.get(k, {})) for k in -# self.config.keys() | secure_config.keys()} + self.config = utils.merge_nested_dicts(self.config, secure_config) + if 'vault' in self.config: + vault_config = self.config['vault'] + self._connect_to_vault( + vault_config['addr'], + **vault_config + ) return self def get_section(self, section): return self.config.get(section, {}) - def get_default(self, section, option, default=None, expand_user=False): + def get_default( + self, section, option, default=None, + expand_user=False, expand_vault=True + ): if not section or not option: raise RuntimeError('get_default without section/option is not ' 'possible') @@ -97,4 +98,20 @@ def get_default(self, section, option, default=None, expand_user=False): value = default if expand_user and value: return os.path.expanduser(value) + if expand_vault and value: + return utils.expand_vars(value, self.vault_client) return value + + def get_cloud(self, section, name): + for cloud in self.config.get(section, []): + if cloud.get('name') == name: + return utils.expand_dict_vars( + cloud.get('data'), self.vault_client) + + def get_clouds(self, section, expand_vault=False): + for cloud in self.config.get(section, []): + if expand_vault: + yield cloud.get('name'), utils.expand_dict_vars( + cloud.get('data'), self.vault_client) + else: + yield cloud.get('name'), cloud.get('data') diff --git a/apimon/lib/utils.py b/apimon/lib/utils.py new file mode 100644 index 0000000..1e8fff7 --- /dev/null +++ b/apimon/lib/utils.py @@ -0,0 +1,73 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from hvac import exceptions as hvac_exceptions + + +def merge_nested_dicts(a, b): + """Naive merge of nested dictinaries + """ + result = dict() + if isinstance(a, str) and isinstance(b, str): + return b + elif isinstance(a, list) and isinstance(b, list): + return [a, b] + elif isinstance(a, dict) and isinstance(b, dict): + for k in a.keys() | b.keys(): + if k not in a and k in b: + result[k] = b[k] + elif k in a and k not in b: + result[k] = a[k] + else: + result[k] = merge_nested_dicts(a[k], b[k]) + else: + raise ValueError('Cannot merge different types') + + return result + + +def expand_dict_vars(value, vault_client=None): + """Expand config dict with values from vault""" + res = dict() + for k, v in value.items(): + if isinstance(v, dict): + res[k] = expand_dict_vars(v, vault_client) + else: + res[k] = expand_vars(v, vault_client) + return res + + +def expand_vars(value, vault_client=None): + """Expand value following reference to vault""" + if not isinstance(value, str): + return value + if not value.startswith('vault|') or not vault_client: + return value + try: + pairs = value.split('|') + dt = {k: v for k, v in [x.split('=') for x in pairs[1:]]} + if dt['engine'] == 'secret': + data = vault_client.secrets.kv.v2.read_secret( + path=dt['path'] + )['data']['data'] + if dt['attr'] in data: + return data.get(dt['attr']) + else: + logging.error( + f"Attribute {dt['attr']} is not present on " + f"{dt['path']}" + ) + except hvac_exceptions.InvalidPath: + logging.error(f"Cannot find secret {dt['path']} in vault") + return None diff --git a/apimon/scheduler/scheduler.py b/apimon/scheduler/scheduler.py old mode 100755 new mode 100644 index c8c9180..f106978 --- a/apimon/scheduler/scheduler.py +++ b/apimon/scheduler/scheduler.py @@ -19,16 +19,12 @@ import openstack -try: - from alertaclient.api import Client as alerta_client -except ImportError: - alerta_client = None - from apimon.lib import queue as _queue +from apimon.lib import alerta_client from apimon.lib.gearworker import GearWorker from apimon.lib.statsd import get_statsd from apimon.project import Project -from apimon.model import TestEnvironment, Cloud +from apimon.model import TestEnvironment from apimon.lib import commandsocket @@ -159,8 +155,8 @@ def run(self): project.refresh_git_repo() self.scheduler._git_updated(project) except Exception: - self.log.exception('Exception during updating git ' - 'repo') + self.log.exception( + 'Exception during updating git repo') class ProjectCleanup(threading.Thread): @@ -220,25 +216,25 @@ def run(self): self._project_cleanup(cloud) def _get_cloud_connect(self, cloud): - auth_part = {} - for cnf in self.config.config.get('clouds', []): - if cnf.get('name') == cloud: - auth_part = cnf.get('data') - if not auth_part: + cloud_config = self.config.get_cloud('clouds', cloud) + if not cloud_config: self.log.error('Cannot determine cloud configuration for the ' 'project cleanup of %s' % cloud) return None region = openstack.config.get_cloud_region( load_yaml_config=False, - **auth_part) + **cloud_config) try: conn = openstack.connection.Connection( config=region, ) + conn.authorize() return conn - except Exception: - self.log.exception('Cannot establish connection to cloud %s' % - cloud) + except openstack.exceptions.SDKException as ex: + self.conn = None + self.log.exception( + f"Cannot connect to the cloud {cloud}: {str(ex)}") + self.send_alert('identity', 'ConnectionException', str(ex)) def _project_cleanup(self, target_cloud): conn = self._get_cloud_connect(target_cloud) @@ -300,7 +296,7 @@ def __init__(self, config, zone: str = None): } self.statsd = get_statsd(self.config, statsd_extra_keys) - self._alerta = None + self._alerta = alerta_client.AlertaClient(self.config) self._projects = {} self._environments = {} self._clouds = {} @@ -369,23 +365,14 @@ def _load_environments(self) -> None: ) self._environments[env.name] = env - def _load_clouds(self) -> None: - """Load cloud connections""" - self._clouds.clear() - for item in self.config.get_section('clouds'): - cl = Cloud( - name=item.get('name'), - data=item.get('data') - ) - self._clouds[cl.name] = cl - def _load_clouds_config(self) -> None: """Load clouds configuration from the config""" clouds = {} conf = {} - for item in self.config.get_section('clouds'): - clouds[item.get('name')] = item.get('data') - conf['clouds'] = clouds + for name, data in self.config.get_clouds('clouds'): + clouds[name] = data + conf['clouds'] = {x[0]: x[1] for x in + self.config.get_clouds('clouds')} metrics_config = self.config.get_section('metrics') if metrics_config: @@ -400,7 +387,7 @@ def start(self) -> None: self._stopped = False self._load_projects() - self._load_clouds() + self._load_clouds_config() self._load_environments() self._git_refresh_thread.start() @@ -410,8 +397,8 @@ def start(self) -> None: self._socket_running = True self._command_socket.start() - self.__socket_thread = threading.Thread(target=self.socket_run, - name='command') + self.__socket_thread = threading.Thread( + target=self.socket_run, name='command') self.__socket_thread.daemon = True self.__socket_thread.start() @@ -544,6 +531,8 @@ def process_operational_queue(self) -> None: self.operational_event_queue.task_done() def _reconfig(self, event) -> None: + self.alerta.connect(force=True) + self.__executor_client.pause_scheduling() self._git_refresh_thread.stop() @@ -553,14 +542,6 @@ def _reconfig(self, event) -> None: self.config = event.config - if alerta_client: - alerta_ep = self.config.get_default('alerta', 'endpoint') - alerta_token = self.config.get_default('alerta', 'token') - if alerta_ep and alerta_token: - self.alerta = alerta_client( - endpoint=alerta_ep, - key=alerta_token) - self._config_version += 1 self._load_clouds_config() self._load_projects() diff --git a/apimon/tests/functional/lib/test_config.py b/apimon/tests/functional/lib/test_config.py new file mode 100644 index 0000000..a0cb1c6 --- /dev/null +++ b/apimon/tests/functional/lib/test_config.py @@ -0,0 +1,65 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import TestCase + +import hvac + +from apimon.lib import config + + +class TestConfig(TestCase): + def setUp(self): + self.vault_client = hvac.Client( + url='http://localhost:8200', + token='root' + ) + + def test_get_cloud_with_vault(self): + cfg = config.Config() + cfg._connect_to_vault('http://localhost:8200', token='root') + vault_connected = False + try: + vault_connected = cfg.vault_client.is_authenticated() + except Exception: + pass + finally: + if not vault_connected: + self.skipTest('Vault not available for test') + + self.vault_client.secrets.kv.v2.create_or_update_secret( + path='usr1', + secret=dict( + username='foo', + password='bar' + ) + ) + cfg.config = dict(clouds=[]) + cfg.config['clouds'] = [ + dict( + name='vault_cloud', + data=dict( + auth=dict( + username="vault|engine=secret|path=usr1|attr=username", + password="vault|engine=secret|path=usr1|attr=password" + ) + ) + ) + ] + + cloud = cfg.get_cloud('clouds', 'vault_cloud') + self.assertDictEqual({ + 'auth': { + 'username': 'foo', + 'password': 'bar' + }, + }, cloud) diff --git a/apimon/tests/functional/lib/test_utils.py b/apimon/tests/functional/lib/test_utils.py new file mode 100644 index 0000000..6de3c83 --- /dev/null +++ b/apimon/tests/functional/lib/test_utils.py @@ -0,0 +1,69 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import TestCase + +import hvac + +from apimon.lib import utils + + +class TestUtils(TestCase): + def test_expand_vars_vault(self): + vault_client = hvac.Client( + url='http://localhost:8200', + token='root' + ) + vault_client.secrets.kv.v2.create_or_update_secret( + path='usr1', + secret=dict( + username='foo', + password='bar' + ) + ) + self.assertEqual( + 'bar', + utils.expand_vars( + 'vault|engine=secret|path=usr1|attr=password', + vault_client) + ) + + def test_expand_dict(self): + vault_client = hvac.Client( + url='http://localhost:8200', + token='root' + ) + vault_client.secrets.kv.v2.create_or_update_secret( + path='fake', + secret=dict( + foo='bar', + ) + ) + vault_client.secrets.kv.v2.create_or_update_secret( + path='fake2', + secret=dict( + foo='bar2', + ) + ) + struct = { + 'foo': 'vault|engine=secret|path=fake|attr=foo', + 'inline': { + 'foo2': 'vault|engine=secret|path=fake2|attr=foo' + } + } + + self.assertDictEqual( + {'foo': 'bar', 'inline': {'foo2': 'bar2'}}, + utils.expand_dict_vars( + struct, + vault_client) + ) diff --git a/apimon/tests/unit/executor/test_server.py b/apimon/tests/unit/executor/test_server.py index 1f14efc..9de2425 100644 --- a/apimon/tests/unit/executor/test_server.py +++ b/apimon/tests/unit/executor/test_server.py @@ -11,25 +11,26 @@ # under the License. import configparser -import mock -import unittest -import uuid -import tempfile +import json import os import shutil import socket import subprocess +import tempfile import time +from unittest import TestCase, mock +import uuid from pathlib import Path from apimon import project as _project from apimon.lib import config as _config +from apimon.lib.gearworker import GearWorker from apimon.executor import server from apimon.executor import message -class TestBase(unittest.TestCase): +class TestBase(TestCase): def setUp(self): super(TestBase, self).setUp() self.config = _config.Config() @@ -47,6 +48,7 @@ def setUp(self): self.executor_server.zone = 'fake_zone' self.executor_server.config = self.config self.executor_server.result_processor = mock.Mock() + self.executor_server._upload_log_file_to_swift = mock.Mock() _projects = { 'fake_proj': self.project } @@ -88,6 +90,12 @@ def test_run(self, sp_mock): env_cmp['APIMON_PROFILER_MESSAGE_SOCKET'] = Path( self.base_job.job_work_dir, '.comm_socket').resolve().as_posix() + prc = mock.Mock() + prc.wait = mock.Mock(return_value=2) + sp_mock.return_value = prc + self.executor_server._upload_log_file_to_swift.return_value = \ + 'fake_swift_url' + self.base_job.run() self.base_job.wait() sp_mock.assert_called_with( @@ -99,6 +107,24 @@ def test_run(self, sp_mock): cwd=self.base_job.job_work_dir, restore_signals=False ) + self.executor_server._upload_log_file_to_swift.assert_called_with( + Path(self.base_job.job_work_dir, 'job-output.txt'), + self.base_job.job_id + ) + self.executor_server.result_processor.add_job_entry.assert_called_with( + job={ + 'job_id': self.base_job.job_id, + 'name': 'fake_task', + 'result': 2, + 'duration': 0, + 'environment': 'env_name', + 'zone': 'fake_zone', + 'log_url': 'fake_swift_url' + } + ) + self.executor_server.finish_job.assert_called_with( + self.job.unique + ) @mock.patch('subprocess.Popen', auto_spec=True) def test_execute(self, sp_mock): @@ -277,3 +303,52 @@ def test_execute(self, sp_mock): cwd=self.job.job_work_dir, restore_signals=False ) + + +class TestServer(TestBase): + def _start_gear(self): + import gear + return gear.Server(4730, 'localhost', keepalive=False) + + def __get_cloud_config(self, job): + job.sendWorkComplete( + json.dumps({ + '_version': 0, + 'clouds': { + 'a': 'vault|engine=secret|path=fake|attr=foo' + } + }) + ) + + def test_get_logs_link(self): + executor = server.ExecutorServer(self.config, standalone=True) + executor._logs_container_name = 'fake_container' + executor._logs_cloud = mock.Mock(auto_spec=True) + executor._logs_cloud.object_store.get_endpoint.return_value = \ + 'http://fake_url' + res = executor._get_logs_link('job', 'res.txt') + self.assertEqual('http://fake_url/fake_container/job/res.txt', res) + + def test_get_clouds_config(self): + """Verify we get cloud config from scheduler and expand vars from + vault + """ + self._start_gear() + gear_worker = GearWorker( + 'Fake', 'log', 'thread', self.config, + {'apimon:get_cloud_config': self.__get_cloud_config}) + gear_worker.start() + executor = server.ExecutorServer(self.config) + with mock.patch( + 'hvac.api.secrets_engines.kv_v2.KvV2.read_secret' + ) as mock_vault: + mock_vault.return_value = {'data': {'data': {'foo': 'bar'}}} + executor._get_clouds_config(0) + self.assertDictEqual( + { + 'clouds': { + 'a': 'bar' + }, + }, + executor._clouds_config + ) diff --git a/apimon/tests/unit/lib/test_alerta_client.py b/apimon/tests/unit/lib/test_alerta_client.py new file mode 100644 index 0000000..1670160 --- /dev/null +++ b/apimon/tests/unit/lib/test_alerta_client.py @@ -0,0 +1,84 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import TestCase + +import requests_mock + +from apimon.lib import alerta_client +from apimon.lib import config + + +class TestAlertaClient(TestCase): + def setUp(self): + cfg = config.Config() + cfg.config['alerta'] = dict( + endpoint='http://alerta', + token='foo' + ) + self.alerta_client = alerta_client.AlertaClient(cfg) + self.adapter = requests_mock.Adapter() + + def test_connect(self): + self.alerta_client.connect() + + def test_alert(self): + self.alerta_client.connect() + with requests_mock.Mocker() as rmock: + rmock.post('http://alerta/alert', json={}) + self.alerta_client.send_alert( + severity='sev', + environment='env', + resource='res', + service='srv', + value='val', + origin='origin' + ) + self.assertEqual(1, len(rmock.request_history)) + + def test_alert_skip(self): + with requests_mock.Mocker() as rmock: + rmock.post('http://alerta/alert', json={}) + self.alerta_client.send_alert( + severity='sev', + environment='env', + resource='res', + service='srv', + value='val', + origin='origin' + ) + self.assertEqual(0, len(rmock.request_history)) + + def test_heartbeat(self): + self.alerta_client.connect() + with requests_mock.Mocker() as rmock: + rmock.post( + 'http://alerta/heartbeat', + json={'heartbeat': {}}) + self.alerta_client.heartbeat( + origin='origin', + tags=[], + attributes={} + ) + self.assertEqual(1, len(rmock.request_history)) + + def test_heartbeat_skip(self): + with requests_mock.Mocker() as rmock: + rmock.post( + 'http://alerta/heartbeat', + json={'heartbeat': {}}) + self.alerta_client.heartbeat( + origin='origin', + tags=[], + attributes={} + ) + self.assertEqual(0, len(rmock.request_history)) diff --git a/apimon/tests/unit/lib/test_utils.py b/apimon/tests/unit/lib/test_utils.py new file mode 100644 index 0000000..43ed906 --- /dev/null +++ b/apimon/tests/unit/lib/test_utils.py @@ -0,0 +1,52 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from unittest import TestCase +from unittest import mock + + +import hvac + +from apimon.lib import utils + + +class TestUtils(TestCase): + + def test_expand_vars(self): + self.assertEqual( + 'abc', + utils.expand_vars('abc') + ) + + def test_expand_vars_no_vault(self): + self.assertEqual( + 'vault|fake', + utils.expand_vars('vault|fake') + ) + + def test_expand_vars_vault(self): + with mock.patch( + 'hvac.api.secrets_engines.kv_v2.KvV2.read_secret' + ) as mock_vault: + mock_vault.return_value = {'data': {'data': {'attr_name': 'bar'}}} + + fake_client = hvac.Client( + url='http://fake.com:8200', + token='fake_token', + timeout=1 + ) + self.assertEqual( + 'bar', + utils.expand_vars( + 'vault|engine=secret|path=fake_path|attr=attr_name', + fake_client) + ) diff --git a/apimon/tests/unit/scheduler/test_scheduler.py b/apimon/tests/unit/scheduler/test_scheduler.py index bcebf18..cfff148 100644 --- a/apimon/tests/unit/scheduler/test_scheduler.py +++ b/apimon/tests/unit/scheduler/test_scheduler.py @@ -60,3 +60,18 @@ def setUp(self): def test_basic(self): self.assertTrue(True) + + def test_load_clouds_config(self): + self.assertDictEqual({}, self.scheduler._clouds_config) + self.scheduler._load_clouds_config() + expected = { + 'clouds': {x['name']: x['data'] for x in + self.config.get_section('clouds')} + } + expected.update( + metrics=self.config.get_section('metrics'), + _version=self.scheduler._config_version + ) + self.assertDictEqual( + expected, + self.scheduler._clouds_config) diff --git a/apimon/tests/unit/test_config.py b/apimon/tests/unit/test_config.py index 5b6e2d3..b8d3924 100644 --- a/apimon/tests/unit/test_config.py +++ b/apimon/tests/unit/test_config.py @@ -1,3 +1,14 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. from unittest import TestCase from unittest import mock diff --git a/etc/apimon.yaml b/etc/apimon.yaml index 40199c4..0516ff6 100644 --- a/etc/apimon.yaml +++ b/etc/apimon.yaml @@ -37,12 +37,11 @@ test_environments: clouds: - name: prod data: - profile: profile auth: auth_url: https://my_url project_name: test_proj user_domain_name: TEST_DOMAIN - username: user + username: "vault|engine=secrets|path=path|attr=attr" password: password additional_metric_tags: environment: production @@ -54,7 +53,7 @@ clouds: auth_url: https://my_other_url project_name: test_proj user_domain_name: OTC - username: user + username: username password: password additional_metric_tags: environment: preprod @@ -88,8 +87,3 @@ metrics: username: foobar password: barfoo measurement: openstack_api -epmonitor: - socket: /tmp/epmon.socket - clouds: - - prod - - preprod diff --git a/etc/logging.conf b/etc/logging.conf index fc0a959..52dceca 100644 --- a/etc/logging.conf +++ b/etc/logging.conf @@ -2,7 +2,7 @@ keys=root,apimon,gear [handlers] -keys=console,debug,normal +keys=console [formatters] keys=simple @@ -13,13 +13,13 @@ handlers=console [logger_apimon] level=DEBUG -handlers=console,debug,normal +handlers=console qualname=apimon propagate=0 [logger_gear] level=WARNING -handlers=normal +handlers=console qualname=gear [handler_console] diff --git a/etc/secure.yaml b/etc/secure.yaml index 92a5a99..91a13f9 100644 --- a/etc/secure.yaml +++ b/etc/secure.yaml @@ -1,4 +1,7 @@ --- +vault: + addr: http://localhost:8200 + token: "root" additional_vars: foo: bar statsd: diff --git a/requirements.txt b/requirements.txt index a60b707..8fa6a98 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ stevedore==3.3.0 #dogpile.cache==0.9.2 sqlalchemy psycopg2 +hvac==0.11.2 diff --git a/test-requirements.txt b/test-requirements.txt index 9feb387..c3e1f7f 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -2,5 +2,6 @@ stestr>=1.0.0 # Apache-2.0 testrepository>=0.0.18 # Apache-2.0/BSD testscenarios>=0.4 # Apache-2.0/BSD testtools>=2.2.0 # MIT +requests-mock # Apacke-2.0 mock flake8 diff --git a/tox.ini b/tox.ini index 8d311f6..5daaf58 100644 --- a/tox.ini +++ b/tox.ini @@ -1,7 +1,7 @@ [tox] -minversion = 3.6 +minversion = 3.8 skipsdist = True -envlist = pep8,py37 +envlist = pep8,py39 ignore_basepython_conflict = True [testenv] @@ -18,6 +18,12 @@ deps = commands = stestr run {posargs} stestr slowest +[testenv:functional] +setenv = + {[testenv]setenv} +commands = stestr --test-path ./apimon/tests/functional/ run --serial {posargs} + stestr slowest + [testenv:pep8] install_command = pip install {opts} {packages} commands =