diff --git a/cmreslogging/client_es.py b/cmreslogging/client_es.py new file mode 100644 index 0000000..6b98cc4 --- /dev/null +++ b/cmreslogging/client_es.py @@ -0,0 +1,125 @@ +from elasticsearch import Elasticsearch, RequestsHttpConnection + +MSG_KERBEROS = "Kerberos module not available. Please install \"requests-kerberos\"" +MSG_AWS = "AWS4Auth not available. Please install \"requests-aws4auth\"" +try: + from requests_kerberos import HTTPKerberosAuth, DISABLED + CMR_KERBEROS_SUPPORTED = True +except ImportError: + CMR_KERBEROS_SUPPORTED = False + +try: + from requests_aws4auth import AWS4Auth + AWS4AUTH_SUPPORTED = True +except ImportError: + AWS4AUTH_SUPPORTED = False + +NO_AUTH = 0 +BASIC_AUTH = 1 +KERBEROS_AUTH = 2 +AWS_SIGNED_AUTH = 3 + + +class ClientElasticSearch: + + def __init__(self, cmrs_handler): + self._cmrs_handler = cmrs_handler + + @staticmethod + def _validation_environment_error(value, msg_error): + if not value: + raise EnvironmentError(msg_error) + + +class ClientNotAuth(ClientElasticSearch): + TYPE_CLIENT = NO_AUTH + + def __init__(self, cmrs_handler): + ClientElasticSearch.__init__(self, cmrs_handler) + + def get(self): + if self._cmrs_handler._client is None: + self._cmrs_handler._client = Elasticsearch(hosts=self._cmrs_handler.hosts, + use_ssl=self._cmrs_handler.use_ssl, + verify_certs=self._cmrs_handler.verify_certs, + connection_class=RequestsHttpConnection, + serializer=self._cmrs_handler.serializer) + + return self._cmrs_handler._client + + +class ClientBasicAuth(ClientElasticSearch): + TYPE_CLIENT = BASIC_AUTH + + def __init__(self, cmrs_handler): + ClientElasticSearch.__init__(self, cmrs_handler) + + def get(self): + + if self._cmrs_handler._client is None: + self._cmrs_handler._client = Elasticsearch(hosts=self._cmrs_handler.hosts, + http_auth=self._cmrs_handler.auth_details, + use_ssl=self._cmrs_handler.use_ssl, + verify_certs=self._cmrs_handler.verify_certs, + connection_class=RequestsHttpConnection, + serializer=self._cmrs_handler.serializer) + + return self._cmrs_handler._client + + +class ClientKerberos(ClientElasticSearch): + TYPE_CLIENT = KERBEROS_AUTH + + def __init__(self, cmrs_handler): + ClientElasticSearch.__init__(self, cmrs_handler) + + def get(self): + self._validation_environment_error(CMR_KERBEROS_SUPPORTED, MSG_KERBEROS) + + # For kerberos we return a new client each time to make sure the tokens are up to date + return Elasticsearch(hosts=self._cmrs_handler.hosts, + use_ssl=self._cmrs_handler.use_ssl, + verify_certs=self._cmrs_handler.verify_certs, + connection_class=RequestsHttpConnection, + http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED), + serializer=self._cmrs_handler.serializer) + + +class ClientAmazon(ClientElasticSearch): + TYPE_CLIENT = AWS_SIGNED_AUTH + + def __init__(self, cmrs_handler): + ClientElasticSearch.__init__(self, cmrs_handler) + + def get(self): + + self._validation_environment_error(AWS4AUTH_SUPPORTED, MSG_AWS) + if self._cmrs_handler._client is None: + awsauth = AWS4Auth(self._cmrs_handler.aws_access_key, + self._cmrs_handler.aws_secret_key, + self._cmrs_handler.aws_region, 'es') + self._cmrs_handler._client = Elasticsearch( + hosts=self._cmrs_handler.hosts, + http_auth=awsauth, + use_ssl=self._cmrs_handler.use_ssl, + verify_certs=True, + connection_class=RequestsHttpConnection, + serializer=self._cmrs_handler.serializer) + + return self._cmrs_handler._client + + +class FactoryClientES: + + CLIENTS = {NO_AUTH: ClientNotAuth, + AWS_SIGNED_AUTH: ClientAmazon, + BASIC_AUTH: ClientBasicAuth, + KERBEROS_AUTH: ClientKerberos} + + @staticmethod + def get_client(cmrs_handler): + klass = FactoryClientES.CLIENTS.get(cmrs_handler.auth_type.value) + if not klass: + raise ValueError("Authentication method not supported") + + return klass(cmrs_handler) diff --git a/cmreslogging/handlers.py b/cmreslogging/handlers.py index 52e250a..4600ead 100644 --- a/cmreslogging/handlers.py +++ b/cmreslogging/handlers.py @@ -1,28 +1,19 @@ """ Elasticsearch logging handler """ -import logging import datetime +import logging import socket -from threading import Timer, Lock from enum import Enum -from elasticsearch import helpers as eshelpers -from elasticsearch import Elasticsearch, RequestsHttpConnection - -try: - from requests_kerberos import HTTPKerberosAuth, DISABLED - CMR_KERBEROS_SUPPORTED = True -except ImportError: - CMR_KERBEROS_SUPPORTED = False +from threading import Lock, Timer -try: - from requests_aws4auth import AWS4Auth - AWS4AUTH_SUPPORTED = True -except ImportError: - AWS4AUTH_SUPPORTED = False +from elasticsearch import helpers as eshelpers from cmreslogging.serializers import CMRESSerializer +from .client_es import (AWS_SIGNED_AUTH, BASIC_AUTH, KERBEROS_AUTH, NO_AUTH, + FactoryClientES) + class CMRESHandler(logging.Handler): """ Elasticsearch log handler @@ -39,10 +30,10 @@ class AuthType(Enum): - Basic authentication - Kerberos or SSO authentication (on windows and linux) """ - NO_AUTH = 0 - BASIC_AUTH = 1 - KERBEROS_AUTH = 2 - AWS_SIGNED_AUTH = 3 + NO_AUTH = NO_AUTH + BASIC_AUTH = BASIC_AUTH + KERBEROS_AUTH = KERBEROS_AUTH + AWS_SIGNED_AUTH = AWS_SIGNED_AUTH class IndexNameFrequency(Enum): """ Index type supported @@ -209,52 +200,8 @@ def __schedule_flush(self): self._timer.start() def __get_es_client(self): - if self.auth_type == CMRESHandler.AuthType.NO_AUTH: - if self._client is None: - self._client = Elasticsearch(hosts=self.hosts, - use_ssl=self.use_ssl, - verify_certs=self.verify_certs, - connection_class=RequestsHttpConnection, - serializer=self.serializer) - return self._client - - if self.auth_type == CMRESHandler.AuthType.BASIC_AUTH: - if self._client is None: - return Elasticsearch(hosts=self.hosts, - http_auth=self.auth_details, - use_ssl=self.use_ssl, - verify_certs=self.verify_certs, - connection_class=RequestsHttpConnection, - serializer=self.serializer) - return self._client - - if self.auth_type == CMRESHandler.AuthType.KERBEROS_AUTH: - if not CMR_KERBEROS_SUPPORTED: - raise EnvironmentError("Kerberos module not available. Please install \"requests-kerberos\"") - # For kerberos we return a new client each time to make sure the tokens are up to date - return Elasticsearch(hosts=self.hosts, - use_ssl=self.use_ssl, - verify_certs=self.verify_certs, - connection_class=RequestsHttpConnection, - http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED), - serializer=self.serializer) - - if self.auth_type == CMRESHandler.AuthType.AWS_SIGNED_AUTH: - if not AWS4AUTH_SUPPORTED: - raise EnvironmentError("AWS4Auth not available. Please install \"requests-aws4auth\"") - if self._client is None: - awsauth = AWS4Auth(self.aws_access_key, self.aws_secret_key, self.aws_region, 'es') - self._client = Elasticsearch( - hosts=self.hosts, - http_auth=awsauth, - use_ssl=self.use_ssl, - verify_certs=True, - connection_class=RequestsHttpConnection, - serializer=self.serializer - ) - return self._client - raise ValueError("Authentication method not supported") + return FactoryClientES.get_client(cmrs_handler=self).get() def test_es_source(self): """ Returns True if the handler can ping the Elasticsearch servers diff --git a/tests/test_client_es.py b/tests/test_client_es.py new file mode 100644 index 0000000..ade99dd --- /dev/null +++ b/tests/test_client_es.py @@ -0,0 +1,53 @@ +import os +import sys +import unittest +from enum import Enum + +from cmreslogging.client_es import (ClientAmazon, ClientBasicAuth, + ClientKerberos, ClientNotAuth, + FactoryClientES) +from cmreslogging.handlers import CMRESHandler + +sys.path.insert(0, os.path.abspath('.')) + + +class AuthTypeFake(Enum): + NO_AUTH = 10 + + +class ClientESTestCase(unittest.TestCase): + + def crms_handler(self, type_client_es): + return CMRESHandler(hosts=[{'host': 'localhost', 'port': 9200}], + auth_type=type_client_es, + es_index_name="my_python_index", + es_additional_fields={'App': 'MyAppName', 'Environment': 'Dev'}) + + def test_get_client_no_auth(self): + cmr_handler = self.crms_handler(CMRESHandler.AuthType.NO_AUTH) + client = FactoryClientES.get_client(cmr_handler) + self.assertIsInstance(client, ClientNotAuth) + + def test_get_client_auth_basic(self): + cmr_handler = self.crms_handler(CMRESHandler.AuthType.BASIC_AUTH) + client = FactoryClientES.get_client(cmr_handler) + self.assertIsInstance(client, ClientBasicAuth) + + def test_get_client_amazon(self): + cmr_handler = self.crms_handler(CMRESHandler.AuthType.AWS_SIGNED_AUTH) + client = FactoryClientES.get_client(cmr_handler) + self.assertIsInstance(client, ClientAmazon) + + def test_get_client_kerberos(self): + cmr_handler = self.crms_handler(CMRESHandler.AuthType.KERBEROS_AUTH) + client = FactoryClientES.get_client(cmr_handler) + self.assertIsInstance(client, ClientKerberos) + + def test_execption_value_error(self): + cmr_handler = self.crms_handler(AuthTypeFake.NO_AUTH) + with self.assertRaises(ValueError): + FactoryClientES.get_client(cmr_handler) + + +if __name__ == '__main__': + unittest.main() diff --git a/tox.ini b/tox.ini index a0d2466..5188acd 100644 --- a/tox.ini +++ b/tox.ini @@ -22,6 +22,7 @@ commands = # pylint ./cmreslogging -r n --files-output=y '--msg-template="\{path\}:\{line\}: [\{msg_id\}(\{symbol\}), \{obj\}] \{msg\}"' coverage erase coverage run -a --source=./cmreslogging --branch tests/test_cmreshandler.py + coverage run -a --source=./cmreslogging --branch tests/test_client_es.py coverage run -a --source=./cmreslogging --branch tests/test_cmresserializer.py coverage xml -i coverage html