From 914ae4e10ce4f2191ea9280a30df8ef99f252252 Mon Sep 17 00:00:00 2001 From: fancy Date: Thu, 9 Apr 2026 16:12:30 +0800 Subject: [PATCH 1/5] feat(copaw-plugin): add MemOS Cloud memory backend for CoPaw Implements CoPaw's BaseMemoryManager interface, delegating long-term memory search/add to MemOS Cloud API while handling context compaction locally. Follows the same pattern as existing OpenClaw plugins. Closes #1440 --- apps/memos-copaw-plugin/memos_client.py | 185 ++++++++ .../memos_memory_manager.py | 429 ++++++++++++++++++ apps/memos-copaw-plugin/plugin.json | 16 + apps/memos-copaw-plugin/plugin.py | 53 +++ 4 files changed, 683 insertions(+) create mode 100644 apps/memos-copaw-plugin/memos_client.py create mode 100644 apps/memos-copaw-plugin/memos_memory_manager.py create mode 100644 apps/memos-copaw-plugin/plugin.json create mode 100644 apps/memos-copaw-plugin/plugin.py diff --git a/apps/memos-copaw-plugin/memos_client.py b/apps/memos-copaw-plugin/memos_client.py new file mode 100644 index 00000000..15cbde15 --- /dev/null +++ b/apps/memos-copaw-plugin/memos_client.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +"""Async HTTP client for MemOS Cloud API.""" +import asyncio +import logging +import time +from typing import Any, Dict, List, Optional + +import aiohttp + +logger = logging.getLogger(__name__) + +_DEFAULT_BASE_URL = "https://memos.memtensor.cn/api/openmem/v1" +_DEFAULT_TIMEOUT = 8.0 +_DEFAULT_RETRIES = 1 + + +class MemOSClient: + """Async client for MemOS Cloud API. + + Handles authentication, retries, and graceful degradation for + the two core endpoints: /search/memory and /add/message. + """ + + def __init__( + self, + base_url: str = _DEFAULT_BASE_URL, + api_key: str = "", + timeout: float = _DEFAULT_TIMEOUT, + retries: int = _DEFAULT_RETRIES, + ): + self.base_url = base_url.rstrip("/") + self.api_key = api_key + self.timeout = timeout + self.retries = retries + self._session: Optional[aiohttp.ClientSession] = None + + async def _ensure_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + self._session = aiohttp.ClientSession( + headers={ + "Authorization": f"Token {self.api_key}", + "Content-Type": "application/json", + }, + timeout=aiohttp.ClientTimeout(total=self.timeout), + ) + return self._session + + async def close(self) -> None: + if self._session and not self._session.closed: + await self._session.close() + self._session = None + + async def _post( + self, + path: str, + payload: Dict[str, Any], + ) -> Optional[Dict[str, Any]]: + """POST with retry. Returns parsed JSON or None on failure.""" + url = f"{self.base_url}{path}" + last_err: Optional[Exception] = None + + for attempt in range(1 + self.retries): + try: + session = await self._ensure_session() + async with session.post(url, json=payload) as resp: + if resp.status == 200: + return await resp.json() + body = await resp.text() + logger.warning( + "MemOS API %s returned %s: %s", + path, + resp.status, + body[:300], + ) + except (aiohttp.ClientError, asyncio.TimeoutError) as exc: + last_err = exc + logger.warning( + "MemOS API %s attempt %d failed: %s", + path, + attempt + 1, + exc, + ) + if attempt < self.retries: + await asyncio.sleep(0.5 * (attempt + 1)) + + logger.error("MemOS API %s exhausted retries: %s", path, last_err) + return None + + # ------------------------------------------------------------------ # + # Search / Recall + # ------------------------------------------------------------------ # + + async def search_memory( + self, + user_id: str, + query: str, + *, + source: str = "copaw", + conversation_id: str = "", + memory_limit_number: int = 9, + include_preference: bool = True, + preference_limit_number: int = 6, + include_tool_memory: bool = False, + tool_memory_limit_number: int = 6, + relativity: float = 0.45, + knowledgebase_ids: Optional[List[str]] = None, + filter_obj: Optional[Dict[str, Any]] = None, + ) -> Optional[Dict[str, Any]]: + """Call POST /search/memory. + + Returns the ``data`` dict from MemOS response, or *None* on failure. + """ + payload: Dict[str, Any] = { + "user_id": user_id, + "query": query, + "source": source, + "memory_limit_number": memory_limit_number, + "include_preference": include_preference, + "preference_limit_number": preference_limit_number, + "include_tool_memory": include_tool_memory, + "tool_memory_limit_number": tool_memory_limit_number, + "relativity": relativity, + } + if conversation_id: + payload["conversation_id"] = conversation_id + if knowledgebase_ids: + payload["knowledgebase_ids"] = knowledgebase_ids + if filter_obj: + payload["filter"] = filter_obj + + result = await self._post("/search/memory", payload) + if result is None: + return None + return result.get("data", result) + + # ------------------------------------------------------------------ # + # Add / Store + # ------------------------------------------------------------------ # + + async def add_message( + self, + user_id: str, + messages: List[Dict[str, str]], + *, + conversation_id: str = "", + source: str = "copaw", + agent_id: str = "", + async_mode: bool = True, + tags: Optional[List[str]] = None, + ) -> bool: + """Call POST /add/message. + + Returns True on success, False on failure. + """ + payload: Dict[str, Any] = { + "user_id": user_id, + "messages": messages, + "source": source, + "async_mode": async_mode, + "tags": tags or ["copaw"], + } + if conversation_id: + payload["conversation_id"] = conversation_id + if agent_id: + payload["agent_id"] = agent_id + + result = await self._post("/add/message", payload) + return result is not None + + # ------------------------------------------------------------------ # + # Health check + # ------------------------------------------------------------------ # + + async def ping(self) -> bool: + """Lightweight connectivity check via a minimal search call.""" + try: + session = await self._ensure_session() + async with session.post( + f"{self.base_url}/search/memory", + json={"user_id": "_ping", "query": "ping"}, + timeout=aiohttp.ClientTimeout(total=5), + ) as resp: + return resp.status < 500 + except Exception: + return False diff --git a/apps/memos-copaw-plugin/memos_memory_manager.py b/apps/memos-copaw-plugin/memos_memory_manager.py new file mode 100644 index 00000000..7122d1f4 --- /dev/null +++ b/apps/memos-copaw-plugin/memos_memory_manager.py @@ -0,0 +1,429 @@ +# -*- coding: utf-8 -*- +"""MemOS Cloud memory manager for CoPaw agents. + +Implements CoPaw's BaseMemoryManager interface, delegating long-term +memory search/add to MemOS Cloud while handling context compaction locally. +""" +import asyncio +import datetime +import logging +import os +from typing import TYPE_CHECKING, List, Optional + +from agentscope.memory import InMemoryMemory +from agentscope.message import Msg, TextBlock +from agentscope.tool import ToolResponse + +from copaw.agents.memory.base_memory_manager import BaseMemoryManager +from copaw.constant import EnvVarLoader + +if TYPE_CHECKING: + from copaw.config.config import AgentProfileConfig + +logger = logging.getLogger(__name__) + +# ------------------------------------------------------------------ # +# Thin InMemoryMemory subclass with _long_term_memory attribute +# required by CoPawAgent.reply() for force_memory_search injection. +# ------------------------------------------------------------------ # + + +class MemOSInMemoryMemory(InMemoryMemory): + """InMemoryMemory wrapper that carries a _long_term_memory slot.""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._long_term_memory: str = "" + + +# ------------------------------------------------------------------ # +# Helper: format MemOS search results into readable text +# ------------------------------------------------------------------ # + + +def _format_search_results(data: dict) -> str: + """Convert MemOS /search/memory response data into plain-text blocks.""" + parts: list[str] = [] + + for item in data.get("memory_detail_list", []): + ts = item.get("update_time") or item.get("create_time") + date_str = "" + if ts: + try: + date_str = datetime.datetime.fromtimestamp( + ts, + tz=datetime.timezone.utc, + ).strftime("[%Y-%m-%d %H:%M] ") + except (OSError, ValueError): + pass + value = (item.get("memory_value") or "")[:8000] + rel = item.get("relativity", 0) + parts.append(f"{date_str}{value} (score={rel:.2f})") + + for item in data.get("preference_detail_list", []): + ptype = item.get("preference_type", "Preference") + pref = item.get("preference", "") + parts.append(f"[{ptype}] {pref}") + + return "\n---\n".join(parts) if parts else "" + + +# ------------------------------------------------------------------ # +# MemOSMemoryManager +# ------------------------------------------------------------------ # + + +class MemOSMemoryManager(BaseMemoryManager): + """Memory manager backed by MemOS Cloud. + + Cloud-side operations: + - memory_search → POST /search/memory + - add_message → POST /add/message (via summary_memory) + + Local operations (no external dependency): + - compact_memory / compact_tool_result / check_context + use the agent's chat model for in-process context management. + + Configuration is read from agent config ``running.memos_config`` + with environment-variable fallbacks: + - MEMOS_API_KEY, MEMOS_BASE_URL, MEMOS_USER_ID + """ + + def __init__(self, working_dir: str, agent_id: str): + super().__init__(working_dir=working_dir, agent_id=agent_id) + self._client = None # lazily created in start() + self._in_memory: Optional[MemOSInMemoryMemory] = None + self._pending_messages: list[dict] = [] + self._config_cache: Optional[dict] = None + + # ------------------------------------------------------------------ # + # Config resolution + # ------------------------------------------------------------------ # + + def _load_memos_config(self) -> dict: + """Resolve MemOS config: agent config > env var > default.""" + if self._config_cache is not None: + return self._config_cache + + # Try loading from agent config + cfg = {} + try: + from copaw.config.config import load_agent_config + + ac = load_agent_config(self.agent_id) + if hasattr(ac.running, "memos_config"): + mc = ac.running.memos_config + cfg = { + "base_url": mc.base_url, + "api_key": mc.api_key, + "user_id": mc.user_id, + "memory_limit_number": mc.memory_limit_number, + "include_preference": mc.include_preference, + "preference_limit_number": mc.preference_limit_number, + "relativity": mc.relativity, + "timeout": mc.timeout, + "conversation_id": mc.conversation_id, + "knowledgebase_ids": mc.knowledgebase_ids, + "async_mode": mc.async_mode, + } + except Exception as e: + logger.debug("Could not load memos_config from agent config: %s", e) + + # Env-var fallbacks + result = { + "base_url": cfg.get("base_url") + or EnvVarLoader.get_str( + "MEMOS_BASE_URL", + "https://memos.memtensor.cn/api/openmem/v1", + ), + "api_key": cfg.get("api_key") + or EnvVarLoader.get_str("MEMOS_API_KEY", ""), + "user_id": cfg.get("user_id") + or EnvVarLoader.get_str("MEMOS_USER_ID", "copaw-user"), + "memory_limit_number": cfg.get("memory_limit_number", 9), + "include_preference": cfg.get("include_preference", True), + "preference_limit_number": cfg.get("preference_limit_number", 6), + "relativity": cfg.get("relativity", 0.45), + "timeout": cfg.get("timeout", 8.0), + "conversation_id": cfg.get("conversation_id", ""), + "knowledgebase_ids": cfg.get("knowledgebase_ids", []), + "async_mode": cfg.get("async_mode", True), + } + self._config_cache = result + return result + + # ------------------------------------------------------------------ # + # Lifecycle + # ------------------------------------------------------------------ # + + async def start(self) -> None: + """Initialise the MemOS HTTP client and verify connectivity.""" + # Lazy import to avoid top-level dependency on aiohttp + # when the plugin is merely discovered but not activated. + import importlib.util + _spec = importlib.util.spec_from_file_location( + "memos_client", + os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "memos_client.py", + ), + ) + _mod = importlib.util.module_from_spec(_spec) + _spec.loader.exec_module(_mod) + MemOSClient = _mod.MemOSClient + + cfg = self._load_memos_config() + if not cfg["api_key"]: + logger.warning( + "MemOS API key not configured. Set MEMOS_API_KEY env var " + "or add memos_config.api_key to agent config." + ) + + self._client = MemOSClient( + base_url=cfg["base_url"], + api_key=cfg["api_key"], + timeout=cfg["timeout"], + ) + + masked_key = ( + cfg["api_key"][:5] + "***" + if len(cfg["api_key"]) > 5 + else "***" + ) + ok = await self._client.ping() + if ok: + logger.info( + "MemOS Cloud connected: %s (key=%s)", + cfg["base_url"], + masked_key, + ) + else: + logger.warning( + "MemOS Cloud unreachable at %s — memory search will " + "degrade gracefully.", + cfg["base_url"], + ) + + async def close(self) -> bool: + """Flush pending messages and close the HTTP session.""" + if self._pending_messages and self._client: + cfg = self._load_memos_config() + await self._client.add_message( + user_id=cfg["user_id"], + messages=self._pending_messages, + conversation_id=cfg["conversation_id"], + agent_id=self.agent_id, + async_mode=cfg["async_mode"], + ) + self._pending_messages.clear() + + if self._client: + await self._client.close() + return True + + # ------------------------------------------------------------------ # + # Memory search (core MemOS feature) + # ------------------------------------------------------------------ # + + async def memory_search( + self, + query: str, + max_results: int = 5, + min_score: float = 0.1, + ) -> ToolResponse: + """Search MemOS Cloud for relevant memories.""" + if not self._client: + return ToolResponse( + content=[ + TextBlock( + type="text", + text="MemOS client not initialized. " + "Check API key and base URL configuration.", + ), + ], + ) + + cfg = self._load_memos_config() + data = await self._client.search_memory( + user_id=cfg["user_id"], + query=query, + memory_limit_number=max_results, + include_preference=cfg["include_preference"], + preference_limit_number=cfg["preference_limit_number"], + relativity=max(min_score, cfg["relativity"]), + conversation_id=cfg["conversation_id"], + knowledgebase_ids=cfg["knowledgebase_ids"] or None, + ) + + if data is None: + return ToolResponse( + content=[ + TextBlock( + type="text", + text="MemOS Cloud search returned no results " + "(API may be unreachable).", + ), + ], + ) + + text = _format_search_results(data) + if not text: + return ToolResponse( + content=[ + TextBlock( + type="text", + text="No relevant memories found.", + ), + ], + ) + + return ToolResponse( + content=[TextBlock(type="text", text=text)], + ) + + # ------------------------------------------------------------------ # + # Context compaction (local, no cloud dependency) + # ------------------------------------------------------------------ # + + async def compact_tool_result(self, **kwargs) -> None: + """Truncate oversized tool outputs in-place.""" + messages: list = kwargs.get("messages", []) + max_chars: int = kwargs.get("max_chars", 20000) + for msg in messages: + if hasattr(msg, "content") and isinstance(msg.content, str): + if len(msg.content) > max_chars: + msg.content = ( + msg.content[:max_chars] + + f"\n... [truncated, original {len(msg.content)} chars]" + ) + + async def check_context(self, **kwargs) -> tuple: + """Simple context-size check based on character count. + + Returns (messages_to_compact, remaining, is_valid). + """ + messages: list = kwargs.get("messages", []) + max_chars: int = kwargs.get("max_input_length", 120000) + compact_ratio: float = kwargs.get("compact_ratio", 0.5) + + total = sum( + len(getattr(m, "content", "") or "") for m in messages + ) + if total <= max_chars: + return [], messages, True + + # Keep the most recent messages within budget + cut = max(1, int(len(messages) * compact_ratio)) + return messages[:cut], messages[cut:], False + + async def compact_memory( + self, + messages: list[Msg], + previous_summary: str = "", + extra_instruction: str = "", + **kwargs, + ) -> str: + """Summarise old messages using the agent's chat model. + + Also queues the summary for async upload to MemOS Cloud. + """ + if not messages: + return previous_summary + + self._prepare_model_formatter() + if self.chat_model is None: + # Fallback: concatenate content + lines = [ + f"{m.role}: {getattr(m, 'content', '')[:500]}" + for m in messages[-20:] + ] + summary = "\n".join(lines) + else: + transcript = "\n".join( + f"[{m.role}] {getattr(m, 'content', '')[:2000]}" + for m in messages + ) + prompt = ( + "Condense the following conversation into a concise summary " + "that preserves all key facts, decisions, and action items. " + "Keep it under 800 words.\n\n" + ) + if previous_summary: + prompt += f"Previous summary:\n{previous_summary}\n\n" + if extra_instruction: + prompt += f"Additional instruction: {extra_instruction}\n\n" + prompt += f"Conversation:\n{transcript}" + + try: + response = self.chat_model( + Msg(role="user", content=prompt), + ) + summary = ( + response.content + if hasattr(response, "content") + else str(response) + ) + except Exception as e: + logger.error("compact_memory LLM call failed: %s", e) + summary = previous_summary or "" + + # Queue the summary for async upload to MemOS Cloud + if summary and self._client: + self._pending_messages.append( + {"role": "assistant", "content": f"[summary] {summary}"} + ) + + return summary + + async def summary_memory(self, messages: list[Msg], **kwargs) -> str: + """Summarise messages and upload to MemOS Cloud.""" + summary = await self.compact_memory(messages, **kwargs) + + # Immediately upload conversation + summary + if self._client and messages: + cfg = self._load_memos_config() + conv_msgs = [] + for m in messages[-30:]: + role = getattr(m, "role", "user") + content = getattr(m, "content", "") + if isinstance(content, str) and content.strip(): + conv_msgs.append( + {"role": role, "content": content[:20000]} + ) + + if conv_msgs: + await self._client.add_message( + user_id=cfg["user_id"], + messages=conv_msgs, + conversation_id=cfg["conversation_id"], + agent_id=self.agent_id, + async_mode=cfg["async_mode"], + ) + + return summary + + # ------------------------------------------------------------------ # + # InMemoryMemory bridge + # ------------------------------------------------------------------ # + + def get_in_memory_memory(self, **kwargs) -> MemOSInMemoryMemory: + """Return an InMemoryMemory with _long_term_memory support.""" + if self._in_memory is None: + self._in_memory = MemOSInMemoryMemory() + return self._in_memory + + # ------------------------------------------------------------------ # + # Internal helpers + # ------------------------------------------------------------------ # + + def _prepare_model_formatter(self) -> None: + """Lazily initialize chat_model and formatter if not set.""" + if self.chat_model is None or self.formatter is None: + try: + from copaw.agents.model_factory import create_model_and_formatter + + self.chat_model, self.formatter = create_model_and_formatter( + self.agent_id, + ) + except Exception as e: + logger.warning("Failed to init chat model: %s", e) diff --git a/apps/memos-copaw-plugin/plugin.json b/apps/memos-copaw-plugin/plugin.json new file mode 100644 index 00000000..8adc9b4d --- /dev/null +++ b/apps/memos-copaw-plugin/plugin.json @@ -0,0 +1,16 @@ +{ + "id": "memos-copaw-plugin", + "name": "MemOS Cloud Memory", + "version": "0.1.0", + "description": "MemOS Cloud memory backend for CoPaw — provides cloud-based long-term memory via MemOS API (search + add).", + "author": "MemTensor", + "entry_point": "plugin.py", + "dependencies": [ + "aiohttp>=3.9.0" + ], + "min_copaw_version": "0.1.0", + "meta": { + "api_key_url": "https://memos.memtensor.cn", + "api_key_hint": "Get your API key from MemOS Cloud dashboard" + } +} diff --git a/apps/memos-copaw-plugin/plugin.py b/apps/memos-copaw-plugin/plugin.py new file mode 100644 index 00000000..fa1b0eb5 --- /dev/null +++ b/apps/memos-copaw-plugin/plugin.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +"""MemOS Cloud memory plugin for CoPaw. + +Registers MemOSMemoryManager as a pluggable memory backend so that +CoPaw agents can use MemOS Cloud for long-term memory. + +Installation: + copaw plugin install + +Then set ``memory_manager_backend: "memos"`` in agent config and +provide MEMOS_API_KEY (env var or config). +""" +import importlib.util +import logging +import os + +logger = logging.getLogger(__name__) + +# Load sibling module without mutating sys.path +_plugin_dir = os.path.dirname(os.path.abspath(__file__)) +_spec = importlib.util.spec_from_file_location( + "memos_memory_manager", + os.path.join(_plugin_dir, "memos_memory_manager.py"), +) +_mod = importlib.util.module_from_spec(_spec) +_spec.loader.exec_module(_mod) +MemOSMemoryManager = _mod.MemOSMemoryManager + + +class _MemOSPlugin: + """Plugin definition following CoPaw's plugin contract.""" + + def register(self, api): + """Register the MemOS memory manager backend with CoPaw.""" + logger.info("MemOS Cloud plugin registering...") + + api.register_memory_manager( + backend_id="memos", + manager_class=MemOSMemoryManager, + ) + logger.info("Registered MemOS memory manager backend 'memos'") + + api.register_startup_hook( + hook_name="memos_cloud_init", + callback=lambda: logger.info( + "MemOS Cloud plugin ready. " + "Set memory_manager_backend='memos' to activate." + ), + priority=90, + ) + + +plugin = _MemOSPlugin() From c1d2588295c55f8128f28b02c52fc6a2b97e54f9 Mon Sep 17 00:00:00 2001 From: fancy Date: Thu, 9 Apr 2026 16:17:11 +0800 Subject: [PATCH 2/5] refactor(copaw-plugin): inherit ReMeLightMemoryManager for full feature parity Replace the standalone BaseMemoryManager implementation with inheritance from ReMeLightMemoryManager. This ensures all local operations (token-aware compaction, check_context, compact_tool_result, get_in_memory_memory) use CoPaw's battle-tested ReMeLight code. Only memory_search and summary_memory are overridden for MemOS Cloud. --- .../memos_memory_manager.py | 348 +++++------------- 1 file changed, 97 insertions(+), 251 deletions(-) diff --git a/apps/memos-copaw-plugin/memos_memory_manager.py b/apps/memos-copaw-plugin/memos_memory_manager.py index 7122d1f4..036985d7 100644 --- a/apps/memos-copaw-plugin/memos_memory_manager.py +++ b/apps/memos-copaw-plugin/memos_memory_manager.py @@ -1,40 +1,31 @@ # -*- coding: utf-8 -*- """MemOS Cloud memory manager for CoPaw agents. -Implements CoPaw's BaseMemoryManager interface, delegating long-term -memory search/add to MemOS Cloud while handling context compaction locally. +Extends ReMeLightMemoryManager — all local operations (context compaction, +token counting, tool result truncation, in-memory memory) are delegated +to the parent class unchanged. Only two methods are overridden: + + - memory_search → queries MemOS Cloud instead of local vector index + - summary_memory → uploads conversation to MemOS Cloud after local summary + +This ensures full compatibility with CoPaw's MemoryCompactionHook and +force_memory_search auto-recall mechanism. """ -import asyncio import datetime import logging import os -from typing import TYPE_CHECKING, List, Optional +from typing import Optional -from agentscope.memory import InMemoryMemory from agentscope.message import Msg, TextBlock from agentscope.tool import ToolResponse -from copaw.agents.memory.base_memory_manager import BaseMemoryManager +from copaw.agents.memory.reme_light_memory_manager import ( + ReMeLightMemoryManager, +) from copaw.constant import EnvVarLoader -if TYPE_CHECKING: - from copaw.config.config import AgentProfileConfig - logger = logging.getLogger(__name__) -# ------------------------------------------------------------------ # -# Thin InMemoryMemory subclass with _long_term_memory attribute -# required by CoPawAgent.reply() for force_memory_search injection. -# ------------------------------------------------------------------ # - - -class MemOSInMemoryMemory(InMemoryMemory): - """InMemoryMemory wrapper that carries a _long_term_memory slot.""" - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self._long_term_memory: str = "" - # ------------------------------------------------------------------ # # Helper: format MemOS search results into readable text @@ -42,7 +33,7 @@ def __init__(self, **kwargs): def _format_search_results(data: dict) -> str: - """Convert MemOS /search/memory response data into plain-text blocks.""" + """Convert MemOS /search/memory response into plain-text blocks.""" parts: list[str] = [] for item in data.get("memory_detail_list", []): @@ -73,28 +64,26 @@ def _format_search_results(data: dict) -> str: # ------------------------------------------------------------------ # -class MemOSMemoryManager(BaseMemoryManager): - """Memory manager backed by MemOS Cloud. +class MemOSMemoryManager(ReMeLightMemoryManager): + """Memory manager that combines ReMeLight local ops with MemOS Cloud. - Cloud-side operations: - - memory_search → POST /search/memory - - add_message → POST /add/message (via summary_memory) + Inherits all ReMeLight capabilities: + - compact_memory / compact_tool_result / check_context (token-aware) + - get_in_memory_memory (with as_token_counter support) + - summary_memory (file-based with toolkit) - Local operations (no external dependency): - - compact_memory / compact_tool_result / check_context - use the agent's chat model for in-process context management. + Overrides cloud-bound operations: + - memory_search → POST /search/memory to MemOS Cloud + - summary_memory → parent summary + POST /add/message to MemOS Cloud - Configuration is read from agent config ``running.memos_config`` - with environment-variable fallbacks: - - MEMOS_API_KEY, MEMOS_BASE_URL, MEMOS_USER_ID + Configuration is read from ``running.memos_config`` in agent config, + with env-var fallbacks: MEMOS_API_KEY, MEMOS_BASE_URL, MEMOS_USER_ID. """ def __init__(self, working_dir: str, agent_id: str): super().__init__(working_dir=working_dir, agent_id=agent_id) - self._client = None # lazily created in start() - self._in_memory: Optional[MemOSInMemoryMemory] = None - self._pending_messages: list[dict] = [] - self._config_cache: Optional[dict] = None + self._memos_client = None + self._memos_cfg: Optional[dict] = None # ------------------------------------------------------------------ # # Config resolution @@ -102,11 +91,10 @@ def __init__(self, working_dir: str, agent_id: str): def _load_memos_config(self) -> dict: """Resolve MemOS config: agent config > env var > default.""" - if self._config_cache is not None: - return self._config_cache + if self._memos_cfg is not None: + return self._memos_cfg - # Try loading from agent config - cfg = {} + cfg: dict = {} try: from copaw.config.config import load_agent_config @@ -127,9 +115,10 @@ def _load_memos_config(self) -> dict: "async_mode": mc.async_mode, } except Exception as e: - logger.debug("Could not load memos_config from agent config: %s", e) + logger.debug( + "Could not load memos_config from agent config: %s", e, + ) - # Env-var fallbacks result = { "base_url": cfg.get("base_url") or EnvVarLoader.get_str( @@ -149,18 +138,21 @@ def _load_memos_config(self) -> dict: "knowledgebase_ids": cfg.get("knowledgebase_ids", []), "async_mode": cfg.get("async_mode", True), } - self._config_cache = result + self._memos_cfg = result return result # ------------------------------------------------------------------ # - # Lifecycle + # Lifecycle (extend parent) # ------------------------------------------------------------------ # async def start(self) -> None: - """Initialise the MemOS HTTP client and verify connectivity.""" - # Lazy import to avoid top-level dependency on aiohttp - # when the plugin is merely discovered but not activated. + """Start ReMeLight, then initialise the MemOS HTTP client.""" + # ReMeLight local memory first + await super().start() + + # MemOS Cloud client import importlib.util + _spec = importlib.util.spec_from_file_location( "memos_client", os.path.join( @@ -170,59 +162,42 @@ async def start(self) -> None: ) _mod = importlib.util.module_from_spec(_spec) _spec.loader.exec_module(_mod) - MemOSClient = _mod.MemOSClient - cfg = self._load_memos_config() - if not cfg["api_key"]: + mc = self._load_memos_config() + if not mc["api_key"]: logger.warning( "MemOS API key not configured. Set MEMOS_API_KEY env var " - "or add memos_config.api_key to agent config." + "or add memos_config.api_key to agent config.", ) - self._client = MemOSClient( - base_url=cfg["base_url"], - api_key=cfg["api_key"], - timeout=cfg["timeout"], + self._memos_client = _mod.MemOSClient( + base_url=mc["base_url"], + api_key=mc["api_key"], + timeout=mc["timeout"], ) - masked_key = ( - cfg["api_key"][:5] + "***" - if len(cfg["api_key"]) > 5 - else "***" - ) - ok = await self._client.ping() + masked = mc["api_key"][:5] + "***" if len(mc["api_key"]) > 5 else "***" + ok = await self._memos_client.ping() if ok: logger.info( - "MemOS Cloud connected: %s (key=%s)", - cfg["base_url"], - masked_key, + "MemOS Cloud connected: %s (key=%s)", mc["base_url"], masked, ) else: logger.warning( - "MemOS Cloud unreachable at %s — memory search will " - "degrade gracefully.", - cfg["base_url"], + "MemOS Cloud unreachable at %s — will fall back to " + "local ReMeLight search.", + mc["base_url"], ) async def close(self) -> bool: - """Flush pending messages and close the HTTP session.""" - if self._pending_messages and self._client: - cfg = self._load_memos_config() - await self._client.add_message( - user_id=cfg["user_id"], - messages=self._pending_messages, - conversation_id=cfg["conversation_id"], - agent_id=self.agent_id, - async_mode=cfg["async_mode"], - ) - self._pending_messages.clear() - - if self._client: - await self._client.close() - return True + """Close MemOS client, then close ReMeLight.""" + if self._memos_client: + await self._memos_client.close() + self._memos_client = None + return await super().close() # ------------------------------------------------------------------ # - # Memory search (core MemOS feature) + # memory_search → MemOS Cloud (fallback: ReMeLight local) # ------------------------------------------------------------------ # async def memory_search( @@ -231,199 +206,70 @@ async def memory_search( max_results: int = 5, min_score: float = 0.1, ) -> ToolResponse: - """Search MemOS Cloud for relevant memories.""" - if not self._client: - return ToolResponse( - content=[ - TextBlock( - type="text", - text="MemOS client not initialized. " - "Check API key and base URL configuration.", - ), - ], - ) + """Search MemOS Cloud; fall back to local ReMeLight on failure.""" + if not self._memos_client: + return await super().memory_search(query, max_results, min_score) - cfg = self._load_memos_config() - data = await self._client.search_memory( - user_id=cfg["user_id"], + mc = self._load_memos_config() + data = await self._memos_client.search_memory( + user_id=mc["user_id"], query=query, memory_limit_number=max_results, - include_preference=cfg["include_preference"], - preference_limit_number=cfg["preference_limit_number"], - relativity=max(min_score, cfg["relativity"]), - conversation_id=cfg["conversation_id"], - knowledgebase_ids=cfg["knowledgebase_ids"] or None, + include_preference=mc["include_preference"], + preference_limit_number=mc["preference_limit_number"], + relativity=max(min_score, mc["relativity"]), + conversation_id=mc["conversation_id"], + knowledgebase_ids=mc["knowledgebase_ids"] or None, ) + # Fallback to local search if cloud is unreachable if data is None: - return ToolResponse( - content=[ - TextBlock( - type="text", - text="MemOS Cloud search returned no results " - "(API may be unreachable).", - ), - ], + logger.warning( + "MemOS Cloud search failed, falling back to local search.", ) + return await super().memory_search(query, max_results, min_score) text = _format_search_results(data) if not text: - return ToolResponse( - content=[ - TextBlock( - type="text", - text="No relevant memories found.", - ), - ], - ) + # Cloud returned empty — try local as supplement + return await super().memory_search(query, max_results, min_score) return ToolResponse( content=[TextBlock(type="text", text=text)], ) # ------------------------------------------------------------------ # - # Context compaction (local, no cloud dependency) + # summary_memory → ReMeLight summary + upload to MemOS Cloud # ------------------------------------------------------------------ # - async def compact_tool_result(self, **kwargs) -> None: - """Truncate oversized tool outputs in-place.""" - messages: list = kwargs.get("messages", []) - max_chars: int = kwargs.get("max_chars", 20000) - for msg in messages: - if hasattr(msg, "content") and isinstance(msg.content, str): - if len(msg.content) > max_chars: - msg.content = ( - msg.content[:max_chars] - + f"\n... [truncated, original {len(msg.content)} chars]" - ) - - async def check_context(self, **kwargs) -> tuple: - """Simple context-size check based on character count. - - Returns (messages_to_compact, remaining, is_valid). - """ - messages: list = kwargs.get("messages", []) - max_chars: int = kwargs.get("max_input_length", 120000) - compact_ratio: float = kwargs.get("compact_ratio", 0.5) - - total = sum( - len(getattr(m, "content", "") or "") for m in messages - ) - if total <= max_chars: - return [], messages, True - - # Keep the most recent messages within budget - cut = max(1, int(len(messages) * compact_ratio)) - return messages[:cut], messages[cut:], False - - async def compact_memory( - self, - messages: list[Msg], - previous_summary: str = "", - extra_instruction: str = "", - **kwargs, - ) -> str: - """Summarise old messages using the agent's chat model. - - Also queues the summary for async upload to MemOS Cloud. - """ - if not messages: - return previous_summary - - self._prepare_model_formatter() - if self.chat_model is None: - # Fallback: concatenate content - lines = [ - f"{m.role}: {getattr(m, 'content', '')[:500]}" - for m in messages[-20:] - ] - summary = "\n".join(lines) - else: - transcript = "\n".join( - f"[{m.role}] {getattr(m, 'content', '')[:2000]}" - for m in messages - ) - prompt = ( - "Condense the following conversation into a concise summary " - "that preserves all key facts, decisions, and action items. " - "Keep it under 800 words.\n\n" - ) - if previous_summary: - prompt += f"Previous summary:\n{previous_summary}\n\n" - if extra_instruction: - prompt += f"Additional instruction: {extra_instruction}\n\n" - prompt += f"Conversation:\n{transcript}" - - try: - response = self.chat_model( - Msg(role="user", content=prompt), - ) - summary = ( - response.content - if hasattr(response, "content") - else str(response) - ) - except Exception as e: - logger.error("compact_memory LLM call failed: %s", e) - summary = previous_summary or "" - - # Queue the summary for async upload to MemOS Cloud - if summary and self._client: - self._pending_messages.append( - {"role": "assistant", "content": f"[summary] {summary}"} - ) - - return summary - async def summary_memory(self, messages: list[Msg], **kwargs) -> str: - """Summarise messages and upload to MemOS Cloud.""" - summary = await self.compact_memory(messages, **kwargs) + """Run ReMeLight summary, then upload conversation to MemOS Cloud.""" + # Delegate to parent for the actual summarisation + summary = await super().summary_memory(messages, **kwargs) - # Immediately upload conversation + summary - if self._client and messages: - cfg = self._load_memos_config() + # Upload conversation to MemOS Cloud (best-effort) + if self._memos_client and messages: + mc = self._load_memos_config() conv_msgs = [] for m in messages[-30:]: role = getattr(m, "role", "user") content = getattr(m, "content", "") if isinstance(content, str) and content.strip(): conv_msgs.append( - {"role": role, "content": content[:20000]} + {"role": role, "content": content[:20000]}, ) - if conv_msgs: - await self._client.add_message( - user_id=cfg["user_id"], - messages=conv_msgs, - conversation_id=cfg["conversation_id"], - agent_id=self.agent_id, - async_mode=cfg["async_mode"], - ) + try: + await self._memos_client.add_message( + user_id=mc["user_id"], + messages=conv_msgs, + conversation_id=mc["conversation_id"], + agent_id=self.agent_id, + async_mode=mc["async_mode"], + ) + except Exception as e: + logger.warning( + "Failed to upload summary to MemOS Cloud: %s", e, + ) return summary - - # ------------------------------------------------------------------ # - # InMemoryMemory bridge - # ------------------------------------------------------------------ # - - def get_in_memory_memory(self, **kwargs) -> MemOSInMemoryMemory: - """Return an InMemoryMemory with _long_term_memory support.""" - if self._in_memory is None: - self._in_memory = MemOSInMemoryMemory() - return self._in_memory - - # ------------------------------------------------------------------ # - # Internal helpers - # ------------------------------------------------------------------ # - - def _prepare_model_formatter(self) -> None: - """Lazily initialize chat_model and formatter if not set.""" - if self.chat_model is None or self.formatter is None: - try: - from copaw.agents.model_factory import create_model_and_formatter - - self.chat_model, self.formatter = create_model_and_formatter( - self.agent_id, - ) - except Exception as e: - logger.warning("Failed to init chat model: %s", e) From ba8a461933d107867c545fdeabd3669a22669658 Mon Sep 17 00:00:00 2001 From: fancy Date: Thu, 9 Apr 2026 16:35:26 +0800 Subject: [PATCH 3/5] fix(copaw-plugin): remove summary_memory override, search-only for v1 summary_memory only fires during context compaction, not every turn. Putting MemOS Cloud add logic there was both unreliable (too infrequent) and architecturally wrong (mixing local summary with cloud upload). v1 scope: MemOS Cloud provides recall (memory_search) only. All local operations (compaction, summary, persistence) stay with ReMeLight unchanged. Cloud-side add requires CoPaw to expose a post-turn hook first. --- .../memos_memory_manager.py | 51 +++++-------------- 1 file changed, 13 insertions(+), 38 deletions(-) diff --git a/apps/memos-copaw-plugin/memos_memory_manager.py b/apps/memos-copaw-plugin/memos_memory_manager.py index 036985d7..547f310f 100644 --- a/apps/memos-copaw-plugin/memos_memory_manager.py +++ b/apps/memos-copaw-plugin/memos_memory_manager.py @@ -72,9 +72,9 @@ class MemOSMemoryManager(ReMeLightMemoryManager): - get_in_memory_memory (with as_token_counter support) - summary_memory (file-based with toolkit) - Overrides cloud-bound operations: - - memory_search → POST /search/memory to MemOS Cloud - - summary_memory → parent summary + POST /add/message to MemOS Cloud + Overrides one method: + - memory_search → POST /search/memory to MemOS Cloud, + with automatic fallback to local ReMeLight search on failure. Configuration is read from ``running.memos_config`` in agent config, with env-var fallbacks: MEMOS_API_KEY, MEMOS_BASE_URL, MEMOS_USER_ID. @@ -238,38 +238,13 @@ async def memory_search( content=[TextBlock(type="text", text=text)], ) - # ------------------------------------------------------------------ # - # summary_memory → ReMeLight summary + upload to MemOS Cloud - # ------------------------------------------------------------------ # - - async def summary_memory(self, messages: list[Msg], **kwargs) -> str: - """Run ReMeLight summary, then upload conversation to MemOS Cloud.""" - # Delegate to parent for the actual summarisation - summary = await super().summary_memory(messages, **kwargs) - - # Upload conversation to MemOS Cloud (best-effort) - if self._memos_client and messages: - mc = self._load_memos_config() - conv_msgs = [] - for m in messages[-30:]: - role = getattr(m, "role", "user") - content = getattr(m, "content", "") - if isinstance(content, str) and content.strip(): - conv_msgs.append( - {"role": role, "content": content[:20000]}, - ) - if conv_msgs: - try: - await self._memos_client.add_message( - user_id=mc["user_id"], - messages=conv_msgs, - conversation_id=mc["conversation_id"], - agent_id=self.agent_id, - async_mode=mc["async_mode"], - ) - except Exception as e: - logger.warning( - "Failed to upload summary to MemOS Cloud: %s", e, - ) - - return summary + # summary_memory / compact_memory / compact_tool_result / check_context + # → fully inherited from ReMeLightMemoryManager, no override needed. + # + # MemOS Cloud "add" (uploading conversations) is intentionally NOT done + # here. CoPaw's memory manager interface has no per-turn callback; + # summary_memory only fires during context compaction, which is too + # infrequent for reliable cloud sync. A proper add flow requires + # CoPaw to expose a post-turn hook (similar to OpenClaw's agent_end). + # Until then, users can populate MemOS Cloud via the MemOS dashboard + # or other clients, and this plugin provides cloud-based recall. From 32a89dbf34b5f9507a484b3e6a1a584addac8d52 Mon Sep 17 00:00:00 2001 From: fancy Date: Thu, 9 Apr 2026 17:14:34 +0800 Subject: [PATCH 4/5] fix(copaw-plugin): address CI lint and AI review feedback - SIM105: use contextlib.suppress instead of try-except-pass - F401: remove unused imports (Msg, time) - ping(): return True only for 2xx, not all < 500 - plugin.json: remove "add" claim from description - docstring: remove stale summary_memory override reference - wire memory_limit_number config into search request - ruff format fixes --- apps/memos-copaw-plugin/memos_client.py | 37 +++++++++++-------- .../memos_memory_manager.py | 26 ++++++------- apps/memos-copaw-plugin/plugin.json | 2 +- apps/memos-copaw-plugin/plugin.py | 2 +- 4 files changed, 34 insertions(+), 33 deletions(-) diff --git a/apps/memos-copaw-plugin/memos_client.py b/apps/memos-copaw-plugin/memos_client.py index 15cbde15..20cc88e2 100644 --- a/apps/memos-copaw-plugin/memos_client.py +++ b/apps/memos-copaw-plugin/memos_client.py @@ -1,12 +1,12 @@ -# -*- coding: utf-8 -*- """Async HTTP client for MemOS Cloud API.""" import asyncio import logging -import time -from typing import Any, Dict, List, Optional + +from typing import Any import aiohttp + logger = logging.getLogger(__name__) _DEFAULT_BASE_URL = "https://memos.memtensor.cn/api/openmem/v1" @@ -32,7 +32,7 @@ def __init__( self.api_key = api_key self.timeout = timeout self.retries = retries - self._session: Optional[aiohttp.ClientSession] = None + self._session: aiohttp.ClientSession | None = None async def _ensure_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: @@ -53,11 +53,11 @@ async def close(self) -> None: async def _post( self, path: str, - payload: Dict[str, Any], - ) -> Optional[Dict[str, Any]]: + payload: dict[str, Any], + ) -> dict[str, Any] | None: """POST with retry. Returns parsed JSON or None on failure.""" url = f"{self.base_url}{path}" - last_err: Optional[Exception] = None + last_err: Exception | None = None for attempt in range(1 + self.retries): try: @@ -103,14 +103,14 @@ async def search_memory( include_tool_memory: bool = False, tool_memory_limit_number: int = 6, relativity: float = 0.45, - knowledgebase_ids: Optional[List[str]] = None, - filter_obj: Optional[Dict[str, Any]] = None, - ) -> Optional[Dict[str, Any]]: + knowledgebase_ids: list[str] | None = None, + filter_obj: dict[str, Any] | None = None, + ) -> dict[str, Any] | None: """Call POST /search/memory. Returns the ``data`` dict from MemOS response, or *None* on failure. """ - payload: Dict[str, Any] = { + payload: dict[str, Any] = { "user_id": user_id, "query": query, "source": source, @@ -140,19 +140,19 @@ async def search_memory( async def add_message( self, user_id: str, - messages: List[Dict[str, str]], + messages: list[dict[str, str]], *, conversation_id: str = "", source: str = "copaw", agent_id: str = "", async_mode: bool = True, - tags: Optional[List[str]] = None, + tags: list[str] | None = None, ) -> bool: """Call POST /add/message. Returns True on success, False on failure. """ - payload: Dict[str, Any] = { + payload: dict[str, Any] = { "user_id": user_id, "messages": messages, "source": source, @@ -172,7 +172,12 @@ async def add_message( # ------------------------------------------------------------------ # async def ping(self) -> bool: - """Lightweight connectivity check via a minimal search call.""" + """Lightweight connectivity check via a minimal search call. + + Returns True only for 2xx responses. 401/403 (bad key) and + other client errors are treated as failures so that ``start()`` + does not falsely report a healthy connection. + """ try: session = await self._ensure_session() async with session.post( @@ -180,6 +185,6 @@ async def ping(self) -> bool: json={"user_id": "_ping", "query": "ping"}, timeout=aiohttp.ClientTimeout(total=5), ) as resp: - return resp.status < 500 + return 200 <= resp.status < 300 except Exception: return False diff --git a/apps/memos-copaw-plugin/memos_memory_manager.py b/apps/memos-copaw-plugin/memos_memory_manager.py index 547f310f..6ea2af47 100644 --- a/apps/memos-copaw-plugin/memos_memory_manager.py +++ b/apps/memos-copaw-plugin/memos_memory_manager.py @@ -1,29 +1,26 @@ -# -*- coding: utf-8 -*- """MemOS Cloud memory manager for CoPaw agents. Extends ReMeLightMemoryManager — all local operations (context compaction, token counting, tool result truncation, in-memory memory) are delegated -to the parent class unchanged. Only two methods are overridden: - - - memory_search → queries MemOS Cloud instead of local vector index - - summary_memory → uploads conversation to MemOS Cloud after local summary +to the parent class unchanged. Only ``memory_search`` is overridden to +query MemOS Cloud instead of the local vector index. This ensures full compatibility with CoPaw's MemoryCompactionHook and force_memory_search auto-recall mechanism. """ +import contextlib import datetime import logging import os -from typing import Optional -from agentscope.message import Msg, TextBlock +from agentscope.message import TextBlock from agentscope.tool import ToolResponse - from copaw.agents.memory.reme_light_memory_manager import ( ReMeLightMemoryManager, ) from copaw.constant import EnvVarLoader + logger = logging.getLogger(__name__) @@ -40,13 +37,11 @@ def _format_search_results(data: dict) -> str: ts = item.get("update_time") or item.get("create_time") date_str = "" if ts: - try: + with contextlib.suppress(OSError, ValueError): date_str = datetime.datetime.fromtimestamp( ts, tz=datetime.timezone.utc, ).strftime("[%Y-%m-%d %H:%M] ") - except (OSError, ValueError): - pass value = (item.get("memory_value") or "")[:8000] rel = item.get("relativity", 0) parts.append(f"{date_str}{value} (score={rel:.2f})") @@ -83,7 +78,7 @@ class MemOSMemoryManager(ReMeLightMemoryManager): def __init__(self, working_dir: str, agent_id: str): super().__init__(working_dir=working_dir, agent_id=agent_id) self._memos_client = None - self._memos_cfg: Optional[dict] = None + self._memos_cfg: dict | None = None # ------------------------------------------------------------------ # # Config resolution @@ -112,7 +107,7 @@ def _load_memos_config(self) -> dict: "timeout": mc.timeout, "conversation_id": mc.conversation_id, "knowledgebase_ids": mc.knowledgebase_ids, - "async_mode": mc.async_mode, + # async_mode reserved for future add flow } except Exception as e: logger.debug( @@ -136,7 +131,6 @@ def _load_memos_config(self) -> dict: "timeout": cfg.get("timeout", 8.0), "conversation_id": cfg.get("conversation_id", ""), "knowledgebase_ids": cfg.get("knowledgebase_ids", []), - "async_mode": cfg.get("async_mode", True), } self._memos_cfg = result return result @@ -211,10 +205,12 @@ async def memory_search( return await super().memory_search(query, max_results, min_score) mc = self._load_memos_config() + # Use config default when caller doesn't override + limit = max(max_results, mc["memory_limit_number"]) data = await self._memos_client.search_memory( user_id=mc["user_id"], query=query, - memory_limit_number=max_results, + memory_limit_number=limit, include_preference=mc["include_preference"], preference_limit_number=mc["preference_limit_number"], relativity=max(min_score, mc["relativity"]), diff --git a/apps/memos-copaw-plugin/plugin.json b/apps/memos-copaw-plugin/plugin.json index 8adc9b4d..934468ca 100644 --- a/apps/memos-copaw-plugin/plugin.json +++ b/apps/memos-copaw-plugin/plugin.json @@ -2,7 +2,7 @@ "id": "memos-copaw-plugin", "name": "MemOS Cloud Memory", "version": "0.1.0", - "description": "MemOS Cloud memory backend for CoPaw — provides cloud-based long-term memory via MemOS API (search + add).", + "description": "MemOS Cloud memory backend for CoPaw — cloud-based long-term memory recall via MemOS search API.", "author": "MemTensor", "entry_point": "plugin.py", "dependencies": [ diff --git a/apps/memos-copaw-plugin/plugin.py b/apps/memos-copaw-plugin/plugin.py index fa1b0eb5..ccf3de04 100644 --- a/apps/memos-copaw-plugin/plugin.py +++ b/apps/memos-copaw-plugin/plugin.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- """MemOS Cloud memory plugin for CoPaw. Registers MemOSMemoryManager as a pluggable memory backend so that @@ -14,6 +13,7 @@ import logging import os + logger = logging.getLogger(__name__) # Load sibling module without mutating sys.path From 837a901def47a87741fb75c6b1695693b7ad7c63 Mon Sep 17 00:00:00 2001 From: fancy Date: Thu, 9 Apr 2026 17:21:46 +0800 Subject: [PATCH 5/5] style(copaw-plugin): apply ruff format --- apps/memos-copaw-plugin/memos_client.py | 1 + apps/memos-copaw-plugin/memos_memory_manager.py | 17 +++++++++-------- apps/memos-copaw-plugin/plugin.py | 4 ++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/apps/memos-copaw-plugin/memos_client.py b/apps/memos-copaw-plugin/memos_client.py index 20cc88e2..5936b0ef 100644 --- a/apps/memos-copaw-plugin/memos_client.py +++ b/apps/memos-copaw-plugin/memos_client.py @@ -1,4 +1,5 @@ """Async HTTP client for MemOS Cloud API.""" + import asyncio import logging diff --git a/apps/memos-copaw-plugin/memos_memory_manager.py b/apps/memos-copaw-plugin/memos_memory_manager.py index 6ea2af47..b1ab4281 100644 --- a/apps/memos-copaw-plugin/memos_memory_manager.py +++ b/apps/memos-copaw-plugin/memos_memory_manager.py @@ -8,6 +8,7 @@ This ensures full compatibility with CoPaw's MemoryCompactionHook and force_memory_search auto-recall mechanism. """ + import contextlib import datetime import logging @@ -111,7 +112,8 @@ def _load_memos_config(self) -> dict: } except Exception as e: logger.debug( - "Could not load memos_config from agent config: %s", e, + "Could not load memos_config from agent config: %s", + e, ) result = { @@ -120,10 +122,8 @@ def _load_memos_config(self) -> dict: "MEMOS_BASE_URL", "https://memos.memtensor.cn/api/openmem/v1", ), - "api_key": cfg.get("api_key") - or EnvVarLoader.get_str("MEMOS_API_KEY", ""), - "user_id": cfg.get("user_id") - or EnvVarLoader.get_str("MEMOS_USER_ID", "copaw-user"), + "api_key": cfg.get("api_key") or EnvVarLoader.get_str("MEMOS_API_KEY", ""), + "user_id": cfg.get("user_id") or EnvVarLoader.get_str("MEMOS_USER_ID", "copaw-user"), "memory_limit_number": cfg.get("memory_limit_number", 9), "include_preference": cfg.get("include_preference", True), "preference_limit_number": cfg.get("preference_limit_number", 6), @@ -174,12 +174,13 @@ async def start(self) -> None: ok = await self._memos_client.ping() if ok: logger.info( - "MemOS Cloud connected: %s (key=%s)", mc["base_url"], masked, + "MemOS Cloud connected: %s (key=%s)", + mc["base_url"], + masked, ) else: logger.warning( - "MemOS Cloud unreachable at %s — will fall back to " - "local ReMeLight search.", + "MemOS Cloud unreachable at %s — will fall back to local ReMeLight search.", mc["base_url"], ) diff --git a/apps/memos-copaw-plugin/plugin.py b/apps/memos-copaw-plugin/plugin.py index ccf3de04..061a392c 100644 --- a/apps/memos-copaw-plugin/plugin.py +++ b/apps/memos-copaw-plugin/plugin.py @@ -9,6 +9,7 @@ Then set ``memory_manager_backend: "memos"`` in agent config and provide MEMOS_API_KEY (env var or config). """ + import importlib.util import logging import os @@ -43,8 +44,7 @@ def register(self, api): api.register_startup_hook( hook_name="memos_cloud_init", callback=lambda: logger.info( - "MemOS Cloud plugin ready. " - "Set memory_manager_backend='memos' to activate." + "MemOS Cloud plugin ready. Set memory_manager_backend='memos' to activate." ), priority=90, )