-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
42 lines (32 loc) · 1.06 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio
import logging
from datetime import datetime, timedelta
from threading import Event, Thread
from src import config
from src.core import Core
from src.rss.feedparser import FeedParser
from src.telegram.bot import TelegramBot
logger = logging.getLogger(__name__)
exit_event = Event()
class ParseJob(Thread):
run_job = True
def run(self) -> None: # noqa: PLR6301
try:
while not exit_event.is_set():
try:
asyncio.run(FeedParser.run())
except Exception:
logger.exception("Error while parsing / sending deals")
logger.info(
"Next feedparser-run: %s",
datetime.now(tz=config.TIMEZONE) + timedelta(seconds=config.PARSE_INTERVAL),
)
exit_event.wait(config.PARSE_INTERVAL)
except (KeyboardInterrupt, SystemExit):
pass
if __name__ == "__main__":
Core.init()
thread = ParseJob()
thread.start()
asyncio.run(TelegramBot.run_bot())
exit_event.set()