diff --git a/src/browser/components/ProjectSidebar/ProjectSidebar.test.tsx b/src/browser/components/ProjectSidebar/ProjectSidebar.test.tsx index e0847ac919..0c2c185273 100644 --- a/src/browser/components/ProjectSidebar/ProjectSidebar.test.tsx +++ b/src/browser/components/ProjectSidebar/ProjectSidebar.test.tsx @@ -425,6 +425,19 @@ function installProjectSidebarTestDoubles() { () => ({ getWorkspaceMetadata: () => undefined, + getWorkspaceSidebarState: () => ({ + canInterrupt: false, + isStarting: false, + awaitingUserQuestion: false, + lastAbortReason: null, + currentModel: null, + recencyTimestamp: null, + loadedSkills: [], + skillLoadErrors: [], + agentStatus: undefined, + terminalActiveCount: 0, + terminalSessionCount: 0, + }), getAggregator: () => undefined, subscribeKey: () => () => undefined, }) as unknown as ReturnType diff --git a/src/browser/components/ProjectSidebar/ProjectSidebar.tsx b/src/browser/components/ProjectSidebar/ProjectSidebar.tsx index 862b9c79fc..e3b753dc4e 100644 --- a/src/browser/components/ProjectSidebar/ProjectSidebar.tsx +++ b/src/browser/components/ProjectSidebar/ProjectSidebar.tsx @@ -155,6 +155,13 @@ function getWorkspaceAttentionSignal( } } +function isWorkspaceWorkingForSidebar( + workspaceStore: WorkspaceStore, + workspaceId: string +): boolean { + return getWorkspaceAttentionSignal(workspaceStore, workspaceId)?.isWorking ?? false; +} + function didWorkspaceAttentionSignalChange( prev: WorkspaceAttentionSignal | undefined, next: WorkspaceAttentionSignal @@ -990,12 +997,7 @@ const ProjectSidebarInner: React.FC = ({ if (result.success && result.data?.kind === "confirm-lossy-untracked-files") { const metadata = workspaceStore.getWorkspaceMetadata(workspaceId); const displayTitle = metadata?.title ?? metadata?.name ?? workspaceId; - const aggregator = workspaceStore.getAggregator(workspaceId); - const hasActiveStreams = (aggregator?.getActiveStreams().length ?? 0) > 0; - const pendingStreamStartTime = aggregator?.getPendingStreamStartTime(); - const isStarting = pendingStreamStartTime != null && !hasActiveStreams; - const awaitingUserQuestion = aggregator?.hasAwaitingUserQuestion() ?? false; - const isStreaming = (hasActiveStreams || isStarting) && !awaitingUserQuestion; + const isStreaming = isWorkspaceWorkingForSidebar(workspaceStore, workspaceId); setArchiveConfirmation({ workspaceId, displayTitle, @@ -1025,15 +1027,7 @@ const ProjectSidebarInner: React.FC = ({ displayTitle, buttonElement, untrackedPaths: preflight.data.paths, - isStreaming: (() => { - const aggregator = workspaceStore.getAggregator(workspaceId); - if (!aggregator) return false; - const hasActiveStreams = aggregator.getActiveStreams().length > 0; - const isStarting = - aggregator.getPendingStreamStartTime() !== null && !hasActiveStreams; - const awaitingUserQuestion = aggregator.hasAwaitingUserQuestion(); - return (hasActiveStreams || isStarting) && !awaitingUserQuestion; - })(), + isStreaming: isWorkspaceWorkingForSidebar(workspaceStore, workspaceId), }); return; } @@ -1060,26 +1054,17 @@ const ProjectSidebarInner: React.FC = ({ ); const hasActiveStream = useCallback( - (workspaceId: string) => { - const aggregator = workspaceStore.getAggregator(workspaceId); - if (!aggregator) return false; - const hasActiveStreams = aggregator.getActiveStreams().length > 0; - const isStarting = aggregator.getPendingStreamStartTime() !== null && !hasActiveStreams; - const awaitingUserQuestion = aggregator.hasAwaitingUserQuestion(); - return (hasActiveStreams || isStarting) && !awaitingUserQuestion; - }, + (workspaceId: string) => isWorkspaceWorkingForSidebar(workspaceStore, workspaceId), [workspaceStore] ); const workspaceHasAttention = useCallback( (workspace: FrontendWorkspaceMetadata) => { const workspaceId = workspace.id; - const aggregator = workspaceStore.getAggregator(workspaceId); - const hasActiveStreams = aggregator ? aggregator.getActiveStreams().length > 0 : false; - const isStarting = aggregator?.getPendingStreamStartTime() != null && !hasActiveStreams; - const awaitingUserQuestion = aggregator?.hasAwaitingUserQuestion() ?? false; - const isWorking = (hasActiveStreams || isStarting) && !awaitingUserQuestion; - const hasError = aggregator?.getLastAbortReason()?.reason === "system"; + const attentionSignal = getWorkspaceAttentionSignal(workspaceStore, workspaceId); + const isWorking = attentionSignal?.isWorking ?? false; + const awaitingUserQuestion = attentionSignal?.awaitingUserQuestion ?? false; + const hasError = attentionSignal?.hasSystemError ?? false; const isRemoving = workspace.isRemoving === true; const isArchiving = archivingWorkspaceIds.has(workspaceId); const isInitializing = workspace.isInitializing === true; diff --git a/src/browser/stores/WorkspaceStore.ts b/src/browser/stores/WorkspaceStore.ts index 6a6d96aa7d..de96dbd7ca 100644 --- a/src/browser/stores/WorkspaceStore.ts +++ b/src/browser/stores/WorkspaceStore.ts @@ -3399,8 +3399,9 @@ export class WorkspaceStore { } /** - * Check if data is a buffered event type by checking the handler map. - * This ensures isStreamEvent() and processStreamEvent() can never fall out of sync. + * Check if data is one of the standard buffered event types backed by WorkspaceStore handlers. + * Replayed stream-error opts into the same buffering path through getBufferedReplayEventBehavior() + * so live errors can still bypass buffering and surface immediately. */ private isBufferedEvent(data: WorkspaceChatMessage): boolean { if (!("type" in data)) { @@ -3416,6 +3417,31 @@ export class WorkspaceStore { ); } + private getBufferedReplayEventBehavior( + data: WorkspaceChatMessage + ): { previewDuringReplay: boolean } | null { + if (isStreamError(data)) { + return data.replay === true ? { previewDuringReplay: true } : null; + } + + if (!this.isBufferedEvent(data)) { + return null; + } + + return { + previewDuringReplay: isStreamLifecycle(data) || isStreamAbort(data) || isRuntimeStatus(data), + }; + } + + private previewBufferedEventDuringReplay( + workspaceId: string, + aggregator: StreamingMessageAggregator, + data: WorkspaceChatMessage + ): void { + applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false }); + this.states.bump(workspaceId); + } + private handleChatMessage(workspaceId: string, data: WorkspaceChatMessage): void { // Aggregator must exist - workspaces are initialized in addWorkspace() before subscriptions run. const aggregator = this.assertGet(workspaceId); @@ -3561,20 +3587,15 @@ export class WorkspaceStore { // // This is especially important for workspaces with long histories (100+ messages), // where unbuffered rendering would cause visible lag and UI stutter. - if (!transient.caughtUp && isStreamError(data) && data.replay === true) { - // Show replayed terminal errors immediately so reconnect UIs preserve the same - // failure classification/copy as the live session, then replay them again after - // history loads so full-replay replacement does not wipe the error back out. - applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false }); - this.states.bump(workspaceId); - transient.pendingStreamEvents.push(data); - return; - } - - if (!transient.caughtUp && this.isBufferedEvent(data)) { - if (isStreamLifecycle(data) || isStreamAbort(data) || isRuntimeStatus(data)) { - applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false }); - this.states.bump(workspaceId); + const bufferedReplayEventBehavior = !transient.caughtUp + ? this.getBufferedReplayEventBehavior(data) + : null; + if (bufferedReplayEventBehavior) { + if (bufferedReplayEventBehavior.previewDuringReplay) { + // Preview replayed startup/terminal state immediately so reconnect UI preserves the + // live session's barrier/error classification until buffered events are replayed again + // after transcript hydration completes. + this.previewBufferedEventDuringReplay(workspaceId, aggregator, data); } transient.pendingStreamEvents.push(data); diff --git a/src/browser/utils/messages/StreamingMessageAggregator.ts b/src/browser/utils/messages/StreamingMessageAggregator.ts index 4ce71c6edf..b8c3c59364 100644 --- a/src/browser/utils/messages/StreamingMessageAggregator.ts +++ b/src/browser/utils/messages/StreamingMessageAggregator.ts @@ -8,7 +8,10 @@ import type { import { createMuxMessage, getCompactionFollowUpContent } from "@/common/types/message"; import { + copyRuntimeStatusEvent, copyStreamLifecycleSnapshot, + hasInFlightStreamLifecycle, + isTerminalRuntimeStatusPhase, type StreamStartEvent, type StreamDeltaEvent, type UsageDeltaEvent, @@ -1209,11 +1212,7 @@ export class StreamingMessageAggregator { } private clearInFlightStreamLifecycle(): void { - if ( - this.streamLifecycle?.phase === "preparing" || - this.streamLifecycle?.phase === "streaming" || - this.streamLifecycle?.phase === "completing" - ) { + if (hasInFlightStreamLifecycle(this.streamLifecycle)) { this.streamLifecycle = null; } } @@ -1224,12 +1223,12 @@ export class StreamingMessageAggregator { */ handleRuntimeStatus(status: RuntimeStatusEvent): void { // Keep stream lifecycle code focused on when runtime status becomes irrelevant. - if (status.phase === "ready" || status.phase === "error") { + if (isTerminalRuntimeStatusPhase(status.phase)) { this.clearRuntimeStatus(); return; } - this.runtimeStatus = status; + this.runtimeStatus = copyRuntimeStatusEvent(status); } private clearRuntimeStatus(): void { diff --git a/src/common/types/stream.test.ts b/src/common/types/stream.test.ts new file mode 100644 index 0000000000..b75ec38c82 --- /dev/null +++ b/src/common/types/stream.test.ts @@ -0,0 +1,75 @@ +import { describe, expect, test } from "bun:test"; + +import { + areRuntimeStatusEventsEqual, + areStreamLifecycleSnapshotsEqual, + copyRuntimeStatusEvent, + hasInFlightStreamLifecycle, + isTerminalRuntimeStatusPhase, +} from "./stream"; + +describe("stream shared helpers", () => { + test("compare stream lifecycle snapshots with nullish abort reasons", () => { + expect( + areStreamLifecycleSnapshotsEqual( + { phase: "failed", hadAnyOutput: false }, + { phase: "failed", hadAnyOutput: false } + ) + ).toBe(true); + + expect( + areStreamLifecycleSnapshotsEqual( + { phase: "failed", hadAnyOutput: false, abortReason: "user" }, + { phase: "failed", hadAnyOutput: false } + ) + ).toBe(false); + }); + + test("copy runtime-status events and compare optional fields nullishly", () => { + const copied = copyRuntimeStatusEvent({ + type: "runtime-status", + workspaceId: "ws-1", + phase: "starting", + runtimeType: "ssh", + detail: "Checking workspace runtime...", + }); + + expect(copied).toEqual({ + type: "runtime-status", + workspaceId: "ws-1", + phase: "starting", + runtimeType: "ssh", + detail: "Checking workspace runtime...", + }); + expect(copied).not.toBe(copyRuntimeStatusEvent(copied)); + expect("source" in copied).toBe(false); + + expect( + areRuntimeStatusEventsEqual(copied, { + phase: "starting", + runtimeType: "ssh", + detail: "Checking workspace runtime...", + }) + ).toBe(true); + expect( + areRuntimeStatusEventsEqual(copied, { + phase: "starting", + runtimeType: "ssh", + detail: "Loading tools...", + }) + ).toBe(false); + }); + + test("share terminal runtime-status and in-flight lifecycle semantics", () => { + expect(isTerminalRuntimeStatusPhase("ready")).toBe(true); + expect(isTerminalRuntimeStatusPhase("error")).toBe(true); + expect(isTerminalRuntimeStatusPhase("starting")).toBe(false); + + expect(hasInFlightStreamLifecycle({ phase: "preparing" })).toBe(true); + expect(hasInFlightStreamLifecycle({ phase: "streaming" })).toBe(true); + expect(hasInFlightStreamLifecycle({ phase: "completing" })).toBe(true); + expect(hasInFlightStreamLifecycle({ phase: "failed" })).toBe(false); + expect(hasInFlightStreamLifecycle({ phase: "interrupted" })).toBe(false); + expect(hasInFlightStreamLifecycle(null)).toBe(false); + }); +}); diff --git a/src/common/types/stream.ts b/src/common/types/stream.ts index 7b94432cee..63d3665519 100644 --- a/src/common/types/stream.ts +++ b/src/common/types/stream.ts @@ -54,6 +54,30 @@ export function copyStreamLifecycleSnapshot( }; } +export function areStreamLifecycleSnapshotsEqual( + left: Pick | null, + right: Pick | null +): boolean { + return ( + left === right || + (left !== null && + right !== null && + left.phase === right.phase && + left.hadAnyOutput === right.hadAnyOutput && + (left.abortReason ?? null) === (right.abortReason ?? null)) + ); +} + +export function isInFlightStreamLifecyclePhase(phase: StreamLifecyclePhase): boolean { + return phase === "preparing" || phase === "streaming" || phase === "completing"; +} + +export function hasInFlightStreamLifecycle( + snapshot: Pick | null | undefined +): boolean { + return snapshot != null && isInFlightStreamLifecyclePhase(snapshot.phase); +} + export interface StreamAbortReasonSnapshot { reason: StreamAbortReason; at: number; @@ -89,3 +113,42 @@ export type AutoRetryAbandonedEvent = z.infer; + +/** + * Shared runtime-status helpers used by both the backend session replay path and the + * renderer aggregator so startup breadcrumbs follow the same lifecycle semantics. + */ +export function copyRuntimeStatusEvent( + status: Pick< + RuntimeStatusEvent, + "type" | "workspaceId" | "phase" | "runtimeType" | "source" | "detail" + > +): RuntimeStatusEvent { + return { + type: status.type, + workspaceId: status.workspaceId, + phase: status.phase, + runtimeType: status.runtimeType, + ...(status.source != null ? { source: status.source } : {}), + ...(status.detail != null ? { detail: status.detail } : {}), + }; +} + +export function areRuntimeStatusEventsEqual( + left: Pick | null, + right: Pick | null +): boolean { + return ( + left === right || + (left !== null && + right !== null && + left.phase === right.phase && + left.runtimeType === right.runtimeType && + (left.source ?? null) === (right.source ?? null) && + (left.detail ?? null) === (right.detail ?? null)) + ); +} + +export function isTerminalRuntimeStatusPhase(phase: RuntimeStatusEvent["phase"]): boolean { + return phase === "ready" || phase === "error"; +} diff --git a/src/node/services/agentSession.ts b/src/node/services/agentSession.ts index 754c5afd7f..c97290c4b9 100644 --- a/src/node/services/agentSession.ts +++ b/src/node/services/agentSession.ts @@ -75,7 +75,11 @@ import { isAgentEffectivelyDisabled } from "@/node/services/agentDefinitions/age import { resolveAgentInheritanceChain } from "@/node/services/agentDefinitions/resolveAgentInheritanceChain"; import { MessageQueue } from "./messageQueue"; import { + areRuntimeStatusEventsEqual, + areStreamLifecycleSnapshotsEqual, + copyRuntimeStatusEvent, copyStreamLifecycleSnapshot, + isTerminalRuntimeStatusPhase, type RuntimeStatusEvent, type StreamAbortReason, type StreamEndEvent, @@ -165,6 +169,12 @@ interface AutoRetryResumeRequest { agentInitiated?: boolean; } +interface ReplayStatusSnapshotState { + replayedTerminalStreamError: boolean; + replayedStreamLifecycle: StreamLifecycleSnapshot | null; + replayedRuntimeStatus: RuntimeStatusEvent | null; +} + interface SwitchAgentResult { agentId: string; reason?: string; @@ -620,38 +630,13 @@ export class AgentSession { return this.terminalStreamLifecycle ?? { phase: "idle", hadAnyOutput: false }; } - private hasSameStreamLifecycle( - left: StreamLifecycleSnapshot | null, - right: StreamLifecycleSnapshot - ): boolean { - return ( - left !== null && - left.phase === right.phase && - left.hadAnyOutput === right.hadAnyOutput && - (left.abortReason ?? null) === (right.abortReason ?? null) - ); - } - - private hasSameRuntimeStatus( - left: RuntimeStatusEvent | null, - right: RuntimeStatusEvent - ): boolean { - return ( - left !== null && - left.phase === right.phase && - left.runtimeType === right.runtimeType && - (left.source ?? null) === (right.source ?? null) && - (left.detail ?? null) === (right.detail ?? null) - ); - } - private emitStreamLifecycleIfChanged(): void { if (this.disposed) { return; } const snapshot = this.getCurrentStreamLifecycleSnapshot(); - if (this.hasSameStreamLifecycle(this.lastEmittedStreamLifecycle, snapshot)) { + if (areStreamLifecycleSnapshotsEqual(this.lastEmittedStreamLifecycle, snapshot)) { return; } @@ -684,18 +669,59 @@ export class AgentSession { } private updatePreparingRuntimeStatus(status: RuntimeStatusEvent): void { - if (status.phase === "ready" || status.phase === "error") { + if (isTerminalRuntimeStatusPhase(status.phase)) { this.clearPreparingRuntimeStatus(); return; } - this.preparingRuntimeStatus = status; + this.preparingRuntimeStatus = copyRuntimeStatusEvent(status); } private clearPreparingRuntimeStatus(): void { this.preparingRuntimeStatus = null; } + private emitReplayStatusSnapshot( + listener: (event: AgentSessionChatEvent) => void, + replayState: ReplayStatusSnapshotState + ): void { + if (!replayState.replayedTerminalStreamError && this.terminalStreamError) { + replayState.replayedTerminalStreamError = true; + listener({ + workspaceId: this.workspaceId, + message: { + ...this.terminalStreamError, + replay: true, + }, + }); + } + + const lifecycle = this.getCurrentStreamLifecycleSnapshot(); + if (!areStreamLifecycleSnapshotsEqual(replayState.replayedStreamLifecycle, lifecycle)) { + replayState.replayedStreamLifecycle = copyStreamLifecycleSnapshot(lifecycle); + listener({ + workspaceId: this.workspaceId, + message: { + type: "stream-lifecycle", + workspaceId: this.workspaceId, + ...lifecycle, + }, + }); + } + + const runtimeStatus = this.preparingRuntimeStatus; + if ( + runtimeStatus && + !areRuntimeStatusEventsEqual(replayState.replayedRuntimeStatus, runtimeStatus) + ) { + replayState.replayedRuntimeStatus = copyRuntimeStatusEvent(runtimeStatus); + listener({ + workspaceId: this.workspaceId, + message: replayState.replayedRuntimeStatus, + }); + } + } + private emitRetryEvent(event: RetryStatusEvent): void { if (this.disposed) { return; @@ -1591,36 +1617,10 @@ export class AgentSession { listener({ workspaceId: this.workspaceId, message }); }; - let replayedTerminalStreamError = false; - let replayedStreamLifecycle: StreamLifecycleSnapshot | null = null; - let replayedRuntimeStatus: RuntimeStatusEvent | null = null; - const emitReplayStatusMessage = (message: WorkspaceChatMessage): void => { - listener({ workspaceId: this.workspaceId, message }); - }; - const emitCurrentReplayTerminalState = (): void => { - if (!replayedTerminalStreamError && this.terminalStreamError) { - replayedTerminalStreamError = true; - emitReplayStatusMessage({ - ...this.terminalStreamError, - replay: true, - }); - } - - const lifecycle = this.getCurrentStreamLifecycleSnapshot(); - if (!this.hasSameStreamLifecycle(replayedStreamLifecycle, lifecycle)) { - replayedStreamLifecycle = copyStreamLifecycleSnapshot(lifecycle); - emitReplayStatusMessage({ - type: "stream-lifecycle", - workspaceId: this.workspaceId, - ...lifecycle, - }); - } - - const runtimeStatus = this.preparingRuntimeStatus; - if (runtimeStatus && !this.hasSameRuntimeStatus(replayedRuntimeStatus, runtimeStatus)) { - replayedRuntimeStatus = { ...runtimeStatus }; - emitReplayStatusMessage(runtimeStatus); - } + const replayStatusState: ReplayStatusSnapshotState = { + replayedTerminalStreamError: false, + replayedStreamLifecycle: null, + replayedRuntimeStatus: null, }; let emittedReplayStreamEvents = false; @@ -1650,7 +1650,7 @@ export class AgentSession { if (shouldReplayTerminalState) { // Rehydrate the current terminal/preparing state immediately so reconnect clients do not // regress to transcript heuristics while the rest of replay is still streaming in. - emitCurrentReplayTerminalState(); + this.emitReplayStatusSnapshot(listener, replayStatusState); } if (mode?.type === "live") { @@ -1892,7 +1892,7 @@ export class AgentSession { if (shouldReplayTerminalState) { // Replay the latest terminal/preparing state one last time before caught-up in case the // stream changed while history was replaying (for example PREPARING -> failed/idle). - emitCurrentReplayTerminalState(); + this.emitReplayStatusSnapshot(listener, replayStatusState); } // Replay queued-message snapshot before caught-up so reconnect clients can