From 382df209165b3baad06258f2acbcdb43a9639964 Mon Sep 17 00:00:00 2001 From: bittergreen Date: Fri, 30 Jan 2026 16:13:47 +0800 Subject: [PATCH 1/3] feat: Data structure for memory versions --- src/memos/memories/textual/item.py | 61 +++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/src/memos/memories/textual/item.py b/src/memos/memories/textual/item.py index 46770758d..63476c7cc 100644 --- a/src/memos/memories/textual/item.py +++ b/src/memos/memories/textual/item.py @@ -45,6 +45,43 @@ class SourceMessage(BaseModel): model_config = ConfigDict(extra="allow") +class ArchivedTextualMemory(BaseModel): + """ + This is a light-weighted class for storing archived versions of memories. + + When an existing memory item needs to be updated due to conflict/duplicate with new memory contents, + its previous contents will be preserved, in 2 places: + 1. ArchivedTextualMemory, which only contains minimal information, like memory content and create time, + stored in the 'history' field of the original node. + 2. A new memory node, storing full original information including sources and embedding, + and referenced by 'archived_memory_id'. + """ + + version: int = Field( + default=1, + description="The version of the archived memory content. Will be compared to the version of the active memory item(in Metadata)", + ) + is_fast: bool = Field( + default=False, + description="Whether this archived memory was created in fast mode, thus raw.", + ) + memory: str | None = Field( + default_factory=lambda: "", description="The content of the archived version of the memory." + ) + update_type: Literal["conflict", "duplicate", "extract", "unrelated"] = Field( + default="unrelated", + description="The type of the memory (e.g., `conflict`, `duplicate`, `extract`, `unrelated`).", + ) + archived_memory_id: str | None = Field( + default=None, + description="Link to a memory node with status='archived', storing full original information, including sources and embedding.", + ) + created_at: str | None = Field( + default_factory=lambda: datetime.now().isoformat(), + description="The time the memory was created.", + ) + + class TextualMemoryMetadata(BaseModel): """Metadata for a memory item. @@ -60,9 +97,29 @@ class TextualMemoryMetadata(BaseModel): default=None, description="The ID of the session during which the memory was created. Useful for tracking context in conversations.", ) - status: Literal["activated", "archived", "deleted"] | None = Field( + status: Literal["activated", "resolving", "archived", "deleted"] | None = Field( default="activated", - description="The status of the memory, e.g., 'activated', 'archived', 'deleted'.", + description="The status of the memory, e.g., 'activated', 'resolving'(updating with conflicting/duplicating new memories), 'archived', 'deleted'.", + ) + is_fast: bool | None = Field( + default=None, + description="Whether or not the memory was created in fast mode, carrying raw memory contents that haven't been edited by llms yet.", + ) + evolve_to: list[str] | None = Field( + default_factory=list, + description="Only valid if a node was once a (raw)fast node. Recording which new memory nodes it 'evolves' to after llm extraction.", + ) + version: int | None = Field( + default=None, + description="The version of the memory. Will be incremented when the memory is updated.", + ) + history: list[ArchivedTextualMemory] | None = Field( + default_factory=list, + description="Storing the archived versions of the memory. Only preserving core information of each version.", + ) + working_binding: str | None = Field( + default=None, + description="The working memory id binding of the (fast) memory.", ) type: str | None = Field(default=None) key: str | None = Field(default=None, description="Memory key or title.") From b98d54da460bd2fcf5419df6b18680d4cc1722be Mon Sep 17 00:00:00 2001 From: bittergreen Date: Fri, 30 Jan 2026 16:15:45 +0800 Subject: [PATCH 2/3] feat: Initialize class for managing memory versions --- src/memos/api/handlers/component_init.py | 3 + .../organize/history_manager.py | 166 ++++++++++++++++++ 2 files changed, 169 insertions(+) create mode 100644 src/memos/memories/textual/tree_text_memory/organize/history_manager.py diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index 13dd92189..ba527d602 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -43,6 +43,7 @@ ) from memos.memories.textual.simple_preference import SimplePreferenceTextMemory from memos.memories.textual.simple_tree import SimpleTreeTextMemory +from memos.memories.textual.tree_text_memory.organize.history_manager import MemoryHistoryManager from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager from memos.memories.textual.tree_text_memory.retrieve.retrieve_utils import FastTokenizer @@ -190,6 +191,7 @@ def init_server() -> dict[str, Any]: ) embedder = EmbedderFactory.from_config(embedder_config) nli_client = NLIClient(base_url=nli_client_config["base_url"]) + memory_history_manager = MemoryHistoryManager(nli_client=nli_client, graph_db=graph_db) # Pass graph_db to mem_reader for recall operations (deduplication, conflict detection) mem_reader = MemReaderFactory.from_config(mem_reader_config, graph_db=graph_db) reranker = RerankerFactory.from_config(reranker_config) @@ -393,4 +395,5 @@ def init_server() -> dict[str, Any]: "redis_client": redis_client, "deepsearch_agent": deepsearch_agent, "nli_client": nli_client, + "memory_history_manager": memory_history_manager, } diff --git a/src/memos/memories/textual/tree_text_memory/organize/history_manager.py b/src/memos/memories/textual/tree_text_memory/organize/history_manager.py new file mode 100644 index 000000000..1afdc9281 --- /dev/null +++ b/src/memos/memories/textual/tree_text_memory/organize/history_manager.py @@ -0,0 +1,166 @@ +import logging + +from typing import Literal + +from memos.context.context import ContextThreadPoolExecutor +from memos.extras.nli_model.client import NLIClient +from memos.extras.nli_model.types import NLIResult +from memos.graph_dbs.base import BaseGraphDB +from memos.memories.textual.item import ArchivedTextualMemory, TextualMemoryItem + + +logger = logging.getLogger(__name__) + +CONFLICT_MEMORY_TITLE = "[possibly conflicting memories]" +DUPLICATE_MEMORY_TITLE = "[possibly duplicate memories]" + + +def _append_related_content( + new_item: TextualMemoryItem, duplicates: list[str], conflicts: list[str] +) -> None: + """ + Append duplicate and conflict memory contents to the new item's memory text, + truncated to avoid excessive length. + """ + max_per_item_len = 200 + max_section_len = 1000 + + def _format_section(title: str, items: list[str]) -> str: + if not items: + return "" + + section_content = "" + for mem in items: + # Truncate individual item + snippet = mem[:max_per_item_len] + "..." if len(mem) > max_per_item_len else mem + # Check total section length + if len(section_content) + len(snippet) + 5 > max_section_len: + section_content += "\n- ... (more items truncated)" + break + section_content += f"\n- {snippet}" + + return f"\n\n{title}:{section_content}" + + append_text = "" + append_text += _format_section(CONFLICT_MEMORY_TITLE, conflicts) + append_text += _format_section(DUPLICATE_MEMORY_TITLE, duplicates) + + if append_text: + new_item.memory += append_text + + +def _detach_related_content(new_item: TextualMemoryItem) -> None: + """ + Detach duplicate and conflict memory contents from the new item's memory text. + """ + markers = [f"\n\n{CONFLICT_MEMORY_TITLE}:", f"\n\n{DUPLICATE_MEMORY_TITLE}:"] + + cut_index = -1 + for marker in markers: + idx = new_item.memory.find(marker) + if idx != -1 and (cut_index == -1 or idx < cut_index): + cut_index = idx + + if cut_index != -1: + new_item.memory = new_item.memory[:cut_index] + + return + + +class MemoryHistoryManager: + def __init__(self, nli_client: NLIClient, graph_db: BaseGraphDB) -> None: + """ + Initialize the MemoryHistoryManager. + + Args: + nli_client: NLIClient for conflict/duplicate detection. + graph_db: GraphDB instance for marking operations during history management. + """ + self.nli_client = nli_client + self.graph_db = graph_db + + def resolve_history_via_nli( + self, new_item: TextualMemoryItem, related_items: list[TextualMemoryItem] + ) -> list[TextualMemoryItem]: + """ + Detect relationships (Duplicate/Conflict) between the new item and related items using NLI, + and attach them as history to the new fast item. + + Args: + new_item: The new memory item being added. + related_items: Existing memory items that might be related. + + Returns: + List of duplicate or conflicting memory items judged by the NLI service. + """ + if not related_items: + return [] + + # 1. Call NLI + nli_results = self.nli_client.compare_one_to_many( + new_item.memory, [r.memory for r in related_items] + ) + + # 2. Process results and attach to history + duplicate_memories = [] + conflict_memories = [] + + for r_item, nli_res in zip(related_items, nli_results, strict=False): + if nli_res == NLIResult.DUPLICATE: + update_type = "duplicate" + duplicate_memories.append(r_item.memory) + elif nli_res == NLIResult.CONTRADICTION: + update_type = "conflict" + conflict_memories.append(r_item.memory) + else: + update_type = "unrelated" + + # Safely get created_at, fallback to updated_at + created_at = getattr(r_item.metadata, "created_at", None) or r_item.metadata.updated_at + + archived = ArchivedTextualMemory( + version=r_item.metadata.version or 1, + is_fast=r_item.metadata.is_fast or False, + memory=r_item.memory, + update_type=update_type, + archived_memory_id=r_item.id, + created_at=created_at, + ) + new_item.metadata.history.append(archived) + logger.info( + f"[MemoryHistoryManager] Archived related memory {r_item.id} as {update_type} for new item {new_item.id}" + ) + + # 3. Concat duplicate/conflict memories to new_item.memory + # We will mark those old memories as invisible during fine processing, this op helps to avoid information loss. + _append_related_content(new_item, duplicate_memories, conflict_memories) + + return duplicate_memories + conflict_memories + + def mark_memory_status( + self, + memory_items: list[TextualMemoryItem], + status: Literal["activated", "resolving", "archived", "deleted"], + ) -> None: + """ + Support status marking operations during history management. Common usages are: + 1. Mark conflict/duplicate old memories' status as "resolving", + to make them invisible to /search api, but still visible for PreUpdateRetriever. + 2. Mark resolved memories' status as "activated", to restore their visibility. + """ + # Execute the actual marking operation - in db. + with ContextThreadPoolExecutor() as executor: + futures = [] + for mem in memory_items: + futures.append( + executor.submit( + self.graph_db.update_node, + id=mem.id, + fields={"status": status}, + ) + ) + + # Wait for all tasks to complete and raise any exceptions + for future in futures: + future.result() + return From d9c2d6d3622faa3a36217e111d7a77e5cfd47188 Mon Sep 17 00:00:00 2001 From: bittergreen Date: Fri, 30 Jan 2026 16:31:26 +0800 Subject: [PATCH 3/3] test: Unit test for managing memory versions --- .../memories/textual/test_history_manager.py | 137 ++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 tests/memories/textual/test_history_manager.py diff --git a/tests/memories/textual/test_history_manager.py b/tests/memories/textual/test_history_manager.py new file mode 100644 index 000000000..46cf3a1f6 --- /dev/null +++ b/tests/memories/textual/test_history_manager.py @@ -0,0 +1,137 @@ +import uuid + +from unittest.mock import MagicMock + +import pytest + +from memos.extras.nli_model.client import NLIClient +from memos.extras.nli_model.types import NLIResult +from memos.graph_dbs.base import BaseGraphDB +from memos.memories.textual.item import ( + TextualMemoryItem, + TextualMemoryMetadata, +) +from memos.memories.textual.tree_text_memory.organize.history_manager import ( + MemoryHistoryManager, + _append_related_content, + _detach_related_content, +) + + +@pytest.fixture +def mock_nli_client(): + client = MagicMock(spec=NLIClient) + return client + + +@pytest.fixture +def mock_graph_db(): + return MagicMock(spec=BaseGraphDB) + + +@pytest.fixture +def history_manager(mock_nli_client, mock_graph_db): + return MemoryHistoryManager(nli_client=mock_nli_client, graph_db=mock_graph_db) + + +def test_detach_related_content(): + original_memory = "This is the original memory content." + item = TextualMemoryItem(memory=original_memory, metadata=TextualMemoryMetadata()) + + duplicates = ["Duplicate 1", "Duplicate 2"] + conflicts = ["Conflict 1", "Conflict 2"] + + # 1. Append content + _append_related_content(item, duplicates, conflicts) + + # Verify content was appended + assert item.memory != original_memory + assert "[possibly conflicting memories]" in item.memory + assert "[possibly duplicate memories]" in item.memory + assert "Duplicate 1" in item.memory + assert "Conflict 1" in item.memory + + # 2. Detach content + _detach_related_content(item) + + # 3. Verify content is restored + assert item.memory == original_memory + + +def test_detach_only_conflicts(): + original_memory = "Original memory." + item = TextualMemoryItem(memory=original_memory, metadata=TextualMemoryMetadata()) + + duplicates = [] + conflicts = ["Conflict A"] + + _append_related_content(item, duplicates, conflicts) + assert "Conflict A" in item.memory + assert "Duplicate" not in item.memory + + _detach_related_content(item) + assert item.memory == original_memory + + +def test_detach_only_duplicates(): + original_memory = "Original memory." + item = TextualMemoryItem(memory=original_memory, metadata=TextualMemoryMetadata()) + + duplicates = ["Duplicate A"] + conflicts = [] + + _append_related_content(item, duplicates, conflicts) + assert "Duplicate A" in item.memory + assert "Conflict" not in item.memory + + _detach_related_content(item) + assert item.memory == original_memory + + +def test_truncation(history_manager, mock_nli_client): + # Setup + new_item = TextualMemoryItem(memory="Test") + long_memory = "A" * 300 + related_item = TextualMemoryItem(memory=long_memory) + + mock_nli_client.compare_one_to_many.return_value = [NLIResult.DUPLICATE] + + # Action + history_manager.resolve_history_via_nli(new_item, [related_item]) + + # Assert + assert "possibly duplicate memories" in new_item.memory + assert "..." in new_item.memory # Should be truncated + assert len(new_item.memory) < 1000 # Ensure reasonable length + + +def test_empty_related_items(history_manager, mock_nli_client): + new_item = TextualMemoryItem(memory="Test") + history_manager.resolve_history_via_nli(new_item, []) + + mock_nli_client.compare_one_to_many.assert_not_called() + assert new_item.metadata.history is None or len(new_item.metadata.history) == 0 + + +def test_mark_memory_status(history_manager, mock_graph_db): + # Setup + id1 = uuid.uuid4().hex + id2 = uuid.uuid4().hex + id3 = uuid.uuid4().hex + items = [ + TextualMemoryItem(memory="M1", id=id1), + TextualMemoryItem(memory="M2", id=id2), + TextualMemoryItem(memory="M3", id=id3), + ] + status = "resolving" + + # Action + history_manager.mark_memory_status(items, status) + + # Assert + assert mock_graph_db.update_node.call_count == 3 + + # Verify we called it correctly + mock_graph_db.update_node.assert_any_call(id=id1, fields={"status": status}) + mock_graph_db.update_node.assert_any_call(id=id2, fields={"status": status}) + mock_graph_db.update_node.assert_any_call(id=id3, fields={"status": status})