-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathlib.py
More file actions
133 lines (101 loc) · 3.19 KB
/
lib.py
File metadata and controls
133 lines (101 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import socket
import time
def manchester_encode(frame: int, invert: bool = False) -> int:
"""
Manchester encodes a 32 bit frame into a 64 bit integer.
"""
if invert:
one = 1
zero = 2
else:
one = 2
zero = 1
mframe = 0
mask = 0x80000000
while mask:
if frame & mask:
mframe <<= 2
mframe |= one
else:
mframe <<= 2
mframe |= zero
mask >>= 1
return mframe
def manchester_decode(mframe: int, invert: bool = False) -> int:
"""
Manchester decodes a 64 bit integer into a 32 bit frame.
Will raise ValueError if decoding fails.
"""
if invert:
one = 0
zero = 1
else:
one = 1
zero = 0
frame = 0
mask = 0x8000000000000000
mask2 = 0x4000000000000000
for i in range(32):
if mframe & mask:
if mframe & mask2:
print(f"ERROR: Manchester decoding error at bit {i}")
raise ValueError("Manchester decoding error")
frame <<= 1
frame |= one
else:
if not (mframe & mask2):
print(f"ERROR: Manchester decoding error at bit {i}")
raise ValueError("Manchester decoding error")
frame <<= 1
frame |= zero
mask >>= 2
mask2 >>= 2
return frame
def frame_encode(msg_type: int, data_id: int, data_value: int) -> int:
"""
Encodes opentherm info into a 32 bit network ordered frame
"""
frame = 0
frame |= (msg_type & 0x07) << 28
frame |= (data_id & 0xff) << 16
frame |= (data_value & 0xffff)
if bin(frame).count("1") & 1: # parity bit
frame |= 0x80000000
return frame
def frame_decode(frame: int) -> tuple[int, int, int]:
"""
Decodes a 32 bit frame into opentherm info.
Will raise ValueError if parity bit is incorrect.
"""
parity = bin(frame).count("1")
if parity & 1:
print(f"ERROR: Parity bit error, frame: {hex(frame)}")
raise ValueError("Parity bit error")
# OT spec 4.2.3: spare bits (27-24) should always be 0
if (frame >> 24) & 0x0F:
send_syslog(f"WARNING: Non-zero spare bits in frame: {hex(frame)}")
msg_type = (frame >> 28) & 0x07
data_id = (frame >> 16) & 0xff
data_value = frame & 0xffff
return msg_type, data_id, data_value
def s8(x: int) -> int:
return ((x & 0xff) ^ 0x80) - 0x80
def s16(x: int) -> int:
return ((x & 0xffff) ^ 0x8000) - 0x8000
def f88(x: int) -> float:
return s16(x) / 256
def send_syslog(message, port=514, hostname="picotherm", appname="main", procid="-", msgid="-"):
print(message)
syslog_addr = ('255.255.255.255', port)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
pri = 13 # user.notice
version = 1
# Omit timestamp entirely - device doesn't know the time
syslog_msg = f"<{pri}>{version} {hostname} {appname} {procid} {msgid} - {message}\r\n".encode('utf-8')
try:
sock.sendto(syslog_msg, syslog_addr)
except Exception as ex:
print(f"Syslog send failed: {ex}")
finally:
sock.close()