|
| 1 | +from threading import Thread |
| 2 | +from datetime import datetime, timedelta |
| 3 | +from eventlet import greenthread |
| 4 | +import Queue |
| 5 | +import socket |
| 6 | +import time |
| 7 | +import pytz |
| 8 | +import pika |
| 9 | +import redis |
| 10 | +import json |
| 11 | +import copy |
| 12 | +import os |
| 13 | + |
| 14 | + |
| 15 | +SRC_METRIC_PATH = os.path.join("/opt", "crystal", "workload_metrics") |
| 16 | +DST_METRIC_PATH = os.path.abspath(__file__).rsplit('/', 1)[0]+'/metrics' |
| 17 | + |
| 18 | + |
| 19 | +class Singleton(type): |
| 20 | + _instances = {} |
| 21 | + |
| 22 | + def __call__(cls, *args, **kwargs): # @NoSelf |
| 23 | + if cls not in cls._instances: |
| 24 | + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) |
| 25 | + return cls._instances[cls] |
| 26 | + |
| 27 | + |
| 28 | +class CrystalMetricControl(object): |
| 29 | + __metaclass__ = Singleton |
| 30 | + |
| 31 | + def __init__(self, conf, log): |
| 32 | + self.logger = log |
| 33 | + self.conf = conf |
| 34 | + |
| 35 | + self.status_thread = NodeStatusThread(self.conf, self.logger) |
| 36 | + self.status_thread.daemon = True |
| 37 | + self.status_thread.start() |
| 38 | + |
| 39 | + self.control_thread = ControlThread(self.conf, self.logger) |
| 40 | + self.control_thread.daemon = True |
| 41 | + |
| 42 | + self.publish_thread = PublishThread(self.conf, self.logger) |
| 43 | + self.publish_thread.daemon = True |
| 44 | + |
| 45 | + self.threads_started = False |
| 46 | + |
| 47 | + def get_metrics(self): |
| 48 | + return self.control_thread.metric_list |
| 49 | + |
| 50 | + def publish_stateful_metric(self, routing_key, data): |
| 51 | + self.publish_thread.publish_statefull(routing_key, data) |
| 52 | + |
| 53 | + def publish_stateless_metric(self, routing_key, data): |
| 54 | + self.publish_thread.publish_stateless(routing_key, data) |
| 55 | + |
| 56 | + def force_publish_metric(self, routing_key, data): |
| 57 | + self.publish_thread.force_publish_metric(routing_key, data) |
| 58 | + |
| 59 | + |
| 60 | +class PublishThread(Thread): |
| 61 | + |
| 62 | + def __init__(self, conf, logger): |
| 63 | + Thread.__init__(self) |
| 64 | + |
| 65 | + self.logger = logger |
| 66 | + self.monitoring_statefull_data = dict() |
| 67 | + self.monitoring_stateless_data = dict() |
| 68 | + self.messages_to_send = Queue.Queue() |
| 69 | + |
| 70 | + self.interval = conf.get('publish_interval', 0.995) |
| 71 | + # self.ip = conf.get('bind_ip')+":"+conf.get('bind_port') |
| 72 | + self.host_name = socket.gethostname() |
| 73 | + self.exchange = conf.get('exchange', 'amq.topic') |
| 74 | + |
| 75 | + rabbit_host = conf.get('rabbit_host') |
| 76 | + rabbit_port = int(conf.get('rabbit_port')) |
| 77 | + rabbit_user = conf.get('rabbit_username') |
| 78 | + rabbit_pass = conf.get('rabbit_password') |
| 79 | + |
| 80 | + credentials = pika.PlainCredentials(rabbit_user, rabbit_pass) |
| 81 | + self.parameters = pika.ConnectionParameters(host=rabbit_host, |
| 82 | + port=rabbit_port, |
| 83 | + credentials=credentials) |
| 84 | + |
| 85 | + def publish_statefull(self, metric_name, data): |
| 86 | + if metric_name not in self.monitoring_statefull_data: |
| 87 | + self.monitoring_statefull_data[metric_name] = dict() |
| 88 | + |
| 89 | + value = data['value'] |
| 90 | + del data['value'] |
| 91 | + key = str(data) |
| 92 | + |
| 93 | + if key not in self.monitoring_statefull_data[metric_name]: |
| 94 | + self.monitoring_statefull_data[metric_name][key] = 0 |
| 95 | + |
| 96 | + try: |
| 97 | + self.monitoring_statefull_data[metric_name][key] += value |
| 98 | + except Exception as e: |
| 99 | + print e |
| 100 | + |
| 101 | + def publish_stateless(self, metric_name, data): |
| 102 | + if metric_name not in self.monitoring_stateless_data: |
| 103 | + self.monitoring_stateless_data[metric_name] = dict() |
| 104 | + |
| 105 | + value = data['value'] |
| 106 | + del data['value'] |
| 107 | + key = str(data) |
| 108 | + |
| 109 | + if key not in self.monitoring_stateless_data[metric_name]: |
| 110 | + self.monitoring_stateless_data[metric_name][key] = 0 |
| 111 | + |
| 112 | + try: |
| 113 | + self.monitoring_stateless_data[metric_name][key] += value |
| 114 | + except Exception as e: |
| 115 | + print e |
| 116 | + |
| 117 | + def force_publish_metric(self, metric_name, data): |
| 118 | + date = datetime.now(pytz.timezone(time.tzname[0])) |
| 119 | + |
| 120 | + data['host'] = self.host_name |
| 121 | + data['metric_name'] = metric_name |
| 122 | + data['@timestamp'] = str(date.isoformat()) |
| 123 | + |
| 124 | + routing_key = 'metric.'+data['method'].lower()+'_'+metric_name |
| 125 | + message = dict() |
| 126 | + message[routing_key] = data |
| 127 | + self.messages_to_send.put(message) |
| 128 | + |
| 129 | + def _generate_messages_from_stateless_data(self, date, last_date): |
| 130 | + stateless_data_copy = copy.deepcopy(self.monitoring_stateless_data) |
| 131 | + |
| 132 | + if last_date == date.strftime("%Y-%m-%d %H:%M:%S"): |
| 133 | + self.last_stateless_data = copy.deepcopy(stateless_data_copy) |
| 134 | + return |
| 135 | + |
| 136 | + for metric_name in stateless_data_copy.keys(): |
| 137 | + for key in stateless_data_copy[metric_name].keys(): |
| 138 | + # example: {"{'project': 'crystal', 'container': 'crystal/data_1', 'method': 'GET'}": 52, |
| 139 | + # "{'project': 'crystal', 'container': 'crystal/data_2', 'method': 'PUT'}": 31} |
| 140 | + |
| 141 | + if metric_name not in self.zero_value_timeout: |
| 142 | + self.zero_value_timeout[metric_name] = dict() |
| 143 | + if key not in self.zero_value_timeout[metric_name]: |
| 144 | + self.zero_value_timeout[metric_name][key] = 0 |
| 145 | + |
| 146 | + if self.last_stateless_data and \ |
| 147 | + metric_name in self.last_stateless_data and \ |
| 148 | + key in self.last_stateless_data[metric_name]: |
| 149 | + value = stateless_data_copy[metric_name][key] - \ |
| 150 | + self.last_stateless_data[metric_name][key] |
| 151 | + else: |
| 152 | + # send value = 0 for second-1 for pretty printing into Kibana |
| 153 | + pre_data = eval(key) |
| 154 | + pre_data['host'] = self.host_name |
| 155 | + pre_data['metric_name'] = metric_name |
| 156 | + d = date - timedelta(seconds=1) |
| 157 | + pre_data['@timestamp'] = str(d.isoformat()) |
| 158 | + pre_data['value'] = 0 |
| 159 | + |
| 160 | + routing_key = 'metric.'+pre_data['method'].lower()+'_'+metric_name |
| 161 | + message = dict() |
| 162 | + message[routing_key] = pre_data |
| 163 | + self.messages_to_send.put(message) |
| 164 | + |
| 165 | + value = stateless_data_copy[metric_name][key] |
| 166 | + |
| 167 | + if value == 0.0: |
| 168 | + self.zero_value_timeout[metric_name][key] += 1 |
| 169 | + if self.zero_value_timeout[metric_name][key] == 5: |
| 170 | + del self.monitoring_stateless_data[metric_name][key] |
| 171 | + if len(self.monitoring_stateless_data[metric_name]) == 0: |
| 172 | + del self.monitoring_stateless_data[metric_name] |
| 173 | + del self.last_stateless_data[metric_name][key] |
| 174 | + if len(self.last_stateless_data[metric_name]) == 0: |
| 175 | + del self.last_stateless_data[metric_name] |
| 176 | + del self.zero_value_timeout[metric_name][key] |
| 177 | + if len(self.zero_value_timeout[metric_name]) == 0: |
| 178 | + del self.zero_value_timeout[metric_name] |
| 179 | + else: |
| 180 | + self.zero_value_timeout[metric_name][key] = 0 |
| 181 | + |
| 182 | + data = eval(key) |
| 183 | + data['host'] = self.host_name |
| 184 | + data['metric_name'] = metric_name |
| 185 | + data['@timestamp'] = str(date.isoformat()) |
| 186 | + data['value'] = value |
| 187 | + |
| 188 | + routing_key = 'metric.'+data['method'].lower()+'_'+metric_name |
| 189 | + message = dict() |
| 190 | + message[routing_key] = data |
| 191 | + self.messages_to_send.put(message) |
| 192 | + |
| 193 | + self.last_stateless_data = copy.deepcopy(stateless_data_copy) |
| 194 | + |
| 195 | + def _generate_messages_from_statefull_data(self, date, last_date): |
| 196 | + |
| 197 | + if last_date == date.strftime("%Y-%m-%d %H:%M:%S"): |
| 198 | + return |
| 199 | + |
| 200 | + statefull_data_copy = copy.deepcopy(self.monitoring_statefull_data) |
| 201 | + |
| 202 | + for metric_name in statefull_data_copy.keys(): |
| 203 | + for key in statefull_data_copy[metric_name].keys(): |
| 204 | + |
| 205 | + if metric_name not in self.zero_value_timeout: |
| 206 | + self.zero_value_timeout[metric_name] = dict() |
| 207 | + if key not in self.zero_value_timeout[metric_name]: |
| 208 | + self.zero_value_timeout[metric_name][key] = 0 |
| 209 | + |
| 210 | + if self.last_statefull_data and \ |
| 211 | + metric_name in self.last_statefull_data and \ |
| 212 | + key in self.last_statefull_data[metric_name]: |
| 213 | + value = statefull_data_copy[metric_name][key] |
| 214 | + else: |
| 215 | + # send value = 0 for second-1 for pretty printing into Kibana |
| 216 | + pre_data = eval(key) |
| 217 | + pre_data['host'] = self.host_name |
| 218 | + pre_data['metric_name'] = metric_name |
| 219 | + d = date - timedelta(seconds=1) |
| 220 | + pre_data['@timestamp'] = str(d.isoformat()) |
| 221 | + pre_data['value'] = 0 |
| 222 | + |
| 223 | + routing_key = 'metric.'+pre_data['method'].lower()+'_'+metric_name |
| 224 | + message = dict() |
| 225 | + message[routing_key] = pre_data |
| 226 | + self.messages_to_send.put(message) |
| 227 | + |
| 228 | + value = statefull_data_copy[metric_name][key] |
| 229 | + |
| 230 | + if value == 0: |
| 231 | + self.zero_value_timeout[metric_name][key] += 1 |
| 232 | + if self.zero_value_timeout[metric_name][key] == 5: |
| 233 | + del self.monitoring_statefull_data[metric_name][key] |
| 234 | + if len(self.monitoring_statefull_data[metric_name]) == 0: |
| 235 | + del self.monitoring_statefull_data[metric_name] |
| 236 | + del self.last_statefull_data[metric_name][key] |
| 237 | + if len(self.last_statefull_data[metric_name]) == 0: |
| 238 | + del self.last_statefull_data[metric_name] |
| 239 | + del self.zero_value_timeout[metric_name][key] |
| 240 | + if len(self.zero_value_timeout[metric_name]) == 0: |
| 241 | + del self.zero_value_timeout[metric_name] |
| 242 | + else: |
| 243 | + self.zero_value_timeout[metric_name][key] = 0 |
| 244 | + |
| 245 | + data = eval(key) |
| 246 | + data['host'] = self.host_name |
| 247 | + data['metric_name'] = metric_name |
| 248 | + data['@timestamp'] = str(date.isoformat()) |
| 249 | + data['value'] = value |
| 250 | + |
| 251 | + routing_key = 'metric.'+data['method'].lower()+'_'+metric_name |
| 252 | + message = dict() |
| 253 | + message[routing_key] = data |
| 254 | + self.messages_to_send.put(message) |
| 255 | + |
| 256 | + self.last_statefull_data = copy.deepcopy(statefull_data_copy) |
| 257 | + |
| 258 | + def run(self): |
| 259 | + last_date = None |
| 260 | + self.last_stateless_data = None |
| 261 | + self.last_statefull_data = None |
| 262 | + self.zero_value_timeout = dict() |
| 263 | + |
| 264 | + self.rabbit = pika.BlockingConnection(self.parameters) |
| 265 | + self.channel = self.rabbit.channel() |
| 266 | + |
| 267 | + while True: |
| 268 | + greenthread.sleep(self.interval) |
| 269 | + date = datetime.now(pytz.timezone(time.tzname[0])) |
| 270 | + self._generate_messages_from_stateless_data(date, last_date) |
| 271 | + self._generate_messages_from_statefull_data(date, last_date) |
| 272 | + last_date = date.strftime("%Y-%m-%d %H:%M:%S") |
| 273 | + |
| 274 | + try: |
| 275 | + while not self.messages_to_send.empty(): |
| 276 | + message = self.messages_to_send.get() |
| 277 | + for routing_key in message: |
| 278 | + data = message[routing_key] |
| 279 | + self.channel.basic_publish(exchange=self.exchange, |
| 280 | + routing_key=routing_key, |
| 281 | + body=json.dumps(data)) |
| 282 | + except: |
| 283 | + self.messages_to_send.put(message) |
| 284 | + self.rabbit = pika.BlockingConnection(self.parameters) |
| 285 | + self.channel = self.rabbit.channel() |
| 286 | + |
| 287 | + |
| 288 | +class ControlThread(Thread): |
| 289 | + |
| 290 | + def __init__(self, conf, logger): |
| 291 | + Thread.__init__(self) |
| 292 | + |
| 293 | + self.conf = conf |
| 294 | + self.logger = logger |
| 295 | + self.server = self.conf.get('execution_server') |
| 296 | + self.interval = self.conf.get('control_interval', 10) |
| 297 | + self.redis_host = self.conf.get('redis_host') |
| 298 | + self.redis_port = self.conf.get('redis_port') |
| 299 | + self.redis_db = self.conf.get('redis_db') |
| 300 | + |
| 301 | + self.redis = redis.StrictRedis(self.redis_host, |
| 302 | + self.redis_port, |
| 303 | + self.redis_db) |
| 304 | + |
| 305 | + self.metric_list = {} |
| 306 | + |
| 307 | + def _get_workload_metrics(self): |
| 308 | + """ |
| 309 | + This method connects to redis to download the metrics and the |
| 310 | + information introduced via the dashboard. |
| 311 | + """ |
| 312 | + metric_keys = self.redis.keys("workload_metric:*") |
| 313 | + metric_list = dict() |
| 314 | + for key in metric_keys: |
| 315 | + metric = self.redis.hgetall(key) |
| 316 | + if metric['execution_server'] == self.server and \ |
| 317 | + metric['enabled'] == 'True': |
| 318 | + metric_list[key] = metric |
| 319 | + |
| 320 | + return metric_list |
| 321 | + |
| 322 | + def run(self): |
| 323 | + while True: |
| 324 | + try: |
| 325 | + self.metric_list = self._get_workload_metrics() |
| 326 | + except: |
| 327 | + self.logger.error("Unable to connect to " + self.redis_host + |
| 328 | + " for getting the workload metrics.") |
| 329 | + greenthread.sleep(self.interval) |
| 330 | + |
| 331 | + |
| 332 | +class NodeStatusThread(Thread): |
| 333 | + |
| 334 | + def __init__(self, conf, logger): |
| 335 | + Thread.__init__(self) |
| 336 | + |
| 337 | + self.conf = conf |
| 338 | + self.logger = logger |
| 339 | + self.server = self.conf.get('execution_server') |
| 340 | + self.region_id = self.conf.get('region_id') |
| 341 | + self.zone_id = self.conf.get('zone_id') |
| 342 | + self.interval = self.conf.get('status_interval', 10) |
| 343 | + self.redis_host = self.conf.get('redis_host') |
| 344 | + self.redis_port = self.conf.get('redis_port') |
| 345 | + self.redis_db = self.conf.get('redis_db') |
| 346 | + |
| 347 | + self.host_name = socket.gethostname() |
| 348 | + self.host_ip = socket.gethostbyname(self.host_name) |
| 349 | + self.devices = self.conf.get('devices') |
| 350 | + |
| 351 | + self.redis = redis.StrictRedis(self.redis_host, |
| 352 | + self.redis_port, |
| 353 | + self.redis_db) |
| 354 | + |
| 355 | + self.metric_list = {} |
| 356 | + |
| 357 | + def _get_swift_disk_usage(self): |
| 358 | + swift_devices = dict() |
| 359 | + if self.server == 'object': |
| 360 | + if self.devices and os.path.exists(self.devices): |
| 361 | + for disk in os.listdir(self.devices): |
| 362 | + if disk.startswith('sd'): |
| 363 | + statvfs = os.statvfs(self.devices+'/'+disk) |
| 364 | + swift_devices[disk] = dict() |
| 365 | + swift_devices[disk]['size'] = statvfs.f_frsize * statvfs.f_blocks |
| 366 | + swift_devices[disk]['free'] = statvfs.f_frsize * statvfs.f_bfree |
| 367 | + |
| 368 | + return swift_devices |
| 369 | + |
| 370 | + def run(self): |
| 371 | + while True: |
| 372 | + try: |
| 373 | + swift_usage = self._get_swift_disk_usage() |
| 374 | + self.redis.hmset(self.server+'_node:'+self.host_name, |
| 375 | + {'type': self.server, |
| 376 | + 'name': self.host_name, |
| 377 | + 'ip': self.host_ip, |
| 378 | + 'region_id': self.region_id, |
| 379 | + 'zone_id': self.zone_id, |
| 380 | + 'last_ping': time.time(), |
| 381 | + 'devices': json.dumps(swift_usage)}) |
| 382 | + except: |
| 383 | + self.logger.error("Unable to connect to " + self.redis_host + |
| 384 | + " for publishing the node status.") |
| 385 | + greenthread.sleep(self.interval) |
0 commit comments