diff --git a/docs/issues/agent-loop-input-exec-responsiveness/plan.md b/docs/issues/agent-loop-input-exec-responsiveness/plan.md new file mode 100644 index 000000000..d3f9dfe05 --- /dev/null +++ b/docs/issues/agent-loop-input-exec-responsiveness/plan.md @@ -0,0 +1,39 @@ +# Agent Loop Input And Exec Responsiveness Plan + +## Runtime Input Flow + +- Keep `chat.steerActiveTurn` as the active-turn entry point. +- Remove hidden steer injection from provider request construction. +- Store active steer input as a priority pending row while the current loop turn continues, so steer + never aborts the in-flight provider request. +- At the process loop boundary after tool calls have returned, yield before continuing to the next + provider request when a pending steer exists; the outer runtime then drains steer through + `processMessage()` as a normal user message. +- Drain pending steer rows before pending queue rows by claiming the row and passing its payload to + `processMessage()` with visible user-message persistence. +- Keep steer rows locked and non-editable, but show not-yet-entered steer rows in the pending input + rail. + +## Exec Isolation + +- Keep the existing background exec core manager as the utility host implementation. +- Replace the exported singleton with a main-process RPC proxy that starts an Electron + `utilityProcess` from the existing main bundle using a dedicated host flag. +- Route `start`, `waitForCompletionOrYield`, `poll`, `log`, `write`, `kill`, `clear`, `remove`, + `cleanupConversation`, and `shutdown` through JSON-serializable messages. +- Track started sessions in the proxy so an unexpected utility exit can return diagnostic error + snapshots for affected sessions. + +## Compatibility + +- `PendingSessionInputMode` remains `queue | steer`. +- Existing `sessions.convertPendingInputToSteer` route remains available for stored and older UI + flows. +- `AgentBashHandler` keeps its current public return shape for completed and yielded commands. + +## Validation + +- Update agent runtime/session integration tests for visible steer turns. +- Update pending input rail tests to assert pending steer rows render as locked items. +- Preserve existing background exec core tests and add coverage around the utility proxy behavior + where practical. diff --git a/docs/issues/agent-loop-input-exec-responsiveness/spec.md b/docs/issues/agent-loop-input-exec-responsiveness/spec.md new file mode 100644 index 000000000..f06e27401 --- /dev/null +++ b/docs/issues/agent-loop-input-exec-responsiveness/spec.md @@ -0,0 +1,30 @@ +# Agent Loop Input And Exec Responsiveness + +## User Stories + +- As a user steering an active agent turn, I want my steering input to appear as a normal user + message so the conversation transcript matches what the agent saw. +- As a user running long shell commands, I want `exec` to yield quickly and keep DeepChat's main + process responsive while the command continues in a managed background session. + +## Acceptance Criteria + +- Active steer does not interrupt the current provider request; it records a priority steer input, + lets the current loop iteration finish including tool results, then yields before the next + provider loop so the steer payload is inserted as a normal visible user turn. +- Pending rows with `mode: "steer"` remain readable for compatibility, but drain before ordinary + queued rows as visible user turns instead of hidden request injections. +- Pending input UI shows not-yet-entered steer rows in the waiting lane as locked items, and keeps + ordinary queued follow-ups editable. +- Foreground `exec` returns a normal result if it finishes inside `yieldMs`; otherwise it returns a + running `sessionId`. +- Shell process spawning, output decoding, output offload, timeout, and process-tree termination + run in an Electron utility process rather than the main event loop. +- If the utility process exits unexpectedly, affected sessions surface an error snapshot instead of + blocking the main process. + +## Non-Goals + +- Do not change the public `exec` tool schema or permission semantics. +- Do not add renderer settings for exec isolation. +- Do not refactor the full agent runtime or provider loop. diff --git a/docs/issues/agent-loop-input-exec-responsiveness/tasks.md b/docs/issues/agent-loop-input-exec-responsiveness/tasks.md new file mode 100644 index 000000000..4a6b199b9 --- /dev/null +++ b/docs/issues/agent-loop-input-exec-responsiveness/tasks.md @@ -0,0 +1,12 @@ +# Tasks + +- [x] Add SDD artifacts for the combined responsiveness issue. +- [x] Queue active steer until the current loop iteration finishes without aborting the stream. +- [x] Yield the agent loop after completed tool calls when a pending steer should enter next. +- [x] Convert pending steer drain into visible user turns. +- [x] Remove hidden steer request injection. +- [x] Show not-yet-entered steer rows in the renderer pending rail. +- [x] Add utility-process RPC host for background exec. +- [x] Replace the production background exec singleton with a proxy. +- [x] Update and run targeted tests. +- [x] Run repository formatting, i18n, lint, and typecheck checks. diff --git a/docs/issues/telegram-message-markdown-render/plan.md b/docs/issues/telegram-message-markdown-render/plan.md index 72b6a1c83..e46ecc17c 100644 --- a/docs/issues/telegram-message-markdown-render/plan.md +++ b/docs/issues/telegram-message-markdown-render/plan.md @@ -5,6 +5,7 @@ - Add `src/main/presenter/remoteControlPresenter/telegram/telegramMarkdown.ts` exposing `convertMarkdownToTelegramHtml(text: string): string`, mirroring the Feishu-side `feishuMarkdown.ts` module location and shape. - The converter: - Escapes `&`, `<`, `>` first to make raw text safe for `parse_mode: 'HTML'`. + - Converts common GFM pipe tables into fenced fixed-width text before code-block extraction. - Handles fenced code blocks (` ``` `) by emitting `
...
` and protecting the body from further Markdown processing. - Handles inline code (` `…` `), bold (`**`/`__`), italic (`*`/`_`), strikethrough (`~~`), links, headings (`#…######`), unordered/ordered lists, and blockquotes (`>`). - Auto-closes a dangling fenced block when called on a chunk that ends mid-block, so each chunk produces valid HTML for Telegram. @@ -12,8 +13,9 @@ - In `TelegramPoller`: - Convert chunk text via `convertMarkdownToTelegramHtml` before `sendMessage`/`editMessageText` calls in `syncDeliverySegment`, `sendChunkedMessage`, `dispatchOutboundActions`, and `editMessageText`. Pass `parseMode: 'HTML'`. - Apply conversion to the interaction prompt text as well so callback prompts render formatting consistently. + - Retry the original plain-text chunk when Telegram returns a 400 entity-parse error for converted HTML. ## Validation -- Run `pnpm test test/main/presenter/remoteControlPresenter/telegramClient.test.ts` (extended) and a new `telegramMarkdown.test.ts` covering core conversion rules and chunk-boundary behavior. +- Run `pnpm test test/main/presenter/remoteControlPresenter/telegramClient.test.ts` (extended) and a new `telegramMarkdown.test.ts` covering core conversion rules, table fallback, and chunk-boundary behavior. - Run `pnpm run typecheck:node` to confirm no signature break in callers (Poller, Adapter). diff --git a/docs/issues/telegram-message-markdown-render/spec.md b/docs/issues/telegram-message-markdown-render/spec.md index 6a3134103..eb840b7a0 100644 --- a/docs/issues/telegram-message-markdown-render/spec.md +++ b/docs/issues/telegram-message-markdown-render/spec.md @@ -8,8 +8,10 @@ When DeepChat's Telegram remote control bot delivers AI replies, command output, - `telegramClient.sendMessage` and `telegramClient.editMessageText` call the Telegram Bot API with `parse_mode: 'HTML'` when the outbound text contains formatted content. - AI answer / process delivery segments routed through `TelegramPoller.syncDeliverySegment` and outbound actions dispatched via `dispatchOutboundActions` go through a Markdown → Telegram-HTML converter that handles bold, italic, strikethrough, inline code, fenced code blocks, headings, links, ordered/unordered lists, blockquotes, and horizontal rules. +- Common GFM pipe tables render as fixed-width preformatted text because Telegram does not support native table entities. - Plain text (system replies, error messages, command echoes) is HTML-escaped and accepted by Telegram without parse-mode errors. - Chunked streaming (4096 char limit) keeps each chunk independently renderable — partial Markdown left at a chunk boundary (e.g. an unclosed code fence) renders as text or a safely balanced block instead of breaking the Telegram parse. +- If Telegram rejects converted HTML with an entity-parse error, DeepChat retries the same outbound chunk as plain text. - Existing Telegram client tests pass; a new test covers the converter and parse-mode wiring. ## Constraints diff --git a/docs/issues/telegram-message-markdown-render/tasks.md b/docs/issues/telegram-message-markdown-render/tasks.md index 93b7e6940..d06dd3076 100644 --- a/docs/issues/telegram-message-markdown-render/tasks.md +++ b/docs/issues/telegram-message-markdown-render/tasks.md @@ -2,8 +2,8 @@ - [x] Capture the reproduction from issue #1665 and confirm `sendMessage`/`editMessageText` ship raw Markdown without `parse_mode`. - [x] Draft SDD spec, plan, tasks documents. -- [ ] Implement `telegram/telegramMarkdown.ts` with `convertMarkdownToTelegramHtml`. -- [ ] Thread an optional `parseMode` through `TelegramClient.sendMessage`, `editMessageText`, and `sendPhoto`. -- [ ] Update `TelegramPoller` to apply the converter and pass `parse_mode: 'HTML'` on all generated text paths. -- [ ] Add focused tests for the converter and parse-mode wiring; keep existing telegram tests green. +- [x] Implement `telegram/telegramMarkdown.ts` with `convertMarkdownToTelegramHtml`. +- [x] Thread an optional `parseMode` through `TelegramClient.sendMessage`, `editMessageText`, and `sendPhoto`. +- [x] Update `TelegramPoller` to apply the converter and pass `parse_mode: 'HTML'` on all generated text paths. +- [x] Add focused tests for the converter, table fallback, parse-mode wiring, and plain-text retry. - [ ] Run `pnpm run format`, `pnpm run lint`, `pnpm run typecheck:node`, and the focused test suites. diff --git a/src/main/appMain.ts b/src/main/appMain.ts new file mode 100644 index 000000000..1db239f86 --- /dev/null +++ b/src/main/appMain.ts @@ -0,0 +1,185 @@ +import { app, dialog } from 'electron' +import { LifecycleManager, registerCoreHooks } from './presenter/lifecyclePresenter' +import { getInstance, Presenter } from './presenter' +import { electronApp } from '@electron-toolkit/utils' +import log from 'electron-log' +import { eventBus, SendTarget } from './eventbus' +import { NOTIFICATION_EVENTS } from './events' +import { registerWorkspacePreviewSchemes } from './presenter/workspacePresenter/workspacePreviewProtocol' +import { + findDeepLinkArg, + findStartupDeepLink, + isDeepLinkUrl, + storeStartupDeepLink +} from './lib/startupDeepLink' +import { isInsecureTlsAllowed } from './lib/insecureTls' + +registerWorkspacePreviewSchemes() + +// Handle unhandled exceptions to prevent app crash or error dialogs +process.on('uncaughtException', (error) => { + log.error('Uncaught Exception:', error) + + const msg = error.message || 'Unknown error' + const isNetworkError = [ + 'net::ERR', + 'ECONNRESET', + 'ETIMEDOUT', + 'ENOTFOUND', + 'Network Error', + 'fetch failed' + ].some((k) => msg.includes(k)) + + if (isNetworkError) { + // Send error to renderer to show a toast notification + // This is "elegant" and non-blocking + eventBus.sendToRenderer(NOTIFICATION_EVENTS.SHOW_ERROR, SendTarget.ALL_WINDOWS, { + id: Date.now().toString(), + title: 'Network Error', + message: msg, + type: 'error' + }) + } +}) + +process.on('unhandledRejection', (reason) => { + log.error('Unhandled Rejection:', reason) +}) + +// Set application command line arguments +app.commandLine.appendSwitch('autoplay-policy', 'no-user-gesture-required') // Allow video autoplay +app.commandLine.appendSwitch('webrtc-max-cpu-consumption-percentage', '100') // Set WebRTC max CPU usage +app.commandLine.appendSwitch('js-flags', '--max-old-space-size=4096') // Set V8 heap memory size +if (isInsecureTlsAllowed()) { + // This disables certificate validation app-wide, so keep it limited to local debugging. + app.commandLine.appendSwitch('ignore-certificate-errors') +} + +// Set platform-specific command line arguments +if (process.platform == 'win32') { + // Windows platform specific parameters (currently commented out) + // app.commandLine.appendSwitch('in-process-gpu') + // app.commandLine.appendSwitch('wm-window-animations-disabled') +} +if (process.platform === 'darwin') { + // macOS platform specific parameters + app.commandLine.appendSwitch('disable-features', 'DesktopCaptureMacV2,IOSurfaceCapturer') +} + +const gotSingleInstanceLock = app.requestSingleInstanceLock() +if (!gotSingleInstanceLock) { + console.log('Another DeepChat instance is already running. Exiting current process.') + app.quit() +} + +// Initialize presenter after ready +let presenter: Presenter | undefined + +console.log('Main process starting, checking for deeplink...') +console.log('Full command line arguments:', process.argv) +const startupDeepLink = findStartupDeepLink(process.argv, process.env) +if (startupDeepLink) { + console.log('Found startup deeplink during initialization:', startupDeepLink) + storeStartupDeepLink(startupDeepLink) +} else { + console.log('No startup deeplink detected during initialization') +} + +const focusExistingAppWindow = () => { + const targetWindow = presenter?.windowPresenter.getAllWindows()[0] + if (!targetWindow || targetWindow.isDestroyed()) { + return + } + + if (targetWindow.isMinimized()) { + targetWindow.restore() + } + targetWindow.show() + targetWindow.focus() +} + +const routeIncomingDeeplink = (url: string, source: string) => { + if (!isDeepLinkUrl(url)) { + return + } + + console.log(`${source}:`, url) + const normalizedUrl = storeStartupDeepLink(url) + if (!normalizedUrl) { + return + } + + if (presenter && app.isReady()) { + void presenter.deeplinkPresenter.handleDeepLink(normalizedUrl) + } +} + +// Listen for open-url events that might occur during startup +// This must be set before app.whenReady() because open-url events can fire before that +app.on('open-url', (event, url) => { + event.preventDefault() + routeIncomingDeeplink(url, 'Received open-url event') +}) + +// Also listen for second-instance events (Windows/Linux) +if (gotSingleInstanceLock) { + app.on('second-instance', (_event, commandLine) => { + console.log('Received second-instance event with command line:', commandLine) + focusExistingAppWindow() + + const deepLinkUrl = findDeepLinkArg(commandLine) + if (deepLinkUrl) { + routeIncomingDeeplink(deepLinkUrl, 'Received second-instance deeplink') + } + }) +} + +// Initialize lifecycle manager and register core hooks +const lifecycleManager = new LifecycleManager() +registerCoreHooks(lifecycleManager) + +function clearPresenterPermissionCaches(activePresenter?: Presenter): void { + if (!activePresenter) return + + activePresenter.commandPermissionService.clearAll() + activePresenter.filePermissionService.clearAll() + activePresenter.settingsPermissionService.clearAll() +} + +// Start the lifecycle management system instead of using app.whenReady() +app.whenReady().then(async () => { + // Set app user model id for windows + electronApp.setAppUserModelId('com.wefonk.deepchat') + try { + console.log('main: Application lifecycle startup') + await lifecycleManager.start() + presenter = getInstance(lifecycleManager) + console.log('main: Application lifecycle startup completed successfully') + } catch (error) { + console.error('main: Application lifecycle startup failed:', error) + dialog.showErrorBox( + 'Application startup failed', + error instanceof Error ? error.message : String(error) + ) + app.quit() // Serious error, exit the program + } +}) + +app.on('before-quit', () => { + clearPresenterPermissionCaches(presenter) +}) + +// Handle window-all-closed event +app.on('window-all-closed', () => { + clearPresenterPermissionCaches(presenter) + if (!presenter) return + + // Check if there are any non-floating-button windows + const mainWindows = presenter.windowPresenter.getAllWindows() + + if (mainWindows.length === 0) { + // When only floating button windows exist, quit app on non-macOS platforms + console.log('main: All main windows closed, requesting shutdown') + app.quit() // Keep this event to avoid unexpected situations + } +}) diff --git a/src/main/index.ts b/src/main/index.ts index 77c0a4fe3..e45139758 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -1,181 +1,5 @@ -import { app, dialog } from 'electron' -import { LifecycleManager, registerCoreHooks } from './presenter/lifecyclePresenter' -import { getInstance, Presenter } from './presenter' -import { electronApp } from '@electron-toolkit/utils' -import log from 'electron-log' -import { eventBus, SendTarget } from './eventbus' -import { NOTIFICATION_EVENTS } from './events' -import { registerWorkspacePreviewSchemes } from './presenter/workspacePresenter/workspacePreviewProtocol' -import { - findDeepLinkArg, - findStartupDeepLink, - isDeepLinkUrl, - storeStartupDeepLink -} from './lib/startupDeepLink' +import { runBackgroundExecUtilityHostIfRequested } from './lib/agentRuntime/backgroundExecUtilityHost' -registerWorkspacePreviewSchemes() - -// Handle unhandled exceptions to prevent app crash or error dialogs -process.on('uncaughtException', (error) => { - log.error('Uncaught Exception:', error) - - const msg = error.message || 'Unknown error' - const isNetworkError = [ - 'net::ERR', - 'ECONNRESET', - 'ETIMEDOUT', - 'ENOTFOUND', - 'Network Error', - 'fetch failed' - ].some((k) => msg.includes(k)) - - if (isNetworkError) { - // Send error to renderer to show a toast notification - // This is "elegant" and non-blocking - eventBus.sendToRenderer(NOTIFICATION_EVENTS.SHOW_ERROR, SendTarget.ALL_WINDOWS, { - id: Date.now().toString(), - title: 'Network Error', - message: msg, - type: 'error' - }) - } -}) - -process.on('unhandledRejection', (reason) => { - log.error('Unhandled Rejection:', reason) -}) - -// Set application command line arguments -app.commandLine.appendSwitch('autoplay-policy', 'no-user-gesture-required') // Allow video autoplay -app.commandLine.appendSwitch('webrtc-max-cpu-consumption-percentage', '100') // Set WebRTC max CPU usage -app.commandLine.appendSwitch('js-flags', '--max-old-space-size=4096') // Set V8 heap memory size -app.commandLine.appendSwitch('ignore-certificate-errors') // Ignore certificate errors (for dev or specific scenarios) - -// Set platform-specific command line arguments -if (process.platform == 'win32') { - // Windows platform specific parameters (currently commented out) - // app.commandLine.appendSwitch('in-process-gpu') - // app.commandLine.appendSwitch('wm-window-animations-disabled') -} -if (process.platform === 'darwin') { - // macOS platform specific parameters - app.commandLine.appendSwitch('disable-features', 'DesktopCaptureMacV2,IOSurfaceCapturer') -} - -const gotSingleInstanceLock = app.requestSingleInstanceLock() -if (!gotSingleInstanceLock) { - console.log('Another DeepChat instance is already running. Exiting current process.') - app.quit() -} - -// Initialize presenter after ready -let presenter: Presenter | undefined - -console.log('Main process starting, checking for deeplink...') -console.log('Full command line arguments:', process.argv) -const startupDeepLink = findStartupDeepLink(process.argv, process.env) -if (startupDeepLink) { - console.log('Found startup deeplink during initialization:', startupDeepLink) - storeStartupDeepLink(startupDeepLink) -} else { - console.log('No startup deeplink detected during initialization') -} - -const focusExistingAppWindow = () => { - const targetWindow = presenter?.windowPresenter.getAllWindows()[0] - if (!targetWindow || targetWindow.isDestroyed()) { - return - } - - if (targetWindow.isMinimized()) { - targetWindow.restore() - } - targetWindow.show() - targetWindow.focus() -} - -const routeIncomingDeeplink = (url: string, source: string) => { - if (!isDeepLinkUrl(url)) { - return - } - - console.log(`${source}:`, url) - const normalizedUrl = storeStartupDeepLink(url) - if (!normalizedUrl) { - return - } - - if (presenter && app.isReady()) { - void presenter.deeplinkPresenter.handleDeepLink(normalizedUrl) - } -} - -// Listen for open-url events that might occur during startup -// This must be set before app.whenReady() because open-url events can fire before that -app.on('open-url', (event, url) => { - event.preventDefault() - routeIncomingDeeplink(url, 'Received open-url event') -}) - -// Also listen for second-instance events (Windows/Linux) -if (gotSingleInstanceLock) { - app.on('second-instance', (_event, commandLine) => { - console.log('Received second-instance event with command line:', commandLine) - focusExistingAppWindow() - - const deepLinkUrl = findDeepLinkArg(commandLine) - if (deepLinkUrl) { - routeIncomingDeeplink(deepLinkUrl, 'Received second-instance deeplink') - } - }) -} - -// Initialize lifecycle manager and register core hooks -const lifecycleManager = new LifecycleManager() -registerCoreHooks(lifecycleManager) - -function clearPresenterPermissionCaches(activePresenter?: Presenter): void { - if (!activePresenter) return - - activePresenter.commandPermissionService.clearAll() - activePresenter.filePermissionService.clearAll() - activePresenter.settingsPermissionService.clearAll() +if (!runBackgroundExecUtilityHostIfRequested()) { + void import('./appMain') } - -// Start the lifecycle management system instead of using app.whenReady() -app.whenReady().then(async () => { - // Set app user model id for windows - electronApp.setAppUserModelId('com.wefonk.deepchat') - try { - console.log('main: Application lifecycle startup') - await lifecycleManager.start() - presenter = getInstance(lifecycleManager) - console.log('main: Application lifecycle startup completed successfully') - } catch (error) { - console.error('main: Application lifecycle startup failed:', error) - dialog.showErrorBox( - 'Application startup failed', - error instanceof Error ? error.message : String(error) - ) - app.quit() // Serious error, exit the program - } -}) - -app.on('before-quit', () => { - clearPresenterPermissionCaches(presenter) -}) - -// Handle window-all-closed event -app.on('window-all-closed', () => { - clearPresenterPermissionCaches(presenter) - if (!presenter) return - - // Check if there are any non-floating-button windows - const mainWindows = presenter.windowPresenter.getAllWindows() - - if (mainWindows.length === 0) { - // When only floating button windows exist, quit app on non-macOS platforms - console.log('main: All main windows closed, requesting shutdown') - app.quit() // Keep this event to avoid unexpected situations - } -}) diff --git a/src/main/lib/agentRuntime/backgroundExecSessionManager.ts b/src/main/lib/agentRuntime/backgroundExecSessionManager.ts index 2e8ccafdb..2967f0d3d 100644 --- a/src/main/lib/agentRuntime/backgroundExecSessionManager.ts +++ b/src/main/lib/agentRuntime/backgroundExecSessionManager.ts @@ -1,6 +1,8 @@ import { spawn, type ChildProcess } from 'child_process' import fs from 'fs' import path from 'path' +import { fileURLToPath } from 'url' +import type { UtilityProcess } from 'electron' import { nanoid } from 'nanoid' import logger from '@shared/logger' import { getUserShell } from './shellEnvHelper' @@ -108,6 +110,52 @@ interface LogResult { timedOut?: boolean } +export type BackgroundExecRpcMethod = + | 'start' + | 'list' + | 'poll' + | 'log' + | 'waitForCompletionOrYield' + | 'getCompletionResult' + | 'write' + | 'kill' + | 'clear' + | 'remove' + | 'cleanupConversation' + | 'shutdown' + +export interface BackgroundExecRpcRequest { + type: 'background-exec:request' + id: string + method: BackgroundExecRpcMethod + args: unknown[] +} + +export type BackgroundExecRpcResponse = + | { + type: 'background-exec:response' + id: string + ok: true + data: unknown + } + | { + type: 'background-exec:response' + id: string + ok: false + error: { + message: string + stack?: string + } + } + +interface TrackedSessionMeta { + conversationId: string + sessionId: string + command: string + createdAt: number + lastAccessedAt: number +} + export class BackgroundExecSessionManager { private sessions = new Map>() private cleanupIntervalId?: NodeJS.Timeout @@ -893,4 +941,418 @@ export class BackgroundExecSessionManager { } } -export const backgroundExecSessionManager = new BackgroundExecSessionManager() +class BackgroundExecUtilityProxy { + private host: UtilityProcess | null = null + private hostReady: Promise | null = null + private requestId = 0 + private shuttingDown = false + private readonly pendingRequests = new Map< + string, + { + resolve: (value: unknown) => void + reject: (error: unknown) => void + } + >() + private readonly activeSessions = new Map() + private readonly crashedSessions = new Map() + + async start( + conversationId: string, + command: string, + cwd: string, + options?: { + timeout?: number + env?: Record + outputPrefix?: string + } + ): Promise { + const result = await this.request('start', [ + conversationId, + command, + cwd, + options + ]) + this.activeSessions.set(result.sessionId, { + conversationId, + sessionId: result.sessionId, + command, + createdAt: Date.now(), + lastAccessedAt: Date.now() + }) + return result + } + + async list(conversationId: string): Promise { + const active = Array.from(this.activeSessions.values()) + .filter((session) => session.conversationId === conversationId) + .map((session) => this.toActiveSessionMeta(session)) + const hostSessions = this.host + ? await this.request('list', [conversationId]).catch((error) => { + logger.warn('[BackgroundExecProxy] Failed to list utility sessions:', error) + return active + }) + : active + const crashed = Array.from(this.crashedSessions.values()) + .filter((session) => session.conversationId === conversationId) + .map((session) => this.toCrashedSessionMeta(session)) + + const sessionIds = new Set() + return [...hostSessions, ...crashed].filter((session) => { + if (sessionIds.has(session.sessionId)) { + return false + } + sessionIds.add(session.sessionId) + return true + }) + } + + async poll(conversationId: string, sessionId: string): Promise { + const crashed = this.getCrashedSession(conversationId, sessionId) + if (crashed) { + return this.toCrashedPollResult(crashed) + } + const result = await this.request('poll', [conversationId, sessionId]) + this.touchOrCompleteSession(conversationId, sessionId, result.status) + return result + } + + async log( + conversationId: string, + sessionId: string, + offset = 0, + limit = 1000 + ): Promise { + const crashed = this.getCrashedSession(conversationId, sessionId) + if (crashed) { + return { + ...this.toCrashedPollResult(crashed), + totalLength: this.crashMessage(crashed).length + } + } + const result = await this.request('log', [conversationId, sessionId, offset, limit]) + this.touchOrCompleteSession(conversationId, sessionId, result.status) + return result + } + + async waitForCompletionOrYield( + conversationId: string, + sessionId: string, + yieldMs = getConfig().backgroundMs + ): Promise { + const crashed = this.getCrashedCompletionResult(conversationId, sessionId) + if (crashed) { + return { + kind: 'completed', + result: crashed + } + } + + const result = await this.request('waitForCompletionOrYield', [ + conversationId, + sessionId, + yieldMs + ]) + if (result.kind === 'completed') { + this.activeSessions.delete(sessionId) + } + return result + } + + async getCompletionResult( + conversationId: string, + sessionId: string, + previewChars = FOREGROUND_PREVIEW_CHARS + ): Promise { + const crashed = this.getCrashedCompletionResult(conversationId, sessionId) + if (crashed) { + return crashed + } + + const result = await this.request('getCompletionResult', [ + conversationId, + sessionId, + previewChars + ]) + this.activeSessions.delete(sessionId) + return result + } + + async write(conversationId: string, sessionId: string, data: string, eof = false): Promise { + await this.request('write', [conversationId, sessionId, data, eof]) + } + + async kill(conversationId: string, sessionId: string): Promise { + await this.request('kill', [conversationId, sessionId]) + } + + async clear(conversationId: string, sessionId: string): Promise { + await this.request('clear', [conversationId, sessionId]) + } + + async remove(conversationId: string, sessionId: string): Promise { + this.activeSessions.delete(sessionId) + if (this.getCrashedSession(conversationId, sessionId)) { + this.crashedSessions.delete(sessionId) + return + } + await this.request('remove', [conversationId, sessionId]) + } + + async cleanupConversation(conversationId: string): Promise { + for (const [sessionId, session] of this.activeSessions) { + if (session.conversationId === conversationId) { + this.activeSessions.delete(sessionId) + } + } + for (const [sessionId, session] of this.crashedSessions) { + if (session.conversationId === conversationId) { + this.crashedSessions.delete(sessionId) + } + } + await this.request('cleanupConversation', [conversationId]) + } + + async shutdown(): Promise { + this.shuttingDown = true + try { + if (this.host) { + await this.request('shutdown', []) + } + } finally { + this.host?.kill() + this.host = null + this.hostReady = null + this.rejectPendingRequests(new Error('Background exec utility process shut down.')) + this.activeSessions.clear() + } + } + + private async request(method: BackgroundExecRpcMethod, args: unknown[]): Promise { + const host = await this.ensureHost() + const id = `exec_rpc_${++this.requestId}` + + return await new Promise((resolve, reject) => { + this.pendingRequests.set(id, { + resolve: (value) => resolve(value as T), + reject + }) + + const payload: BackgroundExecRpcRequest = { + type: 'background-exec:request', + id, + method, + args + } + + try { + host.postMessage(payload) + } catch (error) { + this.pendingRequests.delete(id) + reject(error) + } + }) + } + + private async ensureHost(): Promise { + if (this.host) { + return this.host + } + if (this.hostReady) { + return await this.hostReady + } + + this.shuttingDown = false + this.hostReady = this.startHost() + try { + return await this.hostReady + } finally { + this.hostReady = null + } + } + + private async startHost(): Promise { + const { utilityProcess } = await import('electron') + const modulePath = this.resolveUtilityHostEntryPoint() + const host = utilityProcess.fork(modulePath, ['--deepchat-exec-utility-host'], { + serviceName: 'DeepChat Exec Utility', + stdio: 'ignore', + env: { + ...process.env, + DEEPCHAT_EXEC_UTILITY_HOST: '1' + } + }) + + host.on('message', (message) => this.handleHostMessage(message)) + host.on('exit', (code) => this.handleHostExit(code)) + host.on('error', (type, location) => { + logger.error('[BackgroundExecProxy] Utility process error:', { type, location }) + }) + + return await new Promise((resolve, reject) => { + let settled = false + const settle = (callback: () => void) => { + if (settled) { + return + } + settled = true + host.off('spawn', onSpawn) + host.off('exit', onExit) + callback() + } + const onSpawn = () => { + settle(() => { + this.host = host + resolve(host) + }) + } + const onExit = (code: number) => { + settle(() => { + reject(new Error(`Background exec utility process exited before spawn: ${code}`)) + }) + } + + host.once('spawn', onSpawn) + host.once('exit', onExit) + }) + } + + private resolveUtilityHostEntryPoint(): string { + const modulePath = fileURLToPath(import.meta.url) + if (path.basename(modulePath) === 'index.js') { + return modulePath + } + return fileURLToPath(new URL('../../index.js', import.meta.url)) + } + + private handleHostMessage(message: unknown): void { + if (!message || typeof message !== 'object') { + return + } + const response = message as BackgroundExecRpcResponse + if (response.type !== 'background-exec:response') { + return + } + const pending = this.pendingRequests.get(response.id) + if (!pending) { + return + } + this.pendingRequests.delete(response.id) + if (response.ok) { + pending.resolve(response.data) + return + } + const error = new Error(response.error.message) + if (response.error.stack) { + error.stack = response.error.stack + } + pending.reject(error) + } + + private handleHostExit(code: number): void { + const error = new Error(`Background exec utility process exited with code ${code}.`) + if (!this.shuttingDown) { + for (const session of this.activeSessions.values()) { + this.crashedSessions.set(session.sessionId, { + ...session, + lastAccessedAt: Date.now() + }) + } + } + this.host = null + this.hostReady = null + this.activeSessions.clear() + this.rejectPendingRequests(error) + } + + private rejectPendingRequests(error: Error): void { + for (const pending of this.pendingRequests.values()) { + pending.reject(error) + } + this.pendingRequests.clear() + } + + private getCrashedSession(conversationId: string, sessionId: string): TrackedSessionMeta | null { + const session = this.crashedSessions.get(sessionId) + return session?.conversationId === conversationId ? session : null + } + + private getCrashedCompletionResult( + conversationId: string, + sessionId: string + ): SessionCompletionResult | null { + const session = this.getCrashedSession(conversationId, sessionId) + if (!session) { + return null + } + session.lastAccessedAt = Date.now() + this.activeSessions.delete(sessionId) + return this.toCrashedCompletionResult(session) + } + + private touchOrCompleteSession( + conversationId: string, + sessionId: string, + status: PollResult['status'] + ): void { + const session = this.activeSessions.get(sessionId) + if (!session || session.conversationId !== conversationId) { + return + } + if (status === 'running') { + session.lastAccessedAt = Date.now() + return + } + this.activeSessions.delete(sessionId) + } + + private toCrashedSessionMeta(session: TrackedSessionMeta): SessionMeta { + return { + sessionId: session.sessionId, + command: session.command, + status: 'error', + createdAt: session.createdAt, + lastAccessedAt: session.lastAccessedAt, + outputLength: this.crashMessage(session).length, + offloaded: false, + timedOut: false + } + } + + private toActiveSessionMeta(session: TrackedSessionMeta): SessionMeta { + return { + sessionId: session.sessionId, + command: session.command, + status: 'running', + createdAt: session.createdAt, + lastAccessedAt: session.lastAccessedAt, + outputLength: 0, + offloaded: false, + timedOut: false + } + } + + private toCrashedPollResult(session: TrackedSessionMeta): PollResult { + return { + status: 'error', + output: this.crashMessage(session), + offloaded: false, + timedOut: false + } + } + + private toCrashedCompletionResult(session: TrackedSessionMeta): SessionCompletionResult { + return { + status: 'error', + output: this.crashMessage(session), + exitCode: null, + offloaded: false, + timedOut: false + } + } + + private crashMessage(session: TrackedSessionMeta): string { + return `Background exec utility process exited before session ${session.sessionId} completed. The command may have been terminated: ${session.command}` + } +} + +export const backgroundExecSessionManager = new BackgroundExecUtilityProxy() diff --git a/src/main/lib/agentRuntime/backgroundExecUtilityHost.ts b/src/main/lib/agentRuntime/backgroundExecUtilityHost.ts new file mode 100644 index 000000000..d689f40a0 --- /dev/null +++ b/src/main/lib/agentRuntime/backgroundExecUtilityHost.ts @@ -0,0 +1,100 @@ +import { + BackgroundExecSessionManager, + type BackgroundExecRpcRequest, + type BackgroundExecRpcResponse +} from './backgroundExecSessionManager' + +const EXEC_UTILITY_HOST_ARG = '--deepchat-exec-utility-host' + +type ParentPort = { + postMessage(message: unknown): void + on(event: 'message', listener: (message: unknown) => void): void +} + +function getParentPort(): ParentPort | null { + const maybeProcess = process as NodeJS.Process & { + parentPort?: ParentPort + } + return maybeProcess.parentPort ?? null +} + +function isExecUtilityHostRequest(): boolean { + return ( + process.env.DEEPCHAT_EXEC_UTILITY_HOST === '1' || process.argv.includes(EXEC_UTILITY_HOST_ARG) + ) +} + +function serializeError(error: unknown): { message: string; stack?: string } { + if (error instanceof Error) { + return { + message: error.message, + stack: error.stack + } + } + return { + message: String(error) + } +} + +function sendResponse(parentPort: ParentPort, response: BackgroundExecRpcResponse): void { + parentPort.postMessage(response) +} + +async function handleRequest( + manager: BackgroundExecSessionManager, + parentPort: ParentPort, + request: BackgroundExecRpcRequest +): Promise { + try { + const target = manager as unknown as Record unknown> + const method = target[request.method] + if (typeof method !== 'function') { + throw new Error(`Unknown background exec method: ${request.method}`) + } + + const data = await method.apply(manager, request.args) + sendResponse(parentPort, { + type: 'background-exec:response', + id: request.id, + ok: true, + data + }) + } catch (error) { + sendResponse(parentPort, { + type: 'background-exec:response', + id: request.id, + ok: false, + error: serializeError(error) + }) + } +} + +export function runBackgroundExecUtilityHostIfRequested(): boolean { + if (!isExecUtilityHostRequest()) { + return false + } + + const parentPort = getParentPort() + if (!parentPort) { + throw new Error('Background exec utility host started without a parent port.') + } + + const manager = new BackgroundExecSessionManager() + + parentPort.on('message', (message) => { + if (!message || typeof message !== 'object') { + return + } + const request = message as BackgroundExecRpcRequest + if (request.type !== 'background-exec:request') { + return + } + void handleRequest(manager, parentPort, request) + }) + + process.once('beforeExit', () => { + void manager.shutdown() + }) + + return true +} diff --git a/src/main/lib/insecureTls.ts b/src/main/lib/insecureTls.ts new file mode 100644 index 000000000..f7d96e964 --- /dev/null +++ b/src/main/lib/insecureTls.ts @@ -0,0 +1,5 @@ +import { is } from '@electron-toolkit/utils' + +export function isInsecureTlsAllowed(): boolean { + return is.dev || process.env.DEEPCHAT_ALLOW_INSECURE_TLS === '1' +} diff --git a/src/main/presenter/agentRuntimePresenter/index.ts b/src/main/presenter/agentRuntimePresenter/index.ts index 473f8bd01..173c46459 100644 --- a/src/main/presenter/agentRuntimePresenter/index.ts +++ b/src/main/presenter/agentRuntimePresenter/index.ts @@ -72,13 +72,7 @@ import { buildRuntimeCapabilitiesPrompt, buildSystemEnvPrompt } from '@/lib/agentRuntime/systemEnvPromptBuilder' -import { - buildContext, - buildResumeContext, - createUserChatMessage, - fitMessagesToContextWindow, - isContextHistoryRecord -} from './contextBuilder' +import { buildContext, buildResumeContext, isContextHistoryRecord } from './contextBuilder' import { capAgentDefaultMaxTokens, capAgentRequestMaxTokens, @@ -120,6 +114,8 @@ type PendingInteractionEntry = { blockIndex: number } +type ProcessPendingInputSource = PendingInputEnqueueSource | 'steer' + type DeferredToolExecutionResult = { responseText: string isError: boolean @@ -224,8 +220,6 @@ type ActiveGeneration = { abortController: AbortController } -type ActiveGenerationAbortReason = 'user_stop' | 'steer' - const RATE_LIMIT_STREAM_MESSAGE_PREFIX = '__rate_limit__:' const createAbortError = (): Error => { if (typeof DOMException !== 'undefined') { @@ -251,9 +245,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { private readonly abortControllers: Map = new Map() private readonly deferredToolAbortControllers: Map = new Map() private readonly activeGenerations: Map = new Map() - private readonly activeGenerationAbortReasons: Map = - new Map() - private readonly steerInterruptInputs: Map = new Map() + private readonly activeSteerPendingInputIds: Map = new Map() private readonly sessionAgentIds: Map = new Map() private readonly sessionProjectDirs: Map = new Map() private readonly systemPromptCache: Map = new Map() @@ -412,8 +404,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { } this.abortDeferredToolAbortControllers(sessionId) this.activeGenerations.delete(sessionId) - this.activeGenerationAbortReasons.delete(sessionId) - this.steerInterruptInputs.delete(sessionId) + this.activeSteerPendingInputIds.delete(sessionId) this.clearActiveProviderPermissionsForSession(sessionId) this.pendingInputCoordinator.deleteBySession(sessionId) @@ -524,30 +515,17 @@ export class AgentRuntimePresenter implements IAgentImplementation { } const activeGeneration = this.activeGenerations.get(sessionId) - if (!activeGeneration) { - const preStreamController = this.abortControllers.get(sessionId) - if (state.status === 'generating' && preStreamController) { - this.enqueueSteerInterruptInput(sessionId, normalizedInput) - this.activeGenerationAbortReasons.set(sessionId, 'steer') - preStreamController.abort() - this.abortDeferredToolAbortControllers(sessionId) - this.clearActiveProviderPermissionsForSession(sessionId) - return - } - - void this.processMessage(sessionId, normalizedInput, { - projectDir: this.resolveProjectDir(sessionId) - }).catch((error) => { - console.error('[AgentRuntime] Failed to process steer input:', error) - }) + const preStreamController = this.abortControllers.get(sessionId) + if (activeGeneration || preStreamController) { + this.queueVisibleSteerInput(sessionId, normalizedInput) return } - this.enqueueSteerInterruptInput(sessionId, normalizedInput) - this.activeGenerationAbortReasons.set(sessionId, 'steer') - activeGeneration.abortController.abort() - this.abortDeferredToolAbortControllers(sessionId) - this.clearActiveProviderPermissionsForSession(sessionId) + void this.processMessage(sessionId, normalizedInput, { + projectDir: this.resolveProjectDir(sessionId) + }).catch((error) => { + console.error('[AgentRuntime] Failed to process steer input:', error) + }) } async updateQueuedInput( @@ -600,7 +578,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { projectDir?: string | null emitRefreshBeforeStream?: boolean pendingQueueItemId?: string - pendingQueueItemSource?: PendingInputEnqueueSource + pendingQueueItemSource?: ProcessPendingInputSource } ): Promise { const state = this.runtimeState.get(sessionId) @@ -620,6 +598,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { this.setSessionStatus(sessionId, 'generating') const preStreamAbortController = this.ensureSessionAbortController(sessionId) const preStreamAbortSignal = preStreamAbortController.signal + const pendingInputSource: ProcessPendingInputSource = context?.pendingQueueItemSource ?? 'send' let consumedPendingQueueItem = false let userMessageId: string | null = null let assistantMessageId: string | null = null @@ -754,7 +733,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { assistantMessageId = this.messageStore.createAssistantMessage(sessionId, assistantOrderSeq) this.throwIfAbortRequested(preStreamAbortSignal) - if (context?.pendingQueueItemId && context.pendingQueueItemSource !== 'queue') { + if (context?.pendingQueueItemId && pendingInputSource === 'send') { this.pendingInputCoordinator.consumeQueuedInput(sessionId, context.pendingQueueItemId) consumedPendingQueueItem = true } @@ -774,12 +753,21 @@ export class AgentRuntimePresenter implements IAgentImplementation { interleavedReasoning }) if (context?.pendingQueueItemId && !consumedPendingQueueItem) { - if (context.pendingQueueItemSource === 'queue') { + if (pendingInputSource === 'queue' || pendingInputSource === 'steer') { if (result.status === 'completed' || result.status === 'paused') { - this.pendingInputCoordinator.consumeQueuedInput(sessionId, context.pendingQueueItemId) + this.consumeClaimedPendingInput( + sessionId, + context.pendingQueueItemId, + pendingInputSource + ) consumedPendingQueueItem = true } else { - this.rollbackClaimedQueueInputTurn(sessionId, context.pendingQueueItemId, userMessageId) + this.rollbackClaimedPendingInputTurn( + sessionId, + context.pendingQueueItemId, + pendingInputSource, + userMessageId + ) consumedPendingQueueItem = true } } else { @@ -787,20 +775,6 @@ export class AgentRuntimePresenter implements IAgentImplementation { consumedPendingQueueItem = true } } - const steerInput = result.status === 'aborted' ? this.consumeAbortSteerInput(sessionId) : null - if (steerInput) { - try { - this.settleSteerInterruptedAssistant(sessionId, assistantMessageId) - this.setSessionStatus(sessionId, 'idle') - } finally { - this.clearActiveGeneration(sessionId, runId) - } - this.continueWithSteerInput(sessionId, steerInput, projectDir) - return { - requestId: assistantMessageId, - messageId: assistantMessageId - } - } try { this.applyProcessResultStatus(sessionId, result, runId) } finally { @@ -817,12 +791,18 @@ export class AgentRuntimePresenter implements IAgentImplementation { console.error('[DeepChatAgent] processMessage error:', err) if (context?.pendingQueueItemId && !consumedPendingQueueItem) { try { - if (context.pendingQueueItemSource === 'queue') { - this.rollbackClaimedQueueInputTurn(sessionId, context.pendingQueueItemId, userMessageId) + if (pendingInputSource === 'queue' || pendingInputSource === 'steer') { + this.rollbackClaimedPendingInputTurn( + sessionId, + context.pendingQueueItemId, + pendingInputSource, + userMessageId + ) } else { - this.pendingInputCoordinator.releaseClaimedQueueInput( + this.releaseClaimedPendingInput( sessionId, - context.pendingQueueItemId + context.pendingQueueItemId, + pendingInputSource ) } consumedPendingQueueItem = true @@ -831,37 +811,27 @@ export class AgentRuntimePresenter implements IAgentImplementation { } } if (this.isAbortError(err) || preStreamAbortSignal.aborted) { - const steerInput = this.consumeAbortSteerInput(sessionId) if (userMessageId) { this.emitMessageRefresh(sessionId, userMessageId) } if (assistantMessageId) { - if (steerInput) { - this.settleSteerInterruptedAssistant(sessionId, assistantMessageId) - } else { - const existingAssistant = this.messageStore.getMessage(assistantMessageId) - const existingBlocks = existingAssistant - ? this.parseAssistantBlocks(existingAssistant.content) - : [] - const blocks = buildTerminalErrorBlocks( - existingBlocks, - 'common.error.userCanceledGeneration' - ) - this.messageStore.setMessageError(assistantMessageId, blocks) - this.emitMessageRefresh(sessionId, assistantMessageId) - } - } - if (!steerInput) { - this.dispatchTerminalHooks(sessionId, state, { - status: 'aborted', - stopReason: 'user_stop', - errorMessage: 'common.error.userCanceledGeneration' - }) + const existingAssistant = this.messageStore.getMessage(assistantMessageId) + const existingBlocks = existingAssistant + ? this.parseAssistantBlocks(existingAssistant.content) + : [] + const blocks = buildTerminalErrorBlocks( + existingBlocks, + 'common.error.userCanceledGeneration' + ) + this.messageStore.setMessageError(assistantMessageId, blocks) + this.emitMessageRefresh(sessionId, assistantMessageId) } + this.dispatchTerminalHooks(sessionId, state, { + status: 'aborted', + stopReason: 'user_stop', + errorMessage: 'common.error.userCanceledGeneration' + }) this.setSessionStatus(sessionId, 'idle') - if (steerInput) { - this.continueWithSteerInput(sessionId, steerInput, projectDir) - } return { requestId: assistantMessageId, messageId: assistantMessageId @@ -1269,10 +1239,8 @@ export class AgentRuntimePresenter implements IAgentImplementation { } async cancelGeneration(sessionId: string): Promise { - this.steerInterruptInputs.delete(sessionId) const activeGeneration = this.activeGenerations.get(sessionId) if (activeGeneration) { - this.activeGenerationAbortReasons.set(sessionId, 'user_stop') activeGeneration.abortController.abort() this.clearActiveGeneration(sessionId, activeGeneration.runId) @@ -1937,9 +1905,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { const traceEnabled = this.configPresenter.getSetting('traceDebugEnabled') === true const llmProviderPresenter = this.llmProviderPresenter - const pendingInputCoordinator = this.pendingInputCoordinator const shouldBypassContextBudget = this.shouldBypassDeepChatContextBudget.bind(this) - const injectSteerInputsIntoRequest = this.injectSteerInputsIntoRequest.bind(this) const recoverContextPressure = this.recoverRequestContextPressure.bind(this) const replaceLeadingSystemPromptInPlace = this.replaceLeadingSystemPromptInPlace.bind(this) const persistMessageTrace = this.persistMessageTrace.bind(this) @@ -2004,39 +1970,21 @@ export class AgentRuntimePresenter implements IAgentImplementation { state.providerId, requestModelConfig ) - const claimedSteerBatch = pendingInputCoordinator.claimSteerBatchForNextLoop(sessionId) - const injectedMessages = injectSteerInputsIntoRequest( - requestMessages, - claimedSteerBatch, - supportsVision, - supportsAudioInput, - requestBypassesContextBudget - ? Number.MAX_SAFE_INTEGER - : requestModelConfig.contextLength, - requestMaxTokens - ) - - let didConsumeSteerBatch = false let queuedForRateLimit = false try { - let providerMessages = injectedMessages + let providerMessages = requestMessages let providerMaxTokens = requestMaxTokens const isTtsRequest = isTtsModelConfig(requestModelConfig) || isTtsModelId(requestModelId) const effectiveRequestTools: MCPToolDefinition[] = isTtsRequest ? [] : requestTools if (!requestBypassesContextBudget) { - const protectedSteerTailCount = - claimedSteerBatch.length > 0 - ? claimedSteerBatch.length + (requestMessages.at(-1)?.role === 'user' ? 1 : 0) - : 0 let requestPreflight = preflightRequestContext({ - messages: injectedMessages, + messages: requestMessages, tools: effectiveRequestTools, contextLength: requestModelConfig.contextLength, - requestedMaxTokens: requestMaxTokens, - minimumProtectedTailCount: protectedSteerTailCount + requestedMaxTokens: requestMaxTokens }) if ( requestPreflight.requiresContextPressureRecovery || @@ -2054,7 +2002,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { supportsVision, supportsAudioInput, interleavedReasoning, - minimumProtectedTailCount: protectedSteerTailCount, + minimumProtectedTailCount: 0, signal: abortController.signal }) requestMessages.splice(0, requestMessages.length, ...recovered.messages) @@ -2065,8 +2013,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { messages: requestMessages, tools: effectiveRequestTools, contextLength: requestModelConfig.contextLength, - requestedMaxTokens: requestMaxTokens, - minimumProtectedTailCount: protectedSteerTailCount + requestedMaxTokens: requestMaxTokens }) requestMessages.splice(0, requestMessages.length, ...requestPreflight.messages) } @@ -2105,23 +2052,12 @@ export class AgentRuntimePresenter implements IAgentImplementation { providerMaxTokens, effectiveRequestTools )) { - if (!didConsumeSteerBatch && claimedSteerBatch.length > 0) { - pendingInputCoordinator.consumeClaimedSteerBatch(sessionId) - didConsumeSteerBatch = true - } yield event } - - if (!didConsumeSteerBatch && claimedSteerBatch.length > 0) { - pendingInputCoordinator.consumeClaimedSteerBatch(sessionId) - } } catch (error) { if (queuedForRateLimit) { clearRateLimitWaitingMessage(sessionId, rateLimitMessageId, activeGeneration.runId) } - if (!didConsumeSteerBatch && claimedSteerBatch.length > 0) { - pendingInputCoordinator.releaseClaimedInputs(sessionId) - } throw error } }, @@ -2134,6 +2070,8 @@ export class AgentRuntimePresenter implements IAgentImplementation { permissionMode: state.permissionMode, toolOutputGuard: this.toolOutputGuard, initialBlocks, + shouldYieldForPendingInput: () => + Boolean(this.pendingInputCoordinator.getNextSteerInput(sessionId)), hooks: { onPreToolUse: (tool) => { this.dispatchHook('PreToolUse', { @@ -2326,37 +2264,6 @@ export class AgentRuntimePresenter implements IAgentImplementation { messages.unshift({ role: 'system', content: systemPrompt }) } - private injectSteerInputsIntoRequest( - messages: ChatMessage[], - steerInputs: PendingSessionInputRecord[], - supportsVision: boolean, - supportsAudioInput: boolean, - contextLength: number, - reserveTokens: number - ): ChatMessage[] { - if (steerInputs.length === 0) { - return messages - } - - const steerMessages = steerInputs.map((input) => - createUserChatMessage(input.payload, supportsVision, supportsAudioInput) - ) - const clonedMessages = [...messages] - const lastMessage = clonedMessages[clonedMessages.length - 1] - const trailingUserCount = lastMessage?.role === 'user' ? 1 : 0 - const injectedMessages = - trailingUserCount > 0 - ? [...clonedMessages.slice(0, -1), ...steerMessages, lastMessage] - : [...clonedMessages, ...steerMessages] - - return fitMessagesToContextWindow( - injectedMessages, - contextLength, - reserveTokens, - steerMessages.length + trailingUserCount - ) - } - private async drainPendingQueueIfPossible( sessionId: string, reason: 'enqueue' | 'resume' | 'completed' @@ -2376,20 +2283,29 @@ export class AgentRuntimePresenter implements IAgentImplementation { return false } - const nextQueuedInput = this.pendingInputCoordinator.getNextQueuedInput(sessionId) - if (!nextQueuedInput) { + const nextSteerInput = this.pendingInputCoordinator.getNextSteerInput(sessionId) + const nextQueuedInput = nextSteerInput + ? null + : this.pendingInputCoordinator.getNextQueuedInput(sessionId) + const nextPendingInput = nextSteerInput ?? nextQueuedInput + if (!nextPendingInput) { return false } this.drainingPendingQueues.add(sessionId) try { - const claimedInput = this.pendingInputCoordinator.claimQueuedInput( - sessionId, - nextQueuedInput.id - ) + const pendingInputSource: ProcessPendingInputSource = nextSteerInput ? 'steer' : 'queue' + const claimedInput = + pendingInputSource === 'steer' + ? this.pendingInputCoordinator.claimSteerInput(sessionId, nextPendingInput.id) + : this.pendingInputCoordinator.claimQueuedInput(sessionId, nextPendingInput.id) + if (pendingInputSource === 'steer') { + this.activeSteerPendingInputIds.delete(sessionId) + } await this.processMessage(sessionId, claimedInput.payload, { projectDir: this.resolveProjectDir(sessionId), - pendingQueueItemId: claimedInput.id + pendingQueueItemId: claimedInput.id, + pendingQueueItemSource: pendingInputSource }) return true } catch (error) { @@ -2398,7 +2314,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { } finally { this.drainingPendingQueues.delete(sessionId) if ( - this.pendingInputCoordinator.getNextQueuedInput(sessionId) && + this.pendingInputCoordinator.hasPendingTurnInput(sessionId) && (await this.getSessionState(sessionId))?.status === 'idle' && !this.hasPendingInteractions(sessionId) ) { @@ -2420,7 +2336,7 @@ export class AgentRuntimePresenter implements IAgentImplementation { if (this.drainingPendingQueues.has(sessionId)) { return false } - return this.pendingInputCoordinator.getNextQueuedInput(sessionId) === null + return !this.pendingInputCoordinator.hasPendingTurnInput(sessionId) } private canDrainPendingQueueFromStatus( @@ -2434,9 +2350,10 @@ export class AgentRuntimePresenter implements IAgentImplementation { return (reason === 'enqueue' || reason === 'resume') && status === 'error' } - private rollbackClaimedQueueInputTurn( + private rollbackClaimedPendingInputTurn( sessionId: string, pendingQueueItemId: string, + pendingInputSource: ProcessPendingInputSource, userMessageId: string | null ): void { const userMessage = userMessageId ? this.messageStore.getMessage(userMessageId) : null @@ -2444,7 +2361,31 @@ export class AgentRuntimePresenter implements IAgentImplementation { this.invalidateSummaryIfNeeded(sessionId, userMessage.orderSeq) this.messageStore.deleteFromOrderSeq(sessionId, userMessage.orderSeq) } - this.pendingInputCoordinator.releaseClaimedQueueInput(sessionId, pendingQueueItemId) + this.releaseClaimedPendingInput(sessionId, pendingQueueItemId, pendingInputSource) + } + + private consumeClaimedPendingInput( + sessionId: string, + pendingInputId: string, + pendingInputSource: ProcessPendingInputSource + ): void { + if (pendingInputSource === 'steer') { + this.pendingInputCoordinator.consumeSteerInput(sessionId, pendingInputId) + return + } + this.pendingInputCoordinator.consumeQueuedInput(sessionId, pendingInputId) + } + + private releaseClaimedPendingInput( + sessionId: string, + pendingInputId: string, + pendingInputSource: ProcessPendingInputSource + ): void { + if (pendingInputSource === 'steer') { + this.pendingInputCoordinator.releaseClaimedInput(sessionId, pendingInputId) + return + } + this.pendingInputCoordinator.releaseClaimedQueueInput(sessionId, pendingInputId) } private registerActiveGeneration( @@ -3914,67 +3855,21 @@ export class AgentRuntimePresenter implements IAgentImplementation { return { text, files } } - private enqueueSteerInterruptInput(sessionId: string, input: SendMessageInput): void { - const existing = this.steerInterruptInputs.get(sessionId) ?? [] - existing.push(input) - this.steerInterruptInputs.set(sessionId, existing) - } - - private consumeAbortSteerInput(sessionId: string): SendMessageInput | null { - const abortReason = this.activeGenerationAbortReasons.get(sessionId) ?? 'user_stop' - this.activeGenerationAbortReasons.delete(sessionId) - return abortReason === 'steer' ? this.consumeSteerInterruptInput(sessionId) : null - } - - private consumeSteerInterruptInput(sessionId: string): SendMessageInput | null { - const inputs = this.steerInterruptInputs.get(sessionId) - if (!inputs || inputs.length === 0) { - return null - } - - this.steerInterruptInputs.delete(sessionId) - const text = inputs - .map((input) => input.text.trim()) - .filter(Boolean) - .join('\n\n') - const files = inputs.flatMap((input) => input.files ?? []).filter(Boolean) - return { text, files } - } - - private settleSteerInterruptedAssistant(sessionId: string, assistantMessageId: string): void { - const existingAssistant = this.messageStore.getMessage(assistantMessageId) - const existingBlocks = existingAssistant - ? this.parseAssistantBlocks(existingAssistant.content) - : [] - const visibleBlocks = existingBlocks.filter( - (block) => - !(block.type === 'error' && block.content === 'common.error.userCanceledGeneration') - ) - - if (visibleBlocks.length === 0) { - this.messageStore.deleteMessage(assistantMessageId) - this.emitMessageRefresh(sessionId, assistantMessageId) - return + private queueVisibleSteerInput(sessionId: string, input: SendMessageInput): void { + const mergeItemId = this.activeSteerPendingInputIds.get(sessionId) ?? null + try { + const record = this.pendingInputCoordinator.queueSteerInput(sessionId, input, { + mergeItemId + }) + this.activeSteerPendingInputIds.set(sessionId, record.id) + } catch (error) { + if (!mergeItemId) { + throw error + } + this.activeSteerPendingInputIds.delete(sessionId) + const record = this.pendingInputCoordinator.queueSteerInput(sessionId, input) + this.activeSteerPendingInputIds.set(sessionId, record.id) } - - const settledBlocks = visibleBlocks.map((block) => - block.status === 'pending' || block.status === 'loading' - ? { ...block, status: 'success' as const } - : block - ) - this.messageStore.updateAssistantContent(assistantMessageId, settledBlocks) - this.messageStore.updateMessageStatus(assistantMessageId, 'sent') - this.emitMessageRefresh(sessionId, assistantMessageId) - } - - private continueWithSteerInput( - sessionId: string, - steerInput: SendMessageInput, - projectDir: string | null - ): void { - void this.processMessage(sessionId, steerInput, { projectDir }).catch((error) => { - console.error('[AgentRuntime] Failed to restart after steer interrupt:', error) - }) } private supportsVision(providerId: string, modelId: string): boolean { diff --git a/src/main/presenter/agentRuntimePresenter/pendingInputCoordinator.ts b/src/main/presenter/agentRuntimePresenter/pendingInputCoordinator.ts index 4d9eceb03..21f7ce902 100644 --- a/src/main/presenter/agentRuntimePresenter/pendingInputCoordinator.ts +++ b/src/main/presenter/agentRuntimePresenter/pendingInputCoordinator.ts @@ -49,6 +49,23 @@ export class PendingInputCoordinator { return record } + queueSteerInput( + sessionId: string, + input: string | SendMessageInput, + options?: { + mergeItemId?: string | null + } + ): PendingSessionInputRecord { + let record: PendingSessionInputRecord + if (options?.mergeItemId) { + record = this.store.appendSteerInput(options.mergeItemId, normalizeInput(input)) + } else { + record = this.store.createSteerInput(sessionId, normalizeInput(input)) + } + this.emitUpdated(sessionId) + return record + } + updateQueuedInput( sessionId: string, itemId: string, @@ -84,6 +101,14 @@ export class PendingInputCoordinator { return this.store.getNextPendingQueueInput(sessionId) } + getNextSteerInput(sessionId: string): PendingSessionInputRecord | null { + return this.store.getNextPendingSteerInput(sessionId) + } + + hasPendingTurnInput(sessionId: string): boolean { + return Boolean(this.getNextSteerInput(sessionId) ?? this.getNextQueuedInput(sessionId)) + } + claimQueuedInput(sessionId: string, itemId: string): PendingSessionInputRecord { this.assertQueueInput(sessionId, itemId) const record = this.store.claimQueueInput(itemId) @@ -91,39 +116,37 @@ export class PendingInputCoordinator { return record } - releaseClaimedQueueInput(sessionId: string, itemId: string): PendingSessionInputRecord { - const record = this.store.releaseClaimedQueueInput(itemId) + claimSteerInput(sessionId: string, itemId: string): PendingSessionInputRecord { + this.assertSteerInput(sessionId, itemId) + const record = this.store.claimSteerInput(itemId) this.emitUpdated(sessionId) return record } - consumeQueuedInput(sessionId: string, itemId: string): void { - this.store.consumeQueueInput(itemId) + releaseClaimedQueueInput(sessionId: string, itemId: string): PendingSessionInputRecord { + this.assertQueueInputForSession(sessionId, itemId) + const record = this.store.releaseClaimedQueueInput(itemId) this.emitUpdated(sessionId) + return record } - claimSteerBatchForNextLoop(sessionId: string): PendingSessionInputRecord[] { - const claimed = this.store.claimSteerBatch(sessionId) - if (claimed.length > 0) { - this.emitUpdated(sessionId) - } - return claimed + releaseClaimedInput(sessionId: string, itemId: string): PendingSessionInputRecord { + this.assertInputOwnedBySession(sessionId, itemId) + const record = this.store.releaseClaimedInput(itemId) + this.emitUpdated(sessionId) + return record } - releaseClaimedInputs(sessionId: string): number { - const released = this.store.releaseClaimedInputs(sessionId) - if (released > 0) { - this.emitUpdated(sessionId) - } - return released + consumeQueuedInput(sessionId: string, itemId: string): void { + this.assertQueueInputForSession(sessionId, itemId) + this.store.consumeQueueInput(itemId) + this.emitUpdated(sessionId) } - consumeClaimedSteerBatch(sessionId: string): number { - const consumed = this.store.consumeClaimedSteerBatch(sessionId) - if (consumed > 0) { - this.emitUpdated(sessionId) - } - return consumed + consumeSteerInput(sessionId: string, itemId: string): void { + this.assertSteerInputForSession(sessionId, itemId) + this.store.consumeSteerInput(itemId) + this.emitUpdated(sessionId) } recoverClaimedInputsAfterRestart(): number { @@ -139,7 +162,7 @@ export class PendingInputCoordinator { } isAtCapacity(sessionId: string): boolean { - return this.store.countActive(sessionId) >= MAX_ACTIVE_PENDING_INPUTS + return this.store.countActiveQueue(sessionId) >= MAX_ACTIVE_PENDING_INPUTS } deleteBySession(sessionId: string): void { @@ -148,7 +171,7 @@ export class PendingInputCoordinator { } private ensureWithinLimit(sessionId: string): void { - if (this.store.countActive(sessionId) >= MAX_ACTIVE_PENDING_INPUTS) { + if (this.store.countActiveQueue(sessionId) >= MAX_ACTIVE_PENDING_INPUTS) { throw new Error('Pending input limit reached for this session.') } } @@ -163,6 +186,41 @@ export class PendingInputCoordinator { } } + private assertSteerInput(sessionId: string, itemId: string): void { + const record = this.store.listPendingInputs(sessionId).find((item) => item.id === itemId) + if (!record) { + throw new Error(`Pending input not found: ${itemId}`) + } + if (record.mode !== 'steer') { + throw new Error('Pending input is not a steer item.') + } + } + + private assertInputOwnedBySession(sessionId: string, itemId: string): PendingSessionInputRecord { + const record = this.store.getInput(itemId) + if (!record) { + throw new Error(`Pending input not found: ${itemId}`) + } + if (record.sessionId !== sessionId) { + throw new Error(`Pending input ${itemId} does not belong to session ${sessionId}`) + } + return record + } + + private assertQueueInputForSession(sessionId: string, itemId: string): void { + const record = this.assertInputOwnedBySession(sessionId, itemId) + if (record.mode !== 'queue') { + throw new Error('Steer inputs are locked and cannot be modified.') + } + } + + private assertSteerInputForSession(sessionId: string, itemId: string): void { + const record = this.assertInputOwnedBySession(sessionId, itemId) + if (record.mode !== 'steer') { + throw new Error('Pending input is not a steer item.') + } + } + private emitUpdated(sessionId: string): void { eventBus.sendToRenderer(SESSION_EVENTS.PENDING_INPUTS_UPDATED, SendTarget.ALL_WINDOWS, { sessionId diff --git a/src/main/presenter/agentRuntimePresenter/pendingInputStore.ts b/src/main/presenter/agentRuntimePresenter/pendingInputStore.ts index 6ecebd2ea..a5a070f15 100644 --- a/src/main/presenter/agentRuntimePresenter/pendingInputStore.ts +++ b/src/main/presenter/agentRuntimePresenter/pendingInputStore.ts @@ -28,7 +28,7 @@ export class DeepChatPendingInputStore { listPendingInputs(sessionId: string): PendingSessionInputRecord[] { return this.sqlitePresenter.deepchatPendingInputsTable .listActiveBySession(sessionId) - .filter((row) => !(row.mode === 'queue' && row.state === 'claimed')) + .filter((row) => row.state !== 'claimed') .map((row) => this.toRecord(row)) } @@ -36,6 +36,17 @@ export class DeepChatPendingInputStore { return this.sqlitePresenter.deepchatPendingInputsTable.countActiveBySession(sessionId) } + countActiveQueue(sessionId: string): number { + return this.sqlitePresenter.deepchatPendingInputsTable + .listActiveBySession(sessionId) + .filter((row) => row.mode === 'queue').length + } + + getInput(itemId: string): PendingSessionInputRecord | null { + const row = this.sqlitePresenter.deepchatPendingInputsTable.get(itemId) + return row ? this.toRecord(row) : null + } + createQueueInput(sessionId: string, input: string | SendMessageInput): PendingSessionInputRecord { return this.createQueueInputWithState(sessionId, input, 'pending') } @@ -65,6 +76,44 @@ export class DeepChatPendingInputStore { return this.toRecord(row) } + createSteerInput(sessionId: string, input: string | SendMessageInput): PendingSessionInputRecord { + const normalized = normalizeInput(input) + const id = nanoid() + this.sqlitePresenter.deepchatPendingInputsTable.insert({ + id, + sessionId, + mode: 'steer', + state: 'pending', + payloadJson: JSON.stringify(normalized), + queueOrder: null, + claimedAt: null + }) + const row = this.sqlitePresenter.deepchatPendingInputsTable.get(id) + if (!row) { + throw new Error(`Failed to create steer input ${id}`) + } + return this.toRecord(row) + } + + appendSteerInput(itemId: string, input: string | SendMessageInput): PendingSessionInputRecord { + const row = this.requireRow(itemId) + if (row.mode !== 'steer') { + throw new Error(`Pending input ${itemId} is not a steer item.`) + } + if (row.state !== 'pending') { + throw new Error(`Pending steer item ${itemId} is not editable.`) + } + + const existing = this.parsePayload(row.payload_json) + const next = normalizeInput(input) + const text = [existing.text.trim(), next.text.trim()].filter(Boolean).join('\n\n') + const files = [...(existing.files ?? []), ...(next.files ?? [])].filter(Boolean) + this.sqlitePresenter.deepchatPendingInputsTable.update(itemId, { + payload_json: JSON.stringify({ text, files }) + }) + return this.toRecord(this.requireRow(itemId, row.session_id)) + } + updateQueueInput(itemId: string, input: string | SendMessageInput): PendingSessionInputRecord { const row = this.requireRow(itemId) this.sqlitePresenter.deepchatPendingInputsTable.update(itemId, { @@ -115,6 +164,11 @@ export class DeepChatPendingInputStore { return row ? this.toRecord(row) : null } + getNextPendingSteerInput(sessionId: string): PendingSessionInputRecord | null { + const row = this.getPendingSteerRows(sessionId)[0] + return row ? this.toRecord(row) : null + } + claimQueueInput(itemId: string): PendingSessionInputRecord { const row = this.requireRow(itemId) if (row.mode !== 'queue') { @@ -131,11 +185,35 @@ export class DeepChatPendingInputStore { return this.toRecord(this.requireRow(itemId, row.session_id)) } + claimSteerInput(itemId: string): PendingSessionInputRecord { + const row = this.requireRow(itemId) + if (row.mode !== 'steer') { + throw new Error(`Pending input ${itemId} is not a steer item.`) + } + if (row.state !== 'pending') { + throw new Error(`Pending steer item ${itemId} is not claimable.`) + } + + this.sqlitePresenter.deepchatPendingInputsTable.update(itemId, { + state: 'claimed', + claimed_at: Date.now() + }) + return this.toRecord(this.requireRow(itemId, row.session_id)) + } + releaseClaimedQueueInput(itemId: string): PendingSessionInputRecord { const row = this.requireRow(itemId) if (row.mode !== 'queue') { throw new Error(`Pending input ${itemId} is not a queue item.`) } + return this.releaseClaimedInput(itemId, row) + } + + releaseClaimedInput( + itemId: string, + existingRow?: DeepChatPendingInputRow + ): PendingSessionInputRecord { + const row = existingRow ?? this.requireRow(itemId) if (row.state !== 'claimed') { return this.toRecord(row) } @@ -151,36 +229,15 @@ export class DeepChatPendingInputStore { this.deleteInput(itemId) } - claimSteerBatch(sessionId: string): PendingSessionInputRecord[] { - const now = Date.now() - const steerRows = this.getSteerRows(sessionId).filter((row) => row.state === 'pending') - if (steerRows.length === 0) { - return [] - } - - for (const row of steerRows) { - this.sqlitePresenter.deepchatPendingInputsTable.update(row.id, { - state: 'claimed', - claimed_at: now - }) - } - - return this.getSteerRows(sessionId) - .filter((row) => row.state === 'claimed') - .map((row) => this.toRecord(row)) - } - - releaseClaimedInputs(sessionId: string): number { - const claimedRows = this.sqlitePresenter.deepchatPendingInputsTable - .listActiveBySession(sessionId) - .filter((row) => row.state === 'claimed') - for (const row of claimedRows) { - this.sqlitePresenter.deepchatPendingInputsTable.update(row.id, { - state: 'pending', - claimed_at: null - }) + consumeSteerInput(itemId: string): void { + const row = this.requireRow(itemId) + if (row.mode !== 'steer') { + throw new Error(`Pending input ${itemId} is not a steer item.`) } - return claimedRows.length + this.sqlitePresenter.deepchatPendingInputsTable.update(itemId, { + state: 'consumed', + consumed_at: Date.now() + }) } recoverClaimedInputs(): string[] { @@ -202,22 +259,6 @@ export class DeepChatPendingInputStore { return Array.from(recoveredSessionIds) } - consumeClaimedSteerBatch(sessionId: string): number { - const claimedSteerRows = this.getSteerRows(sessionId).filter((row) => row.state === 'claimed') - if (claimedSteerRows.length === 0) { - return 0 - } - - const now = Date.now() - for (const row of claimedSteerRows) { - this.sqlitePresenter.deepchatPendingInputsTable.update(row.id, { - state: 'consumed', - consumed_at: now - }) - } - return claimedSteerRows.length - } - deleteBySession(sessionId: string): void { this.sqlitePresenter.deepchatPendingInputsTable.deleteBySession(sessionId) } @@ -257,6 +298,10 @@ export class DeepChatPendingInputStore { .sort((left, right) => left.created_at - right.created_at) } + private getPendingSteerRows(sessionId: string): DeepChatPendingInputRow[] { + return this.getSteerRows(sessionId).filter((row) => row.state === 'pending') + } + private listClaimedRows(): DeepChatPendingInputRow[] { return this.sqlitePresenter.deepchatPendingInputsTable.listClaimed() } diff --git a/src/main/presenter/agentRuntimePresenter/process.ts b/src/main/presenter/agentRuntimePresenter/process.ts index a997f2cbf..8f338ea0e 100644 --- a/src/main/presenter/agentRuntimePresenter/process.ts +++ b/src/main/presenter/agentRuntimePresenter/process.ts @@ -423,14 +423,6 @@ export async function processStream(params: ProcessParams): Promise boolean hooks?: ProcessHooks io: IoParams } diff --git a/src/main/presenter/llmProviderPresenter/providers/ollamaProvider.ts b/src/main/presenter/llmProviderPresenter/providers/ollamaProvider.ts index 17c722c1a..e74d873f4 100644 --- a/src/main/presenter/llmProviderPresenter/providers/ollamaProvider.ts +++ b/src/main/presenter/llmProviderPresenter/providers/ollamaProvider.ts @@ -25,6 +25,7 @@ import { } from '../aiSdk' import { normalizeOllamaOpenAIBaseUrl, normalizeOllamaSdkHost } from '../aiSdk/providerFactory' import type { ProviderMcpRuntimePort } from '../runtimePorts' +import { isInsecureTlsAllowed } from '@/lib/insecureTls' const OLLAMA_LIST_TIMEOUT_MS = 5000 @@ -678,7 +679,7 @@ export class OllamaProvider extends BaseLLMProvider { try { const stream = await this.ollama.pull({ model: modelName, - insecure: true, + insecure: isInsecureTlsAllowed(), stream: true }) diff --git a/src/main/presenter/remoteControlPresenter/telegram/telegramMarkdown.ts b/src/main/presenter/remoteControlPresenter/telegram/telegramMarkdown.ts index b1894a220..cc660ebb7 100644 --- a/src/main/presenter/remoteControlPresenter/telegram/telegramMarkdown.ts +++ b/src/main/presenter/remoteControlPresenter/telegram/telegramMarkdown.ts @@ -16,6 +16,7 @@ * - Links `[label](url)` -> `label` * - Headings `# … ######` -> `text` * - Unordered list markers `- / * / +` -> `• ` + * - GFM pipe tables -> fixed-width `
` text
  * - Blockquote lines `> ` -> grouped into `
