From 4c89e59d6d883d73773caba4cbed4009513e7c41 Mon Sep 17 00:00:00 2001 From: xiami762 Date: Thu, 4 Jun 2026 09:34:29 +0800 Subject: [PATCH 1/2] fix(session): recover orphaned running tools after server restart Mark persisted running tool parts as interrupted after unexpected server exits so session history no longer shows stale in-progress tools. Add startup, session-read, and session-loop recovery coverage with regression tests for idle and busy sessions. Co-authored-by: Cursor --- flocks/server/app.py | 14 +++ flocks/server/routes/session.py | 7 +- flocks/session/orphan_tools.py | 88 +++++++++++++ flocks/session/session_loop.py | 55 +-------- tests/server/routes/test_session_routes.py | 73 +++++++++++ tests/session/test_orphan_tools.py | 137 +++++++++++++++++++++ 6 files changed, 320 insertions(+), 54 deletions(-) create mode 100644 flocks/session/orphan_tools.py create mode 100644 tests/session/test_orphan_tools.py diff --git a/flocks/server/app.py b/flocks/server/app.py index 402a75a9a..2a90f30c0 100644 --- a/flocks/server/app.py +++ b/flocks/server/app.py @@ -182,6 +182,20 @@ async def lifespan(app: FastAPI): await _run_startup_phase(log, "storage.init", Storage.init) log.info("storage.initialized") + async def _recover_orphan_tool_parts() -> None: + from flocks.session.orphan_tools import abort_all_orphan_running_parts + + repaired = await abort_all_orphan_running_parts() + if repaired: + log.info("session.orphan_tools.recovered", {"count": repaired}) + + _schedule_startup_phase( + app, + log, + "session.recover_orphan_tools", + _recover_orphan_tool_parts, + ) + # Ensure default device room exists, then migrate legacy device API # configs from flocks.json → device_integrations table. try: diff --git a/flocks/server/routes/session.py b/flocks/server/routes/session.py index 23eac9bac..e53cde68d 100644 --- a/flocks/server/routes/session.py +++ b/flocks/server/routes/session.py @@ -1187,6 +1187,11 @@ async def get_session_messages( _require_session_read_access(session, current_user) try: + from flocks.session.orphan_tools import abort_orphan_running_parts + from flocks.session.core.status import SessionStatus + + if sessionID not in SessionStatus.get_busy_session_ids(): + await abort_orphan_running_parts(sessionID) messages_with_parts = await Message.list_with_parts(sessionID, include_archived=True) if limit: messages_with_parts = messages_with_parts[-limit:] @@ -3847,5 +3852,3 @@ async def clear_session(sessionID: str, http_request: Request): except Exception as e: log.error("session.clear.error", {"sessionID": sessionID, "error": str(e)}) raise HTTPException(status_code=500, detail=f"Failed to clear session: {str(e)}") - - diff --git a/flocks/session/orphan_tools.py b/flocks/session/orphan_tools.py new file mode 100644 index 000000000..5646faeba --- /dev/null +++ b/flocks/session/orphan_tools.py @@ -0,0 +1,88 @@ +"""Recovery helpers for tool calls left running by interrupted processes.""" + +import time +from typing import Iterable, Optional + +from flocks.session.message import Message, ToolPart, ToolStateError +from flocks.session.session import SessionInfo +from flocks.storage.storage import Storage +from flocks.utils.log import Log + + +log = Log.create(service="session.orphan_tools") + + +INTERRUPTED_TOOL_ERROR = "Interrupted by server restart" + + +async def abort_orphan_running_parts(session_id: str) -> int: + """Mark persisted running tool parts as interrupted errors.""" + messages = await Message.list(session_id) + now_ms = int(time.time() * 1000) + repaired = 0 + + for msg in messages: + parts = await Message.parts(msg.id, session_id) + for part in parts: + if not isinstance(part, ToolPart): + continue + state = part.state + if getattr(state, "status", None) != "running": + continue + + time_info = getattr(state, "time", {}) or {} + start_ms = time_info.get("start", now_ms) + + error_state = ToolStateError( + status="error", + input=getattr(state, "input", {}), + error=INTERRUPTED_TOOL_ERROR, + metadata=getattr(state, "metadata", None), + time={"start": start_ms, "end": now_ms}, + ) + part.state = error_state + await Message.store_part(session_id, msg.id, part) + repaired += 1 + + if repaired: + log.info("session.orphan_tools.aborted", { + "session_id": session_id, + "count": repaired, + }) + return repaired + + +async def abort_orphan_running_parts_for_sessions( + session_ids: Iterable[str], + *, + skip_busy: bool = False, +) -> int: + """Best-effort recovery for a known set of sessions.""" + total = 0 + for session_id in dict.fromkeys(session_ids): + try: + if skip_busy: + from flocks.session.core.status import SessionStatus + + if session_id in SessionStatus.get_busy_session_ids(): + continue + total += await abort_orphan_running_parts(session_id) + except Exception as exc: + log.warn("session.orphan_tools.session_failed", { + "session_id": session_id, + "error": str(exc), + }) + return total + + +async def abort_all_orphan_running_parts(*, limit: Optional[int] = None) -> int: + """Best-effort startup recovery for all persisted sessions.""" + entries = await Storage.list_entries(prefix="session:", model=SessionInfo) + session_ids = [ + session.id + for _, session in entries + if getattr(session, "status", None) != "deleted" + ] + if limit is not None: + session_ids = session_ids[:limit] + return await abort_orphan_running_parts_for_sessions(session_ids, skip_busy=True) diff --git a/flocks/session/session_loop.py b/flocks/session/session_loop.py index 6b2e1f455..98dc49182 100644 --- a/flocks/session/session_loop.py +++ b/flocks/session/session_loop.py @@ -345,7 +345,9 @@ async def run( # Mark orphaned running tool parts as error (e.g. from server restart). # Wrapped in try/except so cleanup failures never block the session loop. try: - await cls._abort_orphan_running_parts(session_id) + from flocks.session.orphan_tools import abort_orphan_running_parts + + await abort_orphan_running_parts(session_id) except Exception as exc: log.warn("loop.orphan_cleanup_failed", { "session_id": session_id, @@ -394,57 +396,6 @@ async def run( except Exception as exc: log.warn("loop.idle.event_error", {"error": str(exc)}) - @classmethod - async def _abort_orphan_running_parts(cls, session_id: str) -> None: - """Mark any tool parts stuck in 'running' status as error. - - When the server restarts while a synchronous tool (e.g. delegate_task) - is executing, the tool part stays 'running' in storage forever. On the - next session loop start we know nothing is actually executing yet, so - any 'running' parts are orphans. - """ - import time as _time - from flocks.session.message import ( - ToolPart, ToolStateError, - ) - - messages = await Message.list(session_id) - now_ms = int(_time.time() * 1000) - repaired = 0 - - for msg in messages: - parts = await Message.parts(msg.id, session_id) - for part in parts: - if not isinstance(part, ToolPart): - continue - state = part.state - if getattr(state, "status", None) != "running": - continue - - time_info = getattr(state, "time", {}) or {} - start_ms = time_info.get("start", now_ms) - - error_state = ToolStateError( - status="error", - input=getattr(state, "input", {}), - error="Interrupted by server restart", - time={"start": start_ms, "end": now_ms}, - ) - # Preserve metadata (e.g. sessionId) so the card still works - meta = getattr(state, "metadata", None) - if meta: - error_state.metadata = meta - - part.state = error_state - await Message.store_part(session_id, msg.id, part) - repaired += 1 - - if repaired: - log.info("loop.orphan_parts_aborted", { - "session_id": session_id, - "count": repaired, - }) - @staticmethod async def _resolve_model( session: Any, diff --git a/tests/server/routes/test_session_routes.py b/tests/server/routes/test_session_routes.py index e649d7351..73e893665 100644 --- a/tests/server/routes/test_session_routes.py +++ b/tests/server/routes/test_session_routes.py @@ -20,6 +20,15 @@ from fastapi import HTTPException, status from httpx import AsyncClient from flocks.auth.context import AuthUser +from flocks.session.core.status import SessionStatus, SessionStatusBusy +from flocks.session.message import ( + Message, + MessageRole, + ToolPart, + ToolStateError, + ToolStateRunning, +) +from flocks.session.orphan_tools import INTERRUPTED_TOOL_ERROR from flocks.session.session import Session # =========================================================================== @@ -178,6 +187,70 @@ async def test_send_message_noReply(self, client: AsyncClient, session_id: str): for m in messages ) + @pytest.mark.asyncio + async def test_list_messages_keeps_running_tool_when_session_busy( + self, + client: AsyncClient, + session_id: str, + ): + msg = await Message.create(session_id, MessageRole.ASSISTANT, "") + part = ToolPart( + id="part_busy_running", + sessionID=session_id, + messageID=msg.id, + callID="call_busy_running", + tool="bash", + state=ToolStateRunning( + input={"cmd": "sleep 60"}, + time={"start": 1000}, + ), + ) + await Message.store_part(session_id, msg.id, part) + + SessionStatus.set(session_id, SessionStatusBusy()) + try: + resp = await client.get(f"/api/session/{session_id}/message") + finally: + SessionStatus.clear(session_id) + + assert resp.status_code == status.HTTP_200_OK + parts = await Message.parts(msg.id, session_id) + running_part = next(p for p in parts if p.id == "part_busy_running") + assert running_part.state.status == "running" + + @pytest.mark.asyncio + async def test_list_messages_recovers_orphan_running_tool_when_session_idle( + self, + client: AsyncClient, + session_id: str, + ): + msg = await Message.create(session_id, MessageRole.ASSISTANT, "") + part = ToolPart( + id="part_idle_running", + sessionID=session_id, + messageID=msg.id, + callID="call_idle_running", + tool="bash", + state=ToolStateRunning( + input={"cmd": "sleep 60"}, + metadata={"sessionId": "ses_child"}, + time={"start": 1000}, + ), + ) + await Message.store_part(session_id, msg.id, part) + + resp = await client.get(f"/api/session/{session_id}/message") + + assert resp.status_code == status.HTTP_200_OK + parts = await Message.parts(msg.id, session_id) + repaired_part = next(p for p in parts if p.id == "part_idle_running") + assert isinstance(repaired_part.state, ToolStateError) + assert repaired_part.state.status == "error" + assert repaired_part.state.error == INTERRUPTED_TOOL_ERROR + assert repaired_part.state.metadata == {"sessionId": "ses_child"} + assert repaired_part.state.time["start"] == 1000 + assert repaired_part.state.time["end"] >= 1000 + # =========================================================================== # Delete permissions (single-admin model) diff --git a/tests/session/test_orphan_tools.py b/tests/session/test_orphan_tools.py new file mode 100644 index 000000000..e0b1471ef --- /dev/null +++ b/tests/session/test_orphan_tools.py @@ -0,0 +1,137 @@ +import pytest + +from flocks.session.message import ( + Message, + MessageRole, + ToolPart, + ToolStateCompleted, + ToolStateError, + ToolStateRunning, +) +from flocks.session.orphan_tools import ( + INTERRUPTED_TOOL_ERROR, + abort_all_orphan_running_parts, + abort_orphan_running_parts, +) +from flocks.session.core.status import SessionStatus, SessionStatusBusy +from flocks.session.session import Session + + +@pytest.mark.asyncio +async def test_abort_orphan_running_parts_marks_running_tools_error(): + session = await Session.create(project_id="proj_orphan", directory="/tmp") + msg = await Message.create(session.id, MessageRole.ASSISTANT, "") + part = ToolPart( + id="part_orphan_running", + sessionID=session.id, + messageID=msg.id, + callID="call_orphan_running", + tool="delegate_task", + state=ToolStateRunning( + input={"prompt": "keep going"}, + title="delegate_task", + metadata={"sessionId": "ses_child"}, + time={"start": 1234}, + ), + ) + + await Message.store_part(session.id, msg.id, part) + + repaired = await abort_orphan_running_parts(session.id) + parts = await Message.parts(msg.id, session.id) + repaired_part = next(p for p in parts if p.id == "part_orphan_running") + + assert repaired == 1 + assert isinstance(repaired_part.state, ToolStateError) + assert repaired_part.state.status == "error" + assert repaired_part.state.error == INTERRUPTED_TOOL_ERROR + assert repaired_part.state.input == {"prompt": "keep going"} + assert repaired_part.state.metadata == {"sessionId": "ses_child"} + assert repaired_part.state.time["start"] == 1234 + assert repaired_part.state.time["end"] >= 1234 + + +@pytest.mark.asyncio +async def test_abort_orphan_running_parts_leaves_terminal_tools_unchanged(): + session = await Session.create(project_id="proj_orphan_terminal", directory="/tmp") + msg = await Message.create(session.id, MessageRole.ASSISTANT, "") + completed = ToolPart( + id="part_orphan_completed", + sessionID=session.id, + messageID=msg.id, + callID="call_orphan_completed", + tool="bash", + state=ToolStateCompleted( + input={"cmd": "pwd"}, + output="/tmp", + title="bash", + metadata={}, + time={"start": 1000, "end": 2000}, + ), + ) + + await Message.store_part(session.id, msg.id, completed) + + repaired = await abort_orphan_running_parts(session.id) + parts = await Message.parts(msg.id, session.id) + completed_part = next(p for p in parts if p.id == "part_orphan_completed") + + assert repaired == 0 + assert completed_part.state.status == "completed" + assert completed_part.state.time == {"start": 1000, "end": 2000} + + +@pytest.mark.asyncio +async def test_abort_all_orphan_running_parts_scans_persisted_sessions(): + session = await Session.create(project_id="proj_orphan_all", directory="/tmp") + msg = await Message.create(session.id, MessageRole.ASSISTANT, "") + part = ToolPart( + id="part_orphan_all", + sessionID=session.id, + messageID=msg.id, + callID="call_orphan_all", + tool="bash", + state=ToolStateRunning( + input={"cmd": "sleep 60"}, + time={"start": 5000}, + ), + ) + + await Message.store_part(session.id, msg.id, part) + + repaired = await abort_all_orphan_running_parts() + parts = await Message.parts(msg.id, session.id) + repaired_part = next(p for p in parts if p.id == "part_orphan_all") + + assert repaired == 1 + assert repaired_part.state.status == "error" + assert repaired_part.state.error == INTERRUPTED_TOOL_ERROR + + +@pytest.mark.asyncio +async def test_abort_all_orphan_running_parts_skips_busy_sessions(): + session = await Session.create(project_id="proj_orphan_busy", directory="/tmp") + msg = await Message.create(session.id, MessageRole.ASSISTANT, "") + part = ToolPart( + id="part_orphan_busy", + sessionID=session.id, + messageID=msg.id, + callID="call_orphan_busy", + tool="bash", + state=ToolStateRunning( + input={"cmd": "sleep 60"}, + time={"start": 5000}, + ), + ) + + await Message.store_part(session.id, msg.id, part) + SessionStatus.set(session.id, SessionStatusBusy()) + try: + repaired = await abort_all_orphan_running_parts() + finally: + SessionStatus.clear(session.id) + parts = await Message.parts(msg.id, session.id) + running_part = next(p for p in parts if p.id == "part_orphan_busy") + + assert repaired == 0 + assert running_part.state.status == "running" From 37e98e977bdf0baf5c81c273ff3b48bb17550125 Mon Sep 17 00:00:00 2001 From: xiami762 <> Date: Fri, 5 Jun 2026 10:25:12 +0800 Subject: [PATCH 2/2] fix(session): avoid duplicate orphan tool scans in message listing Reuse preloaded message parts during orphan tool recovery so GET /message no longer performs a redundant scan before returning results. Co-authored-by: Cursor --- flocks/server/routes/session.py | 6 +-- flocks/session/orphan_tools.py | 50 ++++++++++++++-------- tests/server/routes/test_session_routes.py | 20 +++++++++ tests/session/test_orphan_tools.py | 49 +++++++++++++++++++++ 4 files changed, 103 insertions(+), 22 deletions(-) diff --git a/flocks/server/routes/session.py b/flocks/server/routes/session.py index e53cde68d..12a8fd0b6 100644 --- a/flocks/server/routes/session.py +++ b/flocks/server/routes/session.py @@ -1187,12 +1187,12 @@ async def get_session_messages( _require_session_read_access(session, current_user) try: - from flocks.session.orphan_tools import abort_orphan_running_parts + from flocks.session.orphan_tools import abort_orphan_running_parts_in_messages from flocks.session.core.status import SessionStatus - if sessionID not in SessionStatus.get_busy_session_ids(): - await abort_orphan_running_parts(sessionID) messages_with_parts = await Message.list_with_parts(sessionID, include_archived=True) + if sessionID not in SessionStatus.get_busy_session_ids(): + await abort_orphan_running_parts_in_messages(sessionID, messages_with_parts) if limit: messages_with_parts = messages_with_parts[-limit:] diff --git a/flocks/session/orphan_tools.py b/flocks/session/orphan_tools.py index 5646faeba..dd97c724b 100644 --- a/flocks/session/orphan_tools.py +++ b/flocks/session/orphan_tools.py @@ -3,7 +3,7 @@ import time from typing import Iterable, Optional -from flocks.session.message import Message, ToolPart, ToolStateError +from flocks.session.message import Message, MessageWithParts, ToolPart, ToolStateError from flocks.session.session import SessionInfo from flocks.storage.storage import Storage from flocks.utils.log import Log @@ -15,33 +15,39 @@ INTERRUPTED_TOOL_ERROR = "Interrupted by server restart" -async def abort_orphan_running_parts(session_id: str) -> int: - """Mark persisted running tool parts as interrupted errors.""" - messages = await Message.list(session_id) +def _build_interrupted_error_state(state: object, now_ms: int) -> ToolStateError: + """Create the terminal error state used for recovered orphaned tools.""" + time_info = getattr(state, "time", {}) or {} + start_ms = time_info.get("start", now_ms) + + return ToolStateError( + status="error", + input=getattr(state, "input", {}), + error=INTERRUPTED_TOOL_ERROR, + metadata=getattr(state, "metadata", None), + time={"start": start_ms, "end": now_ms}, + ) + + +async def abort_orphan_running_parts_in_messages( + session_id: str, + messages_with_parts: Iterable[MessageWithParts], +) -> int: + """Mark running tool parts as interrupted using preloaded message parts.""" now_ms = int(time.time() * 1000) repaired = 0 - for msg in messages: - parts = await Message.parts(msg.id, session_id) - for part in parts: + for msg_with_parts in messages_with_parts: + message_id = msg_with_parts.info.id + for part in msg_with_parts.parts: if not isinstance(part, ToolPart): continue state = part.state if getattr(state, "status", None) != "running": continue - time_info = getattr(state, "time", {}) or {} - start_ms = time_info.get("start", now_ms) - - error_state = ToolStateError( - status="error", - input=getattr(state, "input", {}), - error=INTERRUPTED_TOOL_ERROR, - metadata=getattr(state, "metadata", None), - time={"start": start_ms, "end": now_ms}, - ) - part.state = error_state - await Message.store_part(session_id, msg.id, part) + part.state = _build_interrupted_error_state(state, now_ms) + await Message.store_part(session_id, message_id, part) repaired += 1 if repaired: @@ -52,6 +58,12 @@ async def abort_orphan_running_parts(session_id: str) -> int: return repaired +async def abort_orphan_running_parts(session_id: str) -> int: + """Mark persisted running tool parts as interrupted errors.""" + messages_with_parts = await Message.list_with_parts(session_id) + return await abort_orphan_running_parts_in_messages(session_id, messages_with_parts) + + async def abort_orphan_running_parts_for_sessions( session_ids: Iterable[str], *, diff --git a/tests/server/routes/test_session_routes.py b/tests/server/routes/test_session_routes.py index 73e893665..39ae2765b 100644 --- a/tests/server/routes/test_session_routes.py +++ b/tests/server/routes/test_session_routes.py @@ -251,6 +251,26 @@ async def test_list_messages_recovers_orphan_running_tool_when_session_idle( assert repaired_part.state.time["start"] == 1000 assert repaired_part.state.time["end"] >= 1000 + @pytest.mark.asyncio + async def test_list_messages_uses_preloaded_orphan_recovery_path( + self, + client: AsyncClient, + session_id: str, + monkeypatch: pytest.MonkeyPatch, + ): + from flocks.session import orphan_tools + + preloaded_recovery = AsyncMock(return_value=0) + legacy_recovery = AsyncMock(side_effect=AssertionError("legacy recovery should not be called")) + monkeypatch.setattr(orphan_tools, "abort_orphan_running_parts_in_messages", preloaded_recovery) + monkeypatch.setattr(orphan_tools, "abort_orphan_running_parts", legacy_recovery) + + resp = await client.get(f"/api/session/{session_id}/message") + + assert resp.status_code == status.HTTP_200_OK + preloaded_recovery.assert_awaited_once() + legacy_recovery.assert_not_called() + # =========================================================================== # Delete permissions (single-admin model) diff --git a/tests/session/test_orphan_tools.py b/tests/session/test_orphan_tools.py index e0b1471ef..bee3a84a0 100644 --- a/tests/session/test_orphan_tools.py +++ b/tests/session/test_orphan_tools.py @@ -12,6 +12,7 @@ INTERRUPTED_TOOL_ERROR, abort_all_orphan_running_parts, abort_orphan_running_parts, + abort_orphan_running_parts_in_messages, ) from flocks.session.core.status import SessionStatus, SessionStatusBusy from flocks.session.session import Session @@ -81,6 +82,54 @@ async def test_abort_orphan_running_parts_leaves_terminal_tools_unchanged(): assert completed_part.state.time == {"start": 1000, "end": 2000} +@pytest.mark.asyncio +async def test_abort_orphan_running_parts_in_messages_reuses_loaded_parts(): + session = await Session.create(project_id="proj_orphan_loaded", directory="/tmp") + msg = await Message.create(session.id, MessageRole.ASSISTANT, "") + running = ToolPart( + id="part_orphan_loaded_running", + sessionID=session.id, + messageID=msg.id, + callID="call_orphan_loaded_running", + tool="bash", + state=ToolStateRunning( + input={"cmd": "sleep 60"}, + metadata={"sessionId": "ses_child"}, + time={"start": 4321}, + ), + ) + completed = ToolPart( + id="part_orphan_loaded_completed", + sessionID=session.id, + messageID=msg.id, + callID="call_orphan_loaded_completed", + tool="bash", + state=ToolStateCompleted( + input={"cmd": "pwd"}, + output="/tmp", + title="bash", + metadata={}, + time={"start": 1000, "end": 2000}, + ), + ) + + await Message.store_part(session.id, msg.id, running) + await Message.store_part(session.id, msg.id, completed) + + messages = await Message.list_with_parts(session.id) + repaired = await abort_orphan_running_parts_in_messages(session.id, messages) + + assert repaired == 1 + repaired_running = next(p for p in messages[0].parts if p.id == "part_orphan_loaded_running") + untouched_completed = next(p for p in messages[0].parts if p.id == "part_orphan_loaded_completed") + assert isinstance(repaired_running.state, ToolStateError) + assert repaired_running.state.error == INTERRUPTED_TOOL_ERROR + assert repaired_running.state.metadata == {"sessionId": "ses_child"} + assert repaired_running.state.time["start"] == 4321 + assert repaired_running.state.time["end"] >= 4321 + assert untouched_completed.state.status == "completed" + + @pytest.mark.asyncio async def test_abort_all_orphan_running_parts_scans_persisted_sessions(): session = await Session.create(project_id="proj_orphan_all", directory="/tmp")