-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
151 lines (120 loc) · 4.75 KB
/
main.py
File metadata and controls
151 lines (120 loc) · 4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from helpers.middleware import CheckRequest
from helpers.wsobjs import WSObjects
from datetime import datetime
from typing import List, Dict
import asyncio
# copycat from altamino/api, maybe we should shrink them
from objects.errors import Errors
from helpers.database.mongo import Database
from helpers.constants import WS_TYPE_PING, WS_TYPE_MARK_READ
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, uid: str):
await websocket.accept()
if uid not in self.active_connections:
self.active_connections[uid] = []
self.active_connections[uid].append(websocket)
def disconnect(self, websocket: WebSocket, uid: str):
self.active_connections[uid].remove(websocket)
if not self.active_connections[uid]:
del self.active_connections[uid]
async def answer(self, message: dict, websocket: WebSocket):
await websocket.send_json(message)
async def selective_broadcast(self, message: dict, uids: List[str]):
tasks = []
got_counter = 0
for uid in uids:
if uid in self.active_connections.keys():
for connection in self.active_connections[uid]:
tasks.append(connection.send_json(message))
got_counter += 1
if tasks:
await asyncio.gather(*tasks)
return got_counter
async def broadcast(self, message: dict):
got_counter = 0
for connections in self.active_connections.values():
for connection in connections:
await connection.send_json(message)
got_counter += 1
return got_counter
manager = ConnectionManager()
@app.get("/")
async def index():
return Errors.InvalidRequest()
@app.get("/health")
async def health():
return {"alive": True}
@app.websocket("/")
async def websocket_endpoint(ws: WebSocket):
admin, uid, error = await CheckRequest(ws)
if error:
print(f"WebSocket connection rejected: {error['message']}")
await ws.close(
code=error.get("code", 1008), reason=error.get("message", "Unauthorized")
)
return
if admin:
await ws.accept()
if not admin:
await manager.connect(ws, uid)
try:
while True:
data = await ws.receive_json()
if data.get("t") and data.get("o"):
# ping
if data["t"] == WS_TYPE_PING:
await manager.answer(WSObjects.Pong(), ws)
continue
# check for id
if not data["o"].get("id"):
await manager.answer(WSObjects.WSError(1, "No ID of request"), ws)
continue
ws_req_id = data["o"].get("id")
if (
data["t"] == WS_TYPE_MARK_READ
and data["o"].get("markHasRead", None) is not None
):
if data["o"]["markHasRead"] is True:
ndcId = data["o"]["ndcId"]
chatId = data["o"]["threadId"]
readTimestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
db_instance = Database()
db = await db_instance.get(f"x{ndcId}")
await db["Chats"].update_one(
{"id": chatId},
{"$set": {f"lastReadedList.{uid}": readTimestamp}},
)
continue
if data.get("ADMIN-SAYS") and admin:
try:
js = data["ADMIN-SAYS"]
users = js["VICTIMS"]
payload = js["WEAPON"]
if users == "ALL":
f = await manager.broadcast(payload)
else:
f = await manager.selective_broadcast(payload, users)
await manager.answer(
{
"status": "ok",
"clients": len(users) if users != "ALL" else f,
"probably_got": f,
},
ws,
)
except Exception as e:
await manager.answer({"status": "error", "reason": str(e)}, ws)
continue
continue
except WebSocketDisconnect:
pass
except Exception as e:
print(f"WebSocket error: {e}")
await ws.close(code=1011, reason="Internal Server Error")
finally:
if not admin:
manager.disconnect(ws, uid)