Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions _conf_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,6 @@
"description": "图片传输使用 Base64 编码",
"default": true,
"hint": "启用后图片会被转为 Base64 编码传输。关闭后将直接传递文件路径或 URL 给 OneBot 实现端,适用于 go-cqhttp、Lagrange 等支持本地路径的实现,可降低内存占用。"
},
"enable_analysis_reply": {
"type": "bool",
"description": "发送文本/表情回复",
"default": false,
"hint": "开启后,/群分析 将通过发送文本消息提示进度;关闭(默认)则使用表情回应。"
}
}
},
Expand Down
10 changes: 3 additions & 7 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,14 +504,10 @@ async def analyze_group_daily(
)
TraceContext.set(trace_id)

# 表情回应 或 文本提示(二选一,由配置开关控制)
# 使用表情回应代替文本回复
adapter = self.bot_manager.get_adapter(platform_id)
orig_msg_id = getattr(event.message_obj, "message_id", None)
use_text_reply = self.config_manager.get_enable_analysis_reply()

if use_text_reply:
yield event.plain_result("🔍 正在启动分析引擎,正在拉取最近消息...")
elif adapter and orig_msg_id:
if adapter and orig_msg_id:
await adapter.set_reaction(event.get_group_id(), orig_msg_id, "🔍") # 🔍

