diff --git a/apps/mobile/src/app/task/[id].tsx b/apps/mobile/src/app/task/[id].tsx index 6657bdffc..485cc79ba 100644 --- a/apps/mobile/src/app/task/[id].tsx +++ b/apps/mobile/src/app/task/[id].tsx @@ -30,6 +30,7 @@ import { modelSupportsReasoning, type ReasoningEffort, } from "@/features/tasks/composer/options"; +import { QueuedMessagesDock } from "@/features/tasks/composer/QueuedMessagesDock"; import { TaskChatComposer } from "@/features/tasks/composer/TaskChatComposer"; import { useMessagingMode, @@ -37,7 +38,10 @@ import { useToggleMessagingMode, } from "@/features/tasks/hooks/useMessagingMode"; import { taskKeys } from "@/features/tasks/hooks/useTasks"; -import { useMessageQueueStore } from "@/features/tasks/stores/messageQueueStore"; +import { + type QueuedMessage, + useMessageQueueStore, +} from "@/features/tasks/stores/messageQueueStore"; import { pendingTaskPromptStoreApi, usePendingTaskPrompt, @@ -96,6 +100,7 @@ export default function TaskDetailScreen() { setConfigOption, getSessionForTask, setFocusedTaskId, + steerQueuedMessage, } = useTaskSessionStore(); useEffect(() => { @@ -386,6 +391,47 @@ export default function TaskDetailScreen() { ], ); + const [restoredDraft, setRestoredDraft] = useState<{ + text: string; + attachments: PendingAttachment[]; + }>(); + + const handleSteerQueued = useCallback( + (message: QueuedMessage) => { + if (!taskId) return; + steerQueuedMessage(taskId, message.id) + .then(() => trackPromptSent(message.content, true)) + .catch((err) => { + log.error("Failed to steer queued message", err); + Alert.alert( + "Couldn't steer", + "This message is still queued. Please try again.", + ); + }); + }, + [taskId, steerQueuedMessage, trackPromptSent], + ); + + const handleReturnQueuedToComposer = useCallback( + (message: QueuedMessage) => { + if (!taskId) return; + useMessageQueueStore.getState().remove(taskId, message.id); + setRestoredDraft({ + text: message.content, + attachments: message.attachments, + }); + }, + [taskId], + ); + + const handleDiscardQueued = useCallback( + (message: QueuedMessage) => { + if (!taskId) return; + useMessageQueueStore.getState().remove(taskId, message.id); + }, + [taskId], + ); + const handleModeChange = useCallback( (value: ExecutionMode) => { if (!taskId) return; @@ -619,8 +665,22 @@ export default function TaskDetailScreen() { last message can never sit behind the input. Stays visible on terminal runs so the user can send a follow-up that resumes. */} + {taskId ? ( + + ) : null} void; + onReturnToComposer: (message: QueuedMessage) => void; + onDiscard: (message: QueuedMessage) => void; +} + +function previewText(message: QueuedMessage): string { + if (message.content.trim().length > 0) return message.content; + const count = message.attachments.length; + return count === 1 ? "1 attachment" : `${count} attachments`; +} + +export function QueuedMessagesDock({ + taskId, + canSteer, + onSteer, + onReturnToComposer, + onDiscard, +}: QueuedMessagesDockProps) { + const themeColors = useThemeColors(); + const queued = useMessageQueueStore((s) => s.queuesByTaskId[taskId]); + const [activeId, setActiveId] = useState(null); + + if (!queued || queued.length === 0) return null; + const active = queued.find((m) => m.id === activeId) ?? null; + + return ( + <> + + {queued.map((message) => ( + setActiveId(message.id)} + accessibilityRole="button" + accessibilityLabel="Queued message actions" + className="flex-row items-center gap-2 rounded-xl border border-gray-6 bg-card px-3 py-2 active:opacity-70" + > + + + {previewText(message)} + + {message.attachments.length > 0 ? ( + + ) : null} + Queued + + ))} + + + setActiveId(null)}> + {active ? ( + <> + + + {previewText(active)} + + + {canSteer ? ( + + } + label="Steer now" + description="Interrupt the current turn and send this now" + onPress={() => { + onSteer(active); + setActiveId(null); + }} + /> + ) : null} + } + label="Edit in composer" + description="Pull it back into the composer to revise" + onPress={() => { + onReturnToComposer(active); + setActiveId(null); + }} + /> + } + label="Discard" + destructive + onPress={() => { + onDiscard(active); + setActiveId(null); + }} + /> + + ) : null} + + + ); +} + +function ActionRow({ + icon, + label, + description, + destructive = false, + onPress, +}: { + icon: ReactNode; + label: string; + description?: string; + destructive?: boolean; + onPress: () => void; +}) { + return ( + + + {icon} + + + + {label} + + {description ? ( + {description} + ) : null} + + + ); +} diff --git a/apps/mobile/src/features/tasks/composer/TaskChatComposer.tsx b/apps/mobile/src/features/tasks/composer/TaskChatComposer.tsx index 216dce2fd..a8fafb7fc 100644 --- a/apps/mobile/src/features/tasks/composer/TaskChatComposer.tsx +++ b/apps/mobile/src/features/tasks/composer/TaskChatComposer.tsx @@ -79,6 +79,8 @@ interface TaskChatComposerProps { messagingMode: MessagingMode; queuedCount: number; onToggleMessagingMode: () => void; + /** A queued message pulled back for editing; pass a fresh object to restore. */ + restoredDraft?: { text: string; attachments: PendingAttachment[] }; } function modeIcon(mode: ExecutionMode, color: string, size = 14): ReactNode { @@ -164,6 +166,7 @@ export function TaskChatComposer({ messagingMode, queuedCount, onToggleMessagingMode, + restoredDraft, }: TaskChatComposerProps) { const themeColors = useThemeColors(); const [message, setMessage] = useState(() => initialMessage ?? ""); @@ -175,6 +178,12 @@ export function TaskChatComposer({ setMessage(initialMessage); }, [initialMessage]); + useEffect(() => { + if (!restoredDraft) return; + setMessage(restoredDraft.text); + setAttachments(restoredDraft.attachments); + }, [restoredDraft]); + const appendTranscript = useCallback((transcript: string) => { setMessage((prev) => (prev ? `${prev} ${transcript}` : transcript)); }, []); diff --git a/apps/mobile/src/features/tasks/stores/messageQueueStore.test.ts b/apps/mobile/src/features/tasks/stores/messageQueueStore.test.ts index 068a14900..a0196b2c7 100644 --- a/apps/mobile/src/features/tasks/stores/messageQueueStore.test.ts +++ b/apps/mobile/src/features/tasks/stores/messageQueueStore.test.ts @@ -65,6 +65,33 @@ describe("messageQueueStore", () => { expect(getQueue("t1").map((m) => m.content)).toEqual(["a", "b", "c"]); }); + + it.each([ + { + name: "removes exactly the targeted message", + contents: ["a", "b", "c"], + removeIndex: 1, + expected: ["a", "c"], + }, + { + name: "clears the entry once the last message is removed", + contents: ["only"], + removeIndex: 0, + expected: [], + }, + ])("$name", ({ contents, removeIndex, expected }) => { + const { enqueue, remove, getQueue } = useMessageQueueStore.getState(); + for (const content of contents) enqueue("t1", content, []); + remove("t1", getQueue("t1")[removeIndex].id); + expect(getQueue("t1").map((m) => m.content)).toEqual(expected); + }); + + it("ignores removal of an unknown id", () => { + const { enqueue, remove, getQueue } = useMessageQueueStore.getState(); + enqueue("t1", "a", []); + remove("t1", "nope"); + expect(getQueue("t1").map((m) => m.content)).toEqual(["a"]); + }); }); describe("combineQueuedMessages", () => { @@ -72,7 +99,7 @@ describe("combineQueuedMessages", () => { content: string, attachments: PendingAttachment[], ): QueuedMessage { - return { content, attachments }; + return { id: content, content, attachments }; } it("joins text in order with a blank line and concatenates attachments", () => { diff --git a/apps/mobile/src/features/tasks/stores/messageQueueStore.ts b/apps/mobile/src/features/tasks/stores/messageQueueStore.ts index d6b066e46..b4d43b4a8 100644 --- a/apps/mobile/src/features/tasks/stores/messageQueueStore.ts +++ b/apps/mobile/src/features/tasks/stores/messageQueueStore.ts @@ -2,12 +2,19 @@ import { create } from "zustand"; import type { PendingAttachment } from "../composer/attachments/types"; export interface QueuedMessage { + id: string; content: string; attachments: PendingAttachment[]; } const EMPTY: QueuedMessage[] = []; +let queueIdCounter = 0; +function nextQueueId(): string { + queueIdCounter += 1; + return `queue-${queueIdCounter}`; +} + interface MessageQueueState { queuesByTaskId: Record; enqueue: ( @@ -19,6 +26,8 @@ interface MessageQueueState { drain: (taskId: string) => QueuedMessage[]; /** Restore messages at the head of the queue, e.g. after a failed flush. */ prepend: (taskId: string, messages: QueuedMessage[]) => void; + /** Drop a single queued message by id. */ + remove: (taskId: string, messageId: string) => void; getQueue: (taskId: string) => QueuedMessage[]; } @@ -30,7 +39,7 @@ export const useMessageQueueStore = create((set, get) => ({ ...state.queuesByTaskId, [taskId]: [ ...(state.queuesByTaskId[taskId] ?? []), - { content, attachments }, + { id: nextQueueId(), content, attachments }, ], }, })), @@ -50,6 +59,20 @@ export const useMessageQueueStore = create((set, get) => ({ [taskId]: [...messages, ...(state.queuesByTaskId[taskId] ?? [])], }, })), + remove: (taskId, messageId) => + set((state) => { + const queue = state.queuesByTaskId[taskId]; + if (!queue) return state; + const next = queue.filter((m) => m.id !== messageId); + if (next.length === queue.length) return state; + if (next.length === 0) { + const { [taskId]: _emptied, ...rest } = state.queuesByTaskId; + return { queuesByTaskId: rest }; + } + return { + queuesByTaskId: { ...state.queuesByTaskId, [taskId]: next }, + }; + }), getQueue: (taskId) => get().queuesByTaskId[taskId] ?? EMPTY, })); diff --git a/apps/mobile/src/features/tasks/stores/taskSessionStore.test.ts b/apps/mobile/src/features/tasks/stores/taskSessionStore.test.ts new file mode 100644 index 000000000..d6b1c8aaf --- /dev/null +++ b/apps/mobile/src/features/tasks/stores/taskSessionStore.test.ts @@ -0,0 +1,179 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +vi.mock("expo-haptics", () => ({ + impactAsync: vi.fn(), + notificationAsync: vi.fn(), + ImpactFeedbackStyle: { Light: "light", Medium: "medium" }, + NotificationFeedbackType: { Success: "success" }, +})); +vi.mock("../lib/cloudTaskStream", () => ({ watchCloudTask: vi.fn() })); +vi.mock("../composer/attachments/buildCloudPrompt", () => ({ + buildCloudPromptBlocks: vi.fn(() => Promise.resolve([])), +})); +vi.mock("../utils/sounds", () => ({ + playMeepSound: vi.fn(() => Promise.resolve()), +})); +vi.mock("@/features/notifications/lib/notifications", () => ({ + presentLocalNotification: vi.fn(() => Promise.resolve()), +})); +vi.mock("../api", () => ({ + CloudCommandError: class CloudCommandError extends Error {}, + getTask: vi.fn(), + runTaskInCloud: vi.fn(), + sendCloudCommand: vi.fn(), +})); + +import type { CloudTaskUpdatePayload, StoredLogEntry } from "../types"; +import { useMessageQueueStore } from "./messageQueueStore"; +import { type TaskSession, useTaskSessionStore } from "./taskSessionStore"; + +function seedSession(overrides: Partial = {}): void { + const session: TaskSession = { + taskRunId: "run-1", + taskId: "t1", + events: [], + status: "connected", + isPromptPending: true, + ...overrides, + }; + useTaskSessionStore.setState({ sessions: { "run-1": session } }); +} + +describe("steerQueuedMessage", () => { + beforeEach(() => { + useMessageQueueStore.setState({ queuesByTaskId: {} }, false); + useTaskSessionStore.setState({ sessions: {} }); + }); + + it("removes the message and resends it as a steer", async () => { + seedSession(); + const sendInterrupting = vi.fn(() => Promise.resolve()); + useTaskSessionStore.setState({ sendInterrupting }); + + useMessageQueueStore.getState().enqueue("t1", "first", []); + useMessageQueueStore.getState().enqueue("t1", "second", []); + const target = useMessageQueueStore.getState().getQueue("t1")[0]; + + await useTaskSessionStore.getState().steerQueuedMessage("t1", target.id); + + expect(sendInterrupting).toHaveBeenCalledWith("t1", "first", []); + expect( + useMessageQueueStore + .getState() + .getQueue("t1") + .map((m) => m.content), + ).toEqual(["second"]); + }); + + it("rolls the message back onto the head when the resend fails", async () => { + seedSession(); + const sendInterrupting = vi.fn(() => Promise.reject(new Error("boom"))); + useTaskSessionStore.setState({ sendInterrupting }); + + useMessageQueueStore.getState().enqueue("t1", "first", []); + useMessageQueueStore.getState().enqueue("t1", "second", []); + const target = useMessageQueueStore.getState().getQueue("t1")[0]; + + await expect( + useTaskSessionStore.getState().steerQueuedMessage("t1", target.id), + ).rejects.toThrow("boom"); + + expect( + useMessageQueueStore + .getState() + .getQueue("t1") + .map((m) => m.content), + ).toEqual(["first", "second"]); + }); + + it("no-ops while the session is compacting", async () => { + seedSession({ isCompacting: true }); + const sendInterrupting = vi.fn(() => Promise.resolve()); + useTaskSessionStore.setState({ sendInterrupting }); + + useMessageQueueStore.getState().enqueue("t1", "first", []); + const target = useMessageQueueStore.getState().getQueue("t1")[0]; + + await useTaskSessionStore.getState().steerQueuedMessage("t1", target.id); + + expect(sendInterrupting).not.toHaveBeenCalled(); + expect( + useMessageQueueStore + .getState() + .getQueue("t1") + .map((m) => m.content), + ).toEqual(["first"]); + }); + + it("no-ops for an unknown message id", async () => { + seedSession(); + const sendInterrupting = vi.fn(() => Promise.resolve()); + useTaskSessionStore.setState({ sendInterrupting }); + + useMessageQueueStore.getState().enqueue("t1", "first", []); + + await useTaskSessionStore.getState().steerQueuedMessage("t1", "missing"); + + expect(sendInterrupting).not.toHaveBeenCalled(); + expect(useMessageQueueStore.getState().getQueue("t1")).toHaveLength(1); + }); + + it("no-ops when no turn is running", async () => { + seedSession({ isPromptPending: false }); + const sendInterrupting = vi.fn(() => Promise.resolve()); + useTaskSessionStore.setState({ sendInterrupting }); + + useMessageQueueStore.getState().enqueue("t1", "first", []); + const target = useMessageQueueStore.getState().getQueue("t1")[0]; + + await useTaskSessionStore.getState().steerQueuedMessage("t1", target.id); + + expect(sendInterrupting).not.toHaveBeenCalled(); + expect(useMessageQueueStore.getState().getQueue("t1")).toHaveLength(1); + }); +}); + +describe("compaction tracking from the log stream", () => { + beforeEach(() => { + useTaskSessionStore.setState({ sessions: {} }); + }); + + function statusEntry(isComplete: boolean): StoredLogEntry { + return { + type: "notification", + notification: { + method: "_posthog/status", + params: { status: "compacting", isComplete }, + }, + }; + } + + function logsUpdate(entries: StoredLogEntry[]): CloudTaskUpdatePayload { + return { + kind: "logs", + taskId: "t1", + runId: "run-1", + newEntries: entries, + totalEntryCount: entries.length, + }; + } + + it("sets isCompacting on a compacting status and clears it on the boundary", () => { + seedSession({ isCompacting: false }); + const store = useTaskSessionStore.getState(); + + store._handleCloudUpdate("run-1", logsUpdate([statusEntry(false)])); + expect(store.getSessionForTask("t1")?.isCompacting).toBe(true); + + store._handleCloudUpdate( + "run-1", + logsUpdate([ + { + type: "notification", + notification: { method: "_posthog/compact_boundary" }, + }, + ]), + ); + expect(store.getSessionForTask("t1")?.isCompacting).toBe(false); + }); +}); diff --git a/apps/mobile/src/features/tasks/stores/taskSessionStore.ts b/apps/mobile/src/features/tasks/stores/taskSessionStore.ts index b82bc33a0..d66fe7ffe 100644 --- a/apps/mobile/src/features/tasks/stores/taskSessionStore.ts +++ b/apps/mobile/src/features/tasks/stores/taskSessionStore.ts @@ -160,6 +160,8 @@ interface BatchAnalysis { hasVisibleAgentOutput: boolean; externalUserMessageCount: number; agentMessageFinalized: boolean; + // Latest compaction state seen in the batch (undefined = no change). + compacting?: boolean; } function analyzeEntries( @@ -173,6 +175,7 @@ function analyzeEntries( let hasVisibleAgentOutput = false; let externalUserMessageCount = 0; let agentMessageFinalized = false; + let compacting: boolean | undefined; for (const entry of entries) { const method = entry.notification?.method; @@ -192,6 +195,18 @@ function analyzeEntries( } } + if (method === "_posthog/status") { + const params = entry.notification?.params as + | { status?: string; isComplete?: boolean } + | undefined; + if (params?.status === "compacting") { + compacting = !params.isComplete; + } + } + if (method === "_posthog/compact_boundary") { + compacting = false; + } + if ( entry.type === "notification" && method === "session/update" && @@ -222,6 +237,7 @@ function analyzeEntries( hasVisibleAgentOutput, externalUserMessageCount, agentMessageFinalized, + compacting, }; } @@ -284,6 +300,10 @@ export interface TaskSession { // here so the response can be routed back to the awaiting tool call. cloudPermissionRequestIds?: Record; pendingPermissions?: Record; + // True while the agent is compacting context. Steering cancels and resends + // the running turn, which would abort an in-flight compaction, so queued + // messages are held until compaction ends. + isCompacting?: boolean; } interface TaskSessionStore { @@ -317,6 +337,8 @@ interface TaskSessionStore { attachments?: PendingAttachment[], ) => Promise; flushQueuedMessages: (taskId: string) => Promise; + /** Drop one queued message and resend it now as a steer (interrupt + resend). */ + steerQueuedMessage: (taskId: string, messageId: string) => Promise; setConfigOption: ( taskId: string, configId: string, @@ -794,6 +816,33 @@ export const useTaskSessionStore = create((set, get) => ({ } }, + steerQueuedMessage: async (taskId: string, messageId: string) => { + const session = get().getSessionForTask(taskId); + // Steering only makes sense against a live turn. Mid-compaction it would + // abort the compaction; with no turn running there is nothing to interrupt + // and the message drains via the normal turn-end flush. + if (!session || !session.isPromptPending || session.isCompacting) return; + + const message = useMessageQueueStore + .getState() + .getQueue(taskId) + .find((m) => m.id === messageId); + if (!message) return; + + useMessageQueueStore.getState().remove(taskId, messageId); + try { + await get().sendInterrupting( + taskId, + message.content, + message.attachments, + ); + } catch (err) { + // Restore at the head so a failed steer never silently drops the message. + useMessageQueueStore.getState().prepend(taskId, [message]); + throw err; + } + }, + getSessionForTask: (taskId: string) => { return Object.values(get().sessions).find((s) => s.taskId === taskId); }, @@ -951,6 +1000,7 @@ export const useTaskSessionStore = create((set, get) => ({ isPromptPending: nextIsPromptPending, awaitingPing: nextAwaitingPing, awaitingAgentOutput: nextAwaitingAgentOutput, + isCompacting: analysis.compacting ?? current.isCompacting, localUserEchoes: echoSet.size > 0 ? echoSet : undefined, lastEventAt: events.length > 0 ? Date.now() : current.lastEventAt, },