diff --git a/cmreslogging/handlers.py b/cmreslogging/handlers.py index 52e250a..4935f44 100644 --- a/cmreslogging/handlers.py +++ b/cmreslogging/handlers.py @@ -11,12 +11,14 @@ try: from requests_kerberos import HTTPKerberosAuth, DISABLED + CMR_KERBEROS_SUPPORTED = True except ImportError: CMR_KERBEROS_SUPPORTED = False try: from requests_aws4auth import AWS4Auth + AWS4AUTH_SUPPORTED = True except ImportError: AWS4AUTH_SUPPORTED = False @@ -25,45 +27,47 @@ class CMRESHandler(logging.Handler): - """ Elasticsearch log handler + """Elasticsearch log handler Allows to log to elasticsearch into json format. All LogRecord fields are serialised and inserted """ class AuthType(Enum): - """ Authentication types supported + """Authentication types supported The handler supports - No authentication - Basic authentication - Kerberos or SSO authentication (on windows and linux) """ + NO_AUTH = 0 BASIC_AUTH = 1 KERBEROS_AUTH = 2 AWS_SIGNED_AUTH = 3 class IndexNameFrequency(Enum): - """ Index type supported + """Index type supported the handler supports - Daily indices - Weekly indices - Monthly indices - Year indices """ + DAILY = 0 WEEKLY = 1 MONTHLY = 2 YEARLY = 3 # Defaults for the class - __DEFAULT_ELASTICSEARCH_HOST = [{'host': 'localhost', 'port': 9200}] - __DEFAULT_AUTH_USER = '' - __DEFAULT_AUTH_PASSWD = '' - __DEFAULT_AWS_ACCESS_KEY = '' - __DEFAULT_AWS_SECRET_KEY = '' - __DEFAULT_AWS_REGION = '' + __DEFAULT_ELASTICSEARCH_HOST = [{"host": "localhost", "port": 9200}] + __DEFAULT_AUTH_USER = "" + __DEFAULT_AUTH_PASSWD = "" + __DEFAULT_AWS_ACCESS_KEY = "" + __DEFAULT_AWS_SECRET_KEY = "" + __DEFAULT_AWS_REGION = "" __DEFAULT_USE_SSL = False __DEFAULT_VERIFY_SSL = True __DEFAULT_AUTH_TYPE = AuthType.NO_AUTH @@ -71,75 +75,85 @@ class IndexNameFrequency(Enum): __DEFAULT_BUFFER_SIZE = 1000 __DEFAULT_FLUSH_FREQ_INSEC = 1 __DEFAULT_ADDITIONAL_FIELDS = {} - __DEFAULT_ES_INDEX_NAME = 'python_logger' - __DEFAULT_ES_DOC_TYPE = 'python_log' + __DEFAULT_ES_INDEX_NAME = "python_logger" + __DEFAULT_ES_DOC_TYPE = "python_log" __DEFAULT_RAISE_ON_EXCEPTION = False __DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp" - __LOGGING_FILTER_FIELDS = ['msecs', - 'relativeCreated', - 'levelno', - 'created'] + __LOGGING_FILTER_FIELDS = ["msecs", "relativeCreated", "levelno", "created"] @staticmethod def _get_daily_index_name(es_index_name): - """ Returns elasticearch index name + """Returns elasticearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date. """ - return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m.%d')) + return "{0!s}-{1!s}".format( + es_index_name, datetime.datetime.now().strftime("%Y.%m.%d") + ) @staticmethod def _get_weekly_index_name(es_index_name): - """ Return elasticsearch index name + """Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific week """ current_date = datetime.datetime.now() - start_of_the_week = current_date - datetime.timedelta(days=current_date.weekday()) - return "{0!s}-{1!s}".format(es_index_name, start_of_the_week.strftime('%Y.%m.%d')) + start_of_the_week = current_date - datetime.timedelta( + days=current_date.weekday() + ) + return "{0!s}-{1!s}".format( + es_index_name, start_of_the_week.strftime("%Y.%m.%d") + ) @staticmethod def _get_monthly_index_name(es_index_name): - """ Return elasticsearch index name + """Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific moth """ - return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m')) + return "{0!s}-{1!s}".format( + es_index_name, datetime.datetime.now().strftime("%Y.%m") + ) @staticmethod def _get_yearly_index_name(es_index_name): - """ Return elasticsearch index name + """Return elasticsearch index name :param: index_name the prefix to be used in the index :return: A srting containing the elasticsearch indexname used which should include the date and specific year """ - return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y')) + return "{0!s}-{1!s}".format( + es_index_name, datetime.datetime.now().strftime("%Y") + ) _INDEX_FREQUENCY_FUNCION_DICT = { IndexNameFrequency.DAILY: _get_daily_index_name, IndexNameFrequency.WEEKLY: _get_weekly_index_name, IndexNameFrequency.MONTHLY: _get_monthly_index_name, - IndexNameFrequency.YEARLY: _get_yearly_index_name + IndexNameFrequency.YEARLY: _get_yearly_index_name, } - def __init__(self, - hosts=__DEFAULT_ELASTICSEARCH_HOST, - auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD), - aws_access_key=__DEFAULT_AWS_ACCESS_KEY, - aws_secret_key=__DEFAULT_AWS_SECRET_KEY, - aws_region=__DEFAULT_AWS_REGION, - auth_type=__DEFAULT_AUTH_TYPE, - use_ssl=__DEFAULT_USE_SSL, - verify_ssl=__DEFAULT_VERIFY_SSL, - buffer_size=__DEFAULT_BUFFER_SIZE, - flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC, - es_index_name=__DEFAULT_ES_INDEX_NAME, - index_name_frequency=__DEFAULT_INDEX_FREQUENCY, - es_doc_type=__DEFAULT_ES_DOC_TYPE, - es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS, - raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION, - default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME): - """ Handler constructor + def __init__( + self, + hosts=__DEFAULT_ELASTICSEARCH_HOST, + auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD), + aws_access_key=__DEFAULT_AWS_ACCESS_KEY, + aws_secret_key=__DEFAULT_AWS_SECRET_KEY, + aws_region=__DEFAULT_AWS_REGION, + auth_type=__DEFAULT_AUTH_TYPE, + use_ssl=__DEFAULT_USE_SSL, + verify_ssl=__DEFAULT_VERIFY_SSL, + buffer_size=__DEFAULT_BUFFER_SIZE, + flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC, + es_index_name=__DEFAULT_ES_INDEX_NAME, + index_name_frequency=__DEFAULT_INDEX_FREQUENCY, + es_doc_type=__DEFAULT_ES_DOC_TYPE, + es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS, + raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION, + default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME, + disabled_fields: tuple = (), + ): + """Handler constructor :param hosts: The list of hosts that elasticsearch clients will connect. The list can be provided in the format ```[{'host':'host1','port':9200}, {'host':'host2','port':9200}]``` to @@ -172,6 +186,7 @@ def __init__(self, to the logs, such the application, environment, etc. :param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptions caused when + :param disabled_fields: A tuple, by default empty, but if you want to remove any key , put here :return: A ready to be used CMRESHandler. """ logging.Handler.__init__(self) @@ -190,8 +205,12 @@ def __init__(self, self.index_name_frequency = index_name_frequency self.es_doc_type = es_doc_type self.es_additional_fields = es_additional_fields.copy() - self.es_additional_fields.update({'host': socket.gethostname(), - 'host_ip': socket.gethostbyname(socket.gethostname())}) + self.disabled_fields = disabled_fields + + # disable it from elasticsearch log + # self.es_additional_fields.update({'host': socket.gethostname(), + # 'host_ip': socket.gethostbyname(socket.gethostname())}) + self.raise_on_indexing_exceptions = raise_on_indexing_exceptions self.default_timestamp_field_name = default_timestamp_field_name @@ -199,7 +218,9 @@ def __init__(self, self._buffer = [] self._buffer_lock = Lock() self._timer = None - self._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[self.index_name_frequency] + self._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[ + self.index_name_frequency + ] self.serializer = CMRESSerializer() def __schedule_flush(self): @@ -211,53 +232,65 @@ def __schedule_flush(self): def __get_es_client(self): if self.auth_type == CMRESHandler.AuthType.NO_AUTH: if self._client is None: - self._client = Elasticsearch(hosts=self.hosts, - use_ssl=self.use_ssl, - verify_certs=self.verify_certs, - connection_class=RequestsHttpConnection, - serializer=self.serializer) + self._client = Elasticsearch( + hosts=self.hosts, + use_ssl=self.use_ssl, + verify_certs=self.verify_certs, + connection_class=RequestsHttpConnection, + serializer=self.serializer, + ) return self._client if self.auth_type == CMRESHandler.AuthType.BASIC_AUTH: if self._client is None: - return Elasticsearch(hosts=self.hosts, - http_auth=self.auth_details, - use_ssl=self.use_ssl, - verify_certs=self.verify_certs, - connection_class=RequestsHttpConnection, - serializer=self.serializer) + return Elasticsearch( + hosts=self.hosts, + http_auth=self.auth_details, + use_ssl=self.use_ssl, + verify_certs=self.verify_certs, + connection_class=RequestsHttpConnection, + serializer=self.serializer, + ) return self._client if self.auth_type == CMRESHandler.AuthType.KERBEROS_AUTH: if not CMR_KERBEROS_SUPPORTED: - raise EnvironmentError("Kerberos module not available. Please install \"requests-kerberos\"") + raise EnvironmentError( + 'Kerberos module not available. Please install "requests-kerberos"' + ) # For kerberos we return a new client each time to make sure the tokens are up to date - return Elasticsearch(hosts=self.hosts, - use_ssl=self.use_ssl, - verify_certs=self.verify_certs, - connection_class=RequestsHttpConnection, - http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED), - serializer=self.serializer) + return Elasticsearch( + hosts=self.hosts, + use_ssl=self.use_ssl, + verify_certs=self.verify_certs, + connection_class=RequestsHttpConnection, + http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED), + serializer=self.serializer, + ) if self.auth_type == CMRESHandler.AuthType.AWS_SIGNED_AUTH: if not AWS4AUTH_SUPPORTED: - raise EnvironmentError("AWS4Auth not available. Please install \"requests-aws4auth\"") + raise EnvironmentError( + 'AWS4Auth not available. Please install "requests-aws4auth"' + ) if self._client is None: - awsauth = AWS4Auth(self.aws_access_key, self.aws_secret_key, self.aws_region, 'es') + awsauth = AWS4Auth( + self.aws_access_key, self.aws_secret_key, self.aws_region, "es" + ) self._client = Elasticsearch( hosts=self.hosts, http_auth=awsauth, use_ssl=self.use_ssl, verify_certs=True, connection_class=RequestsHttpConnection, - serializer=self.serializer + serializer=self.serializer, ) return self._client raise ValueError("Authentication method not supported") def test_es_source(self): - """ Returns True if the handler can ping the Elasticsearch servers + """Returns True if the handler can ping the Elasticsearch servers Can be used to confirm the setup of a handler has been properly done and confirm that things like the authentication is working properly @@ -268,16 +301,26 @@ def test_es_source(self): @staticmethod def __get_es_datetime_str(timestamp): - """ Returns elasticsearch utc formatted time for an epoch timestamp + """Returns elasticsearch utc formatted time for an epoch timestamp :param timestamp: epoch, including milliseconds :return: A string valid for elasticsearch time record """ current_date = datetime.datetime.utcfromtimestamp(timestamp) - return "{0!s}.{1:03d}Z".format(current_date.strftime('%Y-%m-%dT%H:%M:%S'), int(current_date.microsecond / 1000)) + return "{0!s}.{1:03d}Z".format( + current_date.strftime("%Y-%m-%dT%H:%M:%S"), + int(current_date.microsecond / 1000), + ) + + def del_all(self, mapping): + """Remove list of elements from mapping. + :return: list""" + for key in self.disabled_fields: + del mapping[key] + return mapping def flush(self): - """ Flushes the buffer into ES + """Flushes the buffer into ES :return: None """ if self._timer is not None and self._timer.is_alive(): @@ -287,27 +330,29 @@ def flush(self): if self._buffer: try: with self._buffer_lock: - logs_buffer = self._buffer + if not self.disabled_fields: + logs_buffer = self._buffer + else: + logs_buffer = [self.del_all(i) for i in self._buffer] + self._buffer = [] actions = ( { - '_index': self._index_name_func.__func__(self.es_index_name), - '_type': self.es_doc_type, - '_source': log_record + "_index": self._index_name_func.__func__(self.es_index_name), + "_type": self.es_doc_type, + "_source": log_record, } for log_record in logs_buffer ) eshelpers.bulk( - client=self.__get_es_client(), - actions=actions, - stats_only=True + client=self.__get_es_client(), actions=actions, stats_only=True ) except Exception as exception: if self.raise_on_indexing_exceptions: raise exception def close(self): - """ Flushes the buffer and release any outstanding resource + """Flushes the buffer and release any outstanding resource :return: None """ @@ -316,7 +361,7 @@ def close(self): self._timer = None def emit(self, record): - """ Emit overrides the abstract logging.Handler logRecord emit method + """Emit overrides the abstract logging.Handler logRecord emit method Format and records the log @@ -331,7 +376,9 @@ def emit(self, record): if key == "args": value = tuple(str(arg) for arg in value) rec[key] = "" if value is None else value - rec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created) + rec[self.default_timestamp_field_name] = self.__get_es_datetime_str( + record.created + ) with self._buffer_lock: self._buffer.append(rec) diff --git a/setup.py b/setup.py index 08baacc..bb0c350 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='1.0.0', + version='1.0.1', description='Elasticsearch Log handler for the logging library', long_description=long_description,