diff --git a/apps/memos-local-plugin/adapters/hermes/__init__.py b/apps/memos-local-plugin/adapters/hermes/__init__.py index fcb8ad58e..59e12c7c8 100644 --- a/apps/memos-local-plugin/adapters/hermes/__init__.py +++ b/apps/memos-local-plugin/adapters/hermes/__init__.py @@ -8,22 +8,25 @@ from __future__ import annotations +import contextlib import json import logging -import os import re import sys import threading + from pathlib import Path -from typing import Any, Dict, List +from typing import Any + # Add this directory to sys.path so sibling modules (config, bridge_client, …) resolve _PLUGIN_DIR = Path(__file__).resolve().parent if str(_PLUGIN_DIR) not in sys.path: sys.path.insert(0, str(_PLUGIN_DIR)) -from agent.memory_provider import MemoryProvider -from tools.registry import tool_error +from agent.memory_provider import MemoryProvider # noqa: E402 +from tools.registry import tool_error # noqa: E402 + logger = logging.getLogger(__name__) @@ -51,10 +54,10 @@ re.compile(r'^\s*\{["\']?ok["\']?\s*:\s*true\s*\}\s*$', re.IGNORECASE), re.compile(r'^\s*\{["\']?success["\']?\s*:\s*true\s*\}\s*$', re.IGNORECASE), re.compile(r'^\s*\{["\']?status["\']?\s*:\s*["\']?ok["\']?\s*\}\s*$', re.IGNORECASE), - re.compile(r'^Operation interrupted:', re.IGNORECASE), - re.compile(r'^Error:', re.IGNORECASE), - re.compile(r'waiting for model response.*elapsed', re.IGNORECASE), - re.compile(r'^\s*$'), + re.compile(r"^Operation interrupted:", re.IGNORECASE), + re.compile(r"^Error:", re.IGNORECASE), + re.compile(r"waiting for model response.*elapsed", re.IGNORECASE), + re.compile(r"^\s*$"), ] _MIN_CONTENT_LENGTH = 6 @@ -74,7 +77,10 @@ def _is_trivial(text: str) -> bool: keys = {k.lower() for k in obj} if keys <= {"ok", "success", "status", "result", "error", "message"}: vals = list(obj.values()) - if all(isinstance(v, (bool, type(None))) or (isinstance(v, str) and len(v) < 20) for v in vals): + if all( + isinstance(v, bool | type(None)) or (isinstance(v, str) and len(v) < 20) + for v in vals + ): return True except (json.JSONDecodeError, TypeError): pass @@ -100,6 +106,7 @@ def name(self) -> str: def is_available(self) -> bool: try: from config import find_bridge_script + find_bridge_script() return True except Exception: @@ -109,6 +116,7 @@ def initialize(self, session_id: str, **kwargs) -> None: self._session_id = session_id from daemon_manager import ensure_daemon + try: info = ensure_daemon() logger.info( @@ -121,6 +129,7 @@ def initialize(self, session_id: str, **kwargs) -> None: logger.warning("Failed to start MemTensor daemon: %s", e) from bridge_client import MemosCoreBridge + try: self._bridge = MemosCoreBridge() logger.info("MemTensor bridge connected") @@ -175,6 +184,7 @@ def _run(): if pending and self._bridge: try: from config import OWNER + user_content, assistant_content, sid = pending messages = [] if user_content: @@ -201,9 +211,7 @@ def _do_recall(self, query: str) -> str: parts: list[str] = [] try: - search_resp = self._bridge.search( - query, max_results=5, min_score=0.4, owner=OWNER - ) + search_resp = self._bridge.search(query, max_results=5, min_score=0.4, owner=OWNER) hits = search_resp.get("hits") or search_resp.get("memories") or [] for h in hits: text = h.get("original_excerpt") or h.get("content") or h.get("summary", "") @@ -225,9 +233,7 @@ def _do_recall(self, query: str) -> str: return "\n".join(parts) - def sync_turn( - self, user_content: str, assistant_content: str, *, session_id: str = "" - ) -> None: + def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None: """Queue turn data for deferred ingest. Hermes calls sync_all() BEFORE queue_prefetch_all(), so ingesting @@ -239,8 +245,11 @@ def sync_turn( if not self._bridge: return if _is_trivial(user_content) and _is_trivial(assistant_content): - logger.debug("sync_turn: skipping trivial turn (user=%r, assistant=%r)", - user_content[:80] if user_content else "", assistant_content[:80] if assistant_content else "") + logger.debug( + "sync_turn: skipping trivial turn (user=%r, assistant=%r)", + user_content[:80] if user_content else "", + assistant_content[:80] if assistant_content else "", + ) return if _is_trivial(user_content): user_content = "" @@ -249,10 +258,10 @@ def sync_turn( sid = session_id or self._session_id or "default" self._pending_ingest = (user_content, assistant_content, sid) - def get_tool_schemas(self) -> List[Dict[str, Any]]: + def get_tool_schemas(self) -> list[dict[str, Any]]: return [MEMORY_SEARCH_SCHEMA] - def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str: + def handle_tool_call(self, tool_name: str, args: dict[str, Any], **kwargs) -> str: if tool_name != "memory_search": return tool_error(f"Unknown tool: {tool_name}") @@ -265,6 +274,7 @@ def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> st try: from config import OWNER + resp = self._bridge.search(query, max_results=8, owner=OWNER) hits = resp.get("hits") or resp.get("memories") or [] if not hits: @@ -285,6 +295,7 @@ def on_memory_write(self, action: str, target: str, content: str) -> None: return from config import OWNER + label = "user_profile" if target == "user" else "memory" messages = [ {"role": "system", "content": f"[{label}] {content}"}, @@ -298,14 +309,16 @@ def _write(): owner=OWNER, ) self._bridge.flush() - logger.info("MemTensor on_memory_write: %s %s (%d chars)", action, target, len(content)) + logger.info( + "MemTensor on_memory_write: %s %s (%d chars)", action, target, len(content) + ) except Exception as e: logger.warning("MemTensor on_memory_write failed: %s", e) t = threading.Thread(target=_write, daemon=True, name="memtensor-memory-write") t.start() - def on_session_end(self, messages: List[Dict[str, Any]]) -> None: + def on_session_end(self, messages: list[dict[str, Any]]) -> None: if not self._bridge: return # Flush any deferred ingest that hasn't been picked up by queue_prefetch @@ -314,6 +327,7 @@ def on_session_end(self, messages: List[Dict[str, Any]]) -> None: if pending: try: from config import OWNER + user_content, assistant_content, sid = pending msgs = [] if user_content: @@ -334,10 +348,8 @@ def shutdown(self) -> None: if t and t.is_alive(): t.join(timeout=5.0) if self._bridge: - try: + with contextlib.suppress(Exception): self._bridge.shutdown() - except Exception: - pass self._bridge = None diff --git a/apps/memos-local-plugin/adapters/hermes/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/bridge_client.py index 47ca0b016..426f40c86 100644 --- a/apps/memos-local-plugin/adapters/hermes/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/bridge_client.py @@ -7,15 +7,18 @@ from __future__ import annotations +import contextlib import json import logging import os import socket import subprocess import threading + from typing import Any -from config import find_bridge_script, get_bridge_config, get_daemon_port, OWNER, _get_plugin_root +from config import OWNER, _get_plugin_root, find_bridge_script, get_bridge_config, get_daemon_port + logger = logging.getLogger(__name__) @@ -45,10 +48,8 @@ def send(self, data: str) -> str: def close(self) -> None: if self._sock: - try: + with contextlib.suppress(OSError): self._sock.close() - except OSError: - pass self._sock = None @@ -133,20 +134,27 @@ def call(self, method: str, params: dict[str, Any] | None = None) -> Any: raise RuntimeError(f"Bridge error: {resp['error']}") return resp.get("result", {}) - def search(self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = OWNER) -> dict: - return self.call("search", { - "query": query, - "maxResults": max_results, - "minScore": min_score, - "owner": owner, - }) + def search( + self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = OWNER + ) -> dict: + return self.call( + "search", + { + "query": query, + "maxResults": max_results, + "minScore": min_score, + "owner": owner, + }, + ) def ingest(self, messages: list[dict], session_id: str = "default", owner: str = OWNER) -> None: params: dict[str, Any] = {"messages": messages, "sessionId": session_id, "owner": owner} self.call("ingest", params) def build_prompt(self, query: str, max_results: int = 6, owner: str = OWNER) -> dict: - return self.call("build_prompt", {"query": query, "maxResults": max_results, "owner": owner}) + return self.call( + "build_prompt", {"query": query, "maxResults": max_results, "owner": owner} + ) def flush(self) -> None: self.call("flush") diff --git a/apps/memos-local-plugin/adapters/hermes/config.py b/apps/memos-local-plugin/adapters/hermes/config.py index 5613111c9..f1a2a8b4f 100644 --- a/apps/memos-local-plugin/adapters/hermes/config.py +++ b/apps/memos-local-plugin/adapters/hermes/config.py @@ -4,8 +4,10 @@ import json import os + from pathlib import Path + DAEMON_PORT = 18992 VIEWER_PORT = 18901 OWNER = "hermes" @@ -139,6 +141,7 @@ def _resolve_tsx(plugin_root: Path) -> str: if local_tsx.exists(): return str(local_tsx) import shutil + global_tsx = shutil.which("tsx") if global_tsx: return global_tsx @@ -170,7 +173,7 @@ def find_bridge_script() -> list[str]: return ["node", str(candidate)] tsx = _resolve_tsx(candidate.parent) if " " in tsx: - return tsx.split() + [str(candidate)] + return [*tsx.split(), str(candidate)] return [tsx, str(candidate)] raise FileNotFoundError( diff --git a/apps/memos-local-plugin/adapters/hermes/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/daemon_manager.py index c922c5d32..1ac11da52 100644 --- a/apps/memos-local-plugin/adapters/hermes/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/daemon_manager.py @@ -6,6 +6,7 @@ from __future__ import annotations +import contextlib import json import logging import os @@ -13,18 +14,20 @@ import socket import subprocess import time + from typing import Any from config import ( DAEMON_PORT, VIEWER_PORT, + _get_plugin_root, find_bridge_script, get_bridge_config, get_daemon_dir, get_daemon_port, - _get_plugin_root, ) + logger = logging.getLogger(__name__) @@ -83,10 +86,8 @@ def _cleanup_pid_files() -> None: for name in ("bridge.pid", "bridge.port", "viewer.url"): f = daemon_dir / name if f.exists(): - try: + with contextlib.suppress(OSError): f.unlink() - except OSError: - pass def start_daemon( @@ -105,10 +106,8 @@ def start_daemon( pid = 0 pf = daemon_dir / "bridge.pid" if pf.exists(): - try: + with contextlib.suppress(ValueError, OSError): pid = int(pf.read_text().strip()) - except (ValueError, OSError): - pass logger.info("Daemon already responsive on port %d (shared)", port) return { "daemonPort": port, @@ -125,57 +124,55 @@ def start_daemon( env["OPENCLAW_STATE_DIR"] = str(get_daemon_dir().parent) log_dir = get_daemon_dir() - log_file = open(log_dir / "bridge.log", "a") logger.info("Starting daemon: %s", " ".join(bridge_cmd)) - proc = subprocess.Popen( - bridge_cmd, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=log_file, - env=env, - cwd=str(_get_plugin_root()), - start_new_session=True, - ) - - deadline = time.monotonic() + timeout - info: dict[str, Any] = {} - - import select - while time.monotonic() < deadline: - if proc.poll() is not None: - log_file.close() - stderr_out = "" - try: - stderr_out = (log_dir / "bridge.log").read_text()[-2000:] - except OSError: - pass - raise RuntimeError( - f"Daemon exited immediately with code {proc.returncode}.\nlog: {stderr_out}" - ) - - if proc.stdout and select.select([proc.stdout], [], [], 1.0)[0]: - line = proc.stdout.readline().decode("utf-8").strip() - if line: - try: - info = json.loads(line) - break - except json.JSONDecodeError: - logger.debug("Non-JSON stdout line from daemon: %s", line) - - if not info: - log_file.close() - raise RuntimeError("Daemon did not produce startup info within timeout") + with open(log_dir / "bridge.log", "a") as log_file: + proc = subprocess.Popen( + bridge_cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=log_file, + env=env, + cwd=str(_get_plugin_root()), + start_new_session=True, + ) + + deadline = time.monotonic() + timeout + info: dict[str, Any] = {} + + import select + + while time.monotonic() < deadline: + if proc.poll() is not None: + stderr_out = "" + with contextlib.suppress(OSError): + stderr_out = (log_dir / "bridge.log").read_text()[-2000:] + raise RuntimeError( + f"Daemon exited immediately with code {proc.returncode}.\nlog: {stderr_out}" + ) + + if proc.stdout and select.select([proc.stdout], [], [], 1.0)[0]: + line = proc.stdout.readline().decode("utf-8").strip() + if line: + try: + info = json.loads(line) + break + except json.JSONDecodeError: + logger.debug("Non-JSON stdout line from daemon: %s", line) + + if not info: + raise RuntimeError("Daemon did not produce startup info within timeout") if proc.stdout: proc.stdout.close() - log_file.close() info["already_running"] = False logger.info( "Daemon started: pid=%s, port=%s, viewer=%s", - info.get("pid"), info.get("daemonPort"), info.get("viewerUrl"), + info.get("pid"), + info.get("daemonPort"), + info.get("viewerUrl"), ) return info diff --git a/apps/memos-local-plugin/adapters/openharness/scripts/bridge_client.py b/apps/memos-local-plugin/adapters/openharness/scripts/bridge_client.py index ce2b81655..d620606f4 100644 --- a/apps/memos-local-plugin/adapters/openharness/scripts/bridge_client.py +++ b/apps/memos-local-plugin/adapters/openharness/scripts/bridge_client.py @@ -7,15 +7,18 @@ from __future__ import annotations +import contextlib import json import logging import os import socket import subprocess import threading + from typing import Any -from config import find_bridge_script, get_bridge_config, get_daemon_port, _get_plugin_root +from config import _get_plugin_root, find_bridge_script, get_bridge_config, get_daemon_port + logger = logging.getLogger(__name__) @@ -47,10 +50,8 @@ def send(self, data: str) -> str: def close(self) -> None: if self._sock: - try: + with contextlib.suppress(OSError): self._sock.close() - except OSError: - pass self._sock = None @@ -139,22 +140,31 @@ def call(self, method: str, params: dict[str, Any] | None = None) -> Any: raise RuntimeError(f"Bridge error: {resp['error']}") return resp.get("result", {}) - def search(self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = "openharness") -> dict: - return self.call("search", { - "query": query, - "maxResults": max_results, - "minScore": min_score, - "owner": owner, - }) + def search( + self, query: str, max_results: int = 6, min_score: float = 0.45, owner: str = "openharness" + ) -> dict: + return self.call( + "search", + { + "query": query, + "maxResults": max_results, + "minScore": min_score, + "owner": owner, + }, + ) - def ingest(self, messages: list[dict], session_id: str = "default", owner: str | None = None) -> None: + def ingest( + self, messages: list[dict], session_id: str = "default", owner: str | None = None + ) -> None: params: dict[str, Any] = {"messages": messages, "sessionId": session_id} if owner: params["owner"] = owner self.call("ingest", params) def build_prompt(self, query: str, max_results: int = 6, owner: str = "openharness") -> dict: - return self.call("build_prompt", {"query": query, "maxResults": max_results, "owner": owner}) + return self.call( + "build_prompt", {"query": query, "maxResults": max_results, "owner": owner} + ) def flush(self) -> None: self.call("flush") diff --git a/apps/memos-local-plugin/adapters/openharness/scripts/capture.py b/apps/memos-local-plugin/adapters/openharness/scripts/capture.py index e73ba9e31..599584d95 100644 --- a/apps/memos-local-plugin/adapters/openharness/scripts/capture.py +++ b/apps/memos-local-plugin/adapters/openharness/scripts/capture.py @@ -17,13 +17,16 @@ import logging import os import sys + from pathlib import Path + sys.path.insert(0, str(Path(__file__).parent)) from bridge_client import MemosCoreBridge from config import get_project_session_dir + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") logger = logging.getLogger("memos-capture") @@ -85,10 +88,12 @@ def extract_messages_from_payload() -> tuple[list[dict], str]: if isinstance(raw_messages, list): for msg in raw_messages: if isinstance(msg, dict) and "role" in msg and "content" in msg: - messages.append({ - "role": msg["role"], - "content": msg["content"], - }) + messages.append( + { + "role": msg["role"], + "content": msg["content"], + } + ) return messages, session_id diff --git a/apps/memos-local-plugin/adapters/openharness/scripts/config.py b/apps/memos-local-plugin/adapters/openharness/scripts/config.py index 69476c1bd..e12a53e68 100644 --- a/apps/memos-local-plugin/adapters/openharness/scripts/config.py +++ b/apps/memos-local-plugin/adapters/openharness/scripts/config.py @@ -4,9 +4,11 @@ import json import os + from hashlib import sha1 from pathlib import Path + # Default ports DAEMON_PORT = 18990 VIEWER_PORT = 18899 @@ -165,6 +167,7 @@ def _resolve_tsx(plugin_root: Path) -> str: if local_tsx.exists(): return str(local_tsx) import shutil + global_tsx = shutil.which("tsx") if global_tsx: return global_tsx @@ -197,7 +200,7 @@ def find_bridge_script() -> list[str]: return ["node", str(candidate)] tsx = _resolve_tsx(candidate.parent) if " " in tsx: - return tsx.split() + [str(candidate)] + return [*tsx.split(), str(candidate)] return [tsx, str(candidate)] raise FileNotFoundError( diff --git a/apps/memos-local-plugin/adapters/openharness/scripts/daemon_manager.py b/apps/memos-local-plugin/adapters/openharness/scripts/daemon_manager.py index 7b3c55ba3..dae0e6751 100644 --- a/apps/memos-local-plugin/adapters/openharness/scripts/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/openharness/scripts/daemon_manager.py @@ -7,6 +7,7 @@ from __future__ import annotations +import contextlib import json import logging import os @@ -14,19 +15,20 @@ import socket import subprocess import time -from pathlib import Path + from typing import Any from config import ( DAEMON_PORT, VIEWER_PORT, + _get_plugin_root, find_bridge_script, get_bridge_config, get_daemon_dir, get_daemon_port, - _get_plugin_root, ) + logger = logging.getLogger(__name__) @@ -84,10 +86,8 @@ def _cleanup_pid_files() -> None: for name in ("bridge.pid", "bridge.port", "viewer.url"): f = daemon_dir / name if f.exists(): - try: + with contextlib.suppress(OSError): f.unlink() - except OSError: - pass def start_daemon( @@ -109,10 +109,8 @@ def start_daemon( pid = 0 pf = daemon_dir / "bridge.pid" if pf.exists(): - try: + with contextlib.suppress(ValueError, OSError): pid = int(pf.read_text().strip()) - except (ValueError, OSError): - pass return { "daemonPort": port, "viewerUrl": viewer_url, @@ -128,62 +126,56 @@ def start_daemon( # Isolate viewer: prevent migration scan from showing OpenClaw data env["OPENCLAW_STATE_DIR"] = str(get_daemon_dir().parent) - # Redirect stderr to a log file so closing the pipe doesn't EPIPE the daemon log_dir = get_daemon_dir() - log_file = open(log_dir / "bridge.log", "a") logger.info("Starting daemon: %s", " ".join(bridge_cmd)) - proc = subprocess.Popen( - bridge_cmd, - stdin=subprocess.DEVNULL, - stdout=subprocess.PIPE, - stderr=log_file, - env=env, - cwd=str(_get_plugin_root()), - start_new_session=True, - ) + with open(log_dir / "bridge.log", "a") as log_file: + proc = subprocess.Popen( + bridge_cmd, + stdin=subprocess.DEVNULL, + stdout=subprocess.PIPE, + stderr=log_file, + env=env, + cwd=str(_get_plugin_root()), + start_new_session=True, + ) + + deadline = time.monotonic() + timeout + info: dict[str, Any] = {} + + import select + + while time.monotonic() < deadline: + if proc.poll() is not None: + stderr_out = "" + with contextlib.suppress(OSError): + stderr_out = (log_dir / "bridge.log").read_text()[-2000:] + raise RuntimeError( + f"Daemon exited immediately with code {proc.returncode}.\nlog: {stderr_out}" + ) + + if proc.stdout and select.select([proc.stdout], [], [], 1.0)[0]: + line = proc.stdout.readline().decode("utf-8").strip() + if line: + try: + info = json.loads(line) + break + except json.JSONDecodeError: + logger.debug("Non-JSON stdout line from daemon: %s", line) + + if not info: + raise RuntimeError("Daemon did not produce startup info within timeout") - # Wait for the daemon to print its startup JSON line to stdout - deadline = time.monotonic() + timeout - info: dict[str, Any] = {} - - import select - while time.monotonic() < deadline: - if proc.poll() is not None: - log_file.close() - # Read the log for error context - stderr_out = "" - try: - stderr_out = (log_dir / "bridge.log").read_text()[-2000:] - except OSError: - pass - raise RuntimeError( - f"Daemon exited immediately with code {proc.returncode}.\nlog: {stderr_out}" - ) - - if proc.stdout and select.select([proc.stdout], [], [], 1.0)[0]: - line = proc.stdout.readline().decode("utf-8").strip() - if line: - try: - info = json.loads(line) - break - except json.JSONDecodeError: - logger.debug("Non-JSON stdout line from daemon: %s", line) - - if not info: - log_file.close() - raise RuntimeError("Daemon did not produce startup info within timeout") - - # Close our handle to stdout; stderr goes to the log file which stays open if proc.stdout: proc.stdout.close() - log_file.close() info["already_running"] = False logger.info( "Daemon started: pid=%s, port=%s, viewer=%s", - info.get("pid"), info.get("daemonPort"), info.get("viewerUrl"), + info.get("pid"), + info.get("daemonPort"), + info.get("viewerUrl"), ) return info diff --git a/apps/memos-local-plugin/adapters/openharness/scripts/recall.py b/apps/memos-local-plugin/adapters/openharness/scripts/recall.py index 8e8f94b18..df442a73f 100644 --- a/apps/memos-local-plugin/adapters/openharness/scripts/recall.py +++ b/apps/memos-local-plugin/adapters/openharness/scripts/recall.py @@ -16,16 +16,18 @@ from __future__ import annotations import logging -import os import sys + from pathlib import Path + sys.path.insert(0, str(Path(__file__).parent)) from bridge_client import MemosCoreBridge from config import get_project_memory_dir from daemon_manager import ensure_daemon + logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") logger = logging.getLogger("memos-recall") diff --git a/pyproject.toml b/pyproject.toml index e7fca38ff..b04319f7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ ############################################################################## name = "MemoryOS" -version = "2.0.12" +version = "2.0.13" description = "Intelligence Begins with Memory" license = {text = "Apache-2.0"} readme = "README.md" diff --git a/src/memos/__init__.py b/src/memos/__init__.py index 8687b9b9a..89e86a9df 100644 --- a/src/memos/__init__.py +++ b/src/memos/__init__.py @@ -1,4 +1,4 @@ -__version__ = "2.0.12" +__version__ = "2.0.13" from memos.configs.mem_cube import GeneralMemCubeConfig from memos.configs.mem_os import MOSConfig diff --git a/src/memos/mem_scheduler/base_mixins/queue_ops.py b/src/memos/mem_scheduler/base_mixins/queue_ops.py index 590189c24..13de79b3d 100644 --- a/src/memos/mem_scheduler/base_mixins/queue_ops.py +++ b/src/memos/mem_scheduler/base_mixins/queue_ops.py @@ -65,7 +65,13 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt logger.warning("status_tracker.task_submitted failed", exc_info=True) if self.disabled_handlers and msg.label in self.disabled_handlers: - logger.info("Skipping disabled handler: %s - %s", msg.label, msg.content) + logger.debug( + "Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s", + msg.label, + msg.item_id, + msg.user_id, + msg.mem_cube_id, + ) continue task_priority = self.orchestrator.get_task_priority(task_label=msg.label) @@ -74,6 +80,14 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt else: queued_msgs.append(msg) + logger.info( + "Submit scheduler messages summary. total=%s immediate=%s queued=%s queue_backend=%s", + len(messages), + len(immediate_msgs), + len(queued_msgs), + "redis_queue" if self.use_redis_queue else "local_queue", + ) + if immediate_msgs: for m in immediate_msgs: emit_monitor_event( @@ -199,6 +213,15 @@ def _message_consumer(self) -> None: if messages: self.dispatcher.on_messages_enqueued(messages) + if len(messages) >= self.consume_batch: + unique_labels = sorted({msg.label for msg in messages}) + logger.debug( + "Consumer dequeued batch. batch_size=%s consume_batch=%s unique_labels=%s queue_backend=%s", + len(messages), + self.consume_batch, + unique_labels, + "redis_queue" if self.use_redis_queue else "local_queue", + ) self.dispatcher.dispatch(messages) except Exception as e: logger.error("Error dispatching messages: %s", e) diff --git a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py index 74ab15209..690c8d123 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py +++ b/src/memos/mem_scheduler/task_schedule_modules/dispatcher.py @@ -226,7 +226,7 @@ def wrapped_handler(messages: list[ScheduleMessageItem]): if task_item.item_id in self._running_tasks: task_item.mark_completed(result) del self._running_tasks[task_item.item_id] - logger.info(f"Task completed: {task_item.get_execution_info()}") + logger.debug(f"Task completed: {task_item.get_execution_info()}") return result except Exception as e: @@ -630,12 +630,12 @@ def execute_task( with self._task_lock: self._futures.add(future) future.add_done_callback(self._handle_future_result) - logger.info( + logger.debug( f"Dispatch {len(msgs)} message(s) to {task_label} handler for user {user_id} and mem_cube {mem_cube_id}." ) else: # For synchronous execution, the wrapper will run and remove the task upon completion - logger.info( + logger.debug( f"Execute {len(msgs)} message(s) synchronously for {task_label} for user {user_id} and mem_cube {mem_cube_id}." ) wrapped_handler(msgs) @@ -653,6 +653,12 @@ def dispatch(self, msg_list: list[ScheduleMessageItem]): # Group messages by user_id and mem_cube_id first user_cube_groups = group_messages_by_user_and_mem_cube(msg_list) + logger.info( + "Dispatcher received batch. total_messages=%s user_groups=%s unique_labels=%s", + len(msg_list), + len(user_cube_groups), + sorted({msg.label for msg in msg_list}), + ) # Process each user and mem_cube combination for user_id, cube_groups in user_cube_groups.items(): diff --git a/src/memos/mem_scheduler/task_schedule_modules/local_queue.py b/src/memos/mem_scheduler/task_schedule_modules/local_queue.py index 791cedf41..33d007313 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/local_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/local_queue.py @@ -95,8 +95,12 @@ def put( try: self.queue_streams[stream_key].put(item=message, block=block, timeout=timeout) - logger.info( - f"Message successfully put into queue '{stream_key}'. Current size: {self.queue_streams[stream_key].qsize()}" + logger.debug( + "Local queue enqueued. stream=%s size=%s label=%s item_id=%s", + stream_key, + self.queue_streams[stream_key].qsize(), + message.label, + message.item_id, ) except Exception as e: logger.error(f"Failed to put message into queue '{stream_key}': {e}", exc_info=True) @@ -117,7 +121,7 @@ def get( # Return empty list if queue does not exist if stream_key not in self.queue_streams: - logger.error(f"Stream {stream_key} does not exist when trying to get messages.") + logger.debug("Stream %s does not exist when trying to get messages", stream_key) return [] # Ensure we always request a batch so we get a list back @@ -174,6 +178,14 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]: fetched = self.get_nowait(stream_key=stream_key, batch_size=needed) messages.extend(fetched) + if messages and len(messages) >= batch_size: + logger.debug( + "Local queue dequeued batch. batch_size=%s requested_batch_size=%s active_streams=%s", + len(messages), + batch_size, + len(stream_keys), + ) + return messages def qsize(self) -> dict: @@ -196,9 +208,11 @@ def clear(self, stream_key: str | None = None) -> None: if stream_key: if stream_key in self.queue_streams: self.queue_streams[stream_key].clear() + logger.info("Cleared local queue stream: %s", stream_key) else: for queue in self.queue_streams.values(): queue.clear() + logger.info("Cleared all local queue streams. stream_count=%s", len(self.queue_streams)) @property def unfinished_tasks(self) -> int: diff --git a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py index dc7d86752..1277c5465 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/redis_queue.py @@ -384,7 +384,16 @@ def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]: if len(self.message_pack_cache) == 0: return [] else: - return self.message_pack_cache.popleft() + batch = self.message_pack_cache.popleft() + if len(batch) >= batch_size: + logger.debug( + "[REDIS_QUEUE] Dequeued batch. batch_size=%s requested_batch_size=%s cache_packs_remaining=%s stream_count=%s", + len(batch), + batch_size, + len(self.message_pack_cache), + len(self.get_stream_keys()), + ) + return batch def _ensure_consumer_group(self, stream_key) -> None: """Ensure the consumer group exists for the stream.""" @@ -449,9 +458,13 @@ def put( message_id = self._redis_conn.xadd( stream_key, message_data, maxlen=self.max_len, approximate=True ) - - logger.info( - f"Added message {message_id} to Redis stream: {message.label} - {message.content[:100]}..." + logger.debug( + "[REDIS_QUEUE] Enqueued message. message_id=%s stream=%s label=%s item_id=%s stream_cache_size=%s", + message_id, + stream_key, + message.label, + message.item_id, + len(self._stream_keys_cache), ) except Exception as e: @@ -494,7 +507,11 @@ def ack_message( # Optionally delete the message from the stream to keep it clean try: self._redis_conn.xdel(stream_key, redis_message_id) - logger.info(f"Successfully delete acknowledged message {redis_message_id}") + logger.debug( + "[REDIS_QUEUE] Ack/delete message. redis_message_id=%s stream=%s", + redis_message_id, + stream_key, + ) except Exception as e: logger.warning(f"Failed to delete acknowledged message {redis_message_id}: {e}") @@ -989,7 +1006,7 @@ def show_task_status(self, stream_key_prefix: str | None = None) -> dict[str, di ) stream_keys = self.get_stream_keys(stream_key_prefix=effective_prefix) if not stream_keys: - logger.info(f"No Redis streams found for the configured prefix: {effective_prefix}") + logger.debug(f"No Redis streams found for the configured prefix: {effective_prefix}") return {} grouped: dict[str, dict[str, int]] = {} @@ -1157,7 +1174,7 @@ def connect(self) -> None: self._redis_conn.ping() self._is_connected = True self._check_xautoclaim_support() - logger.debug("Redis connection established successfully") + logger.info("Redis connection established successfully") # Start stream keys refresher when connected self._start_stream_keys_refresh_thread() except Exception as e: @@ -1174,7 +1191,7 @@ def disconnect(self) -> None: self._stop_stream_keys_refresh_thread() if self._is_listening: self.stop_listening() - logger.debug("Disconnected from Redis") + logger.info("Disconnected from Redis") def __enter__(self): """Context manager entry.""" @@ -1379,7 +1396,7 @@ def _update_stream_cache_with_log( self._stream_keys_cache = active_stream_keys self._stream_keys_last_refresh = time.time() cache_count = len(self._stream_keys_cache) - logger.info( + logger.debug( f"Refreshed stream keys cache: {cache_count} active keys, " f"{deleted_count} deleted, {len(candidate_keys)} candidates examined." ) diff --git a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py index b49db2b36..1f2e81bef 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/task_queue.py +++ b/src/memos/mem_scheduler/task_schedule_modules/task_queue.py @@ -93,6 +93,9 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt """Submit messages to the message queue (either local queue or Redis).""" if isinstance(messages, ScheduleMessageItem): messages = [messages] + if len(messages) < 1: + logger.error("submit_messages called with empty payload") + return current_trace_id = get_current_trace_id() @@ -104,18 +107,25 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt user_id=msg.user_id, mem_cube_id=msg.mem_cube_id, task_label=msg.label ) - if len(messages) < 1: - logger.error("Submit empty") - elif len(messages) == 1: + if len(messages) == 1: if getattr(messages[0], "timestamp", None) is None: messages[0].timestamp = get_utc_now() - enqueue_ts = to_iso(getattr(messages[0], "timestamp", None)) - emit_monitor_event( - "enqueue", - messages[0], - {"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0}, - ) - self.memos_message_queue.put(messages[0]) + if self.disabled_handlers and messages[0].label in self.disabled_handlers: + logger.debug( + "Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s", + messages[0].label, + messages[0].item_id, + messages[0].user_id, + messages[0].mem_cube_id, + ) + else: + enqueue_ts = to_iso(getattr(messages[0], "timestamp", None)) + emit_monitor_event( + "enqueue", + messages[0], + {"enqueue_ts": enqueue_ts, "event_duration_ms": 0, "total_duration_ms": 0}, + ) + self.memos_message_queue.put(messages[0]) else: user_cube_groups = group_messages_by_user_and_mem_cube(messages) @@ -132,8 +142,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt message.timestamp = get_utc_now() if self.disabled_handlers and message.label in self.disabled_handlers: - logger.info( - f"Skipping disabled handler: {message.label} - {message.content}" + logger.debug( + "Skip disabled handler. label=%s item_id=%s user_id=%s mem_cube_id=%s", + message.label, + message.item_id, + message.user_id, + message.mem_cube_id, ) continue @@ -148,9 +162,12 @@ def submit_messages(self, messages: ScheduleMessageItem | list[ScheduleMessageIt }, ) self.memos_message_queue.put(message) - logger.info( - f"Submitted message to local queue: {message.label} - {message.content}" - ) + + logger.info( + "Queue submit completed. backend=%s total=%s", + "redis_queue" if self.use_redis_queue else "local_queue", + len(messages), + ) def get_messages(self, batch_size: int) -> list[ScheduleMessageItem]: return self.memos_message_queue.get_messages(batch_size=batch_size) diff --git a/src/memos/multi_mem_cube/single_cube.py b/src/memos/multi_mem_cube/single_cube.py index 6a91f436f..355cb8cee 100644 --- a/src/memos/multi_mem_cube/single_cube.py +++ b/src/memos/multi_mem_cube/single_cube.py @@ -22,7 +22,7 @@ ) from memos.memories.textual.item import TextualMemoryItem from memos.multi_mem_cube.views import MemCubeView -from memos.search import search_text_memories +from memos.search import resolve_filter_for_cube, search_text_memories from memos.templates.mem_reader_prompts import PROMPT_MAPPING from memos.types.general_types import ( FINE_STRATEGY, @@ -91,6 +91,13 @@ def search_memories(self, search_req: APISearchRequest) -> dict[str, Any]: Unified memory search handling (text + preference memories). Preference memories are now searched through the same _search_text flow. """ + cube_filter = resolve_filter_for_cube(search_req.filter, self.cube_id) + if cube_filter is not search_req.filter: + import copy + + search_req = copy.copy(search_req) + search_req.filter = cube_filter + # Create UserContext object user_context = UserContext( user_id=search_req.user_id, diff --git a/src/memos/search/__init__.py b/src/memos/search/__init__.py index 71388c62b..d2c197403 100644 --- a/src/memos/search/__init__.py +++ b/src/memos/search/__init__.py @@ -1,4 +1,14 @@ -from .search_service import SearchContext, build_search_context, search_text_memories +from .search_service import ( + SearchContext, + build_search_context, + resolve_filter_for_cube, + search_text_memories, +) -__all__ = ["SearchContext", "build_search_context", "search_text_memories"] +__all__ = [ + "SearchContext", + "build_search_context", + "resolve_filter_for_cube", + "search_text_memories", +] diff --git a/src/memos/search/search_service.py b/src/memos/search/search_service.py index fa713a7d1..f4092d168 100644 --- a/src/memos/search/search_service.py +++ b/src/memos/search/search_service.py @@ -36,6 +36,35 @@ def build_search_context( ) +def resolve_filter_for_cube( + raw_filter: dict[str, Any] | None, cube_id: str +) -> dict[str, Any] | None: + """Resolve a multi-cube filter dict into the sub-filter for a single cube. + + Supported forms: + - None → None (no filter) + - {"and": [...]} / {"or": [...]} → returned as-is (unified, all cubes share) + - {"cube_A": {...}, "cube_B": {...}} → return raw_filter[cube_id] or None + Mixed top-level (and/or + cube keys) is rejected. + """ + if raw_filter is None: + return None + + has_logic_key = "and" in raw_filter or "or" in raw_filter + other_keys = {k for k in raw_filter if k not in ("and", "or")} + + if has_logic_key and other_keys: + raise ValueError( + "Invalid filter: top-level 'and'/'or' cannot coexist with per-cube keys " + f"{other_keys}. Use either a unified filter or per-cube filter, not both." + ) + + if has_logic_key: + return raw_filter + + return raw_filter.get(cube_id) + + def search_text_memories( text_mem: Any, search_req: APISearchRequest, diff --git a/tests/search/test_resolve_filter_for_cube.py b/tests/search/test_resolve_filter_for_cube.py new file mode 100644 index 000000000..ab6be71a3 --- /dev/null +++ b/tests/search/test_resolve_filter_for_cube.py @@ -0,0 +1,78 @@ +import pytest + +from memos.search.search_service import resolve_filter_for_cube + + +class TestResolveFilterForCube: + """Tests for resolve_filter_for_cube — multi-cube filter routing.""" + + # ── None passthrough ── + + def test_none_returns_none(self): + assert resolve_filter_for_cube(None, "cube_001") is None + + # ── Unified filter (filter2): top-level and/or ── + + def test_unified_and_returns_same_for_any_cube(self): + f = {"and": [{"tags": {"contains": "阅读"}}, {"created_at": {"gte": "2025-01-01"}}]} + assert resolve_filter_for_cube(f, "cube_001") is f + assert resolve_filter_for_cube(f, "cube_999") is f + + def test_unified_or_returns_same_for_any_cube(self): + f = {"or": [{"tags": {"contains": "A"}}, {"tags": {"contains": "B"}}]} + assert resolve_filter_for_cube(f, "cube_001") is f + + # ── Per-cube filter (filter1 / filter4) ── + + def test_per_cube_returns_matching_sub_filter(self): + sub_a = {"and": [{"tags": {"contains": "阅读"}}]} + sub_b = {"and": [{"tags": {"contains": "工作"}}]} + f = {"cube_A": sub_a, "cube_B": sub_b} + + assert resolve_filter_for_cube(f, "cube_A") is sub_a + assert resolve_filter_for_cube(f, "cube_B") is sub_b + + def test_per_cube_missing_key_returns_none(self): + f = { + "cube_A": {"and": [{"tags": {"contains": "阅读"}}]}, + "cube_B": {"and": [{"tags": {"contains": "工作"}}]}, + } + assert resolve_filter_for_cube(f, "cube_C") is None + + def test_per_cube_single_key(self): + sub = {"and": [{"created_at": {"gte": "2025-01-01"}}]} + f = {"cube_only": sub} + assert resolve_filter_for_cube(f, "cube_only") is sub + assert resolve_filter_for_cube(f, "other") is None + + # ── Mixed (filter3): illegal ── + + def test_mixed_and_with_cube_key_raises(self): + f = { + "and": [{"tags": {"contains": "阅读"}}], + "cube_A": {"and": [{"tags": {"contains": "工作"}}]}, + } + with pytest.raises(ValueError, match="cannot coexist"): + resolve_filter_for_cube(f, "cube_A") + + def test_mixed_or_with_cube_key_raises(self): + f = { + "or": [{"tags": {"contains": "阅读"}}], + "cube_B": {"and": [{"tags": {"contains": "工作"}}]}, + } + with pytest.raises(ValueError, match="cannot coexist"): + resolve_filter_for_cube(f, "cube_B") + + # ── Edge cases ── + + def test_empty_dict_returns_none(self): + assert resolve_filter_for_cube({}, "cube_001") is None + + def test_per_cube_with_empty_sub_filter(self): + f = {"cube_A": {}} + result = resolve_filter_for_cube(f, "cube_A") + assert result == {} + + def test_unified_and_empty_list(self): + f = {"and": []} + assert resolve_filter_for_cube(f, "any") is f