|
5 | 5 | import uuid |
6 | 6 | import logging |
7 | 7 | import json |
| 8 | +import ssl |
| 9 | +import time |
| 10 | +import random |
| 11 | +import aiohttp |
8 | 12 | import requests |
| 13 | + |
9 | 14 | from asgiref.sync import async_to_sync |
10 | 15 | from django.db.models import Q |
11 | 16 | from django.core.cache import cache |
|
25 | 30 | from rest_framework.viewsets import GenericViewSet |
26 | 31 | from rest_framework.exceptions import PermissionDenied |
27 | 32 |
|
28 | | -from api import monitor, models, permissions, serializers, viewsets, authentication |
| 33 | +from api import monitor, models, permissions, serializers, viewsets, authentication, __version__ |
29 | 34 | from api.tasks import scale_app, restart_app, mount_app, downstream_model_owner, \ |
30 | 35 | delete_pod |
31 | 36 | from api.exceptions import AlreadyExists, ServiceUnavailable, DryccException |
@@ -1242,3 +1247,72 @@ def metric(self, request, **kwargs): |
1242 | 1247 | return StreamingHttpResponse( |
1243 | 1248 | streaming_content=monitor.last_metrics(app_id) |
1244 | 1249 | ) |
| 1250 | + |
| 1251 | + |
| 1252 | +class ProxyMetricsView(View): |
| 1253 | + cache = {} |
| 1254 | + match_meta = staticmethod( |
| 1255 | + re.compile(r'^(?:# (?:HELP|TYPE) )([a-zA-Z_][a-zA-Z0-9_:.-]*)').match) |
| 1256 | + match_data = staticmethod( |
| 1257 | + re.compile(r'^([a-zA-Z_][a-zA-Z0-9_:]*)(?:\{([^}]*)\})?\s+(\S+)').match) |
| 1258 | + default_cache_value = (-1, -1) |
| 1259 | + |
| 1260 | + def __init__(self, *args, **kwargs): |
| 1261 | + super().__init__(*args, **kwargs) |
| 1262 | + if settings.K8S_API_VERIFY_TLS: |
| 1263 | + ssl_context = ssl.create_default_context( |
| 1264 | + cafile='/var/run/secrets/kubernetes.io/serviceaccount/ca.crt') |
| 1265 | + else: |
| 1266 | + ssl_context = ssl.create_default_context() |
| 1267 | + self.connector = aiohttp.TCPConnector(ssl_context=ssl_context) |
| 1268 | + |
| 1269 | + async def sample(self, name, labels_str, value): |
| 1270 | + if not labels_str or name not in settings.DRYCC_METRICS_CONFIG: |
| 1271 | + return None |
| 1272 | + fields = set(settings.DRYCC_METRICS_CONFIG[name]) |
| 1273 | + labels = {} |
| 1274 | + for pair in labels_str.strip(" {}").split(','): |
| 1275 | + k, v = pair.split('=', 1) |
| 1276 | + if k in fields: |
| 1277 | + labels[k] = v.strip(' "') |
| 1278 | + app_id = labels.get("namespace", None) |
| 1279 | + if not app_id: |
| 1280 | + return None |
| 1281 | + owner_id, timeout = self.cache.get(app_id, self.default_cache_value) |
| 1282 | + if (owner_id < 0 and timeout < 0) or time.time() > timeout: |
| 1283 | + if app := await models.app.App.objects.filter(id=app_id).afirst(): |
| 1284 | + owner_id = app.owner_id |
| 1285 | + else: |
| 1286 | + owner_id = -1 |
| 1287 | + self.cache[app_id] = (owner_id, time.time() + random.randint(600, 1200)) |
| 1288 | + if owner_id < 0: |
| 1289 | + return None |
| 1290 | + labels.update({'vm_project_id': app_id, 'vm_account_id': owner_id}) |
| 1291 | + return "%s{%s} %s\n" % (name, ",".join([f'{k}="{v}"' for k, v in labels.items()]), value) |
| 1292 | + |
| 1293 | + async def get(self, request, node, metrics=None): |
| 1294 | + if not metrics: |
| 1295 | + url = f"{settings.SCHEDULER_URL}/api/v1/nodes/{node}/proxy/metrics" |
| 1296 | + else: |
| 1297 | + url = f"{settings.SCHEDULER_URL}/api/v1/nodes/{node}/proxy/metrics/{metrics}" |
| 1298 | + headers = {"Authorization": request.META.get("HTTP_AUTHORIZATION", "")} |
| 1299 | + |
| 1300 | + async def stream_response(): |
| 1301 | + async with aiohttp.ClientSession(connector=self.connector) as session: |
| 1302 | + async with session.get(url, headers=headers) as resp: |
| 1303 | + async for line_bytes in resp.content: |
| 1304 | + line = line_bytes.decode('utf-8', errors='ignore').strip(' \n') |
| 1305 | + if line.startswith('#') and (match := self.match_meta(line)): |
| 1306 | + if match.group(1) in settings.DRYCC_METRICS_CONFIG: |
| 1307 | + yield f"{line}\n" |
| 1308 | + continue |
| 1309 | + match = self.match_data(line) |
| 1310 | + if not match: |
| 1311 | + continue |
| 1312 | + name, labels_str, value = match.groups() |
| 1313 | + sample = await self.sample(name, labels_str, value) |
| 1314 | + if not sample: |
| 1315 | + continue |
| 1316 | + yield sample |
| 1317 | + content_type = f"text/plain; version={__version__}" |
| 1318 | + return StreamingHttpResponse(stream_response(), content_type=content_type) |
0 commit comments