Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/browser/components/ProjectSidebar/ProjectSidebar.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,19 @@ function installProjectSidebarTestDoubles() {
() =>
({
getWorkspaceMetadata: () => undefined,
getWorkspaceSidebarState: () => ({
canInterrupt: false,
isStarting: false,
awaitingUserQuestion: false,
lastAbortReason: null,
currentModel: null,
recencyTimestamp: null,
loadedSkills: [],
skillLoadErrors: [],
agentStatus: undefined,
terminalActiveCount: 0,
terminalSessionCount: 0,
}),
getAggregator: () => undefined,
subscribeKey: () => () => undefined,
}) as unknown as ReturnType<typeof WorkspaceStoreModule.useWorkspaceStoreRaw>
Expand Down
43 changes: 14 additions & 29 deletions src/browser/components/ProjectSidebar/ProjectSidebar.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ function getWorkspaceAttentionSignal(
}
}

function isWorkspaceWorkingForSidebar(
workspaceStore: WorkspaceStore,
workspaceId: string
): boolean {
return getWorkspaceAttentionSignal(workspaceStore, workspaceId)?.isWorking ?? false;
}

function didWorkspaceAttentionSignalChange(
prev: WorkspaceAttentionSignal | undefined,
next: WorkspaceAttentionSignal
Expand Down Expand Up @@ -990,12 +997,7 @@ const ProjectSidebarInner: React.FC<ProjectSidebarProps> = ({
if (result.success && result.data?.kind === "confirm-lossy-untracked-files") {
const metadata = workspaceStore.getWorkspaceMetadata(workspaceId);
const displayTitle = metadata?.title ?? metadata?.name ?? workspaceId;
const aggregator = workspaceStore.getAggregator(workspaceId);
const hasActiveStreams = (aggregator?.getActiveStreams().length ?? 0) > 0;
const pendingStreamStartTime = aggregator?.getPendingStreamStartTime();
const isStarting = pendingStreamStartTime != null && !hasActiveStreams;
const awaitingUserQuestion = aggregator?.hasAwaitingUserQuestion() ?? false;
const isStreaming = (hasActiveStreams || isStarting) && !awaitingUserQuestion;
const isStreaming = isWorkspaceWorkingForSidebar(workspaceStore, workspaceId);
setArchiveConfirmation({
workspaceId,
displayTitle,
Expand Down Expand Up @@ -1025,15 +1027,7 @@ const ProjectSidebarInner: React.FC<ProjectSidebarProps> = ({
displayTitle,
buttonElement,
untrackedPaths: preflight.data.paths,
isStreaming: (() => {
const aggregator = workspaceStore.getAggregator(workspaceId);
if (!aggregator) return false;
const hasActiveStreams = aggregator.getActiveStreams().length > 0;
const isStarting =
aggregator.getPendingStreamStartTime() !== null && !hasActiveStreams;
const awaitingUserQuestion = aggregator.hasAwaitingUserQuestion();
return (hasActiveStreams || isStarting) && !awaitingUserQuestion;
})(),
isStreaming: isWorkspaceWorkingForSidebar(workspaceStore, workspaceId),
});
return;
}
Expand All @@ -1060,26 +1054,17 @@ const ProjectSidebarInner: React.FC<ProjectSidebarProps> = ({
);

const hasActiveStream = useCallback(
(workspaceId: string) => {
const aggregator = workspaceStore.getAggregator(workspaceId);
if (!aggregator) return false;
const hasActiveStreams = aggregator.getActiveStreams().length > 0;
const isStarting = aggregator.getPendingStreamStartTime() !== null && !hasActiveStreams;
const awaitingUserQuestion = aggregator.hasAwaitingUserQuestion();
return (hasActiveStreams || isStarting) && !awaitingUserQuestion;
},
(workspaceId: string) => isWorkspaceWorkingForSidebar(workspaceStore, workspaceId),
[workspaceStore]
);

const workspaceHasAttention = useCallback(
(workspace: FrontendWorkspaceMetadata) => {
const workspaceId = workspace.id;
const aggregator = workspaceStore.getAggregator(workspaceId);
const hasActiveStreams = aggregator ? aggregator.getActiveStreams().length > 0 : false;
const isStarting = aggregator?.getPendingStreamStartTime() != null && !hasActiveStreams;
const awaitingUserQuestion = aggregator?.hasAwaitingUserQuestion() ?? false;
const isWorking = (hasActiveStreams || isStarting) && !awaitingUserQuestion;
const hasError = aggregator?.getLastAbortReason()?.reason === "system";
const attentionSignal = getWorkspaceAttentionSignal(workspaceStore, workspaceId);
const isWorking = attentionSignal?.isWorking ?? false;
const awaitingUserQuestion = attentionSignal?.awaitingUserQuestion ?? false;
const hasError = attentionSignal?.hasSystemError ?? false;
const isRemoving = workspace.isRemoving === true;
const isArchiving = archivingWorkspaceIds.has(workspaceId);
const isInitializing = workspace.isInitializing === true;
Expand Down
53 changes: 37 additions & 16 deletions src/browser/stores/WorkspaceStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3399,8 +3399,9 @@ export class WorkspaceStore {
}

/**
* Check if data is a buffered event type by checking the handler map.
* This ensures isStreamEvent() and processStreamEvent() can never fall out of sync.
* Check if data is one of the standard buffered event types backed by WorkspaceStore handlers.
* Replayed stream-error opts into the same buffering path through getBufferedReplayEventBehavior()
* so live errors can still bypass buffering and surface immediately.
*/
private isBufferedEvent(data: WorkspaceChatMessage): boolean {
if (!("type" in data)) {
Expand All @@ -3416,6 +3417,31 @@ export class WorkspaceStore {
);
}

private getBufferedReplayEventBehavior(
data: WorkspaceChatMessage
): { previewDuringReplay: boolean } | null {
if (isStreamError(data)) {
return data.replay === true ? { previewDuringReplay: true } : null;
}

if (!this.isBufferedEvent(data)) {
return null;
}

return {
previewDuringReplay: isStreamLifecycle(data) || isStreamAbort(data) || isRuntimeStatus(data),
};
}

private previewBufferedEventDuringReplay(
workspaceId: string,
aggregator: StreamingMessageAggregator,
data: WorkspaceChatMessage
): void {
applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false });
this.states.bump(workspaceId);
}

private handleChatMessage(workspaceId: string, data: WorkspaceChatMessage): void {
// Aggregator must exist - workspaces are initialized in addWorkspace() before subscriptions run.
const aggregator = this.assertGet(workspaceId);
Expand Down Expand Up @@ -3561,20 +3587,15 @@ export class WorkspaceStore {
//
// This is especially important for workspaces with long histories (100+ messages),
// where unbuffered rendering would cause visible lag and UI stutter.
if (!transient.caughtUp && isStreamError(data) && data.replay === true) {
// Show replayed terminal errors immediately so reconnect UIs preserve the same
// failure classification/copy as the live session, then replay them again after
// history loads so full-replay replacement does not wipe the error back out.
applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false });
this.states.bump(workspaceId);
transient.pendingStreamEvents.push(data);
return;
}

if (!transient.caughtUp && this.isBufferedEvent(data)) {
if (isStreamLifecycle(data) || isStreamAbort(data) || isRuntimeStatus(data)) {
applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false });
this.states.bump(workspaceId);
const bufferedReplayEventBehavior = !transient.caughtUp
? this.getBufferedReplayEventBehavior(data)
: null;
if (bufferedReplayEventBehavior) {
if (bufferedReplayEventBehavior.previewDuringReplay) {
// Preview replayed startup/terminal state immediately so reconnect UI preserves the
// live session's barrier/error classification until buffered events are replayed again
// after transcript hydration completes.
this.previewBufferedEventDuringReplay(workspaceId, aggregator, data);
}

transient.pendingStreamEvents.push(data);
Expand Down
13 changes: 6 additions & 7 deletions src/browser/utils/messages/StreamingMessageAggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import type {
import { createMuxMessage, getCompactionFollowUpContent } from "@/common/types/message";

import {
copyRuntimeStatusEvent,
copyStreamLifecycleSnapshot,
hasInFlightStreamLifecycle,
isTerminalRuntimeStatusPhase,
type StreamStartEvent,
type StreamDeltaEvent,
type UsageDeltaEvent,
Expand Down Expand Up @@ -1209,11 +1212,7 @@ export class StreamingMessageAggregator {
}

private clearInFlightStreamLifecycle(): void {
if (
this.streamLifecycle?.phase === "preparing" ||
this.streamLifecycle?.phase === "streaming" ||
this.streamLifecycle?.phase === "completing"
) {
if (hasInFlightStreamLifecycle(this.streamLifecycle)) {
this.streamLifecycle = null;
}
}
Expand All @@ -1224,12 +1223,12 @@ export class StreamingMessageAggregator {
*/
handleRuntimeStatus(status: RuntimeStatusEvent): void {
// Keep stream lifecycle code focused on when runtime status becomes irrelevant.
if (status.phase === "ready" || status.phase === "error") {
if (isTerminalRuntimeStatusPhase(status.phase)) {
this.clearRuntimeStatus();
return;
}

this.runtimeStatus = status;
this.runtimeStatus = copyRuntimeStatusEvent(status);
}

private clearRuntimeStatus(): void {
Expand Down
75 changes: 75 additions & 0 deletions src/common/types/stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { describe, expect, test } from "bun:test";

import {
areRuntimeStatusEventsEqual,
areStreamLifecycleSnapshotsEqual,
copyRuntimeStatusEvent,
hasInFlightStreamLifecycle,
isTerminalRuntimeStatusPhase,
} from "./stream";

describe("stream shared helpers", () => {
test("compare stream lifecycle snapshots with nullish abort reasons", () => {
expect(
areStreamLifecycleSnapshotsEqual(
{ phase: "failed", hadAnyOutput: false },
{ phase: "failed", hadAnyOutput: false }
)
).toBe(true);

expect(
areStreamLifecycleSnapshotsEqual(
{ phase: "failed", hadAnyOutput: false, abortReason: "user" },
{ phase: "failed", hadAnyOutput: false }
)
).toBe(false);
});

test("copy runtime-status events and compare optional fields nullishly", () => {
const copied = copyRuntimeStatusEvent({
type: "runtime-status",
workspaceId: "ws-1",
phase: "starting",
runtimeType: "ssh",
detail: "Checking workspace runtime...",
});

expect(copied).toEqual({
type: "runtime-status",
workspaceId: "ws-1",
phase: "starting",
runtimeType: "ssh",
detail: "Checking workspace runtime...",
});
expect(copied).not.toBe(copyRuntimeStatusEvent(copied));
expect("source" in copied).toBe(false);

expect(
areRuntimeStatusEventsEqual(copied, {
phase: "starting",
runtimeType: "ssh",
detail: "Checking workspace runtime...",
})
).toBe(true);
expect(
areRuntimeStatusEventsEqual(copied, {
phase: "starting",
runtimeType: "ssh",
detail: "Loading tools...",
})
).toBe(false);
});

test("share terminal runtime-status and in-flight lifecycle semantics", () => {
expect(isTerminalRuntimeStatusPhase("ready")).toBe(true);
expect(isTerminalRuntimeStatusPhase("error")).toBe(true);
expect(isTerminalRuntimeStatusPhase("starting")).toBe(false);

expect(hasInFlightStreamLifecycle({ phase: "preparing" })).toBe(true);
expect(hasInFlightStreamLifecycle({ phase: "streaming" })).toBe(true);
expect(hasInFlightStreamLifecycle({ phase: "completing" })).toBe(true);
expect(hasInFlightStreamLifecycle({ phase: "failed" })).toBe(false);
expect(hasInFlightStreamLifecycle({ phase: "interrupted" })).toBe(false);
expect(hasInFlightStreamLifecycle(null)).toBe(false);
});
});
63 changes: 63 additions & 0 deletions src/common/types/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@ export function copyStreamLifecycleSnapshot(
};
}

export function areStreamLifecycleSnapshotsEqual(
left: Pick<StreamLifecycleSnapshot, "phase" | "hadAnyOutput" | "abortReason"> | null,
right: Pick<StreamLifecycleSnapshot, "phase" | "hadAnyOutput" | "abortReason"> | null
): boolean {
return (
left === right ||
(left !== null &&
right !== null &&
left.phase === right.phase &&
left.hadAnyOutput === right.hadAnyOutput &&
(left.abortReason ?? null) === (right.abortReason ?? null))
);
}

export function isInFlightStreamLifecyclePhase(phase: StreamLifecyclePhase): boolean {
return phase === "preparing" || phase === "streaming" || phase === "completing";
}

export function hasInFlightStreamLifecycle(
snapshot: Pick<StreamLifecycleSnapshot, "phase"> | null | undefined
): boolean {
return snapshot != null && isInFlightStreamLifecyclePhase(snapshot.phase);
}

export interface StreamAbortReasonSnapshot {
reason: StreamAbortReason;
at: number;
Expand Down Expand Up @@ -89,3 +113,42 @@ export type AutoRetryAbandonedEvent = z.infer<typeof AutoRetryAbandonedEventSche
* Used for both runtime readiness and generic startup breadcrumbs in the barrier UI.
*/
export type RuntimeStatusEvent = z.infer<typeof RuntimeStatusEventSchema>;

/**
* Shared runtime-status helpers used by both the backend session replay path and the
* renderer aggregator so startup breadcrumbs follow the same lifecycle semantics.
*/
export function copyRuntimeStatusEvent(
status: Pick<
RuntimeStatusEvent,
"type" | "workspaceId" | "phase" | "runtimeType" | "source" | "detail"
>
): RuntimeStatusEvent {
return {
type: status.type,
workspaceId: status.workspaceId,
phase: status.phase,
runtimeType: status.runtimeType,
...(status.source != null ? { source: status.source } : {}),
...(status.detail != null ? { detail: status.detail } : {}),
};
}

export function areRuntimeStatusEventsEqual(
left: Pick<RuntimeStatusEvent, "phase" | "runtimeType" | "source" | "detail"> | null,
right: Pick<RuntimeStatusEvent, "phase" | "runtimeType" | "source" | "detail"> | null
): boolean {
return (
left === right ||
(left !== null &&
right !== null &&
left.phase === right.phase &&
left.runtimeType === right.runtimeType &&
(left.source ?? null) === (right.source ?? null) &&
(left.detail ?? null) === (right.detail ?? null))
);
}

export function isTerminalRuntimeStatusPhase(phase: RuntimeStatusEvent["phase"]): boolean {
return phase === "ready" || phase === "error";
}
Loading
Loading