-
Notifications
You must be signed in to change notification settings - Fork 145
/
Copy pathwebsocket_test.py
executable file
·75 lines (64 loc) · 2.16 KB
/
websocket_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#!/usr/bin/env python3
# Example of streaming sensor values from openbmc using the /subscribe api
# requires websockets package to be installed
import argparse
import asyncio
import base64
import json
import ssl
import websockets
parser = argparse.ArgumentParser()
parser.add_argument("--host", help="Host to connect to", required=True)
parser.add_argument(
"--username", help="Username to connect with", default="root"
)
parser.add_argument("--password", help="Password to use", default="0penBmc")
parser.add_argument(
"--ssl", default=True, action=argparse.BooleanOptionalAction
)
args = parser.parse_args()
sensor_type_map = {
"voltage": "Volts",
"power": "Watts",
"fan": "RPM",
"fan_tach": "RPM",
"temperature": "Degrees C",
"altitude": "Meters",
"current": "Amps",
"energy": "Joules",
"cfm": "CFM",
}
async def hello():
protocol = "ws"
if args.ssl:
protocol += "s"
uri = "{}://{}/subscribe".format(protocol, args.host)
ssl_context = ssl.SSLContext()
authbytes = "{}:{}".format(args.username, args.password).encode("ascii")
auth = "Basic {}".format(base64.b64encode(authbytes).decode("ascii"))
headers = {"Authorization": auth}
async with (
websockets.connect(uri, ssl=ssl_context, additional_headers=headers)
if args.ssl
else websockets.connect(uri, extra_headers=headers)
) as websocket:
request = json.dumps(
{
"paths": ["/xyz/openbmc_project/sensors"],
"interfaces": ["xyz.openbmc_project.Sensor.Value"],
}
)
await websocket.send(request)
while True:
payload = await websocket.recv()
j = json.loads(payload)
path = j.get("path", "unknown/unknown")
name = path.split("/")[-1]
sensor_type = path.split("/")[-2]
units = sensor_type_map.get(sensor_type, "")
properties = j.get("properties", [])
value = properties.get("Value", None)
if value is None:
continue
print(f"{name:<20} {value:4.02f} {units}")
asyncio.get_event_loop().run_until_complete(hello())