try:
Expand All @@ -528,7 +524,7 @@ async def analyze_group_daily(
yield event.plain_result("❌ 分析失败,原因未知")
return

if not use_text_reply and adapter and orig_msg_id:
if adapter and orig_msg_id:
await adapter.set_reaction(
event.get_group_id(), orig_msg_id, "📊"
) # 📊
Expand Down
36 changes: 36 additions & 0 deletions src/domain/services/message_cleaner_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,39 @@ def clean_messages(
cleaned_list.append(new_msg)

return cleaned_list

@staticmethod
def sanitize_chat_text(text: str) -> str:
"""
Remove HTML tags, Base64 data URIs, and control characters from chat text
before LLM processing.

This prevents contaminated data from breaking JSON parsing in LLM responses
and causing layout issues in rendered HTML reports.

Args:
text: Raw chat message text

Returns:
Cleaned text with only plain content
"""
if not text:
return ""

# Remove Base64 data URIs first (longer pattern, should be removed before tags)
text = re.sub(r"data:[^;]+;base64,[A-Za-z0-9+/=]+", "", text)

# Remove HTML tags (including self-closing and attributes)
text = re.sub(r"<[^>]+>", "", text)

# Remove control characters (ASCII 0x00-0x1F, 0x7F-0x9F)
text = re.sub(r"[\x00-\x1f\x7f-\x9f]", "", text)

# Normalize Unicode quotes to ASCII equivalents
text = text.replace("\u201c", '"').replace("\u201d", '"') # " "
text = text.replace("\u2018", "'").replace("\u2019", "'") # ' '

# Clean up extra whitespace from removals
text = re.sub(r"\s+", " ", text).strip()

return text
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import datetime

from ....domain.models.data_models import QualityDimension, QualityReview, TokenUsage
from ....domain.services.message_cleaner_service import MessageCleanerService
from ....utils.logger import logger
from ..utils import InfoUtils
from ..utils.json_utils import extract_quality_with_regex, parse_json_object_response
Expand Down Expand Up @@ -79,7 +80,9 @@ def build_prompt(self, data: list[dict]) -> str:

combined_text = "".join(text_parts).strip()
if combined_text and not combined_text.startswith("/"):
text_messages.append(f"[{msg_time}] [{nickname}]: {combined_text}")
sanitized_text = MessageCleanerService.sanitize_chat_text(combined_text)
if sanitized_text:
text_messages.append(f"[{msg_time}] [{nickname}]: {sanitized_text}")

messages_text = "\n".join(text_messages[:1000])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datetime import datetime

from ....domain.models.data_models import GoldenQuote, TokenUsage
from ....domain.services.message_cleaner_service import MessageCleanerService
from ....utils.logger import logger
from ..utils import InfoUtils
from ..utils.json_utils import extract_golden_quotes_with_regex
Expand Down Expand Up @@ -54,8 +55,12 @@ def build_prompt(self, data: list[dict]) -> str:
return ""

# 构建消息文本 (用 [user_id] 替代 nickname 以确保回填 100% 准确,避免 Emoji 等干扰)
# 应用 sanitize_chat_text 清洗 HTML 标签和 Base64 数据
messages_text = "\n".join(
[f"[{msg['time']}] [{msg['user_id']}]: {msg['content']}" for msg in data]
[
f"[{msg['time']}] [{msg['user_id']}]: {MessageCleanerService.sanitize_chat_text(msg['content'])}"
for msg in data
]
)

max_golden_quotes = self.get_max_count()
Expand Down
4 changes: 3 additions & 1 deletion src/infrastructure/analysis/analyzers/topic_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime

from ....domain.models.data_models import SummaryTopic, TokenUsage
from ....domain.services.message_cleaner_service import MessageCleanerService
from ....utils.logger import logger
from ..utils import InfoUtils
from ..utils.json_utils import extract_topics_with_regex
Expand Down Expand Up @@ -149,9 +150,10 @@ def build_prompt(self, data: list[dict]) -> str:

# 构建消息文本
# 使用用户提供的 ID-Only 格式: [HH:MM] [用户ID]: 消息内容
# 应用 sanitize_chat_text 清洗 HTML 标签和 Base64 数据
messages_text = "\n".join(
[
f"[{msg['time']}] [{msg['user_id']}]: {msg['content']}"
f"[{msg['time']}] [{msg['user_id']}]: {MessageCleanerService.sanitize_chat_text(msg['content'])}"
for msg in text_messages
]
)
Expand Down
12 changes: 8 additions & 4 deletions src/infrastructure/analysis/analyzers/user_title_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""

from ....domain.models.data_models import TokenUsage, UserTitle
from ....domain.services.message_cleaner_service import MessageCleanerService
from ....utils.logger import logger
from ..utils.json_utils import extract_user_titles_with_regex
from .base_analyzer import BaseAnalyzer
Expand Down Expand Up @@ -51,12 +52,15 @@ def build_prompt(self, data: dict) -> str:
return ""

# 构建用户数据文本
# 应用 sanitize_chat_text 清洗 HTML 标签和 Base64 数据
users_text = "\n".join(
[
f"- {user['name']} (ID:{user['user_id']}): "
f"发言{user['message_count']}条, 平均{user['avg_chars']}字, "
f"表情比例{user['emoji_ratio']}, 夜间发言比例{user['night_ratio']}, "
f"回复比例{user['reply_ratio']}"
(
f"- {MessageCleanerService.sanitize_chat_text(user['name'])} (ID:{user['user_id']}): "
f"发言{user['message_count']}条, 平均{user['avg_chars']}字, "
f"表情比例{user['emoji_ratio']}, 夜间发言比例{user['night_ratio']}, "
f"回复比例{user['reply_ratio']}"
)
for user in user_summaries
]
)
Expand Down
114 changes: 73 additions & 41 deletions src/infrastructure/analysis/utils/json_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,57 @@
from ....utils.logger import logger


def _extract_json_balanced(text: str, open_char: str, close_char: str) -> str | None:
"""
Extract the first complete JSON array or object from text, handling strings correctly.

This function properly handles:
- Strings containing ] or } characters
- Escaped characters within strings
- Nested structures

Args:
text: Text containing JSON
open_char: Opening bracket/brace ('[' or '{')
close_char: Closing bracket/brace (']' or '}')

Returns:
The extracted JSON string, or None if not found
"""
start = text.find(open_char)
if start == -1:
return None

depth = 0
in_string = False
escape_next = False

for i, char in enumerate(text[start:], start):
if escape_next:
escape_next = False
continue

if char == "\\":
escape_next = True
continue

if char == '"' and not escape_next:
in_string = not in_string
continue

if in_string:
continue

if char == open_char:
depth += 1
elif char == close_char:
depth -= 1
if depth == 0:
return text[start : i + 1]

return None


def fix_json(text: str) -> str:
"""
修复JSON格式问题,包括中文符号替换
Expand All @@ -28,49 +79,32 @@ def fix_json(text: str) -> str:
text = text.replace("\n", " ").replace("\r", " ")
text = re.sub(r"\s+", " ", text)

# 3. 替换中文符号为英文符号(修复)
# 中文引号 -> 英文引号
text = text.replace("“", '"').replace("”", '"')
text = text.replace("‘", "'").replace("’", "'")
# 中文逗号 -> 英文逗号
text = text.replace(",", ",")
# 中文冒号 -> 英文冒号
text = text.replace(":", ":")
# 中文括号 -> 英文括号
text = text.replace("(", "(").replace(")", ")")
text = text.replace("【", "[").replace("】", "]")

# 4. 处理字符串内容中的特殊字符
# 转义字符串内的双引号
def escape_quotes_in_strings(match):
content = match.group(1)
# 转义内部的双引号
content = content.replace('"', '\\"')
return f'"{content}"'

# 先处理字段值中的引号
text = re.sub(r'"([^"]*(?:"[^"]*)*)"', escape_quotes_in_strings, text)

# 5. 修复截断的JSON
# 3. 替换中文符号为英文符号
text = text.replace("\u201c", '"').replace("\u201d", '"')
text = text.replace("\u2018", "'").replace("\u2019", "'")
text = text.replace("\uff0c", ",")
text = text.replace("\uff1a", ":")
text = text.replace("\uff08", "(").replace("\uff09", ")")
text = text.replace("\u3010", "[").replace("\u3011", "]")

# 4. 修复截断的JSON
if not text.endswith("]"):
last_complete = text.rfind("}")
if last_complete > 0:
text = text[: last_complete + 1] + "]"

# 6. 修复常见的JSON格式问题
# 1. 修复缺失的逗号
# 5. 修复缺失的逗号
text = re.sub(r"}\s*{", "}, {", text)

# 2. 确保字段名有引号(仅在对象开始或逗号后,避免破坏字符串值)
# 6. 确保字段名有引号
def quote_field_names(match):
prefix = match.group(1)
key = match.group(2)
return f'{prefix}"{key}":'

# 只在 { 或 , 后面匹配字段名,避免在字符串值中误匹配
text = re.sub(r"([{,]\s*)([a-zA-Z_][a-zA-Z0-9_]*)\s*:", quote_field_names, text)

# 3. 移除多余的逗号
# 7. 移除多余的逗号
text = re.sub(r",\s*}", "}", text)
text = re.sub(r",\s*]", "]", text)

Expand All @@ -96,14 +130,13 @@ def parse_json_response(
"""
fixed_json_text = None
try:
# 1. 提取JSON部分
json_match = re.search(r"\[.*?\]", result_text, re.DOTALL)
if not json_match:
# 1. 提取JSON数组(使用balanced extraction处理字符串内的]字符)
json_text = _extract_json_balanced(result_text, "[", "]")
if not json_text:
error_msg = f"{data_type}响应中未找到JSON格式"
logger.warning(error_msg)
return False, None, error_msg

json_text = json_match.group()
logger.debug(f"{data_type}分析JSON原文: {json_text[:500]}...")

# 2. 尝试直接解析
Expand Down Expand Up @@ -162,17 +195,16 @@ def parse_json_object_response(
raw_text = re.sub(r"```\s*$", "", raw_text)
raw_text = raw_text.strip()

# 2. 提取 JSON 对象
json_match = re.search(r"\{.*\}", raw_text, re.DOTALL)
if not json_match:
# 2. 提取 JSON 对象(使用balanced extraction处理字符串内的}字符)
json_text = _extract_json_balanced(raw_text, "{", "}")
if not json_text:
error_msg = f"{data_type}响应中未找到JSON对象"
logger.warning(error_msg)
return False, None, error_msg

json_text = json_match.group()
logger.debug(f"{data_type}分析JSON原文: {json_text[:500]}...")

# 3. 尝试直接解析(保留原始文本,避免中文引号被破坏)
# 3. 尝试直接解析
try:
data = json.loads(json_text)
logger.info(f"{data_type}直接解析成功")
Expand All @@ -182,10 +214,10 @@ def parse_json_object_response(

# 4. 使用 fix_json 修复后重试
fixed_json = fix_json(json_text)
fixed_match = re.search(r"\{.*\}", fixed_json, re.DOTALL)
if fixed_match:
fixed_text = _extract_json_balanced(fixed_json, "{", "}")
if fixed_text:
try:
data = json.loads(fixed_match.group())
data = json.loads(fixed_text)
logger.info(f"{data_type}修复后解析成功")
return True, data, None
except json.JSONDecodeError as e:
Expand Down
9 changes: 0 additions & 9 deletions src/infrastructure/config/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,15 +466,6 @@ def get_enable_user_card(self) -> bool:
"""获取是否使用用户群名片"""
return self._get_group("basic").get("enable_user_card", False)

def get_enable_analysis_reply(self) -> bool:
"""获取是否在群分析完成后发送文本回复"""
return self._get_group("basic").get("enable_analysis_reply", False)

def set_enable_analysis_reply(self, enabled: bool):
"""设置是否在群分析完成后发送文本回复"""
self._ensure_group("basic")["enable_analysis_reply"] = enabled
self.config.save_config()

# ========== 群文件/群相册上传配置 ==========

def get_enable_group_file_upload(self) -> bool:
Expand Down
Loading