Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions nerve/agent/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ def __init__(self, config: NerveConfig, db: Database):
self._router = None # ChannelRouter — lazy-initialized via .router property
self._mcp_servers_cache = list(config.mcp_servers) # hot-reloadable
self._claude_code_plugins: list[dict[str, str]] = [] # plugin dirs
# Background task watcher tracking — one watcher per session max.
self._background_watchers: dict[str, asyncio.Task] = {}

async def initialize(self) -> None:
"""Initialize the agent engine — set up tools and main session."""
Expand Down Expand Up @@ -353,6 +355,12 @@ async def shutdown(self) -> None:
No memorization here — the periodic sweep handles that.
Sessions are marked idle so they can be resumed on next startup.
"""
# Cancel all background task watchers first
for watcher in self._background_watchers.values():
if not watcher.done():
watcher.cancel()
self._background_watchers.clear()

for sid, client in list(self.sessions._clients.items()):
try:
await self._safe_disconnect(client)
Expand Down Expand Up @@ -879,6 +887,10 @@ def register_task(self, session_id: str, task: asyncio.Task) -> None:

async def stop_session(self, session_id: str) -> bool:
"""Stop a running session."""
# Cancel the background task watcher if one is running
watcher = self._background_watchers.pop(session_id, None)
if watcher and not watcher.done():
watcher.cancel()
# Cancel any pending interactive tool prompts so the handler unblocks
handler = get_handler(session_id)
if handler:
Expand Down Expand Up @@ -1490,11 +1502,16 @@ async def _image_prompt():
for t in bg_tasks
],
})
asyncio.create_task(
# Cancel any existing watcher for this session before spawning
old_watcher = self._background_watchers.pop(session_id, None)
if old_watcher and not old_watcher.done():
old_watcher.cancel()
watcher = asyncio.create_task(
self._watch_background_tasks(
session_id, bg_tasks, source, channel,
)
)
self._background_watchers[session_id] = watcher

return full_response_text

Expand Down Expand Up @@ -1523,6 +1540,10 @@ async def _watch_background_tasks(
await asyncio.sleep(poll_interval)
elapsed += poll_interval

# Keep session alive — prevent idle sweep from killing the
# client while we wait for background tasks to finish.
self.sessions.touch(session_id)

all_done = True
newly_completed = False
for task in bg_tasks:
Expand Down Expand Up @@ -1602,10 +1623,16 @@ async def _watch_background_tasks(
)
return

# Tell the frontend to enter streaming mode BEFORE the run starts.
# This ensures the thinking cursor is visible and input is disabled
# so the user can't type during the auto-resume.
await broadcaster.broadcast_auto_resume_start(session_id)

# Trigger a new engine.run() so the model picks up the
# background task notifications from the SDK
task = asyncio.create_task(
self.run(
# background task results. We await instead of create_task
# so errors propagate and the lifecycle is controlled.
try:
await self.run(
session_id=session_id,
user_message=(
"[Background tasks completed. "
Expand All @@ -1615,14 +1642,25 @@ async def _watch_background_tasks(
channel=channel,
internal=True,
)
)
self.register_task(session_id, task)
except Exception as run_err:
logger.error(
"Auto-resume failed for session %s: %s",
session_id, run_err,
)

except asyncio.CancelledError:
logger.info(
"Background task watcher cancelled for session %s",
session_id,
)
except Exception as e:
logger.error(
"Background task watcher failed for session %s: %s",
session_id, e,
)
finally:
# Clean up watcher reference
self._background_watchers.pop(session_id, None)

# ------------------------------------------------------------------ #
# Cron / Hook runs #
Expand Down
12 changes: 12 additions & 0 deletions nerve/agent/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,18 @@ async def broadcast_file_changed(
"tool_use_id": tool_use_id,
})

async def broadcast_auto_resume_start(self, session_id: str) -> None:
"""Notify the frontend that a background-task auto-resume is starting.

Broadcast on the *session* channel (not __global__) so it reaches
the connected WebSocket listener and is buffered for replay.
The frontend uses this to enter streaming mode before tokens arrive.
"""
await self.broadcast(session_id, {
"type": "auto_resume_start",
"session_id": session_id,
})


# Global broadcaster instance
broadcaster = StreamBroadcaster()
1 change: 1 addition & 0 deletions web/src/api/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export type WSMessage =
| { type: 'notification_answered'; notification_id: string; session_id: string; answer: string; answered_by: string }
| { type: 'answer_injected'; session_id: string; notification_id: string; title: string; answer: string; answered_by: string; content: string }
| { type: 'session_running'; session_id: string; is_running: boolean }
| { type: 'auto_resume_start'; session_id: string }
| { type: 'background_tasks_update'; session_id: string; tasks: { task_id: string; label: string; tool: string; status: 'running' | 'done' | 'timeout' }[] }
| { type: 'hoa_progress'; session_id: string; event: Record<string, unknown> }
| { type: 'pong' };
Expand Down
27 changes: 21 additions & 6 deletions web/src/stores/chatStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -911,12 +911,12 @@ export const useChatStore = create<ChatState>((set, get) => ({
: sess,
) ?? null,
};
// Active session started running from a background trigger (e.g.,
// background task completion, answer injection) — enter streaming mode
// so the response is visible and input is disabled.
// Guard: sendMessage() already sets isStreaming before the WS message,
// so this only fires for server-initiated runs.
if (runMsg.session_id === s.activeSession && runMsg.is_running && !s.isStreaming) {
// Active session started running — enter streaming mode so the
// response is visible and input is disabled. No isStreaming guard:
// sendMessage() sets identical values (idempotent) and removing the
// guard ensures server-initiated runs (auto-resume, answer injection)
// always transition the UI into streaming mode reliably.
if (runMsg.session_id === s.activeSession && runMsg.is_running) {
updates.isStreaming = true;
updates.streamingBlocks = [];
updates.agentStatus = { state: 'thinking' };
Expand Down Expand Up @@ -1103,6 +1103,21 @@ export const useChatStore = create<ChatState>((set, get) => ({
break;
}

case 'auto_resume_start': {
// Background tasks completed — the backend is about to auto-resume.
// Enter streaming mode immediately so the thinking cursor is visible
// and chat input is disabled (prevents user from typing mid-resume).
const arMsg = msg as Extract<WSMessage, { type: 'auto_resume_start' }>;
if (arMsg.session_id === state.activeSession) {
set({
isStreaming: true,
streamingBlocks: [],
agentStatus: { state: 'thinking' },
});
}
break;
}

case 'background_tasks_update': {
const bt = msg as Extract<WSMessage, { type: 'background_tasks_update' }>;
if (bt.session_id === state.activeSession) {
Expand Down