Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 28 additions & 42 deletions src/memos/mem_reader/read_multi_modal/system_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ def parse_fast(
info: dict[str, Any],
**kwargs,
) -> list[TextualMemoryItem]:
"""
Parse system messages in fast mode.

Only tool schemas (within <tool_schema> tags) are stored as ToolSchemaMemory.
Regular system prompts and internal review messages are NOT stored to avoid
polluting user memory with system-level instructions.

Args:
message: System message to parse
info: Dictionary containing user_id and session_id
**kwargs: Additional parameters

Returns:
List of TextualMemoryItem objects (empty for non-tool-schema system messages)
"""
content = message.get("content", "")
if isinstance(content, dict):
content = content.get("text", "")
Expand All @@ -98,6 +113,14 @@ def parse_fast(
tool_schema_pattern = r"<tool_schema>(.*?)</tool_schema>"
match = re.search(tool_schema_pattern, content, flags=re.DOTALL)

if not match:
# No tool schema found - this is a regular system prompt or internal review message
# Do NOT store these as memory chunks to avoid polluting user memory
logger.debug(
f"[SystemParser] Skipping system message without tool schema (message_id={message.get('message_id', 'unknown')})"
)
return []

if match:
original_text = match.group(0) # Complete <tool_schema>...</tool_schema> block
schema_content = match.group(1) # Content between the tags
Expand Down Expand Up @@ -233,48 +256,11 @@ def format_tool_schema_readable(tool_schema):

content = content.replace(original_text, processed_text, 1)

parts = ["system: "]
if message.get("chat_time"):
parts.append(f"[{message.get('chat_time')}]: ")
prefix = "".join(parts)
msg_line = f"{prefix}{content}\n"

source = self.create_source(message, info)

# Extract info fields
info_ = info.copy()
user_id = info_.pop("user_id", "")
session_id = info_.pop("session_id", "")

# Extract manager_user_id and project_id from user_context
user_context: UserContext | None = kwargs.get("user_context")
manager_user_id = user_context.manager_user_id if user_context else None
project_id = user_context.project_id if user_context else None

# Split parsed text into chunks
content_chunks = self._split_text(msg_line)

memory_items = []
for _chunk_idx, chunk_text in enumerate(content_chunks):
if not chunk_text.strip():
continue

memory_item = TextualMemoryItem(
memory=chunk_text,
metadata=TreeNodeTextualMemoryMetadata(
user_id=user_id,
session_id=session_id,
memory_type="LongTermMemory", # only choce long term memory for system messages as a placeholder
status="activated",
tags=["mode:fast"],
sources=[source],
info=info_,
manager_user_id=manager_user_id,
project_id=project_id,
),
)
memory_items.append(memory_item)
return memory_items
# At this point, we have a tool schema that was successfully processed
# We do NOT store the compressed system message content as LongTermMemory
# Only the tool schema itself is extracted and stored via parse_fine
# Return empty list to defer to parse_fine for actual storage
return []

def parse_fine(
self,
Expand Down
125 changes: 125 additions & 0 deletions tests/mem_reader/test_system_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Test SystemParser to ensure system messages are handled correctly."""

import unittest

from unittest.mock import MagicMock

from memos.mem_reader.read_multi_modal.system_parser import SystemParser


class TestSystemParser(unittest.TestCase):
"""Test SystemParser behavior with different system message types."""

def setUp(self):
"""Set up test fixtures."""
# Mock embedder
self.mock_embedder = MagicMock()
self.mock_embedder.embed.return_value = [[0.1] * 128] # Mock embedding vector

# Create SystemParser instance with mocked embedder
self.parser = SystemParser(embedder=self.mock_embedder, chunker=None)

def test_parse_fast_with_tool_schema_creates_tool_schema_memory(self):
"""Test that messages with <tool_schema> blocks create ToolSchemaMemory items."""
message = {
"role": "system",
"content": '<tool_schema>[{"type": "function", "function": {"name": "test_tool"}}]</tool_schema>',
"chat_time": "2025-06-04T10:00:00",
"message_id": "msg_001",
}
info = {"user_id": "user1", "session_id": "session1"}

result = self.parser.parse_fast(message, info)

# Should return memory items for tool schemas
self.assertIsInstance(result, list)
self.assertGreater(len(result), 0, "Tool schema should create memory items")
self.assertEqual(result[0].metadata.memory_type, "ToolSchemaMemory")

def test_parse_fast_with_regular_system_prompt_returns_empty(self):
"""Test that regular system prompts (without tool schemas) do NOT create memory items."""
message = {
"role": "system",
"content": "You are a helpful AI assistant. Please follow these instructions carefully.",
"chat_time": "2025-06-04T10:00:00",
"message_id": "msg_002",
}
info = {"user_id": "user1", "session_id": "session1"}

result = self.parser.parse_fast(message, info)

# Regular system prompts should NOT be stored as memory
self.assertIsInstance(result, list)
self.assertEqual(len(result), 0, "Regular system prompts should not create memory items")

def test_parse_fast_with_internal_review_prompt_returns_empty(self):
"""Test that internal review prompts are NOT stored as memory chunks."""
message = {
"role": "system",
"content": "Internal Review: The conversation above contains sensitive information. "
"Please analyze and extract key points while maintaining confidentiality.",
"chat_time": "2025-06-04T10:00:00",
"message_id": "msg_003",
}
info = {"user_id": "user1", "session_id": "session1"}

result = self.parser.parse_fast(message, info)

# Internal review prompts should NOT be stored
self.assertIsInstance(result, list)
self.assertEqual(len(result), 0, "Internal review prompts should not create memory items")

def test_parse_fast_with_empty_content_returns_empty(self):
"""Test that empty system messages return empty list."""
message = {
"role": "system",
"content": "",
"chat_time": "2025-06-04T10:00:00",
"message_id": "msg_004",
}
info = {"user_id": "user1", "session_id": "session1"}

result = self.parser.parse_fast(message, info)

self.assertIsInstance(result, list)
self.assertEqual(len(result), 0)

def test_parse_fast_preserves_tool_schema_memory_type(self):
"""Test that tool schemas are correctly identified and stored with ToolSchemaMemory type."""
tool_schema_content = """[
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get weather information",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
}
}
}
}
]"""
message = {
"role": "system",
"content": f"<tool_schema>{tool_schema_content}</tool_schema>",
"chat_time": "2025-06-04T10:00:00",
"message_id": "msg_005",
}
info = {"user_id": "user1", "session_id": "session1"}

result = self.parser.parse_fast(message, info)

self.assertGreater(len(result), 0)
# Verify all returned items are ToolSchemaMemory
for item in result:
self.assertEqual(
item.metadata.memory_type,
"ToolSchemaMemory",
"Tool schemas must be stored as ToolSchemaMemory, not LongTermMemory",
)


if __name__ == "__main__":
unittest.main()
Loading