Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/memos/api/handlers/component_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
)
from memos.memories.textual.simple_preference import SimplePreferenceTextMemory
from memos.memories.textual.simple_tree import SimpleTreeTextMemory
from memos.memories.textual.tree_text_memory.organize.history_manager import MemoryHistoryManager
from memos.memories.textual.tree_text_memory.organize.manager import MemoryManager
from memos.memories.textual.tree_text_memory.retrieve.retrieve_utils import FastTokenizer

Expand Down Expand Up @@ -190,6 +191,7 @@ def init_server() -> dict[str, Any]:
)
embedder = EmbedderFactory.from_config(embedder_config)
nli_client = NLIClient(base_url=nli_client_config["base_url"])
memory_history_manager = MemoryHistoryManager(nli_client=nli_client, graph_db=graph_db)
# Pass graph_db to mem_reader for recall operations (deduplication, conflict detection)
mem_reader = MemReaderFactory.from_config(mem_reader_config, graph_db=graph_db)
reranker = RerankerFactory.from_config(reranker_config)
Expand Down Expand Up @@ -393,4 +395,5 @@ def init_server() -> dict[str, Any]:
"redis_client": redis_client,
"deepsearch_agent": deepsearch_agent,
"nli_client": nli_client,
"memory_history_manager": memory_history_manager,
}
61 changes: 59 additions & 2 deletions src/memos/memories/textual/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,43 @@ class SourceMessage(BaseModel):
model_config = ConfigDict(extra="allow")


class ArchivedTextualMemory(BaseModel):
"""
This is a light-weighted class for storing archived versions of memories.

When an existing memory item needs to be updated due to conflict/duplicate with new memory contents,
its previous contents will be preserved, in 2 places:
1. ArchivedTextualMemory, which only contains minimal information, like memory content and create time,
stored in the 'history' field of the original node.
2. A new memory node, storing full original information including sources and embedding,
and referenced by 'archived_memory_id'.
"""

version: int = Field(
default=1,
description="The version of the archived memory content. Will be compared to the version of the active memory item(in Metadata)",
)
is_fast: bool = Field(
default=False,
description="Whether this archived memory was created in fast mode, thus raw.",
)
memory: str | None = Field(
default_factory=lambda: "", description="The content of the archived version of the memory."
)
update_type: Literal["conflict", "duplicate", "extract", "unrelated"] = Field(
default="unrelated",
description="The type of the memory (e.g., `conflict`, `duplicate`, `extract`, `unrelated`).",
)
archived_memory_id: str | None = Field(
default=None,
description="Link to a memory node with status='archived', storing full original information, including sources and embedding.",
)
created_at: str | None = Field(
default_factory=lambda: datetime.now().isoformat(),
description="The time the memory was created.",
)


class TextualMemoryMetadata(BaseModel):
"""Metadata for a memory item.

Expand All @@ -60,9 +97,29 @@ class TextualMemoryMetadata(BaseModel):
default=None,
description="The ID of the session during which the memory was created. Useful for tracking context in conversations.",
)
status: Literal["activated", "archived", "deleted"] | None = Field(
status: Literal["activated", "resolving", "archived", "deleted"] | None = Field(
default="activated",
description="The status of the memory, e.g., 'activated', 'archived', 'deleted'.",
description="The status of the memory, e.g., 'activated', 'resolving'(updating with conflicting/duplicating new memories), 'archived', 'deleted'.",
)
is_fast: bool | None = Field(
default=None,
description="Whether or not the memory was created in fast mode, carrying raw memory contents that haven't been edited by llms yet.",
)
evolve_to: list[str] | None = Field(
default_factory=list,
description="Only valid if a node was once a (raw)fast node. Recording which new memory nodes it 'evolves' to after llm extraction.",
)
version: int | None = Field(
default=None,
description="The version of the memory. Will be incremented when the memory is updated.",
)
history: list[ArchivedTextualMemory] | None = Field(
default_factory=list,
description="Storing the archived versions of the memory. Only preserving core information of each version.",
)
working_binding: str | None = Field(
default=None,
description="The working memory id binding of the (fast) memory.",
)
type: str | None = Field(default=None)
key: str | None = Field(default=None, description="Memory key or title.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import logging

from typing import Literal

from memos.context.context import ContextThreadPoolExecutor
from memos.extras.nli_model.client import NLIClient
from memos.extras.nli_model.types import NLIResult
from memos.graph_dbs.base import BaseGraphDB
from memos.memories.textual.item import ArchivedTextualMemory, TextualMemoryItem


logger = logging.getLogger(__name__)

CONFLICT_MEMORY_TITLE = "[possibly conflicting memories]"
DUPLICATE_MEMORY_TITLE = "[possibly duplicate memories]"


def _append_related_content(
new_item: TextualMemoryItem, duplicates: list[str], conflicts: list[str]
) -> None:
"""
Append duplicate and conflict memory contents to the new item's memory text,
truncated to avoid excessive length.
"""
max_per_item_len = 200
max_section_len = 1000

def _format_section(title: str, items: list[str]) -> str:
if not items:
return ""

section_content = ""
for mem in items:
# Truncate individual item
snippet = mem[:max_per_item_len] + "..." if len(mem) > max_per_item_len else mem
# Check total section length
if len(section_content) + len(snippet) + 5 > max_section_len:
section_content += "\n- ... (more items truncated)"
break
section_content += f"\n- {snippet}"

return f"\n\n{title}:{section_content}"

append_text = ""
append_text += _format_section(CONFLICT_MEMORY_TITLE, conflicts)
append_text += _format_section(DUPLICATE_MEMORY_TITLE, duplicates)

if append_text:
new_item.memory += append_text


def _detach_related_content(new_item: TextualMemoryItem) -> None:
"""
Detach duplicate and conflict memory contents from the new item's memory text.
"""
markers = [f"\n\n{CONFLICT_MEMORY_TITLE}:", f"\n\n{DUPLICATE_MEMORY_TITLE}:"]

cut_index = -1
for marker in markers:
idx = new_item.memory.find(marker)
if idx != -1 and (cut_index == -1 or idx < cut_index):
cut_index = idx

if cut_index != -1:
new_item.memory = new_item.memory[:cut_index]

return


class MemoryHistoryManager:
def __init__(self, nli_client: NLIClient, graph_db: BaseGraphDB) -> None:
"""
Initialize the MemoryHistoryManager.

