Skip to content
Open
30 changes: 30 additions & 0 deletions dimos/agents/mcp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
# limitations under the License.

from collections.abc import Callable
from dataclasses import dataclass
import os
from queue import Empty, Queue
import sys
from threading import Event, RLock, Thread
import time
from typing import Any
Expand Down Expand Up @@ -59,6 +62,7 @@ class McpClient(Module):
_message_queue: Queue[BaseMessage]
_tool_registry: dict[str, dict[str, Any]]
_history: list[BaseMessage]
_parent_session_id: str | None
_thread: Thread
_stop_event: Event
_http_client: httpx.Client
Expand All @@ -72,6 +76,7 @@ def __init__(self, **kwargs: Any) -> None:
self._message_queue = Queue()
self._tool_registry = {}
self._history = []
self._parent_session_id: str | None = None
self._thread = Thread(
target=self._thread_loop,
name=f"{self.__class__.__name__}-thread",
Expand Down Expand Up @@ -218,6 +223,15 @@ def on_system_modules(self, _modules: list[RPCClient]) -> None:

model = MockModel(json_path=self.config.model_fixture)

from dimos.core.global_config import global_config
from dimos.core.session_store import restore_session

self._history, self._parent_session_id = restore_session(
blueprint=os.environ.get("DIMOS_BLUEPRINT", ""),
restore_session_id=global_config.restore_session,
no_restore=global_config.no_restore,
)

with self._lock:
self._state_graph = create_agent(
model=model,
Expand All @@ -238,6 +252,22 @@ def stop(self) -> None:
if self._thread.is_alive():
self._thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT)
self._http_client.close()

run_id = os.environ.get("DIMOS_RUN_ID", "")
blueprint = os.environ.get("DIMOS_BLUEPRINT", "")
if run_id and blueprint and self._history:
from dimos.core.session_store import save_session

save_session(
run_id=run_id,
blueprint=blueprint,
model=self.config.model,
started_at=os.environ.get("DIMOS_STARTED_AT"),
original_argv=sys.argv,
history=self._history,
parent_session_id=self._parent_session_id,
)

super().stop()

@rpc
Expand Down
2 changes: 2 additions & 0 deletions dimos/core/global_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class GlobalConfig(BaseSettings):
obstacle_avoidance: bool = True
detection_model: VlModelName = "moondream"
listen_host: str = "127.0.0.1"
restore_session: str | None = None
no_restore: bool = False

model_config = SettingsConfigDict(
env_file=".env",
Expand Down
132 changes: 132 additions & 0 deletions dimos/core/session_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Persistent storage for agent conversation history across dimos runs.

Layout on disk:
~/.local/state/dimos/sessions/
<blueprint>/
<timestamp>.json
"""

from __future__ import annotations

from datetime import datetime, timezone
import json
import re
from pathlib import Path

from langchain_core.messages import messages_from_dict, messages_to_dict
from langchain_core.messages.base import BaseMessage

from dimos.constants import STATE_DIR
from dimos.utils.logging_config import setup_logger

logger = setup_logger()

# run_id format: "20260502-143022-<blueprint>"
_RUN_ID_RE = re.compile(r"^(\d{8}-\d{6})-(.*)")


def _parse_run_id(run_id: str) -> tuple[str, str]:
"""Return (timestamp, blueprint) from a run_id."""
m = _RUN_ID_RE.match(run_id)
if not m:
raise ValueError(f"Cannot parse run_id: {run_id!r}")
return m.group(1), m.group(2)


def _session_path(run_id: str) -> Path:
timestamp, blueprint = _parse_run_id(run_id)
return STATE_DIR / "sessions" / blueprint / f"{timestamp}.json"


def save_session(
run_id: str,
blueprint: str,
model: str,
started_at: str | None,
original_argv: list[str],
history: list[BaseMessage],
parent_session_id: str | None = None,
) -> None:
"""Persist agent conversation history and session metadata to disk."""
path = _session_path(run_id)
path.parent.mkdir(parents=True, exist_ok=True)
data = {
"run_id": run_id,
"blueprint": blueprint,
"model": model,
"started_at": started_at,
"ended_at": datetime.now(timezone.utc).isoformat(),
"original_argv": original_argv,
"parent_session_id": parent_session_id,
"messages": messages_to_dict(history),
}
path.write_text(json.dumps(data, indent=2))
logger.info("Saved agent session.", run_id=run_id, n_messages=len(history))


def load_session(run_id: str) -> tuple[list[BaseMessage], dict[str, object]]:
"""Load agent history and metadata by run_id.

Returns (messages, metadata) where metadata contains all non-message fields.
Raises FileNotFoundError if the session does not exist.
"""
path = _session_path(run_id)
data = json.loads(path.read_text())
messages = messages_from_dict(data["messages"])
metadata = {k: v for k, v in data.items() if k != "messages"}
logger.info("Restored agent session.", run_id=run_id, n_messages=len(messages))
return messages, metadata


def restore_session(
blueprint: str,
restore_session_id: str | None,
no_restore: bool,
) -> tuple[list[BaseMessage], str | None]:
"""Restore history from a previous session.

Returns (history, parent_session_id). Returns ([], None) if nothing to restore.
"""
if no_restore or not blueprint:
return [], None
session_id = restore_session_id or find_latest_session(blueprint)
if not session_id:
return [], None
try:
history, metadata = load_session(session_id)
return history, str(metadata["run_id"])
except Exception:
logger.warning("Failed to restore session, starting fresh.", session_id=session_id)
return [], None
Comment on lines +109 to +114
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Narrow exception scope crashes agent startup on corrupt sessions

restore_session only catches FileNotFoundError. If the stored JSON is malformed, json.JSONDecodeError propagates from load_session; if the saved file is missing the "run_id" key, a KeyError propagates from str(metadata["run_id"]). Either exception will crash the agent during on_system_modules with no graceful fallback. The catch block should be broadened (e.g., except Exception) and log a warning before returning ([], None), matching the robustness level of find_latest_session.



def find_latest_session(blueprint: str) -> str | None:
"""Return the run_id of the most recent saved session for a blueprint, or None."""
bp_dir = STATE_DIR / "sessions" / blueprint
if not bp_dir.exists():
return None
files = sorted(bp_dir.glob("*.json"))
if not files:
return None
try:
data = json.loads(files[-1].read_text())
return str(data["run_id"])
except Exception:
logger.warning("Failed to read latest session file.", path=str(files[-1]))
return None
Comment on lines +125 to +130
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 find_latest_session silently swallows all exceptions with a bare except Exception: return None. A permission error, decode error, or I/O failure is indistinguishable from "no sessions found", making failures very hard to diagnose. Log a warning before returning.

Suggested change
try:
data = json.loads(files[-1].read_text())
return str(data["run_id"])
except Exception:
return None
try:
data = json.loads(files[-1].read_text())
return str(data["run_id"])
except Exception:
logger.warning("Failed to read latest session file.", path=str(files[-1]))
return None



114 changes: 114 additions & 0 deletions dimos/core/test_session_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for session_store save/load/restore/find_latest."""

from __future__ import annotations

import pytest
from langchain_core.messages import AIMessage, HumanMessage

import dimos.core.session_store as session_store_mod
from dimos.core.session_store import (
find_latest_session,
load_session,
restore_session,
save_session,
)


@pytest.fixture(autouse=True)
def _isolated_state_dir(tmp_path, monkeypatch):
monkeypatch.setattr(session_store_mod, "STATE_DIR", tmp_path)


class TestSaveAndLoadSession:
def test_round_trip(self) -> None:
run_id = "20260503-120000-demo-agent"
history = [HumanMessage(content="hello"), AIMessage(content="hi")]
save_session(
run_id=run_id,
blueprint="demo-agent",
model="gpt-4",
started_at="2026-05-03T12:00:00+00:00",
original_argv=["dimos", "run", "demo-agent"],
history=history,
)
messages, metadata = load_session(run_id)
assert len(messages) == 2
assert messages[0].content == "hello"
assert messages[1].content == "hi"
assert metadata["run_id"] == run_id
assert metadata["blueprint"] == "demo-agent"


class TestFindLatestSession:
def test_returns_none_when_no_sessions(self) -> None:
assert find_latest_session("demo-agent") is None

def test_returns_latest_run_id(self) -> None:
for ts in ["20260503-100000", "20260503-120000"]:
save_session(
run_id=f"{ts}-demo-agent",
blueprint="demo-agent",
model="gpt-4",
started_at=None,
original_argv=[],
history=[HumanMessage(content="hi")],
)
assert find_latest_session("demo-agent") == "20260503-120000-demo-agent"

def test_returns_none_on_corrupt_file(self, tmp_path) -> None:
bp_dir = tmp_path / "sessions" / "demo-agent"
bp_dir.mkdir(parents=True)
(bp_dir / "20260503-120000-demo-agent.json").write_text("not json")
assert find_latest_session("demo-agent") is None


class TestRestoreSession:
def test_returns_empty_when_no_restore_flag(self) -> None:
history, parent = restore_session(
blueprint="demo-agent", restore_session_id=None, no_restore=True
)
assert history == []
assert parent is None

def test_restores_latest_session(self) -> None:
run_id = "20260503-120000-demo-agent"
save_session(
run_id=run_id,
blueprint="demo-agent",
model="gpt-4",
started_at=None,
original_argv=[],
history=[HumanMessage(content="hello")],
)
history, parent = restore_session(
blueprint="demo-agent", restore_session_id=None, no_restore=False
)
assert len(history) == 1
assert history[0].content == "hello"
assert parent == run_id

def test_returns_empty_on_corrupt_session(self, tmp_path) -> None:
bp_dir = tmp_path / "sessions" / "demo-agent"
bp_dir.mkdir(parents=True)
(bp_dir / "20260503-120000.json").write_text("not json")
history, parent = restore_session(
blueprint="demo-agent",
restore_session_id="20260503-120000-demo-agent",
no_restore=False,
)
assert history == []
assert parent is None
Comment on lines +104 to +114
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 test_returns_empty_on_corrupt_session exercises FileNotFoundError, not JSONDecodeError

restore_session calls load_session("20260503-120000-demo-agent"), which builds the path via _session_pathSTATE_DIR / "sessions" / "demo-agent" / "20260503-120000.json". The corrupt file the test places at "20260503-120000-demo-agent.json" is never read — load_session raises FileNotFoundError (missing file) rather than JSONDecodeError (corrupt content). The test passes but does not validate that restore_session handles malformed JSON gracefully; a narrowed except FileNotFoundError in restore_session would make this test pass while leaving JSON decode errors unhandled.

5 changes: 5 additions & 0 deletions dimos/robot/cli/dimos.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ def run(
# Workers inherit DIMOS_RUN_LOG_DIR env var via forkserver.
set_run_log_dir(log_dir)

started_at = datetime.now(timezone.utc).isoformat()
os.environ["DIMOS_RUN_ID"] = run_id
os.environ["DIMOS_BLUEPRINT"] = blueprint_name
os.environ["DIMOS_STARTED_AT"] = started_at

blueprint = autoconnect(*map(get_by_name_or_exit, robot_types))

if disable:
Expand Down