Skip to content

Commit 55595d9

Browse files
committedOct 29, 2017
Towards replication metrics
1 parent 4990710 commit 55595d9

File tree

5 files changed

+236
-14
lines changed

5 files changed

+236
-14
lines changed
 

‎crystal_metric_middleware/metric_handler.py

+13-10
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,12 @@ def __init__(self, app, conf):
5252
def is_crystal_metric_request(self):
5353
try:
5454
valid = True
55-
self._extract_vaco()
55+
if self.request.method != 'REPLICATE':
56+
self._extract_vaco()
5657
except:
5758
valid = False
5859

59-
return self.request.method in ('PUT', 'GET') and valid
60+
return self.request.method in ('PUT', 'GET', 'REPLICATE') and valid
6061

6162
def _extract_vaco(self):
6263
"""
@@ -103,13 +104,14 @@ def _import_metric(self, metric):
103104
it_metrics_q = self.metric_publisher.instant_metrics_q
104105

105106
metric_class = m_class(self.logger, sl_metrics_q, sf_metrics_q,
106-
it_metrics_q, modulename, self._account,
107-
self.exec_server, self.request, self.response)
107+
it_metrics_q, modulename, self.exec_server,
108+
self.request, self.response)
108109
return metric_class
109110

110111
@wsgify
111112
def __call__(self, req):
112113
self.request = req
114+
self.response = None
113115
if not self.is_crystal_metric_request:
114116
return self.request.get_response(self.app)
115117

@@ -118,19 +120,19 @@ def __call__(self, req):
118120
try:
119121
metrics = self.metrics_getter.get_metrics()
120122

121-
if metrics and self.request.method == 'PUT':
123+
if metrics and self.request.method in ('PUT', 'REPLICATE'):
122124
for metric_key in metrics:
123125
metric = metrics[metric_key]
124-
if metric['in_flow'] == 'True':
126+
if metric['put'] == 'True' or metric['replicate'] == 'True':
125127
metric_class = self._import_metric(metric)
126128
self.request = metric_class.execute()
127129

128130
if hasattr(self.request.environ['wsgi.input'], 'metrics'):
129131
metric_list = list()
130132
for metric in self.request.environ['wsgi.input'].metrics:
131133
metric_list.append(metric.metric_name)
132-
self.logger.info('Go to execute metrics on PUT request: ' +
133-
str(metric_list))
134+
self.logger.info('Go to execute metrics on %s request: %s',
135+
self.request.method, str(metric_list))
134136

135137
return self.request.get_response(self.app)
136138

@@ -140,7 +142,7 @@ def __call__(self, req):
140142
if self.response.is_success:
141143
for metric_key in metrics:
142144
metric = metrics[metric_key]
143-
if metric['out_flow'] == 'True':
145+
if metric['get'] == 'True':
144146
metric_class = self._import_metric(metric)
145147
self.response = metric_class.execute()
146148

@@ -155,7 +157,8 @@ def __call__(self, req):
155157
else:
156158
self.logger.info('No metrics to execute')
157159
return req.get_response(self.app)
158-
except:
160+
except Exception as e:
161+
print e
159162
self.logger.debug('%s call in %s-server: Bypassing middleware' %
160163
(req.method, self.exec_server))
161164
return self.request.get_response(self.app)

‎crystal_metric_middleware/metrics/abstract_metric.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ class AbstractMetric(object):
1414
type = 'stateless'
1515

1616
def __init__(self, logger, stateless_metrics_queue, statefull_metrics_queue,
17-
instant_metrics_queue, metric_name, project_id,
18-
server, request, response):
17+
instant_metrics_queue, metric_name, server, request, response):
1918
self.logger = logger
2019
self.request = request
2120
self.response = response
@@ -32,7 +31,7 @@ def __init__(self, logger, stateless_metrics_queue, statefull_metrics_queue,
3231
self._parse_vaco()
3332

3433
self.project_name = str(self.request.headers['X-Project-Name'])
35-
self.project_id = project_id.split('_')[1]
34+
self.project_id = self.account_id.split('_')[1]
3635
self.data = {}
3736
# self.data['storage_policy'] = self._get_storage_policy_id()
3837
self.data['project'] = self.project_name
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
from datetime import datetime
2+
from eventlet import Timeout
3+
import pytz
4+
import time
5+
6+
CHUNK_SIZE = 64 * 1024
7+
8+
9+
class AbstractReplicationMetric(object):
10+
11+
type = 'stateless'
12+
13+
def __init__(self, logger, stateless_metrics_queue, statefull_metrics_queue,
14+
instant_metrics_queue, metric_name, server, request, response):
15+
self.logger = logger
16+
self.request = request
17+
self.response = response
18+
19+
self.stateless_metrics = stateless_metrics_queue
20+
self.statefull_metrics = statefull_metrics_queue
21+
self.instant_metrics = instant_metrics_queue
22+
23+
self.metric_name = metric_name
24+
self.current_server = server
25+
self.method = self.request.method
26+
self.read_timeout = 30 # seconds
27+
28+
self.data = {}
29+
self.data['method'] = self.method
30+
self.data['server_type'] = self.current_server
31+
self.data['source_node'] = request.environ['REMOTE_ADDR']
32+
self.data['metric_name'] = self.metric_name
33+
34+
def register_metric(self, value):
35+
"""
36+
Send data to publish thread
37+
"""
38+
if self.type == 'stateful':
39+
self.statefull_metrics.put((self.data, value))
40+
elif self.type == 'stateless':
41+
self.stateless_metrics.put((self.data, value))
42+
elif self.type == 'force':
43+
date = datetime.now(pytz.timezone(time.tzname[0]))
44+
self.instant_metrics.put((self.data, date, value))
45+
46+
def _is_ssync_already_intercepted(self):
47+
return isinstance(self.request.environ['wsgi.input'], IterReplication)
48+
49+
def _get_applied_metrics_on_ssync(self):
50+
if hasattr(self.request.environ['wsgi.input'], 'metrics'):
51+
metrics = self.request.environ['wsgi.input'].metrics
52+
self.request.environ['wsgi.input'].metrics = list()
53+
return metrics
54+
else:
55+
return list()
56+
57+
def _get_reader(self):
58+
if not self._is_ssync_already_intercepted():
59+
reader = self.request.environ['wsgi.input']
60+
else:
61+
reader = self.request.environ['wsgi.input'].obj_data
62+
63+
return reader
64+
65+
def _intercept_ssync(self):
66+
reader = self._get_reader()
67+
metrics = self._get_applied_metrics_on_ssync()
68+
metrics.append(self)
69+
70+
self.request.environ['wsgi.input'] = IterReplication(reader, metrics, self.read_timeout)
71+
72+
def execute(self):
73+
self._intercept_ssync()
74+
self.on_start()
75+
return self.request
76+
77+
def on_start(self):
78+
pass
79+
80+
def on_read(self, chunk):
81+
pass
82+
83+
def on_finish(self):
84+
pass
85+
86+
87+
class IterReplication(object):
88+
89+
def __init__(self, obj_data, metrics, timeout):
90+
self.closed = False
91+
self.obj_data = obj_data
92+
self.timeout = timeout
93+
self.metrics = metrics
94+
self.buf = b''
95+
96+
def __iter__(self):
97+
return self
98+
99+
def _apply_metrics_on_read(self, chunk):
100+
for metric in self.metrics:
101+
metric.on_read(chunk)
102+
103+
def _apply_metrics_on_finish(self):
104+
for metric in self.metrics:
105+
metric.on_finish()
106+
107+
def read_with_timeout(self, size):
108+
try:
109+
with Timeout(self.timeout):
110+
chunk = self.obj_data.read(size)
111+
self._apply_metrics_on_read(chunk)
112+
except Timeout:
113+
self.close()
114+
raise
115+
except Exception:
116+
self.close()
117+
raise
118+
119+
return chunk
120+
121+
def next(self, size=CHUNK_SIZE):
122+
if len(self.buf) < size:
123+
self.buf += self.read_with_timeout(size - len(self.buf))
124+
if self.buf == b'':
125+
self.close()
126+
raise StopIteration('Stopped iterator ex')
127+
128+
if len(self.buf) > size:
129+
data = self.buf[:size]
130+
self.buf = self.buf[size:]
131+
else:
132+
data = self.buf
133+
self.buf = b''
134+
return data
135+
136+
def _close_check(self):
137+
if self.closed:
138+
raise ValueError('I/O operation on closed file')
139+
140+
def read(self, size=CHUNK_SIZE):
141+
self._close_check()
142+
return self.next(size)
143+
144+
def readline(self, size=-1):
145+
self._close_check()
146+
147+
# read data into self.buf if there is not enough data
148+
while b'\n' not in self.buf and \
149+
(size < 0 or len(self.buf) < size):
150+
if size < 0:
151+
chunk = self.read()
152+
else:
153+
chunk = self.read(size - len(self.buf))
154+
if not chunk:
155+
break
156+
self.buf += chunk
157+
158+
# Retrieve one line from buf
159+
data, sep, rest = self.buf.partition(b'\n')
160+
data += sep
161+
self.buf = rest
162+
163+
# cut out size from retrieved line
164+
if size >= 0 and len(data) > size:
165+
self.buf = data[size:] + self.buf
166+
data = data[:size]
167+
168+
return data
169+
170+
def readlines(self, sizehint=-1):
171+
self._close_check()
172+
lines = []
173+
try:
174+
while True:
175+
line = self.readline(sizehint)
176+
if not line:
177+
break
178+
lines.append(line)
179+
if sizehint >= 0:
180+
sizehint -= len(line)
181+
if sizehint <= 0:
182+
break
183+
except StopIteration:
184+
pass
185+
return lines
186+
187+
def close(self):
188+
if self.closed:
189+
return
190+
self._apply_metrics_on_finish()
191+
try:
192+
self.obj_data.close()
193+
except AttributeError:
194+
pass
195+
self.closed = True
196+
197+
def __del__(self):
198+
self.close()

‎crystal_metric_middleware/metrics/get_metrics.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def __init__(self, conf, logger):
1111
self.conf = conf
1212
self.logger = logger
1313
self.server = self.conf.get('execution_server')
14-
self.interval = self.conf.get('control_interval', 10)
14+
self.interval = self.conf.get('control_interval', 1)
1515
self.redis_host = self.conf.get('redis_host')
1616
self.redis_port = self.conf.get('redis_port')
1717
self.redis_db = self.conf.get('redis_db')
+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from crystal_metric_middleware.metrics.abstract_replication_metric import AbstractReplicationMetric
2+
import eventlet
3+
4+
5+
class ReplicationBandwidth(AbstractReplicationMetric):
6+
7+
type = 'stateless'
8+
9+
def on_start(self):
10+
self.mbytes = 0
11+
self.sender = eventlet.spawn(self.send_metric)
12+
13+
def on_read(self, chunk):
14+
self.mbytes += (len(chunk)/1024.0)/1024
15+
16+
def send_metric(self):
17+
while True:
18+
eventlet.sleep(0.1)
19+
if self.mbytes == 0:
20+
break
21+
self.register_metric(self.mbytes)
22+
self.mbytes = 0

0 commit comments

Comments
 (0)
Please sign in to comment.