Args:
nli_client: NLIClient for conflict/duplicate detection.
graph_db: GraphDB instance for marking operations during history management.
"""
self.nli_client = nli_client
self.graph_db = graph_db

def resolve_history_via_nli(
self, new_item: TextualMemoryItem, related_items: list[TextualMemoryItem]
) -> list[TextualMemoryItem]:
"""
Detect relationships (Duplicate/Conflict) between the new item and related items using NLI,
and attach them as history to the new fast item.

Args:
new_item: The new memory item being added.
related_items: Existing memory items that might be related.

Returns:
List of duplicate or conflicting memory items judged by the NLI service.
"""
if not related_items:
return []

# 1. Call NLI
nli_results = self.nli_client.compare_one_to_many(
new_item.memory, [r.memory for r in related_items]
)

# 2. Process results and attach to history
duplicate_memories = []
conflict_memories = []

for r_item, nli_res in zip(related_items, nli_results, strict=False):
if nli_res == NLIResult.DUPLICATE:
update_type = "duplicate"
duplicate_memories.append(r_item.memory)
elif nli_res == NLIResult.CONTRADICTION:
update_type = "conflict"
conflict_memories.append(r_item.memory)
else:
update_type = "unrelated"

# Safely get created_at, fallback to updated_at
created_at = getattr(r_item.metadata, "created_at", None) or r_item.metadata.updated_at

archived = ArchivedTextualMemory(
version=r_item.metadata.version or 1,
is_fast=r_item.metadata.is_fast or False,
memory=r_item.memory,
update_type=update_type,
archived_memory_id=r_item.id,
created_at=created_at,
)
new_item.metadata.history.append(archived)
logger.info(
f"[MemoryHistoryManager] Archived related memory {r_item.id} as {update_type} for new item {new_item.id}"
)

# 3. Concat duplicate/conflict memories to new_item.memory
# We will mark those old memories as invisible during fine processing, this op helps to avoid information loss.
_append_related_content(new_item, duplicate_memories, conflict_memories)

return duplicate_memories + conflict_memories

def mark_memory_status(
self,
memory_items: list[TextualMemoryItem],
status: Literal["activated", "resolving", "archived", "deleted"],
) -> None:
"""
Support status marking operations during history management. Common usages are:
1. Mark conflict/duplicate old memories' status as "resolving",
to make them invisible to /search api, but still visible for PreUpdateRetriever.
2. Mark resolved memories' status as "activated", to restore their visibility.
"""
# Execute the actual marking operation - in db.
with ContextThreadPoolExecutor() as executor:
futures = []
for mem in memory_items:
futures.append(
executor.submit(
self.graph_db.update_node,
id=mem.id,
fields={"status": status},
)
)

# Wait for all tasks to complete and raise any exceptions
for future in futures:
future.result()
return
Loading