5
5
from contextlib import suppress
6
6
from typing import Any , Dict , Generator , List , Mapping , Optional
7
7
8
+ from aiohttp import ClientSession
8
9
from cachetools import TTLCache
9
10
from cachetools .keys import hashkey
10
- from nonebot import get_bot , require
11
+ from nonebot import get_bot as _get_bot
12
+ from nonebot import require
11
13
from nonebot .adapters .onebot .v11 import Bot
12
14
from nonebot .log import logger
13
15
19
21
from nonebot_plugin_guild_patch .permission import GUILD_ADMIN , GUILD_OWNER # noqa
20
22
21
23
from .config import config
24
+ from .parsing .utils import get_proxy
25
+
26
+ bot_offline = False
22
27
23
28
24
29
def get_http_caching_headers (
@@ -113,7 +118,11 @@ def get_torrent_b16_hash(content: bytes) -> str:
113
118
return str (b16_hash , "utf-8" )
114
119
115
120
116
- async def send_message_to_admin (message : str , bot : Bot ) -> None :
121
+ async def send_message_to_admin (message : str , bot : Optional [Bot ] = None ) -> None :
122
+ if bot is None :
123
+ bot : Bot = await get_bot () # type: ignore
124
+ if bot is None :
125
+ return
117
126
await bot .send_private_msg (user_id = int (list (config .superusers )[0 ]), message = message )
118
127
119
128
@@ -129,7 +138,9 @@ async def send_msg(
129
138
130
139
发送消息到私聊或群聊
131
140
"""
132
- bot : Bot = get_bot () # type: ignore
141
+ bot : Bot = await get_bot () # type: ignore
142
+ if bot is None :
143
+ raise ValueError ("There are not bots to get." )
133
144
msg_id = []
134
145
if group_ids :
135
146
for group_id in group_ids :
@@ -208,3 +219,32 @@ def partition_list(
208
219
) -> Generator [List [Any ], None , None ]:
209
220
for i in range (0 , len (input_list ), partition_size ):
210
221
yield input_list [i : i + partition_size ]
222
+
223
+
224
+ async def send_message_to_telegram_admin (message : str ) -> None :
225
+ try :
226
+ async with ClientSession (raise_for_status = True ) as session :
227
+ await session .post (
228
+ f"https://api.telegram.org/bot{ config .telegram_bot_token } /sendMessage" ,
229
+ json = {
230
+ "chat_id" : config .telegram_admin_ids [0 ],
231
+ "text" : message ,
232
+ },
233
+ proxy = get_proxy (),
234
+ )
235
+ except Exception as e :
236
+ logger .error (f"发送到 Telegram 失败:\n { e } " )
237
+
238
+
239
+ async def get_bot () -> Optional [Bot ]:
240
+ global bot_offline
241
+ bot : Optional [Bot ] = None
242
+ try :
243
+ bot = _get_bot () # type: ignore
244
+ bot_offline = False
245
+ except ValueError :
246
+ if not bot_offline and config .telegram_admin_id and config .telegram_bot_token :
247
+ await send_message_to_telegram_admin ("QQ Bot 已离线!" )
248
+ logger .warning ("Bot 已离线!" )
249
+ bot_offline = True
250
+ return bot
0 commit comments