Feat/add ai analysis notify #8202
Conversation
|
hanouyang seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
Codecov Report❌ Patch coverage is
❌ Your patch status has failed because the patch coverage (29.94%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## master #8202 +/- ##
==========================================
- Coverage 58.31% 58.16% -0.15%
==========================================
Files 677 678 +1
Lines 37036 37203 +167
==========================================
+ Hits 21597 21639 +42
- Misses 15439 15564 +125
... and 2 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
gcloud/template_base/models.py
Outdated
| notify_receivers = models.TextField(_("流程事件通知人"), default="{}") | ||
| time_out = models.IntegerField(_("流程超时时间(分钟)"), default=20) | ||
| is_deleted = models.BooleanField(_("是否删除"), default=False) | ||
| ai_analysis_notify_person = models.TextField(_("AI分析通知个人通知方式"), default="{}") |
There was a problem hiding this comment.
参考之前的风格,命名为ai_notify_type比较合适
There was a problem hiding this comment.
✅ 此问题似乎已被最近的更改解决。命名已统一为 ai_notify_type,与项目现有风格保持一致。
gcloud/template_base/models.py
Outdated
| time_out = models.IntegerField(_("流程超时时间(分钟)"), default=20) | ||
| is_deleted = models.BooleanField(_("是否删除"), default=False) | ||
| ai_analysis_notify_person = models.TextField(_("AI分析通知个人通知方式"), default="{}") | ||
| ai_analysis_notify_group = models.TextField(_("AI分析群聊通知"), default="{}") |
There was a problem hiding this comment.
同上,另外,这两个字段是否可以考虑用JsonField
| operations = [ | ||
| migrations.RemoveField( | ||
| model_name="tasktemplate", | ||
| name="ai_analysis_notify_group_chat_id", |
There was a problem hiding this comment.
这里上面加了一个字段,下面有remove掉,这种无用功,还会造成DB负担,不应该提交上来,这种应该你本地消化掉的
There was a problem hiding this comment.
✅ 此问题似乎已被最近的更改解决。无关的 migrations(0011、0021)均已从本次 PR 中移除。
gcloud/taskflow3/models.py
Outdated
| return notify_type if isinstance(notify_type, dict) else {"success": notify_type, "fail": notify_type} | ||
|
|
||
| # 获取AI分析报告通知方式 个人通知 | ||
| def get_ai_analysis_notify_person(self): |
There was a problem hiding this comment.
下面的models使用JsonField就不需要这样每次读取都进行json操作了
| try: | ||
| taskflow = TaskFlowInstance.objects.get(pipeline_instance__instance_id=pipeline_id) | ||
|
|
||
| bk_biz_id = taskflow.project.bk_biz_id |
There was a problem hiding this comment.
这一块的功能最好整体都用一个开关来进行控制,包括这个开关需要给到前端,可以参考context_processor文件,前端通过这个开关来隐藏掉相关功能
There was a problem hiding this comment.
✅ 此问题似乎已被最近的更改解决。已添加 ENABLE_AI_NOTIFICATION 开关(env.py),并在 signals/handlers.py 中通过该开关控制 AI 通知任务的触发,同时通过 context_processors.py 将开关状态暴露给前端。
| @@ -0,0 +1,35 @@ | |||
| # Generated by Django 3.2.25 on 2026-02-09 07:14 | |||
There was a problem hiding this comment.
跟功能无关的migrations尽量别提交上来,可以指定某个模块来生成migrations的
There was a problem hiding this comment.
✅ 此问题似乎已被最近的更改解决。无关的 migrations(0011、0021)均已从本次 PR 中移除。
api/ai_sops_agent.py
Outdated
|
|
||
| class BKSopsAgentClient: | ||
| def __init__(self): | ||
| self.app_code = env.BK_SOPS_APP_CODE |
There was a problem hiding this comment.
当前项目自己的app_code,是可以通过settings.APP_CODE来获取的,这里另外用一个变量是否有别的考虑
There was a problem hiding this comment.
这里我看开发者中心项目的app_code是bksops, 然后智能体网关授权的app_code也是bksops。 当前项目的APP_CODE好像是bk_sops
api/ai_sops_agent.py
Outdated
|
|
||
| def _make_request(self, method, params=None, data=None, timeout=120): | ||
| try: | ||
| url = f"{self.host}/{self.apigw_environment}/invoke/1.0.0assistant/" |
There was a problem hiding this comment.
✅ 此问题似乎已被最近的更改解决。版本号已改为通过 env.AGENT_VERSION 环境变量动态获取,不再硬编码。
| logger = logging.getLogger("root") | ||
|
|
||
|
|
||
| class BKSopsAgentClient: |
There was a problem hiding this comment.
chia那边也有需要调用到agent api的需求,确认下这里的逻辑是否可以复用吧
There was a problem hiding this comment.
可以将host作为实例初始化参数传入,修改后可以创建对应不同智能体的网关实例
8baf40f to
2a85644
Compare
There was a problem hiding this comment.
🔍 代码审查
已复核先前的评论,部分问题已在最新提交中修复。以下是针对当前变更的新反馈:
🚨 严重问题
api/ai_sops_agent.py:68 - 🔒 敏感信息泄露风险
使用 settings.SECRET_KEY 作为 app_secret 不安全。SECRET_KEY 是 Django 的密钥,不应用于外部 API 认证。
gcloud/taskflow3/celery/tasks.py:474 -
crypto.decrypt() 可能抛出异常但未捕获,会导致群聊通知任务失败。建议在 try-except 块中处理。
gcloud/taskflow3/celery/tasks.py:342 - ⚡ 阻塞操作
AI 分析调用可能耗时较长,应设置合理的超时时间并在异常时优雅降级。
⚠️ 需要关注
gcloud/taskflow3/celery/tasks.py:360-361 - 📝 重复调用
任务失败时会发送两次相同的 task_summary,确认是否为预期行为。
gcloud/taskflow3/celery/tasks.py:421 - 🔍 空值检查不完整
检查了 chat_id、url、mentioned_str 但未检查 group_notify_config 本身是否为空。
api/ai_sops_agent.py:96 - ⚡ 默认超时过长
默认 300 秒超时可能导致 Celery 任务堆积,建议根据实际场景调整。
gcloud/taskflow3/signals/handlers.py:107 - 🔍 排除周期任务的原因
代码排除了周期任务的 AI 通知,但未注释说明原因,建议添加。
✨ 改进建议
- 考虑添加 AI 通知功能开关的日志,便于排查问题
- 建议对 AI Agent 响应进行格式验证
mentioned_str构建时应处理空列表情况
总结:功能实现完整,但在安全性和异常处理方面需要加强。优先修复密钥使用和解密异常处理问题。
2a85644 to
043f7ee
Compare
There was a problem hiding this comment.
🔍 代码审查总结
本次PR添加了AI分析通知功能。发现以下高优先级问题需要处理:
🚨 严重问题
- 密钥解密失败处理不足 -
chat_id解密失败时返回None可能导致后续逻辑错误 - 网络请求缺少错误处理 - 群聊通知中的
requests.post可能因超时或网络问题导致任务失败 - 日志记录不完整 - 多处
except Exception仅记录了异常但未保留完整堆栈信息
⚡ 性能与资源
- 重复 HTTP 请求 - 失败场景下对同一 webhook 连续发送两次请求,应考虑合并
- 超时配置过长 - Agent API 默认超时 300 秒可能导致 Celery 任务长时间挂起
⚠️ 代码质量
- 变量命名拼写错误 -
annalyze_task_error应为analyze_task_error - 硬编码字符串 - 多处标题和消息模板应提取为常量
✅ 建议
- 添加单元测试覆盖 AI 通知场景
- 考虑添加熔断机制防止 Agent API 不可用时影响主流程
- 补充异常场景的降级处理逻辑
严重程度统计: 🚨 3个严重 | ⚡ 2个性能 |
|
|
||
| else: | ||
| return None, None, None | ||
|
|
There was a problem hiding this comment.
🔒 密钥解密未验证返回值
如果 crypto.decrypt() 失败返回空,后续会导致请求失败。建议添加校验
| # 消息发送 | ||
| if msg_type == ATOM_FAILED: | ||
| content = "{}\n{}\n".format(str(task_summary), mentioned_str) | ||
| resp = requests.post( |
There was a problem hiding this comment.
🚨 网络请求缺少异常处理
requests.post 可能抛出异常(超时、连接错误),应添加 try-except 包裹
|
|
||
| if msg_type == ATOM_FAILED and task_error_analysis and task_summary: | ||
|
|
||
| notify_type = ai_analysis_notify_types.get("fail", []) |
There was a problem hiding this comment.
建议使用 logger.exception(f"AIAnalysisNotify error: {e}") 以保留完整堆栈
|
|
||
| # 消息发送 | ||
| if msg_type == ATOM_FAILED: | ||
| content = "{}\n{}\n".format(str(task_summary), mentioned_str) |
There was a problem hiding this comment.
⚡ 避免重复请求
失败时连续发两次 POST,考虑合并为一次请求发送两段内容
| """ | ||
|
|
||
| # 默认超时时间(秒) | ||
| DEFAULT_TIMEOUT = 300 |
There was a problem hiding this comment.
⚡ 超时时间过长
DEFAULT_TIMEOUT=300s 可能导致 Celery 任务长时间阻塞,建议降至 30-60s
| user_input = f"使用summarize_task_execution这个工具,帮我总结一下我的任务执行情况 ,业务ID是{bk_biz_id},任务ID是 {task_id}" | ||
| agent_output = self.call_agent_apigw(user_input=user_input) | ||
| output = agent_output.get("outputs", {}).get("output", {}) | ||
| if not output: |
There was a problem hiding this comment.
annalyze_task_error 应修正为 analyze_task_error
|
|
||
| # 发送任务总结报告 | ||
| title = "标准运维AI任务执行情况总结" | ||
| CmsiSender().send( |
There was a problem hiding this comment.
✨ 标题应提取为常量
硬编码的标题字符串建议提取到模块顶部或配置文件
043f7ee to
5f6414b
Compare
There was a problem hiding this comment.
🔍 代码审查
已复核先前评论,部分问题已在最新提交中修复:
✅ 已解决的问题
- 字段类型已改为 JSONField(先前建议使用 JsonField)
- 已添加 AI 通知功能开关配置
- 无关的 migrations 文件已移除
🚨 仍需关注的问题
api/ai_sops_agent.py:68
🔒 使用 settings.SECRET_KEY 作为 app_secret 存在风险。Django 的 SECRET_KEY 不应用于外部 API 认证,建议使用专用的 app secret。
api/ai_sops_agent.py:25
env.AGENT_VERSION,但右侧 value 仍包含字面字符串拼接,确认是否正确。
gcloud/taskflow3/celery/tasks.py:476
🔒 crypto.decrypt() 可能返回 None,但未校验直接使用,会导致 chatid 为空的请求失败。
gcloud/taskflow3/celery/tasks.py:399-400
⚡ 失败场景下连续发送两次独立的 POST 请求,建议合并为一次请求或添加批量接口。
api/ai_sops_agent.py:65
⚡ 默认超时 300 秒过长,可能阻塞 Celery worker,建议调整为 30-60 秒。
gcloud/taskflow3/celery/tasks.py:350
logger.exception() 保留堆栈信息,便于问题排查。
📝 其他建议
- 群聊通知中未对
requests.post添加异常处理(网络超时、连接失败等场景) - 考虑为 AI 分析结果添加缓存机制,避免重复调用
- 排除周期任务的逻辑建议添加注释说明原因
优先级:先修复密钥使用和解密校验问题,其他为改进建议。
gcloud/taskflow3/celery/tasks.py
Outdated
| notify_type = ai_analysis_notify_types.get("fail", []) | ||
|
|
||
| # 发送任务总结报告 | ||
| title = "标准运维AI任务执行情况总结" |
There was a problem hiding this comment.
两个title需要提取到环境变量里,默认值可以用你现在这个
There was a problem hiding this comment.
✅ 此问题似乎已被最近的更改解决。通知标题已提取到环境变量 AI_SUMMARY_TASK_NOTIFY_TITLE 和 AI_ANALYSIS_FAIL_TASK_NOTIFY_TITLE,并设置了合理的默认值。
| task_summary, task_error_analysis = get_ai_analysis_report(bk_biz_id, task_id, msg_type) | ||
|
|
||
| # 消息发送 | ||
| if msg_type == ATOM_FAILED: |
There was a problem hiding this comment.
这里要确认下,如果流程有重试,是每一次重试失败都会通知,还是只有最后一次失败才通知,理论上用户只需要接受到一次通知才比较合理
5f6414b to
e8c8533
Compare
There was a problem hiding this comment.
代码审查总结
✅ 已解决的历史问题
- 拼写错误
annalyze_task_error已修正 - 功能开关
ENABLE_AI_NOTIFICATION已添加 - 通知标题已提取到环境变量
- 无关的 migrations 已移除
- 命名规范已按建议调整
🚨 仍需修复的关键问题
1. 🔒 settings.SECRET_KEY 用于外部 API 认证(api/ai_sops_agent.py:69)
Django 的 SECRET_KEY 是框架内部签名密钥,不应作为 app_secret 传递给外部 API 网关。建议使用专用的应用密钥配置项(如 settings.APP_SECRET 或环境变量)。
2. 🚨 agent_output 为 None 时直接调用 .get() 会抛出 AttributeError(api/ai_sops_agent.py:120, 128)
call_agent_apigw 在失败时返回 None,但 summarize_task_execution 和 analyze_task_error 直接在其返回值上调用 .get(),缺少空值检查,会导致未捕获的异常。
3. AIAnalysisNotifyGroupChat 中 requests.post 缺少异常处理(gcloud/taskflow3/celery/tasks.py:400)
网络超时、连接拒绝等异常未被捕获,会导致 Celery 任务崩溃。整个 requests.post 调用应包含在 try-except 块中。
4. ATOM_FAILED 场景第一次 requests.post 的结果被丢弃(gcloud/taskflow3/celery/tasks.py:400-411)
发送任务总结后 resp 被第二次赋值,如果第一次请求失败,错误会被静默忽略,末尾的 if not resp.ok 只检查了第二次请求的状态。
|
|
||
| def __init__(self, agent_host, request_type=AgentRequestType.PLUGIN, username=""): | ||
| self.app_code = settings.APP_CODE | ||
| self.app_secret = settings.SECRET_KEY |
There was a problem hiding this comment.
🔒 settings.SECRET_KEY 是 Django 框架内部签名密钥,不应作为外部 API 网关的 app_secret 使用。建议改用专用配置项(如独立的环境变量 APP_SECRET),防止密钥用途混淆造成安全风险。
| def summarize_task_execution(self, bk_biz_id, task_id): | ||
| user_input = f"使用summarize_task_execution这个工具,帮我总结一下我的任务执行情况 ,业务ID是{bk_biz_id},任务ID是 {task_id}" | ||
| agent_output = self.call_agent_apigw(user_input=user_input) | ||
| output = agent_output.get("outputs", {}).get("output", {}) |
There was a problem hiding this comment.
🚨 call_agent_apigw 失败时返回 None,此处直接对其调用 .get() 会抛出 AttributeError。需要先检查 agent_output is not None 再调用,否则 AI 调用失败时会引发未预期的异常而非优雅降级。
| # 消息发送 | ||
| if msg_type == ATOM_FAILED: | ||
| content = "{}\n{}\n".format(str(task_summary), mentioned_str) | ||
| resp = requests.post( |
There was a problem hiding this comment.
requests.post 未包含异常处理,网络超时或连接失败会导致整个 Celery 任务崩溃(外层 try-except 虽然存在,但仅对 get_ai_analysis_notify_group_config 有效)。建议将 requests.post 调用放入独立的 try-except 块中。
| timeout=5, | ||
| ) | ||
| content = "{}\n{}\n".format(str(task_error_analysis), mentioned_str) | ||
| resp = requests.post( |
There was a problem hiding this comment.
ATOM_FAILED 场景下,第一次 requests.post(发送任务总结)的 resp 结果在此被覆盖,导致其发送失败时错误被静默忽略。末尾的 if not resp.ok 只检查了第二次请求的状态,第一次请求的失败将无法感知。
增加AI分析报告企业微信推送功能