-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
✅ Update blockwords check and hook modules
- Loading branch information
1 parent
08d771e
commit cdeafd8
Showing
6 changed files
with
182 additions
and
79 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from typing import Any, List | ||
|
||
from nonebot.log import logger | ||
from nonebot.adapters import Event as BaseEvent | ||
|
||
from ..config import plugin_config | ||
from ..typings import Message, BotSendFilter | ||
from ..exception import StopSendMessage, PauseSendMessage, FinishSendMessage | ||
|
||
_bot_send_hooks: List[BotSendFilter] = [] | ||
|
||
|
||
def on_bot_send(func: BotSendFilter) -> BotSendFilter: | ||
"""注册一个 bot send hook | ||
```python | ||
@on_bot_send | ||
async def _(event: Event, message: Message) -> Message: | ||
# do something | ||
return message | ||
``` | ||
""" | ||
_bot_send_hooks.append(func) | ||
return func | ||
|
||
|
||
def clear_send_hook() -> None: | ||
"""清空所有 bot send hook""" | ||
_bot_send_hooks.clear() | ||
|
||
|
||
async def send_hook( | ||
send, | ||
event: BaseEvent, | ||
message: Message, | ||
**kwargs: Any, | ||
): | ||
if not plugin_config.blockwords_bot: | ||
return await send(event, message, **kwargs) | ||
for hook in _bot_send_hooks: | ||
try: | ||
message = await hook(event, message, **kwargs) | ||
except FinishSendMessage as err: | ||
return await send(event, err.message, **kwargs) | ||
except PauseSendMessage as err: | ||
await send(event, err.message, **kwargs) | ||
except StopSendMessage: | ||
break | ||
except Exception as err: | ||
logger.opt(colors=True, exception=err).error( | ||
"<r><bg #f8bbd0>Error in bot send hook</bg #f8bbd0></r>" | ||
) | ||
raise err | ||
else: | ||
return await send(event, message, **kwargs) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,84 @@ | ||
import json | ||
from typing import List | ||
from pathlib import Path | ||
from typing import List, Optional | ||
|
||
from genericpath import isdir | ||
from nonebot.log import logger | ||
|
||
from .config import plugin_config | ||
|
||
global_words: Optional[List[str]] = None | ||
read_filed: List[Path] = [] | ||
|
||
def get_blockword() -> List[str]: | ||
|
||
def get_blockwords() -> List[str]: | ||
"""获取屏蔽词 | ||
在配置文件中没有配置blockwords_file时,会读取默认的屏蔽词文件夹中的所有文件 | ||
Returns: | ||
List[str]: 屏蔽词列表 | ||
""" | ||
words = plugin_config.blockwords.copy() | ||
if isinstance(plugin_config.blockwords_file, str): | ||
words.extend(read_words(Path(plugin_config.blockwords_file))) | ||
logger.success(f"读取屏蔽词文件 << {plugin_config.blockwords_file}") | ||
elif isinstance(plugin_config.blockwords_file, list): | ||
for file_path in plugin_config.blockwords_file: | ||
words.extend(read_words(Path(file_path))) | ||
logger.success(f"读取屏蔽词文件 << {file_path}") | ||
words = list(set(words)) # 去重 | ||
words.sort(key=len, reverse=True) # 按长度排序 | ||
return words | ||
|
||
|
||
def read_words(file_path: Path) -> List[str]: | ||
text = file_path.read_text(encoding="utf-8") | ||
try: | ||
words = json.loads(text) | ||
if isinstance(words, list): | ||
return words | ||
logger.error(f"{file_path} 屏蔽词文件格式并不是一个 list") | ||
except ValueError: | ||
return text.split("\n") | ||
global global_words | ||
if global_words is None: | ||
words = plugin_config.blockwords.copy() | ||
if isinstance(plugin_config.blockwords_file, str): | ||
words: List[str] = read_blockwords( | ||
Path(plugin_config.blockwords_file)) | ||
elif isinstance(plugin_config.blockwords_file, list): | ||
for file_path in plugin_config.blockwords_file: | ||
words.extend(read_blockwords(Path(file_path))) | ||
global_words = sorted(set(words), key=len, reverse=True) | ||
return global_words | ||
|
||
|
||
def read_file_blockwords(file_path: Path) -> List[str]: | ||
"""读取文件屏蔽词 | ||
如果文件是`.json`格式结尾就会进行`json`解析 | ||
如果文件是`.txt`格式结尾就会按行读取 | ||
Args: | ||
file_path (Path): 文件路径 | ||
Returns: | ||
List[str]: 屏蔽词内容 | ||
""" | ||
if file_path not in read_filed: | ||
read_filed.append(file_path) | ||
text = file_path.read_text(encoding="utf-8") | ||
logger.success(f"读取屏蔽词文件 << {file_path}") | ||
try: | ||
if file_path.suffix == ".json": | ||
if isinstance(words := json.loads(text), list): | ||
return words | ||
logger.error(f"{file_path} 屏蔽词文件格式并不是一个 list") | ||
else: | ||
return text.split("\n") | ||
except ValueError: | ||
logger.error(f"{file_path} 屏蔽词文件格式错误") | ||
else: | ||
logger.warning(f"{file_path} 屏蔽词文件已读取过") | ||
return [] | ||
|
||
|
||
def read_blockwords(file_path: Path) -> List[str]: | ||
"""读取屏蔽词 | ||
传入路径后回递归读取文件夹中的所有`.json`和`.txt`文件 | ||
Args: | ||
file_path (Path): 文件或文件夹路径 | ||
Returns: | ||
List[str]: 屏蔽词 | ||
""" | ||
if file_path.is_dir(): | ||
bk = [] | ||
for file in file_path.iterdir(): | ||
bk.extend(read_blockwords(file)) | ||
return bk | ||
elif file_path.suffix in [".json", ".txt"]: | ||
return read_file_blockwords(file_path) | ||
return [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
[tool.poetry] | ||
name = "nonebot-plugin-blockwords" | ||
version = "0.1.2" | ||
version = "0.2.0" | ||
description = "nonebot聊天屏蔽词插件" | ||
authors = ["MelodyKnit <[email protected]>"] | ||
license = "MIT" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
from pathlib import Path | ||
|
||
import pytest | ||
from nonebug import App | ||
|
||
|
||
@pytest.mark.asyncio() | ||
async def test_send(app: App): | ||
from nonebot_plugin_blockwords.utils import ( | ||
read_filed, | ||
get_blockwords, | ||
read_blockwords, | ||
) | ||
|
||
# 测试屏蔽词从长倒短排序 | ||
assert [len(i) for i in get_blockwords()] == [2, 2, 1, 1] | ||
|
||
# 测试屏蔽词读取 | ||
|
||
test_path = Path.cwd() | ||
|
||
# 测试读取txt | ||
test_txt_path = test_path / "test_read.txt" | ||
test_txt_path.write_text("鸡\n坤\n太美\n丁真", encoding="utf-8") | ||
assert read_blockwords(test_txt_path) == ["鸡", "坤", "太美", "丁真"] | ||
|
||
# 测试读取json | ||
|
||
test_json_path = test_path / "test_read.json" | ||
test_json_path.write_text('["哪里","贵","79元","眉笔"]', encoding="utf-8") | ||
assert read_blockwords(test_json_path) == ["哪里", "贵", "79元", "眉笔"] | ||
|
||
# 测试读取重复文件 | ||
assert read_blockwords(test_json_path) == [] | ||
|
||
# 测试读取全部文件 | ||
read_filed.clear() | ||
assert not bool(set(read_blockwords(test_path)).difference( | ||
{"哪里", "贵", "79元", "眉笔", "鸡", "坤", "太美", "丁真"} | ||
)) | ||
|
||
test_txt_path.unlink(missing_ok=True) | ||
test_json_path.unlink(missing_ok=True) |