...
` * - Horizontal rules `---` / `***` -> `———` * @@ -52,6 +53,108 @@ const renderCodeBlock = (lang: string, body: string): string => { const renderInlineCode = (body: string): string => `${escapeHtml(body)}` +const parseMarkdownTableRow = (line: string): string[] | null => { + const trimmed = line.trim() + if (!trimmed.includes('|')) { + return null + } + + const withoutOuterPipes = + trimmed.startsWith('|') && trimmed.endsWith('|') ? trimmed.slice(1, -1) : trimmed + const cells = withoutOuterPipes.split('|').map((cell) => cell.trim()) + + return cells.length >= 2 ? cells : null +} + +const isMarkdownTableSeparator = (cells: string[]): boolean => + cells.length >= 2 && + cells.every((cell) => { + const normalized = cell.replace(/\s/g, '') + return /^:?-{3,}:?$/.test(normalized) + }) + +const getCellWidth = (cell: string): number => Array.from(cell).length + +const padCell = (cell: string, width: number): string => + `${cell}${' '.repeat(Math.max(0, width - getCellWidth(cell)))}` + +const formatMarkdownTableAsText = (rows: string[][]): string => { + const columnCount = rows.reduce((max, row) => Math.max(max, row.length), 0) + const normalizedRows = rows.map((row) => + Array.from({ length: columnCount }, (_, index) => row[index] ?? '') + ) + const widths = Array.from({ length: columnCount }, (_, index) => + Math.max(2, ...normalizedRows.map((row) => getCellWidth(row[index] ?? ''))) + ) + + const formatRow = (row: string[]): string => + row + .map((cell, index) => padCell(cell, widths[index] ?? 2)) + .join(' | ') + .trimEnd() + const separator = widths.map((width) => '-'.repeat(width)).join('-|-') + + return [formatRow(normalizedRows[0] ?? []), separator, ...normalizedRows.slice(1).map(formatRow)] + .filter(Boolean) + .join('\n') +} + +const convertMarkdownTablesToCodeBlocks = (text: string): string => { + const lines = text.split('\n') + const output: string[] = [] + let index = 0 + let fenceMarker: string | null = null + + while (index < lines.length) { + const line = lines[index] ?? '' + const fenceMatch = line.match(/^\s*(`{3,}|~{3,})/) + if (fenceMatch) { + const marker = fenceMatch[1] ?? '' + if (!fenceMarker) { + fenceMarker = marker + } else if (marker[0] === fenceMarker[0] && marker.length >= fenceMarker.length) { + fenceMarker = null + } + output.push(line) + index += 1 + continue + } + + if (fenceMarker) { + output.push(line) + index += 1 + continue + } + + const header = parseMarkdownTableRow(line) + const separator = parseMarkdownTableRow(lines[index + 1] ?? '') + + if (header && separator && isMarkdownTableSeparator(separator)) { + const rows: string[][] = [header] + index += 2 + + while (index < lines.length) { + const row = parseMarkdownTableRow(lines[index] ?? '') + if (!row || isMarkdownTableSeparator(row)) { + break + } + rows.push(row) + index += 1 + } + + output.push('```') + output.push(formatMarkdownTableAsText(rows)) + output.push('```') + continue + } + + output.push(line) + index += 1 + } + + return output.join('\n') +} + const extractFencedCodeBlocks = ( text: string, store: Array<{ lang: string; body: string }> @@ -172,7 +275,9 @@ export const convertMarkdownToTelegramHtml = (input: string): string => { } try { - const normalized = input.replace(/\r\n/g, '\n').replace(/\r/g, '\n') + const normalized = convertMarkdownTablesToCodeBlocks( + input.replace(/\r\n/g, '\n').replace(/\r/g, '\n') + ) const codeBlocks: Array<{ lang: string; body: string }> = [] const codeInlines: string[] = [] diff --git a/src/main/presenter/remoteControlPresenter/telegram/telegramPoller.ts b/src/main/presenter/remoteControlPresenter/telegram/telegramPoller.ts index 05e540a0b..49829a441 100644 --- a/src/main/presenter/remoteControlPresenter/telegram/telegramPoller.ts +++ b/src/main/presenter/remoteControlPresenter/telegram/telegramPoller.ts @@ -735,12 +735,20 @@ export class TelegramPoller { text: string, replyMarkup?: TelegramInlineKeyboardMarkup ): Promise { - return await this.deps.client.sendMessage( - target, - convertMarkdownToTelegramHtml(text), - replyMarkup, - { parseMode: 'HTML' } - ) + try { + return await this.deps.client.sendMessage( + target, + convertMarkdownToTelegramHtml(text), + replyMarkup, + { parseMode: 'HTML' } + ) + } catch (error) { + if (this.isTelegramEntityParseError(error)) { + return await this.deps.client.sendMessage(target, text, replyMarkup) + } + + throw error + } } private async sendPendingInteractionPrompt( @@ -795,6 +803,23 @@ export class TelegramPoller { return } + if (this.isTelegramEntityParseError(error)) { + try { + await this.deps.client.editMessageText({ + target, + messageId: action.messageId, + text: action.text, + replyMarkup: action.replyMarkup ?? undefined + }) + } catch (fallbackError) { + if (this.isMessageNotModifiedError(fallbackError)) { + return + } + throw fallbackError + } + return + } + throw error } } @@ -887,6 +912,16 @@ export class TelegramPoller { ) } + private isTelegramEntityParseError(error: unknown): boolean { + return ( + error instanceof TelegramApiRequestError && + error.code === 400 && + /parse entities|can't parse entities|unsupported start tag|can't find end tag/i.test( + error.message + ) + ) + } + private isFatalPollError(error: unknown): boolean { if (error instanceof TelegramApiRequestError) { return typeof error.code === 'number' && error.code >= 400 && error.code < 500 diff --git a/src/main/presenter/skillPresenter/skillExecutionService.ts b/src/main/presenter/skillPresenter/skillExecutionService.ts index 1eaa86a88..c10afe7df 100644 --- a/src/main/presenter/skillPresenter/skillExecutionService.ts +++ b/src/main/presenter/skillPresenter/skillExecutionService.ts @@ -103,7 +103,7 @@ export class SkillExecutionService { ) if (input.stdin) { - backgroundExecSessionManager.write( + await backgroundExecSessionManager.write( options.conversationId, result.sessionId, input.stdin, diff --git a/src/main/presenter/toolPresenter/agentTools/agentBashHandler.ts b/src/main/presenter/toolPresenter/agentTools/agentBashHandler.ts index dfa2c8f0b..5ba12ed2f 100644 --- a/src/main/presenter/toolPresenter/agentTools/agentBashHandler.ts +++ b/src/main/presenter/toolPresenter/agentTools/agentBashHandler.ts @@ -288,7 +288,12 @@ export class AgentBashHandler { outputPrefix: options.outputPrefix }) - backgroundExecSessionManager.write(conversationId, session.sessionId, options.stdin ?? '', true) + await backgroundExecSessionManager.write( + conversationId, + session.sessionId, + options.stdin ?? '', + true + ) const yielded = await backgroundExecSessionManager.waitForCompletionOrYield( conversationId, @@ -584,7 +589,12 @@ export class AgentBashHandler { }) if (options.stdin !== undefined) { - backgroundExecSessionManager.write(conversationId, result.sessionId, options.stdin, true) + await backgroundExecSessionManager.write( + conversationId, + result.sessionId, + options.stdin, + true + ) } return { diff --git a/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts b/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts index c1c239c29..b533308b3 100644 --- a/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts +++ b/src/main/presenter/toolPresenter/agentTools/agentToolManager.ts @@ -696,7 +696,7 @@ export class AgentToolManager { switch (action) { case 'list': { - const sessions = backgroundExecSessionManager.list(conversationId) + const sessions = await backgroundExecSessionManager.list(conversationId) return { content: JSON.stringify({ status: 'ok', sessions }, null, 2) } @@ -731,7 +731,7 @@ export class AgentToolManager { if (!sessionId) { throw new Error('sessionId is required for write action') } - backgroundExecSessionManager.write(conversationId, sessionId, data ?? '', eof) + await backgroundExecSessionManager.write(conversationId, sessionId, data ?? '', eof) return { content: JSON.stringify({ status: 'ok', sessionId }) } @@ -751,7 +751,7 @@ export class AgentToolManager { if (!sessionId) { throw new Error('sessionId is required for clear action') } - backgroundExecSessionManager.clear(conversationId, sessionId) + await backgroundExecSessionManager.clear(conversationId, sessionId) return { content: JSON.stringify({ status: 'ok', sessionId }) } diff --git a/src/renderer/src/stores/ui/pendingInput.ts b/src/renderer/src/stores/ui/pendingInput.ts index e9531e871..e56c249e6 100644 --- a/src/renderer/src/stores/ui/pendingInput.ts +++ b/src/renderer/src/stores/ui/pendingInput.ts @@ -19,7 +19,7 @@ export const usePendingInputStore = defineStore('pendingInput', () => { .filter((item) => item.mode === 'queue') .sort((left, right) => (left.queueOrder ?? 0) - (right.queueOrder ?? 0)) ) - const activeCount = computed(() => items.value.length) + const activeCount = computed(() => queueItems.value.length) const isAtCapacity = computed(() => activeCount.value >= MAX_PENDING_INPUTS) async function loadPendingInputs(sessionId: string): Promise { diff --git a/test/main/lib/agentRuntime/backgroundExecSessionManager.test.ts b/test/main/lib/agentRuntime/backgroundExecSessionManager.test.ts index c525e6511..3f3c7b20b 100644 --- a/test/main/lib/agentRuntime/backgroundExecSessionManager.test.ts +++ b/test/main/lib/agentRuntime/backgroundExecSessionManager.test.ts @@ -4,6 +4,10 @@ import { spawn } from 'child_process' import fs from 'fs' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +const { mockUtilityProcessFork } = vi.hoisted(() => ({ + mockUtilityProcessFork: vi.fn() +})) + vi.mock('child_process', () => ({ spawn: vi.fn() })) @@ -11,6 +15,9 @@ vi.mock('child_process', () => ({ vi.mock('electron', () => ({ app: { getPath: vi.fn((name: string) => (name === 'userData' ? '/mock/userData' : '/mock/home')) + }, + utilityProcess: { + fork: mockUtilityProcessFork } })) @@ -28,7 +35,10 @@ vi.mock('@shared/logger', () => ({ } })) -import { BackgroundExecSessionManager } from '@/lib/agentRuntime/backgroundExecSessionManager' +import { + BackgroundExecSessionManager, + backgroundExecSessionManager +} from '@/lib/agentRuntime/backgroundExecSessionManager' class MockStream extends EventEmitter {} @@ -43,6 +53,11 @@ class MockChildProcess extends EventEmitter { pid = 321 } +class MockUtilityProcess extends EventEmitter { + postMessage = vi.fn() + kill = vi.fn() +} + function mockStats(kind: 'file' | 'directory'): fs.Stats { return { isFile: () => kind === 'file', @@ -63,6 +78,7 @@ describe('BackgroundExecSessionManager', () => { beforeEach(() => { manager = new BackgroundExecSessionManager() clearInterval((manager as never).cleanupIntervalId) + mockUtilityProcessFork.mockReset() vi.spyOn(fs, 'existsSync').mockReturnValue(true) vi.spyOn(fs, 'statSync').mockImplementation((candidate) => String(candidate).includes('workspace') ? mockStats('directory') : mockStats('file') @@ -407,3 +423,88 @@ describe('BackgroundExecSessionManager', () => { }) }) }) + +describe('backgroundExecSessionManager utility proxy', () => { + const resetProxyState = () => { + const proxy = backgroundExecSessionManager as any + proxy.host = null + proxy.hostReady = null + proxy.shuttingDown = false + proxy.activeSessions.clear() + proxy.crashedSessions.clear() + proxy.pendingRequests.clear() + } + + beforeEach(() => { + mockUtilityProcessFork.mockReset() + resetProxyState() + }) + + afterEach(() => { + resetProxyState() + }) + + it('forks the main bootstrap entrypoint for the utility host', async () => { + const host = new MockUtilityProcess() + mockUtilityProcessFork.mockReturnValue(host) + + const startPromise = (backgroundExecSessionManager as any).startHost() + await vi.waitFor(() => { + expect(mockUtilityProcessFork).toHaveBeenCalled() + }) + host.emit('spawn') + + await expect(startPromise).resolves.toBe(host) + expect(mockUtilityProcessFork).toHaveBeenCalledWith( + expect.stringMatching(/[\\/]src[\\/]main[\\/]index\.js$/), + ['--deepchat-exec-utility-host'], + expect.objectContaining({ + serviceName: 'DeepChat Exec Utility', + env: expect.objectContaining({ + DEEPCHAT_EXEC_UTILITY_HOST: '1' + }) + }) + ) + }) + + it('returns crashed completion results without starting a fresh utility host', async () => { + const proxy = backgroundExecSessionManager as any + proxy.crashedSessions.set('bg_crashed', { + conversationId: 'conv-1', + sessionId: 'bg_crashed', + command: 'pnpm test', + createdAt: 1, + lastAccessedAt: 1 + }) + + await expect( + backgroundExecSessionManager.waitForCompletionOrYield('conv-1', 'bg_crashed', 10) + ).resolves.toEqual({ + kind: 'completed', + result: { + status: 'error', + output: expect.stringContaining('pnpm test'), + exitCode: null, + offloaded: false, + timedOut: false + } + }) + expect(mockUtilityProcessFork).not.toHaveBeenCalled() + }) + + it('removes crashed sessions locally without RPC', async () => { + const proxy = backgroundExecSessionManager as any + proxy.crashedSessions.set('bg_crashed', { + conversationId: 'conv-1', + sessionId: 'bg_crashed', + command: 'pnpm test', + createdAt: 1, + lastAccessedAt: 1 + }) + + await backgroundExecSessionManager.remove('conv-1', 'bg_crashed') + + expect(proxy.crashedSessions.has('bg_crashed')).toBe(false) + expect(mockUtilityProcessFork).not.toHaveBeenCalled() + }) +}) diff --git a/test/main/presenter/agentRuntimePresenter/agentRuntimePresenter.test.ts b/test/main/presenter/agentRuntimePresenter/agentRuntimePresenter.test.ts index 466e3425b..5452e22af 100644 --- a/test/main/presenter/agentRuntimePresenter/agentRuntimePresenter.test.ts +++ b/test/main/presenter/agentRuntimePresenter/agentRuntimePresenter.test.ts @@ -117,6 +117,69 @@ function createMockSqlitePresenter() { summary_cursor_order_seq: 1, summary_updated_at: null } + const pendingRows: any[] = [] + let pendingRowClock = 1 + const pendingInputsTable = { + insert: vi.fn((input: any) => { + const now = pendingRowClock++ + const existingIndex = pendingRows.findIndex((row) => row.id === input.id) + const row = { + id: input.id, + session_id: input.sessionId ?? input.session_id, + mode: input.mode, + state: input.state, + payload_json: input.payloadJson ?? input.payload_json, + queue_order: input.queueOrder ?? input.queue_order ?? null, + claimed_at: input.claimedAt ?? input.claimed_at ?? null, + consumed_at: input.consumedAt ?? input.consumed_at ?? null, + created_at: now, + updated_at: now + } + if (existingIndex >= 0) { + pendingRows.splice(existingIndex, 1, row) + } else { + pendingRows.push(row) + } + }), + get: vi.fn((id: string) => pendingRows.find((row) => row.id === id)), + listBySession: vi.fn((sessionId: string) => + pendingRows.filter((row) => row.session_id === sessionId) + ), + listClaimed: vi.fn(() => pendingRows.filter((row) => row.state === 'claimed')), + listActiveBySession: vi.fn((sessionId: string) => + pendingRows.filter((row) => row.session_id === sessionId && row.state !== 'consumed') + ), + countActiveBySession: vi.fn( + (sessionId: string) => + pendingRows.filter( + (row) => + row.session_id === sessionId && + row.state !== 'consumed' && + !(row.mode === 'queue' && row.state === 'claimed') + ).length + ), + update: vi.fn((id: string, patch: Record) => { + const row = pendingRows.find((item) => item.id === id) + if (!row) { + return + } + Object.assign(row, patch, { updated_at: pendingRowClock++ }) + }), + delete: vi.fn((id: string) => { + for (let index = pendingRows.length - 1; index >= 0; index -= 1) { + if (pendingRows[index].id === id) { + pendingRows.splice(index, 1) + } + } + }), + deleteBySession: vi.fn((sessionId: string) => { + for (let index = pendingRows.length - 1; index >= 0; index -= 1) { + if (pendingRows[index].session_id === sessionId) { + pendingRows.splice(index, 1) + } + } + }) + } const deepchatMessagesTable = { insert: vi.fn(), updateContent: vi.fn(), @@ -231,17 +294,7 @@ function createMockSqlitePresenter() { deleteByMessageIds: vi.fn(), deleteBySessionId: vi.fn() }, - deepchatPendingInputsTable: { - insert: vi.fn(), - get: vi.fn(), - listBySession: vi.fn().mockReturnValue([]), - listClaimed: vi.fn().mockReturnValue([]), - listActiveBySession: vi.fn().mockReturnValue([]), - countActiveBySession: vi.fn().mockReturnValue(0), - update: vi.fn(), - delete: vi.fn(), - deleteBySession: vi.fn() - } + deepchatPendingInputsTable: pendingInputsTable } as any } @@ -742,7 +795,7 @@ describe('AgentRuntimePresenter', () => { ) }) - it('steers during pre-stream setup without starting a parallel turn', async () => { + it('queues steer during pre-stream setup and drains it as the next visible turn', async () => { let releaseTools: (() => void) | null = null toolPresenter.getAllToolDefinitions.mockImplementationOnce( () => @@ -761,20 +814,21 @@ describe('AgentRuntimePresenter', () => { releaseTools?.() await firstProcess - let steeredUserInsert: any = null for (let attempt = 0; attempt < 20; attempt += 1) { - steeredUserInsert = sqlitePresenter.deepchatMessagesTable.insert.mock.calls.find( - ([row]) => row.role === 'user' - )?.[0] - if (steeredUserInsert) { + if ((processStream as ReturnType).mock.calls.length > 1) { break } await new Promise((resolve) => setTimeout(resolve, 0)) } - expect(steeredUserInsert).toBeTruthy() - expect(JSON.parse(steeredUserInsert.content).text).toBe('Refine before stream') - expect(processStream).toHaveBeenCalledTimes(1) + const userInserts = sqlitePresenter.deepchatMessagesTable.insert.mock.calls + .map(([row]) => row) + .filter((row) => row.role === 'user') + + expect(userInserts).toHaveLength(2) + expect(JSON.parse(userInserts[0].content).text).toBe('First prompt') + expect(JSON.parse(userInserts[1].content).text).toBe('Refine before stream') + expect(processStream).toHaveBeenCalledTimes(2) for (let attempt = 0; attempt < 20; attempt += 1) { if ((await agent.getSessionState('s1'))?.status === 'idle') { @@ -785,53 +839,25 @@ describe('AgentRuntimePresenter', () => { expect((await agent.getSessionState('s1'))?.status).toBe('idle') }) - it('interrupts an active stream for steer without marking the partial assistant as error', async () => { + it('queues active stream steer without aborting the current stream', async () => { + let releaseFirstStream: (() => void) | null = null + let firstAbortSignal: AbortSignal | null = null ;(processStream as ReturnType) .mockImplementationOnce( async (params: { io: { abortSignal: AbortSignal } }) => await new Promise((resolve) => { - params.io.abortSignal.addEventListener( - 'abort', - () => { - resolve({ - status: 'aborted', - stopReason: 'user_stop', - errorMessage: 'common.error.userCanceledGeneration' - }) - }, - { once: true } - ) + firstAbortSignal = params.io.abortSignal + releaseFirstStream = () => + resolve({ + status: 'completed', + stopReason: 'complete' + }) }) ) .mockResolvedValueOnce({ status: 'completed', stopReason: 'complete' }) - sqlitePresenter.deepchatMessagesTable.get.mockReturnValue({ - id: 'mock-msg-id', - session_id: 's1', - order_seq: 2, - role: 'assistant', - content: JSON.stringify([ - { - type: 'content', - content: 'partial', - status: 'pending', - timestamp: 1 - }, - { - type: 'error', - content: 'common.error.userCanceledGeneration', - status: 'error', - timestamp: 2 - } - ]), - status: 'pending', - is_context_edge: 0, - metadata: null, - created_at: 1, - updated_at: 1 - }) await agent.initSession('s1', { providerId: 'openai', modelId: 'gpt-4' }) const firstProcess = agent.processMessage('s1', 'First prompt') @@ -844,6 +870,19 @@ describe('AgentRuntimePresenter', () => { } await agent.steerActiveTurn('s1', 'Refine active stream') + await agent.steerActiveTurn('s1', 'Add second steer note') + expect(firstAbortSignal?.aborted).toBe(false) + expect(processStream).toHaveBeenCalledTimes(1) + expect((processStream as ReturnType).mock.calls[0][0]).toEqual( + expect.objectContaining({ + shouldYieldForPendingInput: expect.any(Function) + }) + ) + expect( + (processStream as ReturnType).mock.calls[0][0].shouldYieldForPendingInput() + ).toBe(true) + + releaseFirstStream?.() await firstProcess for (let attempt = 0; attempt < 20; attempt += 1) { @@ -853,26 +892,20 @@ describe('AgentRuntimePresenter', () => { await new Promise((resolve) => setTimeout(resolve, 0)) } - expect(sqlitePresenter.deepchatMessagesTable.updateStatus).toHaveBeenCalledWith( - 'mock-msg-id', - 'sent' - ) - expect(sqlitePresenter.deepchatMessagesTable.updateContent).toHaveBeenCalledWith( - 'mock-msg-id', - JSON.stringify([ - { - type: 'content', - content: 'partial', - status: 'success', - timestamp: 1 - } - ]) - ) expect(sqlitePresenter.deepchatMessagesTable.updateContentAndStatus).not.toHaveBeenCalledWith( 'mock-msg-id', expect.any(String), 'error' ) + const userInserts = sqlitePresenter.deepchatMessagesTable.insert.mock.calls + .map(([row]) => row) + .filter((row) => row.role === 'user') + + expect(userInserts).toHaveLength(2) + expect(JSON.parse(userInserts[0].content).text).toBe('First prompt') + expect(JSON.parse(userInserts[1].content).text).toBe( + 'Refine active stream\n\nAdd second steer note' + ) expect(processStream).toHaveBeenCalledTimes(2) for (let attempt = 0; attempt < 20; attempt += 1) { diff --git a/test/main/presenter/agentRuntimePresenter/pendingInputCoordinator.test.ts b/test/main/presenter/agentRuntimePresenter/pendingInputCoordinator.test.ts new file mode 100644 index 000000000..343bfb5be --- /dev/null +++ b/test/main/presenter/agentRuntimePresenter/pendingInputCoordinator.test.ts @@ -0,0 +1,100 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { PendingInputCoordinator } from '@/presenter/agentRuntimePresenter/pendingInputCoordinator' +import type { PendingSessionInputRecord } from '@shared/types/agent-interface' + +vi.mock('@/eventbus', () => ({ + eventBus: { + sendToRenderer: vi.fn() + }, + SendTarget: { + ALL_WINDOWS: 'all' + } +})) + +vi.mock('@/events', () => ({ + SESSION_EVENTS: { + PENDING_INPUTS_UPDATED: 'session:pending-inputs-updated' + } +})) + +vi.mock('@/routes/publishDeepchatEvent', () => ({ + publishDeepchatEvent: vi.fn() +})) + +function createRecord( + id: string, + sessionId: string, + mode: PendingSessionInputRecord['mode'] +): PendingSessionInputRecord { + return { + id, + sessionId, + mode, + state: 'claimed', + payload: { + text: id, + files: [] + }, + queueOrder: mode === 'queue' ? 1 : null, + claimedAt: 1, + consumedAt: null, + createdAt: 1, + updatedAt: 1 + } +} + +function createCoordinator(records: Map) { + const store = { + getInput: vi.fn((itemId: string) => records.get(itemId) ?? null), + releaseClaimedQueueInput: vi.fn((itemId: string) => records.get(itemId)!), + releaseClaimedInput: vi.fn((itemId: string) => records.get(itemId)!), + consumeQueueInput: vi.fn((itemId: string) => { + records.delete(itemId) + }), + consumeSteerInput: vi.fn((itemId: string) => { + const record = records.get(itemId) + if (record) { + records.set(itemId, { + ...record, + state: 'consumed', + consumedAt: 2 + }) + } + }) + } + + return { + coordinator: new PendingInputCoordinator(store as any), + store + } +} + +describe('PendingInputCoordinator claimed input ownership', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('does not release a claimed queue input from another session', () => { + const records = new Map([ + ['queue-1', createRecord('queue-1', 'session-2', 'queue')] + ]) + const { coordinator, store } = createCoordinator(records) + + expect(() => coordinator.releaseClaimedQueueInput('session-1', 'queue-1')).toThrow( + 'does not belong to session session-1' + ) + expect(store.releaseClaimedQueueInput).not.toHaveBeenCalled() + }) + + it('does not consume a claimed steer input from another session', () => { + const records = new Map([ + ['steer-1', createRecord('steer-1', 'session-2', 'steer')] + ]) + const { coordinator, store } = createCoordinator(records) + + expect(() => coordinator.consumeSteerInput('session-1', 'steer-1')).toThrow( + 'does not belong to session session-1' + ) + expect(store.consumeSteerInput).not.toHaveBeenCalled() + }) +}) diff --git a/test/main/presenter/agentRuntimePresenter/process.test.ts b/test/main/presenter/agentRuntimePresenter/process.test.ts index b84f8c5dd..dde564b4b 100644 --- a/test/main/presenter/agentRuntimePresenter/process.test.ts +++ b/test/main/presenter/agentRuntimePresenter/process.test.ts @@ -356,6 +356,49 @@ describe('processStream', () => { expect(toolResultMsg.content).toBe('Sunny, 72F') }) + it('yields after completed tool calls when a pending input should run next', async () => { + const coreStream = vi.fn(() => + (async function* () { + yield { + type: 'tool_call_start', + tool_call_id: 'tc1', + tool_call_name: 'get_weather' + } as LLMCoreStreamEvent + yield { + type: 'tool_call_end', + tool_call_id: 'tc1', + tool_call_arguments_complete: '{}' + } as LLMCoreStreamEvent + yield { type: 'stop', stop_reason: 'tool_use' } as LLMCoreStreamEvent + })() + ) as unknown as ProcessParams['coreStream'] + + const shouldYieldForPendingInput = vi.fn(() => true) + const toolPresenter = createMockToolPresenter({ get_weather: 'Sunny, 72F' }) + const params = createParams({ + coreStream, + toolPresenter, + tools: [makeTool('get_weather')], + shouldYieldForPendingInput + }) + + const promise = processStream(params) + await vi.runAllTimersAsync() + const result = await promise + + expect(coreStream).toHaveBeenCalledTimes(1) + expect(toolPresenter.callTool).toHaveBeenCalledTimes(1) + expect(shouldYieldForPendingInput).toHaveBeenCalledTimes(1) + expect(result).toMatchObject({ + status: 'completed', + stopReason: 'pending_input' + }) + + const finalizedBlocks = (messageStore.finalizeAssistantMessage as ReturnType).mock + .calls[0][1] + expect(finalizedBlocks[0].tool_call.response).toBe('Sunny, 72F') + }) + it('refreshes tools for the next loop iteration after skill_view activates a skill', async () => { let callCount = 0 const toolPresenter = { diff --git a/test/main/presenter/agentSessionPresenter/integration.test.ts b/test/main/presenter/agentSessionPresenter/integration.test.ts index 383e98636..06480bac6 100644 --- a/test/main/presenter/agentSessionPresenter/integration.test.ts +++ b/test/main/presenter/agentSessionPresenter/integration.test.ts @@ -1035,7 +1035,7 @@ describe('Integration: multi-turn context', () => { await expect(agentPresenter.listPendingInputs(session.id)).resolves.toEqual([]) }) - it('injects steer inputs before the next queued user message', async () => { + it('drains converted steer inputs as visible user messages before queued messages', async () => { let releaseFirstTurn: (() => void) | null = null const providerInstance = { coreStream: vi @@ -1068,26 +1068,40 @@ describe('Integration: multi-turn context', () => { await agentPresenter.convertPendingInputToSteer(session.id, pendingInputs[0].id) releaseFirstTurn?.() - await new Promise((r) => setTimeout(r, 80)) + await vi.waitFor(() => { + expect(providerInstance.coreStream).toHaveBeenCalledTimes(3) + }) - expect(providerInstance.coreStream).toHaveBeenCalledTimes(2) + expect(providerInstance.coreStream).toHaveBeenCalledTimes(3) const secondCallMessages = providerInstance.coreStream.mock.calls[1][0] - const trailingUserMessages = secondCallMessages.filter( + const secondCallUserMessages = secondCallMessages.filter( + (message: any) => message.role === 'user' + ) + const thirdCallMessages = providerInstance.coreStream.mock.calls[2][0] + const thirdCallUserMessages = thirdCallMessages.filter( (message: any) => message.role === 'user' ) - expect(trailingUserMessages[trailingUserMessages.length - 2]).toEqual({ + expect(secondCallUserMessages[secondCallUserMessages.length - 1]).toEqual({ role: 'user', content: 'Steer instruction' }) - expect(trailingUserMessages[trailingUserMessages.length - 1]).toEqual({ + expect(thirdCallUserMessages[thirdCallUserMessages.length - 1]).toEqual({ role: 'user', content: 'Queued target' }) + + const messages = sqlitePresenter.deepchatMessagesTable.getBySession(session.id) + const userMessages = messages.filter((message: any) => message.role === 'user') + expect(userMessages.map((message: any) => JSON.parse(message.content).text)).toEqual([ + 'Turn one', + 'Steer instruction', + 'Queued target' + ]) await expect(agentPresenter.listPendingInputs(session.id)).resolves.toEqual([]) }) - it('rebudgets long steer inputs before streaming the next queued turn', async () => { + it('rebudgets long converted steer inputs as their own visible turn', async () => { let releaseFirstTurn: (() => void) | null = null const firstPrompt = 'P'.repeat(2000) const firstResponse = 'R'.repeat(2000) @@ -1138,27 +1152,33 @@ describe('Integration: multi-turn context', () => { await agentPresenter.convertPendingInputToSteer(session.id, pendingInputs[0].id) releaseFirstTurn?.() - await new Promise((r) => setTimeout(r, 80)) + await vi.waitFor(() => { + expect(providerInstance.coreStream).toHaveBeenCalledTimes(3) + }) - expect(providerInstance.coreStream).toHaveBeenCalledTimes(2) + expect(providerInstance.coreStream).toHaveBeenCalledTimes(3) const secondCallMessages = providerInstance.coreStream.mock.calls[1][0] const secondCallContents = secondCallMessages.map((message: any) => typeof message.content === 'string' ? message.content : JSON.stringify(message.content) ) - const trailingUserMessages = secondCallMessages.filter( + const secondCallUserMessages = secondCallMessages.filter( + (message: any) => message.role === 'user' + ) + const thirdCallMessages = providerInstance.coreStream.mock.calls[2][0] + const thirdCallUserMessages = thirdCallMessages.filter( (message: any) => message.role === 'user' ) expect(secondCallContents).not.toContain(firstPrompt) expect(secondCallContents).not.toContain(firstResponse) expect(estimateMessagesTokens(secondCallMessages) + 128).toBeLessThanOrEqual(2048) - expect(trailingUserMessages[trailingUserMessages.length - 2].content).toEqual( + expect(secondCallUserMessages[secondCallUserMessages.length - 1].content).toEqual( expect.stringContaining('[Attached File 1]') ) - expect(trailingUserMessages[trailingUserMessages.length - 2].content).toEqual( + expect(secondCallUserMessages[secondCallUserMessages.length - 1].content).toEqual( expect.stringContaining('steer.txt') ) - expect(trailingUserMessages[trailingUserMessages.length - 1]).toEqual({ + expect(thirdCallUserMessages[thirdCallUserMessages.length - 1]).toEqual({ role: 'user', content: 'Queued target' }) diff --git a/test/main/presenter/llmProviderPresenter/ollamaProvider.test.ts b/test/main/presenter/llmProviderPresenter/ollamaProvider.test.ts index 1dff63648..3cda83053 100644 --- a/test/main/presenter/llmProviderPresenter/ollamaProvider.test.ts +++ b/test/main/presenter/llmProviderPresenter/ollamaProvider.test.ts @@ -1,4 +1,4 @@ -import { beforeEach, describe, expect, it, vi } from 'vitest' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { ModelType } from '../../../../src/shared/model' import type { IConfigPresenter, @@ -35,6 +35,12 @@ vi.mock('@shared/logger', () => ({ } })) +vi.mock('@electron-toolkit/utils', () => ({ + is: { + dev: false + } +})) + vi.mock('../../../../src/main/presenter/devicePresenter', () => ({ DevicePresenter: { getDefaultHeaders: () => ({}) @@ -82,10 +88,12 @@ const createModel = ( describe('OllamaProvider.fetchModels', () => { let configPresenter: IConfigPresenter let provider: LLM_PROVIDER + const originalAllowInsecureTls = process.env.DEEPCHAT_ALLOW_INSECURE_TLS beforeEach(() => { mockOllamaConstructorOptions.length = 0 mockExecFile.mockReset() + delete process.env.DEEPCHAT_ALLOW_INSECURE_TLS mockExecFile.mockImplementation((_command, _args, _options, callback) => { callback(null, '', '') }) @@ -119,6 +127,14 @@ describe('OllamaProvider.fetchModels', () => { } }) + afterEach(() => { + if (originalAllowInsecureTls === undefined) { + delete process.env.DEEPCHAT_ALLOW_INSECURE_TLS + } else { + process.env.DEEPCHAT_ALLOW_INSECURE_TLS = originalAllowInsecureTls + } + }) + it('normalizes Ollama SDK host and OpenAI-compatible runtime base URL', () => { const ollamaProvider = new OllamaProvider( { @@ -273,6 +289,38 @@ describe('OllamaProvider.fetchModels', () => { }) await expect(ollamaProvider.pullModel('qwen3:8b')).resolves.toBe(true) + expect((ollamaProvider as any).ollama.pull).toHaveBeenCalledWith( + expect.objectContaining({ + model: 'qwen3:8b', + insecure: false, + stream: true + }) + ) + }) + + it('only enables insecure pulls behind the explicit TLS debug flag', async () => { + process.env.DEEPCHAT_ALLOW_INSECURE_TLS = '1' + const ollamaProvider = new OllamaProvider(provider, configPresenter) + ;(ollamaProvider as any).ollama = { + pull: vi.fn(async () => ({ + async *[Symbol.asyncIterator]() { + yield { status: 'success' } + } + })), + list: vi.fn(async () => ({ models: [{ ...createModel('qwen3:8b') }] })), + show: vi.fn(async () => { + throw new Error('show unavailable') + }) + } + + await expect(ollamaProvider.pullModel('qwen3:8b')).resolves.toBe(true) + expect((ollamaProvider as any).ollama.pull).toHaveBeenCalledWith( + expect.objectContaining({ + model: 'qwen3:8b', + insecure: true, + stream: true + }) + ) }) it('treats latest tags from ollama list as a successful untagged pull', async () => { diff --git a/test/main/presenter/remoteControlPresenter/telegramMarkdown.test.ts b/test/main/presenter/remoteControlPresenter/telegramMarkdown.test.ts index 0eccce973..f3a0c95c8 100644 --- a/test/main/presenter/remoteControlPresenter/telegramMarkdown.test.ts +++ b/test/main/presenter/remoteControlPresenter/telegramMarkdown.test.ts @@ -39,6 +39,20 @@ describe('convertMarkdownToTelegramHtml', () => { expect(convertMarkdownToTelegramHtml(input)).toBe('
hello
') }) + it('renders GFM pipe tables as preformatted fixed-width text', () => { + const input = '| Name | Value |\n| --- | ---: |\n| Alpha | 1 |\n| Beta | 22 |' + expect(convertMarkdownToTelegramHtml(input)).toBe( + '
Name  | Value\n------|------\nAlpha | 1\nBeta  | 22
' + ) + }) + + it('does not convert pipe table text inside fenced code blocks', () => { + const input = '```\n| A | B |\n| --- | --- |\n| 1 | 2 |\n```' + expect(convertMarkdownToTelegramHtml(input)).toBe( + '
| A | B |\n| --- | --- |\n| 1 | 2 |
' + ) + }) + it('auto-closes a dangling fenced block at a chunk boundary', () => { const input = '```ts\nconst a = 1' expect(convertMarkdownToTelegramHtml(input)).toBe( diff --git a/test/main/presenter/remoteControlPresenter/telegramPoller.test.ts b/test/main/presenter/remoteControlPresenter/telegramPoller.test.ts index fdebfd0ef..aeed7ef86 100644 --- a/test/main/presenter/remoteControlPresenter/telegramPoller.test.ts +++ b/test/main/presenter/remoteControlPresenter/telegramPoller.test.ts @@ -395,6 +395,94 @@ describe('TelegramPoller', () => { await poller.stop() }) + it('retries formatted chunks as plain text when Telegram rejects entities', async () => { + const client = createClient() + const bindingStore = createBindingStore() + client.sendMessage.mockImplementation(async (_target, _text, _replyMarkup, options) => { + if (options?.parseMode === 'HTML') { + throw new TelegramApiRequestError("Bad Request: can't parse entities", 400) + } + return 100 + }) + client.getUpdates + .mockResolvedValueOnce([ + { + update_id: 1, + message: { + message_id: 20, + chat: { + id: 100, + type: 'private' + }, + from: { + id: 123 + }, + text: 'hello' + } + } + ]) + .mockImplementation(createBlockingUpdates()) + + const poller = new TelegramPoller({ + client: client as any, + parser: { + parseUpdate: vi.fn().mockReturnValue({ + kind: 'message', + updateId: 1, + chatId: 100, + messageThreadId: 0, + messageId: 20, + chatType: 'private', + fromId: 123, + text: 'hello', + command: null + }) + } as any, + router: { + handleMessage: vi.fn().mockResolvedValue({ + replies: [], + conversation: { + sessionId: 'session-1', + eventId: 'msg-1', + getSnapshot: vi.fn().mockResolvedValue({ + messageId: 'msg-1', + text: '**fallback**', + completed: true, + pendingInteraction: null + }) + } + }) + } as any, + bindingStore: bindingStore as any + }) + + await poller.start() + + await vi.waitFor(() => { + expect(client.sendMessage).toHaveBeenNthCalledWith( + 1, + { + chatId: 100, + messageThreadId: 0 + }, + 'fallback', + undefined, + { parseMode: 'HTML' } + ) + expect(client.sendMessage).toHaveBeenNthCalledWith( + 2, + { + chatId: 100, + messageThreadId: 0 + }, + '**fallback**', + undefined + ) + }) + + await poller.stop() + }) + it('streams answer text beside a persistent trace log', async () => { vi.useFakeTimers() @@ -1731,6 +1819,106 @@ describe('TelegramPoller', () => { warnSpy.mockRestore() }) + it('ignores not-modified errors from plain edit fallback', async () => { + const client = createClient() + client.editMessageText + .mockRejectedValueOnce(new TelegramApiRequestError("Bad Request: can't parse entities", 400)) + .mockRejectedValueOnce( + new TelegramApiRequestError( + 'Bad Request: message is not modified: specified new message content and reply markup are exactly the same as a current content and reply markup of the message', + 400 + ) + ) + client.getUpdates + .mockResolvedValueOnce([ + { + update_id: 2, + callback_query: { + id: 'callback-1', + from: { + id: 123 + }, + data: 'model:menu-token:p:0', + message: { + message_id: 30, + chat: { + id: 100, + type: 'private' + } + } + } + } + ]) + .mockImplementation(createBlockingUpdates()) + + const poller = new TelegramPoller({ + client: client as any, + parser: { + parseUpdate: vi.fn().mockReturnValue({ + kind: 'callback_query', + updateId: 2, + chatId: 100, + messageThreadId: 0, + messageId: 30, + chatType: 'private', + fromId: 123, + callbackQueryId: 'callback-1', + data: 'model:menu-token:p:0' + }) + } as any, + router: { + handleMessage: vi.fn().mockResolvedValue({ + replies: [], + outboundActions: [ + { + type: 'editMessageText', + messageId: 30, + text: '**fallback**', + replyMarkup: null + } + ], + callbackAnswer: { + text: 'Choose a model' + } + }) + } as any, + bindingStore: { + getPollOffset: vi.fn().mockReturnValue(0), + setPollOffset: vi.fn(), + getTelegramConfig: vi.fn().mockReturnValue({ + streamMode: 'draft' + }) + } as any + }) + + await poller.start() + + await vi.waitFor(() => { + expect(client.editMessageText).toHaveBeenCalledTimes(2) + }) + expect(client.editMessageText).toHaveBeenNthCalledWith(1, { + target: { + chatId: 100, + messageThreadId: 0 + }, + messageId: 30, + text: 'fallback', + replyMarkup: undefined, + parseMode: 'HTML' + }) + expect(client.editMessageText).toHaveBeenNthCalledWith(2, { + target: { + chatId: 100, + messageThreadId: 0 + }, + messageId: 30, + text: '**fallback**', + replyMarkup: undefined + }) + + await poller.stop() + }) + it('sends pending interaction prompts after completed conversation output', async () => { const client = createClient() const bindingStore = createBindingStore() diff --git a/test/renderer/components/PendingInputLane.test.ts b/test/renderer/components/PendingInputLane.test.ts index 0cf1f3621..6ac4a93ad 100644 --- a/test/renderer/components/PendingInputLane.test.ts +++ b/test/renderer/components/PendingInputLane.test.ts @@ -144,7 +144,8 @@ describe('PendingInputLane', () => { } }), buildPendingInput('queue-2', 'queue'), - buildPendingInput('queue-3', 'queue') + buildPendingInput('queue-3', 'queue'), + buildPendingInput('queue-4', 'queue') ] } }) diff --git a/test/renderer/stores/pendingInputStore.test.ts b/test/renderer/stores/pendingInputStore.test.ts index 2e3be84a0..2177077ae 100644 --- a/test/renderer/stores/pendingInputStore.test.ts +++ b/test/renderer/stores/pendingInputStore.test.ts @@ -10,10 +10,10 @@ function createDeferred() { return { promise, resolve, reject } } -const createPendingItem = (id: string, sessionId: string) => ({ +const createPendingItem = (id: string, sessionId: string, mode: 'queue' | 'steer' = 'queue') => ({ id, sessionId, - mode: 'queue' as const, + mode, state: 'pending' as const, payload: { text: id, @@ -122,4 +122,19 @@ describe('pendingInput store', () => { expect(unsubscribePendingInputsChanged).toHaveBeenCalledTimes(1) }) + + it('exposes steer inputs while counting only queue inputs toward queue capacity', async () => { + const { store, sessionClient } = await setupStore() + sessionClient.listPendingInputs.mockResolvedValueOnce([ + createPendingItem('q1', 's1'), + createPendingItem('steer1', 's1', 'steer') + ]) + + await store.loadPendingInputs('s1') + + expect(store.queueItems).toHaveLength(1) + expect(store.steerItems).toHaveLength(1) + expect(store.activeCount).toBe(1) + expect(store.isAtCapacity).toBe(false) + }) })