-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi.py
84 lines (69 loc) · 2.63 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import json
from hardware import server, battery, accelerometer, light, walker, display
from adafruit_httpserver import Request, JSONResponse, POST, GET
from tests import leg_test
from routines import RoutinesRegistry, BaseRoutine
from utils import run_callable_async_or_not
from flash_storage import storage
from logging_ import log, get_unsent_loglines
@RoutinesRegistry.register()
class HttpCommandsRoutine(BaseRoutine):
def __init__(self) -> None:
super().__init__()
self.tasks = []
async def tick(self):
while len(self.tasks):
task, task_kwargs = self.tasks.pop(0)
await run_callable_async_or_not(task, **task_kwargs)
def add_task(self, task, task_kwargs):
log(f"Adding task: {task}")
self.get_instance().tasks.append((task, task_kwargs))
@server.route("/api/stats", GET)
def stats(request: Request):
data = {
"battery_percentage": battery.get_battery_percentage(),
"battery_voltage": battery.get_battery_voltage(),
"accelerometer_xyz": accelerometer.xyz,
"accelerometr_angles": accelerometer.angles,
"light_brightness": light.get_brightness(),
"storage_content": json.dumps(storage._data),
}
return JSONResponse(request, data)
@server.route("/api/command", POST)
def command(request: Request):
request_data = request.json()
if not request_data:
return JSONResponse(request, {"error": "No data received"})
action = request_data.get("action")
if action in commands:
kwargs = request_data.get("params", {})
action_function = commands[action]
HttpCommandsRoutine.get_instance().add_task(action_function, kwargs)
else:
log(f"❌ Command not found: {action}")
data = {
"message": "Command received",
}
return JSONResponse(request, data)
@server.route("/api/logs", GET)
def logs(request: Request):
logs = get_unsent_loglines('http')
return JSONResponse(request, logs)
@server.route("/api/servos-angles", GET)
def angles(request: Request):
return JSONResponse(request, walker.get_servos_angles())
@server.route("/api/saved-positions", GET)
def saved_positions(request: Request):
return JSONResponse(request, walker.get_saved_positions())
commands = {
"set-light-brightness": light.set_brightness,
"wiggle": walker.wiggle,
"leg-test": leg_test,
"to-zero": walker.to_zero,
"calibrate-zero": accelerometer.calibrate_zero,
"write-oled": display.write_text,
"display-mode": display.set_mode,
"set-servos": walker.set_servos_angles,
"save-position": walker.save_position,
"load-position": walker.load_position,
}