-
Notifications
You must be signed in to change notification settings - Fork 114
/
Copy pathhandlers.py
341 lines (300 loc) · 14.9 KB
/
handlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
""" Elasticsearch logging handler
"""
import logging
import datetime
import socket
from threading import Timer, Lock
from enum import Enum
from elasticsearch import helpers as eshelpers
from elasticsearch import Elasticsearch, RequestsHttpConnection
try:
from requests_kerberos import HTTPKerberosAuth, DISABLED
CMR_KERBEROS_SUPPORTED = True
except ImportError:
CMR_KERBEROS_SUPPORTED = False
try:
from requests_aws4auth import AWS4Auth
AWS4AUTH_SUPPORTED = True
except ImportError:
AWS4AUTH_SUPPORTED = False
from cmreslogging.serializers import CMRESSerializer
class CMRESHandler(logging.Handler):
""" Elasticsearch log handler
Allows to log to elasticsearch into json format.
All LogRecord fields are serialised and inserted
"""
class AuthType(Enum):
""" Authentication types supported
The handler supports
- No authentication
- Basic authentication
- Kerberos or SSO authentication (on windows and linux)
"""
NO_AUTH = 0
BASIC_AUTH = 1
KERBEROS_AUTH = 2
AWS_SIGNED_AUTH = 3
class IndexNameFrequency(Enum):
""" Index type supported
the handler supports
- Daily indices
- Weekly indices
- Monthly indices
- Year indices
"""
DAILY = 0
WEEKLY = 1
MONTHLY = 2
YEARLY = 3
# Defaults for the class
__DEFAULT_ELASTICSEARCH_HOST = [{'host': 'localhost', 'port': 9200}]
__DEFAULT_AUTH_USER = ''
__DEFAULT_AUTH_PASSWD = ''
__DEFAULT_AWS_ACCESS_KEY = ''
__DEFAULT_AWS_SECRET_KEY = ''
__DEFAULT_AWS_REGION = ''
__DEFAULT_USE_SSL = False
__DEFAULT_VERIFY_SSL = True
__DEFAULT_AUTH_TYPE = AuthType.NO_AUTH
__DEFAULT_INDEX_FREQUENCY = IndexNameFrequency.DAILY
__DEFAULT_BUFFER_SIZE = 1000
__DEFAULT_FLUSH_FREQ_INSEC = 1
__DEFAULT_ADDITIONAL_FIELDS = {}
__DEFAULT_ES_INDEX_NAME = 'python_logger'
__DEFAULT_ES_DOC_TYPE = 'python_log'
__DEFAULT_RAISE_ON_EXCEPTION = False
__DEFAULT_TIMESTAMP_FIELD_NAME = "timestamp"
__LOGGING_FILTER_FIELDS = ['msecs',
'relativeCreated',
'levelno',
'created']
@staticmethod
def _get_daily_index_name(es_index_name):
""" Returns elasticearch index name
:param: index_name the prefix to be used in the index
:return: A srting containing the elasticsearch indexname used which should include the date.
"""
return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m.%d'))
@staticmethod
def _get_weekly_index_name(es_index_name):
""" Return elasticsearch index name
:param: index_name the prefix to be used in the index
:return: A srting containing the elasticsearch indexname used which should include the date and specific week
"""
current_date = datetime.datetime.now()
start_of_the_week = current_date - datetime.timedelta(days=current_date.weekday())
return "{0!s}-{1!s}".format(es_index_name, start_of_the_week.strftime('%Y.%m.%d'))
@staticmethod
def _get_monthly_index_name(es_index_name):
""" Return elasticsearch index name
:param: index_name the prefix to be used in the index
:return: A srting containing the elasticsearch indexname used which should include the date and specific moth
"""
return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y.%m'))
@staticmethod
def _get_yearly_index_name(es_index_name):
""" Return elasticsearch index name
:param: index_name the prefix to be used in the index
:return: A srting containing the elasticsearch indexname used which should include the date and specific year
"""
return "{0!s}-{1!s}".format(es_index_name, datetime.datetime.now().strftime('%Y'))
_INDEX_FREQUENCY_FUNCION_DICT = {
IndexNameFrequency.DAILY: _get_daily_index_name,
IndexNameFrequency.WEEKLY: _get_weekly_index_name,
IndexNameFrequency.MONTHLY: _get_monthly_index_name,
IndexNameFrequency.YEARLY: _get_yearly_index_name
}
def __init__(self,
hosts=__DEFAULT_ELASTICSEARCH_HOST,
auth_details=(__DEFAULT_AUTH_USER, __DEFAULT_AUTH_PASSWD),
aws_access_key=__DEFAULT_AWS_ACCESS_KEY,
aws_secret_key=__DEFAULT_AWS_SECRET_KEY,
aws_region=__DEFAULT_AWS_REGION,
auth_type=__DEFAULT_AUTH_TYPE,
use_ssl=__DEFAULT_USE_SSL,
verify_ssl=__DEFAULT_VERIFY_SSL,
buffer_size=__DEFAULT_BUFFER_SIZE,
flush_frequency_in_sec=__DEFAULT_FLUSH_FREQ_INSEC,
es_index_name=__DEFAULT_ES_INDEX_NAME,
index_name_frequency=__DEFAULT_INDEX_FREQUENCY,
es_doc_type=__DEFAULT_ES_DOC_TYPE,
es_additional_fields=__DEFAULT_ADDITIONAL_FIELDS,
raise_on_indexing_exceptions=__DEFAULT_RAISE_ON_EXCEPTION,
default_timestamp_field_name=__DEFAULT_TIMESTAMP_FIELD_NAME):
""" Handler constructor
:param hosts: The list of hosts that elasticsearch clients will connect. The list can be provided
in the format ```[{'host':'host1','port':9200}, {'host':'host2','port':9200}]``` to
make sure the client supports failover of one of the instertion nodes
:param auth_details: When ```CMRESHandler.AuthType.BASIC_AUTH``` is used this argument must contain
a tuple of string with the user and password that will be used to authenticate against
the Elasticsearch servers, for example```('User','Password')
:param aws_access_key: When ```CMRESHandler.AuthType.AWS_SIGNED_AUTH``` is used this argument must contain
the AWS key id of the the AWS IAM user
:param aws_secret_key: When ```CMRESHandler.AuthType.AWS_SIGNED_AUTH``` is used this argument must contain
the AWS secret key of the the AWS IAM user
:param aws_region: When ```CMRESHandler.AuthType.AWS_SIGNED_AUTH``` is used this argument must contain
the AWS region of the the AWS Elasticsearch servers, for example```'us-east'
:param auth_type: The authentication type to be used in the connection ```CMRESHandler.AuthType```
Currently, NO_AUTH, BASIC_AUTH, KERBEROS_AUTH are supported
:param use_ssl: A boolean that defines if the communications should use SSL encrypted communication
:param verify_ssl: A boolean that defines if the SSL certificates are validated or not
:param buffer_size: An int, Once this size is reached on the internal buffer results are flushed into ES
:param flush_frequency_in_sec: A float representing how often and when the buffer will be flushed, even
if the buffer_size has not been reached yet
:param es_index_name: A string with the prefix of the elasticsearch index that will be created. Note a
date with YYYY.MM.dd, ```python_logger``` used by default
:param index_name_frequency: Defines what the date used in the postfix of the name would be. available values
are selected from the IndexNameFrequency class (IndexNameFrequency.DAILY,
IndexNameFrequency.WEEKLY, IndexNameFrequency.MONTHLY, IndexNameFrequency.YEARLY). By default
it uses daily indices.
:param es_doc_type: A string with the name of the document type that will be used ```python_log``` used
by default
:param es_additional_fields: A dictionary with all the additional fields that you would like to add
to the logs, such the application, environment, etc.
:param raise_on_indexing_exceptions: A boolean, True only for debugging purposes to raise exceptions
caused when
:return: A ready to be used CMRESHandler.
"""
logging.Handler.__init__(self)
self.hosts = hosts
self.auth_details = auth_details
self.aws_access_key = aws_access_key
self.aws_secret_key = aws_secret_key
self.aws_region = aws_region
self.auth_type = auth_type
self.use_ssl = use_ssl
self.verify_certs = verify_ssl
self.buffer_size = buffer_size
self.flush_frequency_in_sec = flush_frequency_in_sec
self.es_index_name = es_index_name
self.index_name_frequency = index_name_frequency
self.es_doc_type = es_doc_type
self.es_additional_fields = es_additional_fields.copy()
self.es_additional_fields.update({'host': socket.gethostname(),
'host_ip': socket.gethostbyname(socket.gethostname())})
self.raise_on_indexing_exceptions = raise_on_indexing_exceptions
self.default_timestamp_field_name = default_timestamp_field_name
self._client = None
self._buffer = []
self._buffer_lock = Lock()
self._timer = None
self._index_name_func = CMRESHandler._INDEX_FREQUENCY_FUNCION_DICT[self.index_name_frequency]
self.serializer = CMRESSerializer()
def __schedule_flush(self):
if self._timer is None:
self._timer = Timer(self.flush_frequency_in_sec, self.flush)
self._timer.setDaemon(True)
self._timer.start()
def __get_es_client(self):
if self.auth_type == CMRESHandler.AuthType.NO_AUTH:
if self._client is None:
self._client = Elasticsearch(hosts=self.hosts,
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
connection_class=RequestsHttpConnection,
serializer=self.serializer)
return self._client
if self.auth_type == CMRESHandler.AuthType.BASIC_AUTH:
if self._client is None:
return Elasticsearch(hosts=self.hosts,
http_auth=self.auth_details,
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
connection_class=RequestsHttpConnection,
serializer=self.serializer)
return self._client
if self.auth_type == CMRESHandler.AuthType.KERBEROS_AUTH:
if not CMR_KERBEROS_SUPPORTED:
raise EnvironmentError("Kerberos module not available. Please install \"requests-kerberos\"")
# For kerberos we return a new client each time to make sure the tokens are up to date
return Elasticsearch(hosts=self.hosts,
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
connection_class=RequestsHttpConnection,
http_auth=HTTPKerberosAuth(mutual_authentication=DISABLED),
serializer=self.serializer)
if self.auth_type == CMRESHandler.AuthType.AWS_SIGNED_AUTH:
if not AWS4AUTH_SUPPORTED:
raise EnvironmentError("AWS4Auth not available. Please install \"requests-aws4auth\"")
if self._client is None:
awsauth = AWS4Auth(self.aws_access_key, self.aws_secret_key, self.aws_region, 'es')
self._client = Elasticsearch(
hosts=self.hosts,
http_auth=awsauth,
use_ssl=self.use_ssl,
verify_certs=True,
connection_class=RequestsHttpConnection,
serializer=self.serializer
)
return self._client
raise ValueError("Authentication method not supported")
def test_es_source(self):
""" Returns True if the handler can ping the Elasticsearch servers
Can be used to confirm the setup of a handler has been properly done and confirm
that things like the authentication is working properly
:return: A boolean, True if the connection against elasticserach host was successful
"""
return self.__get_es_client().ping()
@staticmethod
def __get_es_datetime_str(timestamp):
""" Returns elasticsearch utc formatted time for an epoch timestamp
:param timestamp: epoch, including milliseconds
:return: A string valid for elasticsearch time record
"""
current_date = datetime.datetime.utcfromtimestamp(timestamp)
return "{0!s}.{1:03d}Z".format(current_date.strftime('%Y-%m-%dT%H:%M:%S'), int(current_date.microsecond / 1000))
def flush(self):
""" Flushes the buffer into ES
:return: None
"""
if self._timer is not None and self._timer.is_alive():
self._timer.cancel()
self._timer = None
if self._buffer:
try:
with self._buffer_lock:
logs_buffer = self._buffer
self._buffer = []
actions = (
{
'_index': self._index_name_func.__func__(self.es_index_name),
# '_type': self.es_doc_type, #Remove to resolve Issue #76 (ElasticsearchDeprecationWarning: [types removal] Specifying types in bulk requests is deprecated.)
'_source': log_record
}
for log_record in logs_buffer
)
eshelpers.bulk(
client=self.__get_es_client(),
actions=actions,
stats_only=True
)
except Exception as exception:
if self.raise_on_indexing_exceptions:
raise exception
def close(self):
""" Flushes the buffer and release any outstanding resource
:return: None
"""
if self._timer is not None:
self.flush()
self._timer = None
def emit(self, record):
""" Emit overrides the abstract logging.Handler logRecord emit method
Format and records the log
:param record: A class of type ```logging.LogRecord```
:return: None
"""
self.format(record)
rec = self.es_additional_fields.copy()
for key, value in record.__dict__.items():
if key not in CMRESHandler.__LOGGING_FILTER_FIELDS:
if key == "args":
value = tuple(str(arg) for arg in value)
rec[key] = "" if value is None else value
rec[self.default_timestamp_field_name] = self.__get_es_datetime_str(record.created)
with self._buffer_lock:
self._buffer.append(rec)
if len(self._buffer) >= self.buffer_size:
self.flush()
else:
self.__schedule_flush()