diff --git a/.gitignore b/.gitignore index 69f1b88..e6f93ab 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ build/ dist/ debug_payload.json brainstorm_outputs/ -reference/ \ No newline at end of file +reference/ +botpy.log diff --git a/bridges/qq.py b/bridges/qq.py new file mode 100644 index 0000000..5cdaf12 --- /dev/null +++ b/bridges/qq.py @@ -0,0 +1,1141 @@ +""" +bridges/qq.py — QQ Bot bridge for CheetahClaws. + +Uses the official qq-botpy SDK (WebSocket + HTTP) to connect to QQ groups +and C2C private chats. Runs botpy's async Client on a dedicated asyncio +event loop inside a daemon thread, bridging to CheetahClaws's synchronous +main thread via threading.Queue and threading.Event. + +Setup: /qq start — connect with configured appid/secret + /qq stop — stop the bridge + /qq status — show current status + +Prerequisite: pip install qq-botpy + Obtain appid + secret from https://q.qq.com +""" + +from __future__ import annotations + +import asyncio +import base64 +import threading +import time as _time_mod + +import jobs as _jobs +import logging_utils as _log +import runtime +from tools.interaction import _qq_thread_local +from ui.render import clr, err, info, ok, warn + +_qq_thread: threading.Thread | None = None +_qq_stop = threading.Event() + +# ── Per-target job queues ────────────────────────────────────────────────── +# key: target_id (group_openid or user_openid) → list of (job_id, prompt, image_b64) +_qq_queues: dict[str, list[tuple[str, str, str | None]]] = {} +_qq_queues_lock = threading.Lock() +_qq_busy: dict[str, bool] = {} +_qq_run_lock = threading.Lock() + +# ── Message queue: async botpy → sync poll loop ─────────────────────────── +_qq_msg_queue: asyncio.Queue | None = None # created inside async loop + +# ── Deduplication ────────────────────────────────────────────────────────── +_qq_seen_msgids: set[str] = set() + +# ── Passive reply tracking ──────────────────────────────────────────────── +# QQ requires passive replies reference the original msg_id or event_id within +# a short validity window. botpy documents the current group/C2C passive reply +# window as 5 minutes. Track per-target: +# {target_id: (msg_id, event_id, next_seq, timestamp, msg_type)} +# msg_id can be None for group messages; event_id can also be None if expired. +# msg_type is "group" or "c2c" — stored here to avoid race conditions with +# RuntimeContext.qq_current_msg_type being overwritten by concurrent handlers. +_qq_reply_ctx: dict[str, tuple[str | None, str | None, int, float, str]] = {} +_qq_reply_lock = threading.Lock() + +_QQ_PASSIVE_WINDOW = 300 # seconds (5 minutes) +_QQ_STREAM_INTERVAL = 2.0 +_QQ_STREAM_MIN_LEN = 80 +_QQ_MAX_MSG_LEN = 2000 + +# ── Intents ──────────────────────────────────────────────────────────────── + + +def _make_intents(): + """Build botpy.Intents for group + C2C messages.""" + import botpy + + return botpy.Intents(public_messages=True) + + +# ── Send helpers (called from main thread) ───────────────────────────────── + + +def _qq_log_send_future(future, route_name: str, target_id: str) -> None: + """Surface async QQ HTTP failures scheduled via run_coroutine_threadsafe.""" + try: + exc = future.exception() + except Exception as callback_exc: + _log.warn( + "qq_send_future_error", + route=route_name, + target_id=target_id, + error=str(callback_exc)[:200], + ) + return + if exc is not None: + _log.warn( + "qq_send_api_error", + route=route_name, + target_id=target_id, + error=str(exc)[:200], + ) + + +async def _qq_post_group( + api, + group_openid: str, + content: str, + msg_id: str | None = None, + event_id: str | None = None, + msg_seq: int = 1, +) -> None: + """Send text to a QQ group via botpy HTTP layer with a clean payload. + + We bypass api.post_group_message() because it uses ``locals()`` which + includes all None-valued keyword args (embed, ark, etc.) that the QQ + API rejects with error 11255. + + For passive replies within 5 minutes, provide either msg_id or event_id. + """ + from botpy.http import Route + + payload: dict = { + "group_openid": group_openid, + "msg_type": 0, + "content": content[:_QQ_MAX_MSG_LEN], + } + if msg_id: + payload["msg_id"] = msg_id + payload["msg_seq"] = msg_seq + elif event_id: + payload["event_id"] = event_id + payload["msg_seq"] = msg_seq + + await api._http.request( + Route("POST", "/v2/groups/{group_openid}/messages", group_openid=group_openid), + json=payload, + ) + + +async def _qq_post_c2c( + api, + openid: str, + content: str, + msg_id: str | None = None, + event_id: str | None = None, + msg_seq: int = 1, +) -> None: + """Send text to a C2C user via botpy HTTP layer with a clean payload. + + For passive replies within 5 minutes, provide either msg_id or event_id. + """ + from botpy.http import Route + + payload: dict = { + "openid": openid, + "msg_type": 0, + "content": content[:_QQ_MAX_MSG_LEN], + } + if msg_id: + payload["msg_id"] = msg_id + payload["msg_seq"] = msg_seq + elif event_id: + payload["event_id"] = event_id + payload["msg_seq"] = msg_seq + await api._http.request( + Route("POST", "/v2/users/{openid}/messages", openid=openid), + json=payload, + ) + + +def _qq_send_group( + api, + group_openid: str, + content: str, + msg_id: str | None = None, + event_id: str | None = None, + msg_seq: int = 1, +) -> None: + """Send text to a QQ group (thread-safe wrapper).""" + try: + loop = _get_async_loop() + if loop and loop.is_running(): + future = asyncio.run_coroutine_threadsafe( + _qq_post_group(api, group_openid, content, msg_id, event_id, msg_seq), + loop, + ) + future.add_done_callback( + lambda fut: _qq_log_send_future(fut, "group", group_openid) + ) + except Exception as exc: + _log.warn("qq_send_group_error", error=str(exc)[:200]) + + +def _qq_send_c2c( + api, + openid: str, + content: str, + msg_id: str | None = None, + event_id: str | None = None, + msg_seq: int = 1, +) -> None: + """Send text to a C2C user (thread-safe wrapper).""" + try: + loop = _get_async_loop() + if loop and loop.is_running(): + future = asyncio.run_coroutine_threadsafe( + _qq_post_c2c(api, openid, content, msg_id, event_id, msg_seq), + loop, + ) + future.add_done_callback( + lambda fut: _qq_log_send_future(fut, "c2c", openid) + ) + except Exception as exc: + _log.warn("qq_send_c2c_error", error=str(exc)[:200]) + + +def _qq_send( + target_id: str, text: str, config: dict, msg_type: str | None = None +) -> None: + """High-level send: route to group or C2C. + + If msg_type is provided, use it directly (avoids race conditions + when called from a background thread). + + Otherwise fall back to the msg_type stored in _qq_reply_ctx for this + target_id (set when the message was first received). This is safer + than RuntimeContext because the latter can be overwritten by concurrent + handlers. + + Final fallback: RuntimeContext.qq_current_msg_type or "group". + """ + # Defensive: if target_id is empty, try thread-local fallback (avoids 404 + # when concurrent handlers clear RuntimeContext.qq_current_target_id). + if not target_id: + target_id = getattr(_qq_thread_local, "target_id", "") or "" + if not target_id: + _log.warn("qq_send_empty_target_id", text_preview=text[:80]) + return + + if msg_type is None: + with _qq_reply_lock: + ctx = _qq_reply_ctx.get(target_id) + if ctx and len(ctx) >= 5: + msg_type = ctx[4] # msg_type stored at index 4 + if msg_type is None: + sctx = runtime.get_ctx(config) + msg_type = getattr(sctx, "qq_current_msg_type", "") or "group" + api = _qq_api_client + if api is None: + return + + if not text or not text.strip(): + return + + with _qq_reply_lock: + ctx = _qq_reply_ctx.get(target_id) + if ctx: + msg_id, event_id, seq, ts = ctx[0], ctx[1], ctx[2], ctx[3] + stored_type = ctx[4] if len(ctx) > 4 else (msg_type or "group") + if _time_mod.time() - ts > _QQ_PASSIVE_WINDOW: + # Clear both msg_id and event_id when window expires; + # active pushes do not need msg_seq. + msg_id, event_id, seq = None, None, 0 + seq += 1 # first call: 0→1, second: 1→2, etc. + _qq_reply_ctx[target_id] = ( + msg_id, + event_id, + seq, + _time_mod.time(), + stored_type, + ) + else: + msg_id, event_id, seq = None, None, 1 + + chunks = [ + text[i : i + _QQ_MAX_MSG_LEN] + for i in range(0, max(len(text), 1), _QQ_MAX_MSG_LEN) + ] + for chunk in chunks: + if msg_type == "c2c": + _qq_send_c2c(api, target_id, chunk, msg_id, event_id, seq) + else: + _qq_send_group(api, target_id, chunk, msg_id, event_id, seq) + if msg_id or event_id: + seq += 1 + + # Update stored seq so the next _qq_send starts after the last used seq. + # Without this, multi-chunk messages reuse the same seq values. + if (msg_id or event_id) and target_id in _qq_reply_ctx: + with _qq_reply_lock: + ctx = _qq_reply_ctx.get(target_id) + if ctx: + _qq_reply_ctx[target_id] = ( + ctx[0], ctx[1], seq, _time_mod.time(), ctx[4] + ) + + +# ── Async loop management ────────────────────────────────────────────────── + +_async_loop: asyncio.AbstractEventLoop | None = None +_qq_api_client = None # botpy client.api — module-level to avoid mutating config dict + + +def _get_async_loop() -> asyncio.AbstractEventLoop | None: + return _async_loop + + +# ── botpy logging suppression ────────────────────────────────────────────── + + +def _mute_botpy(): + """Raise all botpy loggers (and their handlers) to WARNING.""" + import logging + + for _logger_name in list(logging.root.manager.loggerDict): + if _logger_name.startswith("botpy"): + _lg = logging.getLogger(_logger_name) + _lg.setLevel(logging.WARNING) + for _h in _lg.handlers: + _h.setLevel(logging.WARNING) + + +# ── botpy Client subclass ────────────────────────────────────────────────── + + +def _create_client_class(): + """Build the QQBridgeClient class. Deferred to avoid import-time dependency on botpy.""" + import botpy + from botpy.message import C2CMessage, GroupMessage + + class QQBridgeClient(botpy.Client): + async def on_ready(self): + _mute_botpy() # suppress any loggers created at runtime + _log.info("qq_bridge_ready", robot=getattr(self.robot, "name", "")) + + async def on_group_at_message_create(self, message: GroupMessage): + await self._handle_message(message, "group") + + async def on_c2c_message_create(self, message: C2CMessage): + await self._handle_message(message, "c2c") + + async def _handle_message(self, message, msg_type: str): + import re + + # Extract text content — strip @mention prefix for group messages + content = (message.content or "").strip() + if msg_type == "group": + content = re.sub(r"<@!\d+>\s*", "", content).strip() + + # Use message.id for dedup, fall back to event_id + raw_id = getattr(message, "id", None) + event_id = getattr(message, "event_id", None) + + dedup_id = raw_id or event_id + if dedup_id and dedup_id in _qq_seen_msgids: + return + if dedup_id: + _qq_seen_msgids.add(dedup_id) + if len(_qq_seen_msgids) > 2000: + for old in list(_qq_seen_msgids)[:500]: + _qq_seen_msgids.discard(old) + + # Determine target ID and author + if msg_type == "group": + target_id = getattr(message, "group_openid", "") + author_id = ( + getattr(message.author, "member_openid", "") + if message.author + else "" + ) + else: + target_id = ( + getattr(message.author, "user_openid", "") if message.author else "" + ) + author_id = target_id + + # For passive replies: store both msg_id and event_id. + # msg_id can be None for group messages; event_id should always exist. + # QQ API accepts either msg_id or event_id for passive replies. + reply_msg_id = raw_id if raw_id else None + reply_event_id = event_id if event_id else None + + # Store reply context: seq starts at 0, incremented to 1 on first send + # Also store msg_type to avoid race conditions with concurrent handlers. + with _qq_reply_lock: + _qq_reply_ctx[target_id] = ( + reply_msg_id, + reply_event_id, + 0, + _time_mod.time(), + msg_type, + ) + + # Download images from attachments + images: list[str] = [] + for att in message.attachments or []: + url = getattr(att, "url", "") + if url: + try: + import urllib.request + + with urllib.request.urlopen(url, timeout=30) as resp: + img_bytes = resp.read() + images.append(base64.b64encode(img_bytes).decode("utf-8")) + except Exception as exc: + _log.warn("qq_image_download_error", error=str(exc)[:200]) + + if not content and images: + content = "What do you see in this image? Describe it in detail." + if not content: + return + + # Enqueue for the sync poll loop + if _qq_msg_queue is not None: + await _qq_msg_queue.put( + { + "content": content, + "target_id": target_id, + "author_id": author_id, + "msg_type": msg_type, + "msg_id": dedup_id, + "images": images, + } + ) + + return QQBridgeClient + + +def _qq_try_deliver_input(session_ctx, target_id: str, text: str) -> bool: + """Deliver a pending QQ permission/input reply only from the prompt target.""" + evt = getattr(session_ctx, "qq_input_event", None) + if evt is None: + return False + + pending_target = getattr(session_ctx, "qq_input_target_id", "") or "" + if pending_target != target_id: + _log.warn( + "qq_input_wrong_target", + expected=pending_target, + actual=target_id, + text_preview=text[:80], + ) + return False + + session_ctx.qq_input_value = text + print(clr(f"\n 📩 QQ 权限回复: {text}", "cyan")) + evt.set() + return True + + +# ── Poll loop (daemon thread) ────────────────────────────────────────────── + + +def _qq_poll_loop(config: dict) -> str: + """Run the botpy client in its own asyncio loop on this thread. + + Processes messages from _qq_msg_queue (filled by async handlers). + Returns "stopped", or raises. + """ + global _async_loop, _qq_msg_queue, _qq_api_client + + session_ctx = runtime.get_session_ctx(config.get("_session_id", "default")) + run_query_cb = session_ctx.run_query + + _async_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_async_loop) + _qq_msg_queue = asyncio.Queue() + + appid = config.get("qq_appid", "") + secret = config.get("qq_secret", "") + + QQBridgeClient = _create_client_class() + client = QQBridgeClient(intents=_make_intents()) + + _mute_botpy() # mute before starting + + # Start the botpy client in a background task within this loop + async def _run_botpy(): + try: + await client.start(appid=appid, secret=secret) + except Exception as exc: + _log.error("qq_botpy_start_error", error=str(exc)[:200]) + raise + + botpy_task = _async_loop.create_task(_run_botpy()) + + # Store the API object for send helpers (module-level, not in config dict) + _qq_api_client = client.api + + # Set up the session-level send callback + session_ctx.qq_send = lambda tid, txt: _qq_send(tid, txt, config) + + consecutive_failures = 0 + + try: + while not _qq_stop.is_set(): + try: + # Drain the message queue with a timeout + msg = _async_loop.run_until_complete( + asyncio.wait_for(_qq_msg_queue.get(), timeout=2.0) + ) + except asyncio.TimeoutError: + if botpy_task.done(): + # Surface startup/runtime failure to the supervisor instead + # of leaving an alive thread with a dead botpy client. + botpy_task.result() + return "stopped" + continue + except Exception: + consecutive_failures += 1 + if consecutive_failures >= 10: + return "stopped" + _qq_stop.wait(1) + continue + consecutive_failures = 0 + + if msg is None: + continue + + text = msg["content"] + target_id = msg["target_id"] + msg_type = msg["msg_type"] + images = msg.get("images") or [] + image_b64 = images[0] if images else None + + # Set qq_current_msg_type for all _qq_send calls in this message handler + sctx = runtime.get_ctx(config) + sctx.qq_current_target_id = target_id + sctx.qq_current_msg_type = msg_type + + # ── Interactive PTY session ──────────────────────────────── + from bridges.interactive_session import ( + InteractiveSession, + get_session, + remove_session, + set_session, + ) + + _sess_key = f"qq_{target_id}" + _active_sess = get_session(_sess_key) + + if _active_sess: + stripped = text.strip().lower() + _norm = stripped.replace(" ", "") + _exit_set = {"!exit", "!quit", "!stop", "/exit", "/quit"} + if ( + stripped in _exit_set + or _norm in _exit_set + or stripped == "/exit_session" + ): + remove_session(_sess_key) + _qq_send(target_id, "⏹ Interactive session ended.", config) + continue + if stripped in ("!ping", "!screen", "!refresh") or _norm in ( + "!ping", + "!screen", + "!refresh", + ): + _qq_send(target_id, "⏹ Refreshing screen…", config) + _active_sess.force_flush() + continue + _active_sess.send_input(text) + _qq_send(target_id, f"⌨ `{text[:60]}`", config) + continue + + # ── Permission input pending ─────────────────────────────── + if _qq_try_deliver_input(session_ctx, target_id, text): + continue + + # ── Slash commands ───────────────────────────────────────── + if text.strip().startswith("/"): + slash_cb = session_ctx.handle_slash + if slash_cb: + + def _qq_slash_runner(_slash_text, _tid, _mtype): + _qq_thread_local.active = True + _qq_thread_local.target_id = _tid + _qq_thread_local.msg_type = _mtype + sctx = runtime.get_ctx(config) + sctx.qq_current_target_id = _tid + sctx.qq_current_msg_type = _mtype + try: + cmd_type = slash_cb(_slash_text) + except Exception as e: + _qq_send(_tid, f"⚠ Error: {e}", config) + return + finally: + _qq_thread_local.active = False + _qq_thread_local.target_id = "" + _qq_thread_local.msg_type = "" + sctx.qq_current_target_id = "" + sctx.qq_current_msg_type = "" + if cmd_type == "simple": + cmd_name = _slash_text.strip().split()[0] + _qq_send(_tid, f"✅ {cmd_name} executed.", config, _mtype) + return + qq_state = session_ctx.agent_state + if qq_state and qq_state.messages: + for m in reversed(qq_state.messages): + if m.get("role") == "assistant": + content = m.get("content", "") + if isinstance(content, list): + parts = [ + ( + b.get("text", "") + if isinstance(b, dict) + and b.get("type") == "text" + else (b if isinstance(b, str) else "") + ) + for b in content + ] + content = "\n".join(p for p in parts if p) + if content: + _qq_send(_tid, content, config, _mtype) + break + + threading.Thread( + target=_qq_slash_runner, + args=(text, target_id, msg_type), + daemon=True, + ).start() + continue + + # ── ! commands (shell / terminal) ────────────────────────── + if text.strip().startswith("!"): + raw_cmd = text.strip()[1:].strip() + sess_key = f"qq_{target_id}" + + if raw_cmd.lower() in ("stop", ""): + from bridges.terminal_runner import stop_terminal + + killed = stop_terminal(sess_key) + _qq_send( + target_id, + "🛑 Command stopped." if killed else "ℹ No command running.", + config, + ) + continue + + _interactive_progs = ( + "claude", + "python", + "python3", + "ipython", + "bash", + "sh", + "zsh", + "node", + "irb", + "sqlite3", + "psql", + "mysql", + "redis-cli", + ) + _base = raw_cmd.split()[0].split("/")[-1] + if _base in _interactive_progs: + + def _start_pty_qq(cmd, tid, skey, mt): + def _send(out): + _qq_send(tid, out, config) + + try: + sess = InteractiveSession(cmd, _send, session_key=skey) + set_session(skey, sess) + _qq_send( + tid, + f"▶ {cmd} 已启动\n发消息即可交互,发 !exit 结束会话", + config, + ) + except Exception as e: + _qq_send(tid, f"⚠ 无法启动: {e}", config) + + threading.Thread( + target=_start_pty_qq, + args=(raw_cmd, target_id, sess_key, msg_type), + daemon=True, + ).start() + continue + + def _qq_terminal(cmd, tid, skey): + from bridges.terminal_runner import run_terminal + + _qq_send(tid, f"▶ {cmd}", config) + run_terminal( + cmd, + lambda out: _qq_send(tid, out, config), + session_key=skey, + stop_event=_qq_stop, + ) + + threading.Thread( + target=_qq_terminal, + args=(raw_cmd, target_id, sess_key), + daemon=True, + ).start() + continue + + # ── Job dashboard & control commands ─────────────────────── + stripped_lower = text.strip().lower() + if stripped_lower in ("!jobs", "!j", "!status"): + _qq_send(target_id, _jobs.format_dashboard(), config) + continue + + if stripped_lower.startswith("!job "): + jid = text.strip().split(None, 1)[1].lstrip("#").strip() + _qq_send(target_id, _jobs.format_detail(jid), config) + continue + + if stripped_lower.startswith("!retry "): + jid = text.strip().split(None, 1)[1].lstrip("#").strip() + original = _jobs.get(jid) + if not original: + _qq_send(target_id, f"❓ Job #{jid} not found.", config) + continue + retry_job = _jobs.create( + original.prompt, source="qq", retry_of=original.id + ) + _qq_send( + target_id, + f'↩ Retrying #{jid} as #{retry_job.id}:\n"{original.title}"', + config, + ) + queued_pos = _queue_or_dispatch_qq_job( + retry_job, + original.prompt, + target_id, + msg_type, + run_query_cb, + session_ctx, + config, + ) + if queued_pos: + _qq_send( + target_id, + f"⏳ 已排队 #{retry_job.id}(第 {queued_pos} 位)\n" + f"「{retry_job.title}」\n" + f"发 !jobs 查看进度", + config, + ) + continue + + if stripped_lower in ("!cancel", "!kill"): + running = _jobs.list_running() + if running: + for j in running: + _jobs.cancel(j.id) + _qq_send(target_id, f"🚫 已取消 {len(running)} 个任务", config) + else: + _qq_send(target_id, "ℹ 当前没有运行中的任务", config) + continue + + if stripped_lower.startswith(("!cancel ", "!kill ")): + jid = text.strip().split(None, 1)[1].lstrip("#").strip() + j = _jobs.get(jid) + if j: + _jobs.cancel(jid) + _qq_send(target_id, f"🚫 任务 #{jid} 已取消", config) + else: + _qq_send(target_id, f"❓ 找不到任务 #{jid}", config) + continue + + # ── Claude query: create job, queue if busy, else run now ── + print(clr(f"\n 📩 QQ: {text}", "cyan")) + if image_b64: + print(clr(" 📎 QQ image attachment received", "dim")) + job = _jobs.create(text, source="qq") + + queue_pos = _queue_or_dispatch_qq_job( + job, + text, + target_id, + msg_type, + run_query_cb, + session_ctx, + config, + image_b64, + ) + if queue_pos: + _qq_send( + target_id, + f"⏳ 已排队 #{job.id}(第 {queue_pos} 位)\n" + f"「{job.title}」\n" + f"发 !jobs 查看进度", + config, + ) + + except Exception: + if not _qq_stop.is_set(): + raise + finally: + botpy_task.cancel() + try: + _async_loop.run_until_complete( + asyncio.wait_for( + asyncio.gather(botpy_task, return_exceptions=True), timeout=5.0 + ) + ) + except Exception: + pass + _async_loop.stop() + _async_loop = None + _qq_msg_queue = None + _qq_api_client = None + session_ctx.qq_send = None + + return "stopped" + + +# ── Job dispatch & background runner ────────────────────────────────────── + + +def _queue_or_dispatch_qq_job( + job, + q_text: str, + target_id: str, + msg_type: str, + run_query_cb, + session_ctx, + config: dict, + image_b64: str | None = None, +) -> int: + """Queue a QQ job if target is busy, otherwise mark busy and dispatch it. + + Returns 0 when dispatched immediately, or the target-local queue position. + """ + with _qq_queues_lock: + if _qq_busy.get(target_id): + _qq_queues.setdefault(target_id, []).append((job.id, q_text, image_b64)) + return len(_qq_queues[target_id]) + _qq_busy[target_id] = True + + _dispatch_qq_job( + job, q_text, target_id, msg_type, run_query_cb, session_ctx, config, image_b64 + ) + return 0 + + +def _dispatch_qq_job( + job, + q_text: str, + target_id: str, + msg_type: str, + run_query_cb, + session_ctx, + config: dict, + image_b64: str | None = None, +) -> None: + def _run(): + try: + _qq_bg_runner( + job, + q_text, + target_id, + msg_type, + run_query_cb, + session_ctx, + config, + image_b64, + ) + finally: + _drain_qq_queue(target_id, msg_type, run_query_cb, session_ctx, config) + + threading.Thread(target=_run, daemon=True).start() + + +def _drain_qq_queue( + target_id: str, msg_type: str, run_query_cb, session_ctx, config: dict +) -> None: + with _qq_queues_lock: + queue = _qq_queues.get(target_id, []) + if not queue: + _qq_busy[target_id] = False + return + job_id, prompt, image_b64 = queue.pop(0) + _qq_busy[target_id] = True + + job = _jobs.get(job_id) + if not job or job.status == "cancelled": + _drain_qq_queue(target_id, msg_type, run_query_cb, session_ctx, config) + return + + remaining = len(_qq_queues.get(target_id, [])) + pos_msg = f"(还有 {remaining} 个待处理)" if remaining else "" + _qq_send( + target_id, f"▶ 开始执行 #{job_id}{pos_msg}:\n「{job.title}」", config, msg_type + ) + _dispatch_qq_job( + job, prompt, target_id, msg_type, run_query_cb, session_ctx, config, image_b64 + ) + + +def _qq_bg_runner( + job, + q_text: str, + target_id: str, + msg_type: str, + run_query_cb, + session_ctx, + config: dict, + image_b64: str | None = None, +) -> None: + """Execute one QQ AI query with job tracking. + + QQ does not support message editing — we buffer chunks and send + a new message every ~2 seconds as text arrives. + """ + _jobs.start(job.id) + _qq_send(target_id, f"⏳ 任务 #{job.id} 执行中…", config, msg_type) + + _chunks: list[str] = [] + _last_send = [_time_mod.monotonic()] + _stream_lock = threading.Lock() + + def _flush_chunks(): + text_so_far = "".join(_chunks) + if len(text_so_far) >= _QQ_STREAM_MIN_LEN: + _qq_send(target_id, text_so_far, config, msg_type) + _chunks.clear() + _last_send[0] = _time_mod.monotonic() + + def _on_chunk(chunk: str): + _chunks.append(chunk) + _jobs.stream_result(job.id, chunk) + with _stream_lock: + if _time_mod.monotonic() - _last_send[0] >= _QQ_STREAM_INTERVAL: + _flush_chunks() + + def _on_tool_start(name: str, inputs: dict): + from ui.render import _tool_desc + + desc = _tool_desc(name, inputs or {}) + _jobs.add_step(job.id, name, desc[:80]) + label = f"⚙ {desc}" + _qq_send(target_id, label, config, msg_type) + + def _on_tool_end(name: str, result: str): + _jobs.finish_step(job.id, name, result[:80] if result else "") + + with _qq_run_lock: + sctx = runtime.get_ctx(config) + sctx.qq_current_target_id = target_id + sctx.qq_current_msg_type = msg_type + sctx.qq_incoming = True + sctx.in_qq_turn = True + if image_b64: + sctx.pending_image = image_b64 + _qq_thread_local.active = True + _qq_thread_local.target_id = target_id + _qq_thread_local.msg_type = msg_type + session_ctx.on_text_chunk = _on_chunk + session_ctx.on_tool_start = _on_tool_start + session_ctx.on_tool_end = _on_tool_end + try: + if run_query_cb: + run_query_cb(q_text) + except Exception as e: + _jobs.fail(job.id, str(e)) + _qq_send( + target_id, + f"❌ 任务 #{job.id} 失败:{e}\n↩ 重试:!retry {job.id}", + config, + msg_type, + ) + return + finally: + session_ctx.on_text_chunk = None + session_ctx.on_tool_start = None + session_ctx.on_tool_end = None + if image_b64 and getattr(sctx, "pending_image", None) == image_b64: + sctx.pending_image = None + sctx.qq_incoming = False + sctx.in_qq_turn = False + sctx.qq_current_target_id = "" + sctx.qq_current_msg_type = "" + _qq_thread_local.active = False + _qq_thread_local.target_id = "" + _qq_thread_local.msg_type = "" + + # Flush remaining chunks + text_so_far = "".join(_chunks).strip() + if text_so_far: + _qq_send(target_id, text_so_far, config, msg_type) + elif not _chunks: + # No streaming chunks at all - get final message from state + state = session_ctx.agent_state + if state and state.messages: + for m in reversed(state.messages): + if m.get("role") == "assistant": + content = m.get("content", "") + if isinstance(content, list): + content = "\n".join( + ( + b.get("text", "") + if isinstance(b, dict) and b.get("type") == "text" + else (b if isinstance(b, str) else "") + ) + for b in content + ) + if content: + _qq_send(target_id, content, config, msg_type) + break + + full_result = text_so_far.strip() + _jobs.complete(job.id, full_result) + + j = _jobs.get(job.id) + if j and j.step_count > 0: + dur = f" {j.duration_s:.0f}s" if j.duration_s else "" + _qq_send( + target_id, + f"✅ 任务 #{job.id} 完成({j.step_count} 步{dur})", + config, + msg_type, + ) + + +# ── Supervisor with backoff ──────────────────────────────────────────────── + +_QQ_BACKOFF_INITIAL = 2.0 +_QQ_BACKOFF_MAX = 120.0 + + +def _qq_supervisor(config: dict) -> None: + """Wrap _qq_poll_loop with exponential-backoff reconnect on unexpected exit.""" + global _qq_thread + backoff = _QQ_BACKOFF_INITIAL + attempt = 0 + while not _qq_stop.is_set(): + attempt += 1 + try: + reason = _qq_poll_loop(config) + except Exception as exc: + if _qq_stop.is_set(): + break + _log.warn( + "bridge_crash", + bridge="qq", + attempt=attempt, + error=str(exc)[:200], + backoff_s=backoff, + ) + print( + clr( + f"\n ⚠ QQ bridge crashed (attempt {attempt}), " + f"reconnecting in {backoff:.0f}s…", + "yellow", + ) + ) + _qq_stop.wait(backoff) + backoff = min(backoff * 2, _QQ_BACKOFF_MAX) + continue + + # Normal return without _qq_stop set → transient failure (e.g. 10 consecutive + # errors). Backoff and reconnect instead of giving up permanently. + if not _qq_stop.is_set(): + _log.warn( + "bridge_reconnect", + bridge="qq", + attempt=attempt, + reason=reason, + backoff_s=backoff, + ) + print( + clr( + f"\n ⚠ QQ bridge exited (attempt {attempt}), " + f"reconnecting in {backoff:.0f}s…", + "yellow", + ) + ) + _qq_stop.wait(backoff) + backoff = min(backoff * 2, _QQ_BACKOFF_MAX) + continue + break + + _qq_thread = None + + +def _qq_start_bridge(config) -> None: + global _qq_thread, _qq_stop + _qq_stop = threading.Event() + _qq_thread = threading.Thread( + target=_qq_supervisor, args=(config,), daemon=True, name="qq-bridge" + ) + _qq_thread.start() + ok("QQ bridge started.") + info("Send a message in QQ — @mention in groups or direct message in C2C.") + info("Stop with /qq stop.") + + +# ── Slash command ────────────────────────────────────────────────────────── + + +def cmd_qq(args: str, _state, config) -> bool: + """QQ bot bridge via official botpy SDK. + + Usage: /qq — configure and start the bridge + /qq — start using saved credentials + /qq stop — stop the bridge + /qq status — show current status + + Obtain appid + secret from https://q.qq.com developer portal. + """ + global _qq_thread, _qq_stop + from cc_config import save_config + + parts = args.strip().split() + + if parts and parts[0].lower() in ("stop", "off"): + if _qq_thread and _qq_thread.is_alive(): + _qq_stop.set() + _qq_thread.join(timeout=5) + _qq_thread = None + ok("QQ bridge stopped.") + else: + warn("QQ bridge is not running.") + return True + + if parts and parts[0].lower() == "status": + running = _qq_thread and _qq_thread.is_alive() + appid = config.get("qq_appid", "") + if running: + ok(f"QQ bridge running (appid: {appid[:8]}…)") + elif appid: + info("Configured but not running. Use /qq to start.") + else: + info("Not configured. Use /qq ") + return True + + if len(parts) >= 2: + appid = parts[0] + secret = parts[1] + config["qq_appid"] = appid + config["qq_secret"] = secret + save_config(config) + ok("QQ config saved.") + else: + appid = config.get("qq_appid", "") + secret = config.get("qq_secret", "") + + if not appid or not secret: + err("No config found. Usage: /qq ") + return True + + if _qq_thread and _qq_thread.is_alive(): + warn("QQ bridge is already running. Use /qq stop first.") + return True + + _qq_start_bridge(config) + return True diff --git a/cc_config.py b/cc_config.py index a1f9ad1..2b06c31 100644 --- a/cc_config.py +++ b/cc_config.py @@ -91,6 +91,10 @@ # "qwen_api_key": "..." # "zhipu_api_key": "..." # "deepseek_api_key": "..." + # ── QQ Bot bridge ────────────────────────────────────────────────────── + # qq_appid / qq_secret from https://q.qq.com developer portal + "qq_appid": "", + "qq_secret": "", # ── WeChat smart-reply (off by default) ──────────────────────────────── # When enabled, inbound messages from whitelisted contacts no longer # auto-reply via the agent. Instead the auxiliary cheap model drafts diff --git a/cheetahclaws.py b/cheetahclaws.py index 9abbeb9..7fd459f 100755 --- a/cheetahclaws.py +++ b/cheetahclaws.py @@ -107,6 +107,8 @@ /telegram stop|status Stop or check Telegram bridge /wechat login Authenticate WeChat via QR code /wechat stop|status Stop or check WeChat bridge + /qq Start QQ bridge (QQ Bot) + /qq stop|status Stop or check QQ bridge /slack Start Slack bridge (Web API) /slack stop|status|logout Stop, check, or clear Slack bridge /lab start Autonomous multi-agent research run (9 stages) @@ -214,9 +216,11 @@ def __getattr__(self, name): import bridges.telegram as _btg import bridges.wechat as _bwx import bridges.slack as _bslk +import bridges.qq as _bqq from bridges.telegram import cmd_telegram, _tg_send from bridges.wechat import cmd_wechat, _wx_start_bridge from bridges.slack import cmd_slack, _slack_start_bridge +from bridges.qq import cmd_qq, _qq_start_bridge, _qq_send # ── Session commands ─────────────────────────────────────────────────────── from commands.session import ( @@ -265,6 +269,7 @@ def __getattr__(self, name): _tg_thread_local, _is_in_tg_turn, _wx_thread_local, _is_in_wx_turn, _slack_thread_local, _is_in_slack_turn, + _qq_thread_local, _is_in_qq_turn, ) # ── Live session context (replaces config["_run_query_callback"] etc.) ───── @@ -417,6 +422,7 @@ def _proactive_watcher_loop(config): "wechat": cmd_wechat, "weixin": cmd_wechat, "slack": cmd_slack, + "qq": cmd_qq, "checkpoint": cmd_checkpoint, "rewind": cmd_rewind, "plan": cmd_plan, @@ -582,6 +588,7 @@ def handle_slash(line: str, state, config) -> Union[bool, tuple]: "telegram": ("Telegram bot bridge", ["stop", "status"]), "wechat": ("WeChat bridge (iLink Bot API)", ["stop", "status"]), "slack": ("Slack bot bridge (Web API)", ["stop", "status", "logout"]), + "qq": ("QQ bot bridge (botpy SDK)", [" ", "stop", "status"]), **({"video": ("AI video factory: story→voice→images→mp4", ["status", "niches"])} if _VIDEO_AVAILABLE else {}), "checkpoint": ("List / restore checkpoints", ["clear"]), "rewind": ("Rewind to checkpoint (alias)", ["clear"]), @@ -726,7 +733,7 @@ def _handler(line: str): def _start_headless_bridges(config: dict) -> None: - """Auto-start configured Telegram/WeChat/Slack bridges in headless mode. + """Auto-start configured Telegram/WeChat/Slack/QQ bridges in headless mode. Sets up a shared ``session_ctx`` with a minimal ``run_query`` driving the agent loop directly (no REPL UI). Bridges keep their existing event @@ -735,7 +742,8 @@ def _start_headless_bridges(config: dict) -> None: """ if not (config.get("telegram_token") and config.get("telegram_chat_id")) \ and not config.get("wechat_token") \ - and not (config.get("slack_token") and config.get("slack_channel")): + and not (config.get("slack_token") and config.get("slack_channel")) \ + and not (config.get("qq_appid") and config.get("qq_secret")): return # nothing configured — no-op import runtime as _runtime @@ -797,6 +805,10 @@ def _headless_run_query(prompt: str, is_background: bool = False) -> None: state, config, _headless_run_query ) + # Pre-set QQ send callback so permission prompts work before the poll loop sets it. + if config.get("qq_appid") and config.get("qq_secret"): + session_ctx.qq_send = lambda tid, text: _qq_send(tid, text, config) + if config.get("telegram_token") and config.get("telegram_chat_id"): if not (_btg._telegram_thread and _btg._telegram_thread.is_alive()): _btg._telegram_stop.clear() @@ -815,6 +827,10 @@ def _headless_run_query(prompt: str, is_background: bool = False) -> None: if not (_bslk._slack_thread and _bslk._slack_thread.is_alive()): _slack_start_bridge(config) + if config.get("qq_appid") and config.get("qq_secret"): + if not (_bqq._qq_thread and _bqq._qq_thread.is_alive()): + _qq_start_bridge(config) + # ── Main REPL ────────────────────────────────────────────────────────────── @@ -967,6 +983,8 @@ def _row(colored: str, plain: str) -> str: active_flags.append("wechat") if config.get("slack_token") and config.get("slack_channel"): active_flags.append("slack") + if config.get("qq_appid") and config.get("qq_secret"): + active_flags.append("qq") if active_flags: flags_str = " · ".join(clr(f, "green") for f in active_flags) info(f"Active: {flags_str}") @@ -1004,10 +1022,12 @@ def run_query(user_input: str, is_background: bool = False): # Rebuild system prompt each turn (picks up cwd changes, etc.) system_prompt = build_system_prompt(config) - if is_background and not session_ctx.telegram_incoming: + if is_background and not (session_ctx.telegram_incoming or session_ctx.qq_incoming): print(clr("\n\n[Background Event Triggered]", "yellow")) session_ctx.in_telegram_turn = session_ctx.telegram_incoming session_ctx.telegram_incoming = False + session_ctx.in_qq_turn = session_ctx.qq_incoming + session_ctx.qq_incoming = False print(clr("\n╭─ CheetahClaws ", "dim") + clr("●", "green") + clr(" ─────────────────────────", "dim")) @@ -1213,6 +1233,11 @@ def run_query(user_input: str, is_background: bool = False): if not (_bslk._slack_thread and _bslk._slack_thread.is_alive()): _slack_start_bridge(config) + # ── Auto-start QQ bridge if configured ──────────────────────────── + if config.get("qq_appid") and config.get("qq_secret"): + if not (_bqq._qq_thread and _bqq._qq_thread.is_alive()): + _qq_start_bridge(config) + # ── Rapid Ctrl+C force-quit ───────────────────────────────────────── # 3 Ctrl+C presses within 2 seconds → immediate hard exit _ctrl_c_times = [] diff --git a/pyproject.toml b/pyproject.toml index 91b9a1e..2c9c8f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,8 @@ ocr = ["pytesseract", "Pillow"] trading = ["yfinance>=0.2.30", "rank-bm25>=0.2.2"] web = ["sqlalchemy>=2.0", "bcrypt>=4.0", "PyJWT>=2.8.0"] litellm = ["litellm>=1.60.0,<2.0.0"] -all = ["sounddevice", "Pillow", "prompt_toolkit>=3.0.43", "playwright", "pymupdf", "openpyxl", "pytesseract", "yfinance>=0.2.30", "rank-bm25>=0.2.2", "sqlalchemy>=2.0", "bcrypt>=4.0", "PyJWT>=2.8.0", "litellm>=1.60.0,<2.0.0"] +qq = ["qq-botpy>=1.2.1"] +all = ["sounddevice", "Pillow", "prompt_toolkit>=3.0.43", "playwright", "pymupdf", "openpyxl", "pytesseract", "yfinance>=0.2.30", "rank-bm25>=0.2.2", "sqlalchemy>=2.0", "bcrypt>=4.0", "PyJWT>=2.8.0", "litellm>=1.60.0,<2.0.0", "qq-botpy>=1.2.1"] [project.scripts] cheetahclaws = "cheetahclaws:main" diff --git a/runtime.py b/runtime.py index 82ec709..d91de9c 100644 --- a/runtime.py +++ b/runtime.py @@ -67,6 +67,15 @@ class RuntimeContext: wx_input_event: Optional[threading.Event] = None wx_input_value: str = "" + # ── QQ bridge ─────────────────────────────────────────────────────────────── + qq_send: Optional[Callable] = None + qq_input_event: Optional[threading.Event] = None + qq_input_value: str = "" + qq_input_target_id: str = "" + in_qq_turn: bool = False + qq_current_target_id: str = "" + qq_current_msg_type: str = "" # "group" or "c2c" + # ── Live-streaming hooks (set by bridges before run_query; cleared after) ── # on_text_chunk(text) — called for every TextChunk as it streams # on_tool_start(name, inputs) — called when a tool call begins @@ -88,6 +97,7 @@ class RuntimeContext: in_wechat_turn: bool = False in_slack_turn: bool = False telegram_incoming: bool = False + qq_incoming: bool = False wx_current_user_id: str = "" slack_current_channel: str = "" diff --git a/tests/fixtures/golden_default_prompt.txt b/tests/fixtures/golden_default_prompt.txt index 5e963b9..b19014b 100644 --- a/tests/fixtures/golden_default_prompt.txt +++ b/tests/fixtures/golden_default_prompt.txt @@ -137,6 +137,7 @@ These commands the **user** can invoke at the REPL prompt — they are NOT tools - `/plan` `[done | status]` — Enter/exit plan mode - `/plugin` `[install | uninstall | enable | disable | disable-all | update | recommend | info]` — Manage plugins - `/proactive` `[off]` — Manage proactive background watcher +- `/qq` `[ | stop | status]` — QQ bot bridge (botpy SDK) - `/quit` — Exit (alias for /exit) - `/resume` — Resume last session - `/rewind` `[clear]` — Rewind to checkpoint (alias) diff --git a/tests/test_qq_bridge.py b/tests/test_qq_bridge.py new file mode 100644 index 0000000..4c46df7 --- /dev/null +++ b/tests/test_qq_bridge.py @@ -0,0 +1,646 @@ +"""Tests for bridges/qq.py — message parsing, config, turn detection.""" +import sys +import threading +import time +import types +import pytest + + +class _FakeRoute: + def __init__(self, method, path, **kwargs): + self.method = method + self.path = path + self.kwargs = kwargs + + +@pytest.fixture +def fake_botpy_route(monkeypatch): + """Provide the botpy Route class needed by payload-only send tests.""" + fake_botpy = types.ModuleType("botpy") + fake_http = types.ModuleType("botpy.http") + fake_http.Route = _FakeRoute + fake_botpy.http = fake_http + monkeypatch.setitem(sys.modules, "botpy", fake_botpy) + monkeypatch.setitem(sys.modules, "botpy.http", fake_http) + + +def test_config_defaults(monkeypatch, tmp_path): + """QQ config keys exist in DEFAULTS.""" + monkeypatch.setenv("HOME", str(tmp_path)) + import importlib + import cc_config + importlib.reload(cc_config) + cfg = cc_config.load_config() + assert "qq_appid" in cfg + assert "qq_secret" in cfg + assert cfg["qq_appid"] == "" + assert cfg["qq_secret"] == "" + + +def test_runtime_context_fields(): + """RuntimeContext has QQ fields with correct defaults.""" + from runtime import RuntimeContext + ctx = RuntimeContext() + assert ctx.qq_send is None + assert ctx.qq_input_event is None + assert ctx.qq_input_value == "" + assert ctx.qq_input_target_id == "" + assert ctx.in_qq_turn is False + assert ctx.qq_current_target_id == "" + assert ctx.qq_current_msg_type == "" + + +def test_is_in_qq_turn_default(): + """Turn detection returns False when no QQ turn is active.""" + from tools.interaction import _is_in_qq_turn + assert _is_in_qq_turn({}) is False + + +def test_is_in_qq_turn_thread_local(): + """Turn detection returns True when thread-local flag is set.""" + from tools.interaction import _qq_thread_local, _is_in_qq_turn + _qq_thread_local.active = True + try: + assert _is_in_qq_turn({}) is True + finally: + _qq_thread_local.active = False + + +def test_is_in_qq_turn_runtime_ctx(): + """Turn detection returns True when RuntimeContext.in_qq_turn is True.""" + from tools.interaction import _is_in_qq_turn + import runtime + ctx = runtime.get_session_ctx("_test_qq_turn") + ctx.in_qq_turn = True + try: + assert _is_in_qq_turn({"_session_id": "_test_qq_turn"}) is True + finally: + ctx.in_qq_turn = False + runtime.release_session_ctx("_test_qq_turn") + + +def test_qq_cmd_missing_config(): + """cmd_qq shows error when no args and no saved config.""" + from bridges.qq import cmd_qq + result = cmd_qq("", None, {"qq_appid": "", "qq_secret": ""}) + assert result is True + + +def test_qq_cmd_inline_config(tmp_path, monkeypatch): + """cmd_qq saves appid/secret when provided inline.""" + from unittest.mock import patch + from bridges.qq import cmd_qq + monkeypatch.setenv("HOME", str(tmp_path)) + cfg = {"qq_appid": "", "qq_secret": ""} + # Mock bridge start to avoid spawning a real daemon thread + with patch("bridges.qq._qq_start_bridge"): + result = cmd_qq("myappid mysecret", None, cfg) + assert result is True + assert cfg["qq_appid"] == "myappid" + assert cfg["qq_secret"] == "mysecret" + + +def test_qq_cmd_status_not_running(): + """cmd_qq status reports not configured when empty.""" + from bridges.qq import cmd_qq + result = cmd_qq("status", None, {"qq_appid": "", "qq_secret": ""}) + assert result is True + + +def test_qq_cmd_status_configured(): + """cmd_qq status reports configured but not running.""" + from bridges.qq import cmd_qq + result = cmd_qq("status", None, {"qq_appid": "test123", "qq_secret": "sec"}) + assert result is True + + +def test_message_dedup_set_capped(): + """_qq_seen_msgids stays under 2000 entries.""" + from bridges import qq + for i in range(2100): + qq._qq_seen_msgids.add(f"msg_{i}") + assert len(qq._qq_seen_msgids) <= 2100 + qq._qq_seen_msgids.clear() + + +def test_reply_ctx_tracking(): + """Passive reply context stores msg_id, event_id, seq, timestamp, and msg_type.""" + from bridges.qq import _qq_reply_ctx, _qq_reply_lock + with _qq_reply_lock: + _qq_reply_ctx["test_target"] = ("msg123", "event456", 1, time.time(), "group") + assert "test_target" in _qq_reply_ctx + msg_id, event_id, seq, ts, msg_type = _qq_reply_ctx["test_target"] + assert msg_id == "msg123" + assert event_id == "event456" + assert seq == 1 + assert msg_type == "group" + # Test with None values + with _qq_reply_lock: + _qq_reply_ctx["test_target2"] = (None, None, 1, time.time(), "c2c") + msg_id, event_id, seq, ts, msg_type = _qq_reply_ctx["test_target2"] + assert msg_id is None + assert event_id is None + assert msg_type == "c2c" + with _qq_reply_lock: + del _qq_reply_ctx["test_target"] + del _qq_reply_ctx["test_target2"] + + +def test_qq_send_no_api(): + """_qq_send is a no-op when no API is configured.""" + from bridges.qq import _qq_send + _qq_send("some_target", "hello", {"qq_appid": "x"}) + + +def test_qq_pending_input_only_accepts_prompt_target(): + """A QQ permission reply from another target must not release the prompt.""" + from runtime import RuntimeContext + from bridges.qq import _qq_try_deliver_input + + ctx = RuntimeContext() + evt = threading.Event() + ctx.qq_input_event = evt + ctx.qq_input_target_id = "target-a" + + assert _qq_try_deliver_input(ctx, "target-b", "y") is False + assert not evt.is_set() + assert ctx.qq_input_value == "" + + assert _qq_try_deliver_input(ctx, "target-a", "y") is True + assert evt.is_set() + assert ctx.qq_input_value == "y" + + +def test_qq_send_with_chunks(): + """_qq_send splits long text into chunks.""" + from bridges.qq import _qq_send, _QQ_MAX_MSG_LEN + long_text = "A" * (_QQ_MAX_MSG_LEN * 2 + 100) + # Should not raise even without API + _qq_send("target", long_text, {}) + + +def test_passive_window_constants(): + """Passive reply window follows botpy's documented 5-minute validity.""" + from bridges.qq import _QQ_PASSIVE_WINDOW, _QQ_STREAM_INTERVAL, _QQ_MAX_MSG_LEN, _QQ_STREAM_MIN_LEN + assert _QQ_PASSIVE_WINDOW == 300 + assert _QQ_STREAM_INTERVAL == 2.0 + assert _QQ_MAX_MSG_LEN == 2000 + assert _QQ_STREAM_MIN_LEN == 80 + + +def test_send_future_exception_is_logged(): + """Errors raised by scheduled QQ HTTP sends should be surfaced.""" + from concurrent.futures import Future + from unittest.mock import patch + from bridges.qq import _qq_log_send_future + + fut = Future() + fut.set_exception(RuntimeError("api failed")) + + with patch("bridges.qq._log.warn") as warn: + _qq_log_send_future(fut, "group", "target") + + warn.assert_called_once() + assert warn.call_args.args[0] == "qq_send_api_error" + + +def test_queue_or_dispatch_marks_busy_before_thread_dispatch(): + """A second same-target job should queue before worker thread starts.""" + from unittest.mock import MagicMock, patch + from bridges import qq + + job1 = MagicMock() + job1.id = "job1" + job2 = MagicMock() + job2.id = "job2" + + with qq._qq_queues_lock: + qq._qq_busy.clear() + qq._qq_queues.clear() + + dispatched = [] + + def fake_dispatch(job, prompt, target_id, msg_type, run_query_cb, session_ctx, config, image_b64=None): + dispatched.append((job.id, prompt, target_id, image_b64)) + + with patch("bridges.qq._dispatch_qq_job", side_effect=fake_dispatch): + pos1 = qq._queue_or_dispatch_qq_job( + job1, "prompt1", "target", "group", None, None, {}, "img1" + ) + pos2 = qq._queue_or_dispatch_qq_job( + job2, "prompt2", "target", "group", None, None, {}, "img2" + ) + + assert pos1 == 0 + assert pos2 == 1 + assert dispatched == [("job1", "prompt1", "target", "img1")] + assert qq._qq_queues["target"] == [("job2", "prompt2", "img2")] + + with qq._qq_queues_lock: + qq._qq_busy.clear() + qq._qq_queues.clear() + + +def test_qq_thread_not_running_initially(): + """QQ bridge thread state is properly managed.""" + from bridges.qq import _qq_thread + # After import, thread may have been started by other tests; + # just verify the module-level variable exists and is accessible + assert _qq_thread is None or isinstance(_qq_thread, threading.Thread) + + +def test_qq_stop_event_cleared(): + """QQ stop event should not be set initially.""" + from bridges.qq import _qq_stop + assert not _qq_stop.is_set() + + +def test_post_group_clean_payload_no_msg_id(fake_botpy_route): + """_qq_post_group builds clean payload without msg_id/event_id when empty.""" + import asyncio + from unittest.mock import AsyncMock, MagicMock + from bridges.qq import _qq_post_group + + api = MagicMock() + api._http = MagicMock() + api._http.request = AsyncMock() + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(_qq_post_group(api, "group123", "hello")) + finally: + loop.close() + + api._http.request.assert_called_once() + call_args = api._http.request.call_args + payload = call_args[1]["json"] + # msg_id, event_id, and msg_seq should NOT be in payload when not provided + assert "msg_id" not in payload + assert "event_id" not in payload + assert "msg_seq" not in payload + # Only clean fields present + assert payload == {"group_openid": "group123", "msg_type": 0, "content": "hello"} + # No null fields from botpy's locals() pattern + assert "embed" not in payload + assert "ark" not in payload + assert "media" not in payload + + +def test_post_group_clean_payload_with_msg_id(fake_botpy_route): + """_qq_post_group includes msg_id/msg_seq when msg_id is provided.""" + import asyncio + from unittest.mock import AsyncMock, MagicMock + from bridges.qq import _qq_post_group + + api = MagicMock() + api._http = MagicMock() + api._http.request = AsyncMock() + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(_qq_post_group(api, "group123", "reply text", msg_id="msg456", msg_seq=2)) + finally: + loop.close() + + payload = api._http.request.call_args[1]["json"] + assert payload["msg_id"] == "msg456" + assert payload["msg_seq"] == 2 + assert payload["content"] == "reply text" + + +def test_post_group_clean_payload_with_event_id(fake_botpy_route): + """_qq_post_group uses event_id when msg_id is None.""" + import asyncio + from unittest.mock import AsyncMock, MagicMock + from bridges.qq import _qq_post_group + + api = MagicMock() + api._http = MagicMock() + api._http.request = AsyncMock() + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(_qq_post_group(api, "group123", "reply text", event_id="evt789", msg_seq=1)) + finally: + loop.close() + + payload = api._http.request.call_args[1]["json"] + assert "msg_id" not in payload # msg_id should not be set + assert payload["event_id"] == "evt789" + assert payload["msg_seq"] == 1 + assert payload["content"] == "reply text" + + +def test_post_c2c_clean_payload(fake_botpy_route): + """_qq_post_c2c builds clean payload.""" + import asyncio + from unittest.mock import AsyncMock, MagicMock + from bridges.qq import _qq_post_c2c + + api = MagicMock() + api._http = MagicMock() + api._http.request = AsyncMock() + + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(_qq_post_c2c(api, "user123", "hi")) + finally: + loop.close() + + payload = api._http.request.call_args[1]["json"] + assert "msg_id" not in payload + assert "event_id" not in payload + assert "msg_seq" not in payload + assert payload == {"openid": "user123", "msg_type": 0, "content": "hi"} + + +def test_msg_seq_starts_at_1_for_new_message(): + """First send with reply context should use msg_seq=1.""" + import time + from bridges.qq import _qq_reply_ctx, _qq_reply_lock + from unittest.mock import MagicMock, patch + + # Set up reply context with seq=0 (as stored by _handle_message) + with _qq_reply_lock: + _qq_reply_ctx["test_target"] = ("msg123", "event456", 0, time.time(), "group") + + # Mock the API calls to capture the msg_seq values + captured_seqs = [] + + def mock_send_group(api, group_openid, content, msg_id=None, event_id=None, msg_seq=1): + captured_seqs.append((msg_id, event_id, msg_seq, content)) + + with patch("bridges.qq._qq_send_group", side_effect=mock_send_group): + with patch("bridges.qq._qq_api_client", MagicMock()): + from bridges.qq import _qq_send + _qq_send("test_target", "hello", {}) + + # First chunk should have msg_seq=1 + assert len(captured_seqs) == 1 + assert captured_seqs[0][0] == "msg123" # msg_id + assert captured_seqs[0][2] == 1 # msg_seq should be 1, not 2 + + # Clean up + with _qq_reply_lock: + del _qq_reply_ctx["test_target"] + + +def test_msg_seq_increments_correctly_for_chunks(): + """Multiple chunks should increment msg_seq properly.""" + import time + from bridges.qq import _qq_reply_ctx, _qq_reply_lock, _QQ_MAX_MSG_LEN + from unittest.mock import MagicMock, patch + + # Set up reply context with seq=0 + with _qq_reply_lock: + _qq_reply_ctx["test_target"] = ("msg123", "event456", 0, time.time(), "group") + + captured_seqs = [] + + def mock_send_group(api, group_openid, content, msg_id=None, event_id=None, msg_seq=1): + captured_seqs.append((msg_id, event_id, msg_seq, content)) + + with patch("bridges.qq._qq_send_group", side_effect=mock_send_group): + with patch("bridges.qq._qq_api_client", MagicMock()): + from bridges.qq import _qq_send + # Send text that will be split into 2 chunks + long_text = "A" * (_QQ_MAX_MSG_LEN + 100) + _qq_send("test_target", long_text, {}) + + # Should have 2 chunks with msg_seq=1 and msg_seq=2 + assert len(captured_seqs) == 2 + assert captured_seqs[0][2] == 1 # First chunk: seq=1 + assert captured_seqs[1][2] == 2 # Second chunk: seq=2 + + # Clean up + with _qq_reply_lock: + del _qq_reply_ctx["test_target"] + + +def test_no_duplicate_send_in_bg_runner(): + """_qq_bg_runner should not send duplicate messages.""" + from unittest.mock import MagicMock, patch + from bridges.qq import _qq_bg_runner + + session_ctx = MagicMock() + + job = MagicMock() + job.id = "test123" + + run_query_cb = MagicMock() + send_calls = [] + + def mock_qq_send(target_id, text, _cfg=None, _msg_type=None): + send_calls.append((target_id, text)) + + with patch("bridges.qq._qq_api_client", MagicMock()): + with patch("bridges.qq._qq_send", side_effect=mock_qq_send): + with patch("bridges.qq._jobs.start"): + with patch("bridges.qq._jobs.stream_result"): + with patch("bridges.qq._jobs.complete"): + _qq_bg_runner(job, "test prompt", "target123", "group", + run_query_cb, session_ctx, {}) + + # Check that _qq_send was called reasonable times (not hundreds of duplicates) + # We expect: 1 "任务执行中" message + result messages + # But not the same message repeated dozens of times + assert len(send_calls) < 50, f"Too many send calls: {len(send_calls)}, likely duplicate echo bug" + # Verify the expected initial message is present + assert any("执行中" in str(call[1]) for call in send_calls), "Missing initial status message" + + +def test_qq_bg_runner_sets_pending_image_for_matching_job(): + """Downloaded QQ images should be attached to the job that owns them.""" + from unittest.mock import MagicMock, patch + from bridges.qq import _qq_bg_runner + import runtime + + session_id = "_test_qq_image_job" + config = {"_session_id": session_id} + session_ctx = runtime.get_session_ctx(session_id) + job = MagicMock() + job.id = "img-job" + seen_pending = [] + + def run_query_cb(_prompt): + seen_pending.append(runtime.get_ctx(config).pending_image) + runtime.get_ctx(config).pending_image = None + + try: + with patch("bridges.qq._qq_send"): + with patch("bridges.qq._jobs.start"): + with patch("bridges.qq._jobs.complete"): + _qq_bg_runner( + job, + "describe this", + "target", + "group", + run_query_cb, + session_ctx, + config, + "base64-image", + ) + finally: + runtime.release_session_ctx(session_id) + + assert seen_pending == ["base64-image"] + + +def test_qq_bg_runner_clears_pending_image_if_run_query_fails_before_consuming(): + """A failed image job must not leak its image into the next turn.""" + from unittest.mock import MagicMock, patch + from bridges.qq import _qq_bg_runner + import runtime + + session_id = "_test_qq_image_fail_cleanup" + config = {"_session_id": session_id} + session_ctx = runtime.get_session_ctx(session_id) + job = MagicMock() + job.id = "img-fail-job" + + def run_query_cb(_prompt): + raise RuntimeError("boom before agent consumes image") + + try: + with patch("bridges.qq._qq_send"): + with patch("bridges.qq._jobs.start"): + with patch("bridges.qq._jobs.fail"): + _qq_bg_runner( + job, + "describe this", + "target", + "group", + run_query_cb, + session_ctx, + config, + "base64-image", + ) + assert runtime.get_ctx(config).pending_image is None + finally: + runtime.release_session_ctx(session_id) + + +def test_streaming_hook_idempotency(): + """Streaming hooks should handle duplicate calls gracefully.""" + from unittest.mock import MagicMock, patch + from bridges.qq import _qq_bg_runner + + session_ctx = MagicMock() + + job = MagicMock() + job.id = "test999" + + # Track how many times each hook is called + chunk_count = [0] + tool_start_count = [0] + tool_end_count = [0] + + # Track _qq_send calls + send_calls = [] + + def mock_run_query(prompt): + # Simulate agent calling hooks multiple times + if session_ctx.on_text_chunk: + session_ctx.on_text_chunk("test chunk") + chunk_count[0] += 1 + if session_ctx.on_tool_start: + session_ctx.on_tool_start("TestTool", {}) + tool_start_count[0] += 1 + if session_ctx.on_tool_end: + session_ctx.on_tool_end("TestTool", "done") + tool_end_count[0] += 1 + + def mock_qq_send(target_id, text, _cfg=None, _msg_type=None): + _ = _cfg, _msg_type # Mark as intentionally unused + send_calls.append((target_id, text)) + + with patch("bridges.qq._qq_api_client", MagicMock()): + with patch("bridges.qq._qq_send", side_effect=mock_qq_send): + with patch("bridges.qq._jobs.start"): + with patch("bridges.qq._jobs.complete"): + _qq_bg_runner(job, "test", "target", "group", + mock_run_query, session_ctx, {}) + + # Each hook should be called exactly once + assert chunk_count[0] == 1, f"on_text_chunk called {chunk_count[0]} times" + assert tool_start_count[0] == 1, f"on_tool_start called {tool_start_count[0]} times" + assert tool_end_count[0] == 1, f"on_tool_end called {tool_end_count[0]} times" + + # _qq_send should be called for: "执行中" + tool start message = 2 calls minimum + assert len(send_calls) >= 2, f"Expected at least 2 send calls, got {len(send_calls)}" + + +def test_qq_bg_runner_serializes_global_streaming_hooks(): + """Concurrent QQ jobs must not overwrite each other's session-level hooks.""" + from unittest.mock import MagicMock, patch + from bridges.qq import _qq_bg_runner + from runtime import RuntimeContext + + session_ctx = RuntimeContext() + job_a = MagicMock() + job_a.id = "job-a" + job_b = MagicMock() + job_b.id = "job-b" + + a_entered = threading.Event() + b_entered = threading.Event() + release_a = threading.Event() + send_calls = [] + send_lock = threading.Lock() + + def mock_run_query(prompt): + if prompt == "prompt-a": + a_entered.set() + session_ctx.on_text_chunk("chunk-a") + assert release_a.wait(timeout=2) + else: + b_entered.set() + session_ctx.on_text_chunk("chunk-b") + + def mock_qq_send(target_id, text, _cfg=None, _msg_type=None): + with send_lock: + send_calls.append((target_id, text)) + + with patch("bridges.qq._qq_send", side_effect=mock_qq_send): + with patch("bridges.qq._jobs.start"): + with patch("bridges.qq._jobs.stream_result"): + with patch("bridges.qq._jobs.complete"): + t1 = threading.Thread( + target=_qq_bg_runner, + args=( + job_a, + "prompt-a", + "target-a", + "group", + mock_run_query, + session_ctx, + {}, + ), + ) + t2 = threading.Thread( + target=_qq_bg_runner, + args=( + job_b, + "prompt-b", + "target-b", + "group", + mock_run_query, + session_ctx, + {}, + ), + ) + t1.start() + assert a_entered.wait(timeout=2) + t2.start() + time.sleep(0.05) + assert not b_entered.is_set() + release_a.set() + t1.join(timeout=2) + t2.join(timeout=2) + + assert not t1.is_alive() + assert not t2.is_alive() + assert ("target-a", "chunk-a") in send_calls + assert ("target-b", "chunk-b") in send_calls + assert ("target-a", "chunk-b") not in send_calls + assert ("target-b", "chunk-a") not in send_calls diff --git a/tools/__init__.py b/tools/__init__.py index 124b98b..1ec06c9 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -39,8 +39,8 @@ ) from tools.interaction import ( # noqa: F401 - _tg_thread_local, _wx_thread_local, _slack_thread_local, - _is_in_tg_turn, _is_in_wx_turn, _is_in_slack_turn, _is_in_web_turn, + _tg_thread_local, _wx_thread_local, _slack_thread_local, _qq_thread_local, + _is_in_tg_turn, _is_in_wx_turn, _is_in_slack_turn, _is_in_qq_turn, _is_in_web_turn, _ask_user_question, ask_input_interactive, _sleeptimer, _INPUT_WAIT_TIMEOUT, ) diff --git a/tools/interaction.py b/tools/interaction.py index 53805bc..042c0be 100644 --- a/tools/interaction.py +++ b/tools/interaction.py @@ -12,6 +12,7 @@ _tg_thread_local = threading.local() _wx_thread_local = threading.local() _slack_thread_local = threading.local() +_qq_thread_local = threading.local() def _is_in_tg_turn(config: dict) -> bool: @@ -37,6 +38,12 @@ def _is_in_web_turn(config: dict) -> bool: return bool(getattr(runtime.get_ctx(config), 'in_web_turn', False)) +def _is_in_qq_turn(config: dict) -> bool: + import runtime + return (getattr(_qq_thread_local, "active", False) + or bool(runtime.get_ctx(config).in_qq_turn)) + + # ── options=… helpers (shared menu rendering + reply resolution) ───────── def _strip_emojis_punct(s: str) -> str: @@ -238,6 +245,41 @@ def ask_input_interactive(prompt: str, config: dict, _session_ctx.wx_input_value = "" return _resolve_choice(text, _value_map) + # ── QQ ──────────────────────────────────────────────────────────────── + if _is_in_qq_turn(config) and _session_ctx.qq_send is not None: + clean_prompt = _re.sub(r'\x1b\[[0-9;]*m', '', prompt).strip() + payload = "" + if menu_text: + payload += _re.sub(r'\x1b\[[0-9;]*m', '', menu_text).strip() + "\n\n" + payload += f"❓ 需要输入\n{clean_prompt}" + if _menu_block: + payload += "\n\n" + _menu_block + # Echo to terminal for visibility + print(f"\n 📩 QQ 权限请求: {clean_prompt}") + if _menu_block: + for line in _menu_block.splitlines(): + print(f" {line}") + sctx = _runtime.get_ctx(config) + # Prefer thread-local target_id to avoid race conditions with concurrent handlers + target = (getattr(_qq_thread_local, "target_id", "") + or getattr(sctx, "qq_current_target_id", "") or "") + if not target: + print(f"\n ⚠ QQ 权限请求无法发送:target_id 为空") + return "(error: no QQ target_id)" + evt = _threading.Event() + _session_ctx.qq_input_target_id = target + _session_ctx.qq_input_event = evt + _session_ctx.qq_send(target, payload) + if not evt.wait(timeout=120): + _session_ctx.qq_input_event = None + _session_ctx.qq_input_target_id = "" + return "(timeout: no input received)" + text = _session_ctx.qq_input_value.strip() + _session_ctx.qq_input_event = None + _session_ctx.qq_input_value = "" + _session_ctx.qq_input_target_id = "" + return _resolve_choice(text, _value_map) + # ── Web (chat API) ──────────────────────────────────────────────────── if getattr(_session_ctx, 'in_web_turn', False): # Permission event is already pushed to WS by ChatSession._run_agent.