-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscript.py
52 lines (42 loc) · 1.34 KB
/
script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio
import functools
import os
import python_weather
import enapter
async def main():
async with python_weather.Client() as client:
device_factory = functools.partial(
WttrIn,
client=client,
location=os.environ["WTTR_IN_LOCATION"],
)
await enapter.vucm.run(device_factory)
class WttrIn(enapter.vucm.Device):
def __init__(self, client, location, **kwargs):
super().__init__(**kwargs)
self.client = client
self.location = location
async def task_properties_sender(self):
while True:
await self.send_properties(
{
"location": self.location,
}
)
await asyncio.sleep(10)
async def task_telemetry_sender(self):
while True:
try:
weather = await self.client.get(self.location)
await self.send_telemetry(
{
"temperature": weather.current.temperature,
}
)
self.alerts.clear()
except Exception as e:
self.alerts.add("wttr_in_error")
await self.log.error(f"failed to get weather: {e}")
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())