Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dimos/perception/experimental/temporal_memory/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,6 @@ Notes
- Evidence is extracted in sliding windows, so queries can refer to recent or past entities.
- Distance estimation can run in the background to enrich graph relations.
- If you want a different output directory, set `TemporalMemoryConfig(output_dir=...)`.

To visualize, run
`python -m dimos.perception.experimental.temporal_memory.graph_viz_server assets/temporal_memory/entity_graph.db` and open in `localhost:8080` in your browser.
55 changes: 55 additions & 0 deletions dimos/perception/experimental/temporal_memory/entity_graph_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,61 @@ def get_entity(self, entity_id: str) -> dict[str, Any] | None:
"metadata": json.loads(row["metadata"]) if row["metadata"] else None,
}

def update_entity(
self,
entity_id: str,
descriptor: str | None = None,
metadata: dict[str, Any] | None = None,
) -> bool:
"""
Update an entity's descriptor and/or metadata.

Args:
entity_id: Entity ID to update
descriptor: New descriptor (optional)
metadata: New metadata dict (optional, will merge with existing)

Returns:
True if entity was updated, False if not found
"""
conn = self._get_connection()
cursor = conn.cursor()

# Get existing entity
cursor.execute("SELECT metadata FROM entities WHERE entity_id = ?", (entity_id,))
row = cursor.fetchone()
if row is None:
return False

# Merge metadata if provided
existing_metadata = json.loads(row["metadata"]) if row["metadata"] else {}
if metadata:
existing_metadata.update(metadata)

# Update descriptor and/or metadata
updates = []
params: list[Any] = []

if descriptor is not None:
updates.append("descriptor = ?")
params.append(descriptor)

if metadata is not None:
updates.append("metadata = ?")
params.append(json.dumps(existing_metadata))

if not updates:
return True # Nothing to update

params.append(entity_id)
cursor.execute(
f"UPDATE entities SET {', '.join(updates)} WHERE entity_id = ?",
params,
)
conn.commit()
logger.debug(f"Updated entity {entity_id}")
return True

def get_all_entities(self, entity_type: str | None = None) -> list[dict[str, Any]]:
"""Get all entities, optionally filtered by type."""
conn = self._get_connection()
Expand Down
Loading
Loading