-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathetsi_api.py
More file actions
104 lines (70 loc) · 2.72 KB
/
etsi_api.py
File metadata and controls
104 lines (70 loc) · 2.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# etsi_api.py (FINAL - SYNC + CLEAN)
from fastapi import APIRouter, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import AUTH_ENABLED, AUTH_TOKEN, SYSTEM_MODE
security = HTTPBearer()
# =================================================
# AUTH
# =================================================
def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
if not AUTH_ENABLED:
return True
if credentials.scheme != "Bearer":
raise HTTPException(status_code=401, detail="Invalid scheme")
if credentials.credentials != AUTH_TOKEN:
raise HTTPException(status_code=401, detail="Invalid token")
return True
# =================================================
# ROUTER
# =================================================
def create_etsi_router(buffer, audit):
router = APIRouter()
# -------------------------------------------------
# STATUS
# -------------------------------------------------
@router.get("/etsi/v2/status")
def status(auth: bool = Depends(verify_token)):
audit.api("/etsi/v2/status")
stats = buffer.stats()
return {
"service": "ETSI-KMS",
"status": "RUNNING",
"mode": SYSTEM_MODE,
"available_keys": stats["ready_keys"],
"total_keys": stats["total_keys"],
"sync_index": stats.get("sync_index", 0)
}
# -------------------------------------------------
# GET NEXT KEY
# -------------------------------------------------
@router.post("/etsi/v2/keys")
def get_key(auth: bool = Depends(verify_token)):
audit.api("/etsi/v2/keys")
key = buffer.get_next_key()
if not key:
audit.error("No keys available")
raise HTTPException(status_code=404, detail="No keys available")
# log usage
audit.key_served(key.key_id)
return {
"key_id": key.key_id,
"key": key.key_value,
"size": key.key_size,
"origin": key.origin_node
}
# -------------------------------------------------
# GET KEY BY ID (CRITICAL FOR SYNC)
# -------------------------------------------------
@router.get("/etsi/v2/keys/{key_id}")
def get_key_by_id(key_id: str, auth: bool = Depends(verify_token)):
audit.api(f"/etsi/v2/keys/{key_id}")
key = buffer.get_key_by_id(key_id)
if not key:
audit.error(f"Key not found: {key_id}")
raise HTTPException(status_code=404, detail="Key not found")
audit.key_served(key_id)
return {
"key_id": key.key_id,
"key": key.key_value
}
return router