-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
135 lines (110 loc) · 4.11 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import sys
import json
import logging
from typing import Any
from flask import Flask, send_file
import flask.cli
from flask_sock import Sock
from networktables import NetworkTables
# Zap Flask spam
flask.cli.show_server_banner = lambda *_: None
logging.getLogger("werkzeug").disabled = True
app = Flask("HauntedHouse")
sock = Sock(app)
# Don't cache files
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
with open("config.json", "r") as file:
Config = json.load(file)
# Sorry 10000er friends but I am sleepy and don't want to debug regex
assert Config["teamNumber"] < 10_000
BOTBRAIN_IP_ADDRESS = ".".join(
[
"10",
str(Config["teamNumber"])[:-2].zfill(2),
str(Config["teamNumber"])[-2:].zfill(2),
"2"
]
)
print(f"Connecting to {BOTBRAIN_IP_ADDRESS}...")
NetworkTables.initialize(server=BOTBRAIN_IP_ADDRESS)
DEAD_SOCKET = "--dead" in sys.argv or not NetworkTables.isConnected()
if DEAD_SOCKET:
print("DEAD ROBOT")
else:
for i in range(5):
print("LIVE ROBOT! LIVE ROBOT! LIVE ROBOT!")
class SocketResponses:
OK = {"success": True}
ERROR = {"success": False}
class NetTableTables:
limelight = NetworkTables.getTable("limelight")
command_and_control = NetworkTables.getTable("evil_manipulation")
class NetTableEntries:
pose = NetTableTables.limelight.getEntry("botpose")
heartbeat = NetTableTables.command_and_control.getEntry("badump_token")
drive_vector = NetTableTables.command_and_control.getEntry("drive_vector")
def error(*args) -> None:
print("[error]", *args)
@app.route("/")
def index():
return send_file("static/index.html")
@sock.route("/socket_bocket_locket_rocket")
def socket(ws):
# Robot commands are sent via websockets to avoid latency and scary HTTP
# nightmares like caching.
# NOTE: This route will be instanced in a new thread for each client. Greenlet is evil!!!!!!!!!!!
if DEAD_SOCKET:
return
while True:
incoming = json.loads(ws.receive())
try:
assert "cmd" in incoming
assert "key" in incoming
except Exception as e:
print(incoming)
raise e
# cmd still in dat boooooo booooooo boooooooooooo boooooooooooooo boooooooooooooooooooooooooooo
out = execute_command(incoming["cmd"], incoming)
# Tell the client what on Earth it's receiving
out = {**out, "cmd": incoming["cmd"], "key": incoming["key"]}
ws.send(json.dumps(out))
def execute_command(cmd: str, dat: dict) -> Any:
match cmd:
case "set":
return execute_set_command(dat["key"], dat["value"])
case "get":
return {"result": execute_get_command(dat["key"])}
case _:
error(f"[execute_command] Bad command '{cmd}'")
return SocketResponses.ERROR
def execute_set_command(key: str, value: Any) -> dict:
# TODO: Better type annotation for SocketResponses
match key:
case "drive_vector":
NetTableEntries.drive_vector.setDoubleArray(value)
case "badump":
# Here we recieve a random float (0-1) token and mirror it to the bot. The bot will be
# watchdogging around waiting for us to stop sending this without warning. If we do
# that, bad things are afoot, and it will DIE!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
assert isinstance(value, float)
NetTableEntries.heartbeat.setDouble(value)
case _:
error(f"[set_command] Bad key '{key}'")
return SocketResponses.ERROR
return SocketResponses.OK
def execute_get_command(key: str) -> Any:
match key:
case "pose":
return NetTableEntries.pose.getDoubleArray([0, 0, 0, 0, 0, 0])
case _:
error(f"[get_command] Bad key '{key}'")
return SocketResponses.ERROR
error(f"[get_command] Didn't return anything for '{key}'")
return SocketResponses.ERROR
if __name__ == "__main__":
print("Ready! :^) @ http://localhost:8080")
app.run(
host="localhost",
port=8080,
)