From 5a6fa0ebce2b717ae54cdfadfd8bb6dad883080d Mon Sep 17 00:00:00 2001 From: "haozhe.yang" Date: Fri, 12 Jun 2026 14:32:16 +0800 Subject: [PATCH] feat(acp-adapter): steer and queue client prompts during busy turns - forward agent-initiated background turns to ACP clients - merge plain-text prompts into active turns via steer when agent is busy - queue skill activations until the active turn ends - move tool-call streaming state to session level to survive turn hand-offs - add unit tests for steering behavior --- packages/acp-adapter/src/session.ts | 657 ++++++++++++------ .../acp-adapter/test/session-steer.test.ts | 513 ++++++++++++++ 2 files changed, 965 insertions(+), 205 deletions(-) create mode 100644 packages/acp-adapter/test/session-steer.test.ts diff --git a/packages/acp-adapter/src/session.ts b/packages/acp-adapter/src/session.ts index 53c50beb5..8aa068194 100644 --- a/packages/acp-adapter/src/session.ts +++ b/packages/acp-adapter/src/session.ts @@ -95,6 +95,60 @@ export class AcpSession { */ private currentTurnId: number | undefined = undefined; + /** + * Per-tool-call streaming args accumulator, shared by every event + * forwarder for this session (the per-prompt subscriptions in + * {@link runTurnBody} and the constructor-level agent-initiated-turn + * forwarder). Session-level — rather than per-prompt — so a turn that + * straddles a forwarder hand-off (e.g. a background-notification turn + * that a client prompt steers into mid-tool-call) keeps its cumulative + * args instead of restarting from an empty string. Keyed on the **SDK** + * `toolCallId` (not the ACP-prefixed one) because the SDK delta events + * only carry the raw id. Cleared on every main-agent `turn.ended`; + * cross-turn collisions are additionally impossible at the wire level + * because wire ids are turn-prefixed. + */ + private readonly argsByToolCall = new Map(); + + /** + * Set of **wire-level** (turn-prefixed) tool-call ids for which the + * `tool_call` CREATE notification has already been sent. The + * agent-core actually emits `tool.call.delta` events BEFORE + * `tool.call.started` (deltas come from the model's args stream; + * the started event comes from the loop dispatching the call + * afterwards). Without this set, the naive "started → tool_call, + * delta → tool_call_update" mapping puts updates on the wire + * ahead of the create, and clients such as Zed surface "Tool + * call not found" until the create eventually lands. We instead + * lazy-create the wire `tool_call` on the first delta and + * downgrade the eventual started event into a `tool_call_update` + * carrying the canonical title/kind/rawInput (and any + * `display`-derived diff). + * + * Keyed on the wire id (`${turnId}:${rawToolCallId}`) — not the + * raw SDK `toolCallId` — because providers may legitimately + * reuse the same raw id across turns within one prompt, and + * each turn produces a distinct wire-level tool call that needs + * its own CREATE. Session-level for the same hand-off reason as + * {@link argsByToolCall}; cleared on every main-agent `turn.ended`. + */ + private readonly startedToolCalls = new Set(); + + /** + * Tokens of in-flight `session/prompt` requests, in arrival order. + * Each {@link runTurnBody} invocation registers a token for its + * lifetime. Serves two purposes: + * + * - Non-empty ⇒ a client prompt is in flight, so the constructor-level + * agent-initiated-turn forwarder stays silent (the prompt's own + * subscription forwards the stream). + * - Only the OLDEST token forwards stream events. When a second + * `session/prompt` arrives mid-turn and is steered into the active + * turn, two subscriptions observe every event; without this gate + * each chunk would be pushed to the client twice. + */ + private readonly promptSubscribers = new Set(); + /** * The adapter-side authoritative current BASE model id (no * `,thinking` suffix) for the `configOptions` model picker (PLAN D11). @@ -226,6 +280,19 @@ export class AcpSession { if (typeof this.session.setQuestionHandler === 'function') { this.session.setQuestionHandler(async (req) => this.handleQuestion(req)); } + // Forward output of agent-initiated turns (background-task / cron + // notifications steered into the agent while idle — see + // `agent-core/src/agent/background/index.ts` `notifyBackgroundTask`) + // to the client. Those turns have no originating `session/prompt`, + // so without this subscription their entire response is invisible + // to ACP clients. Registered once at construction (same lifetime + // pattern as the approval handler); muted whenever a client prompt + // is in flight because that prompt's own `runTurnBody` subscription + // forwards the stream. The `typeof` guard keeps partial-stub + // `Session` instances used in adapter-level unit tests working. + if (typeof this.session.onEvent === 'function') { + this.session.onEvent((event) => this.forwardAgentInitiatedEvent(event)); + } } /** ACP-level session identifier — matches the underlying SDK session id. */ @@ -735,12 +802,20 @@ export class AcpSession { this.emitTelemetry('acp_skill_activated', { skill_name: intent.skillName }); const skillName = intent.skillName; const skillArgs = intent.args; - return this.runTurnBody(sessionId, conn, () => - // `activateSkill` accepts `args?: string | undefined`; pass the - // empty string through verbatim — the SDK's - // `normalizeOptionalString` converts `''` to `undefined`, which - // is the canonical "no args" form for the skill renderer. - this.session.activateSkill(skillName, skillArgs.length > 0 ? skillArgs : undefined), + return this.runTurnBody( + sessionId, + conn, + () => + // `activateSkill` accepts `args?: string | undefined`; pass the + // empty string through verbatim — the SDK's + // `normalizeOptionalString` converts `''` to `undefined`, which + // is the canonical "no args" form for the skill renderer. + this.session.activateSkill(skillName, skillArgs.length > 0 ? skillArgs : undefined), + // A skill activation cannot be merged into a running turn the + // way a plain prompt can (the `` block must + // open its own turn), so when the agent is busy the activation + // is queued and re-issued once the active turn ends. + { kind: 'queue' }, ); } if (intent.kind === 'builtin') { @@ -750,7 +825,18 @@ export class AcpSession { return this.runUnknownSlashCommand(intent.name); } - return this.runTurnBody(sessionId, conn, () => this.session.prompt(parts)); + // Steer fallback: when the SDK reports `turn.agent_busy` (an + // agent-initiated turn — e.g. a background-task notification — is + // already running), inject this prompt into the active turn via + // `steer` instead of failing the request. Plain text prompts are + // safe to merge; skill activations (above) queue until idle instead. + // The `typeof` guard tolerates partial-stub sessions in + // adapter-level unit tests and pre-steer SDK builds. + const busyFallback = + typeof this.session.steer === 'function' + ? { kind: 'steer' as const, steer: () => this.session.steer(parts) } + : undefined; + return this.runTurnBody(sessionId, conn, () => this.session.prompt(parts), busyFallback); } private async runBuiltInCommand( @@ -883,48 +969,52 @@ export class AcpSession { * `packages/agent-core/src/agent/skill/index.ts`), so the event * subscription's `turn.started` / `turn.ended` semantics apply * uniformly. + * + * When `busyFallback` is provided and the SDK answers the kick with a + * `turn.agent_busy` error (another turn — typically an agent-initiated + * background-task notification turn — is active), the request is not + * rejected; instead: + * + * - `kind: 'steer'` (plain text prompts): the input is steered into + * the active turn. The subscription keeps forwarding that turn's + * stream and the prompt resolves on its `turn.ended`, so from the + * client's perspective the message was merged into the ongoing + * response. + * - `kind: 'queue'` (skill activations, which cannot be merged into + * a running turn): the kick is re-issued once the active turn ends, + * so the activation runs as the next turn. A `cancelled` end of the + * turn we are queued behind settles the queued request as cancelled + * too — the user pressed stop and expects the session to go quiet. */ private runTurnBody( sessionId: string, conn: AgentSideConnection, kick: () => Promise, + busyFallback?: + | { kind: 'steer'; steer: () => Promise } + | { kind: 'queue' }, ): Promise { return new Promise((resolve, reject) => { let settled = false; - const isFromMainAgent = (event: { agentId?: string }): boolean => - event.agentId === undefined || event.agentId === MAIN_AGENT_ID; - // Per-tool-call streaming args accumulator. Lives in the Promise - // executor closure so each `prompt()` invocation gets its own - // map and no state leaks across concurrent or sequential turns. - // Keyed on the **SDK** `toolCallId` (not the ACP-prefixed one) - // because the SDK delta events only carry the raw id. - const argsByToolCall = new Map(); - // Set of **wire-level** (turn-prefixed) tool-call ids for which - // we have already sent the `tool_call` CREATE notification. The - // agent-core actually emits `tool.call.delta` events BEFORE - // `tool.call.started` (deltas come from the model's args stream; - // the started event comes from the loop dispatching the call - // afterwards). Without this set, the naive "started → tool_call, - // delta → tool_call_update" mapping puts updates on the wire - // ahead of the create, and clients such as Zed surface "Tool - // call not found" until the create eventually lands. We instead - // lazy-create the wire `tool_call` on the first delta and - // downgrade the eventual started event into a `tool_call_update` - // carrying the canonical title/kind/rawInput (and any - // `display`-derived diff). - // - // Keyed on the wire id (`${turnId}:${rawToolCallId}`) — not the - // raw SDK `toolCallId` — because providers may legitimately - // reuse the same raw id across turns within one prompt, and - // each turn produces a distinct wire-level tool call that needs - // its own CREATE. - const startedToolCalls = new Set(); + let steered = false; + let waitingForIdle = false; + // Registered for the lifetime of this prompt. Mutes the + // constructor-level agent-initiated-turn forwarder, and — via + // `ownsStream` (oldest token wins) — elects exactly one live + // prompt subscription as the stream forwarder so a steered + // second prompt never duplicates chunks. Registration happens + // synchronously before `kick()`, so no event can slip through + // to the background forwarder in between. + const token = Symbol('acp-prompt'); + this.promptSubscribers.add(token); + const ownsStream = (): boolean => + this.promptSubscribers.values().next().value === token; const initialActiveTurnId = this.currentTurnId; let hasReceivedOwnTurnStarted = false; const unsub = this.session.onEvent((event) => { if ( event.type === 'turn.started' && - isFromMainAgent(event) && + isMainAgentEvent(event) && (initialActiveTurnId === undefined || event.turnId !== initialActiveTurnId) ) { hasReceivedOwnTurnStarted = true; @@ -941,20 +1031,57 @@ export class AcpSession { if ( 'turnId' in event && typeof event.turnId === 'number' && - isFromMainAgent(event) + isMainAgentEvent(event) ) { this.currentTurnId = event.turnId; } if (event.type === 'error') { if (settled) return; - if (!isFromMainAgent(event)) return; + if (!isMainAgentEvent(event)) return; if (event.code !== ErrorCodes.TURN_AGENT_BUSY) return; if (hasReceivedOwnTurnStarted) return; + // An agent-initiated turn (background-task / cron notification) + // is already running. Merge this prompt into it via `steer` + // rather than failing the request: the steered input is + // consumed within the active turn (agent-core flushes the + // steer buffer before each step and before ending the turn), + // this subscription keeps streaming the turn's output, and the + // prompt resolves on the turn's `turn.ended`. Attempted at + // most once; if the steer RPC itself fails the prompt fails. + if (busyFallback?.kind === 'steer' && !steered) { + steered = true; + log.info('acp: another turn is active; steering prompt into it', { + sessionId, + details: event.details, + }); + busyFallback.steer().catch((err: unknown) => { + if (settled) return; + settled = true; + cleanup(); + reject(mapPromptError(err, sessionId)); + }); + return; + } + // Queue mode: hold the request and re-issue the kick when the + // active turn ends (see the `turn.ended` branch below). Marked + // idempotently so a re-kick that races into yet another + // agent-initiated turn simply queues again. + if (busyFallback?.kind === 'queue') { + if (!waitingForIdle) { + waitingForIdle = true; + log.info('acp: another turn is active; queueing activation until idle', { + sessionId, + details: event.details, + }); + } + return; + } settled = true; - argsByToolCall.clear(); - startedToolCalls.clear(); - this.currentTurnId = undefined; - unsub(); + // Intentionally no `currentTurnId` reset and no shared-state + // clear here: the busy turn is still running, and the + // constructor-level forwarder (plus any approval prompt it + // serves) still needs both. + cleanup(); log.warn('acp: prompt rejected because another turn is active', { sessionId, details: event.details, @@ -967,167 +1094,48 @@ export class AcpSession { ); return; } - if (event.type === 'assistant.delta') { - if (!isFromMainAgent(event)) return; - // `sessionUpdate` is itself async (it serializes onto the - // ndjson stream). The text deltas form a strictly ordered - // single-producer/single-consumer pipeline, so each await - // would force the next delta to wait for the previous flush. - // Fire-and-forget keeps the stream pumping; we log push - // failures rather than dropping them silently. - conn - .sessionUpdate(assistantDeltaToSessionUpdate(sessionId, event)) - .catch((err) => { - log.warn('acp: failed to push agent_message_chunk', { - sessionId, - error: err instanceof Error ? err.message : String(err), - }); - }); - return; - } - if (event.type === 'thinking.delta') { - if (!isFromMainAgent(event)) return; - conn - .sessionUpdate(thinkingDeltaToSessionUpdate(sessionId, event)) - .catch((err) => { - log.warn('acp: failed to push agent_thought_chunk', { - sessionId, - error: err instanceof Error ? err.message : String(err), - }); - }); - return; - } - if (event.type === 'tool.call.started') { - if (!isFromMainAgent(event)) return; - // Seed the accumulator with the **stringified initial args**. - // The wire-level `tool_call_update` is REPLACE-content (not - // append) so each subsequent delta emits the cumulative args - // string; if we seeded with an empty string the first delta - // would silently drop the initial args from the rendered card. - argsByToolCall.set(event.toolCallId, { args: stringifyArgs(event.args) }); - // Branch on whether a streaming delta already lazy-created - // the wire `tool_call` for this id: - // - YES → we cannot send a second `tool_call` CREATE; emit a - // `tool_call_update` (the "upgrade") so `title`/`kind`/ - // `rawInput`/`display`-derived diff land on the existing - // card and `status` flips to `'in_progress'`. - // - NO → no prior deltas (e.g. provider doesn't stream args); - // take the original path and emit the `tool_call` CREATE. - const startedWireId = acpToolCallId(event.turnId, event.toolCallId); - if (startedToolCalls.has(startedWireId)) { - conn - .sessionUpdate(toolCallStartedUpgradeToSessionUpdate(sessionId, event)) - .catch((err) => { - log.warn('acp: failed to push tool_call_update (start upgrade)', { - sessionId, - toolCallId: event.toolCallId, - error: err instanceof Error ? err.message : String(err), - }); - }); - } else { - startedToolCalls.add(startedWireId); - conn - .sessionUpdate(toolCallStartToSessionUpdate(sessionId, event)) - .catch((err) => { - log.warn('acp: failed to push tool_call', { - sessionId, - toolCallId: event.toolCallId, - error: err instanceof Error ? err.message : String(err), - }); - }); + // Streaming turn output (assistant / thinking deltas and the + // tool-call lifecycle). The event → `session/update` mapping + // lives in `forwardStreamEvent` so the constructor-level + // agent-initiated-turn forwarder shares it. Only the oldest + // live prompt subscription forwards: when a second prompt has + // been steered into the same turn, both subscriptions observe + // every event and forwarding from both would duplicate chunks. + if (isTurnStreamEvent(event)) { + if (isMainAgentEvent(event) && ownsStream()) { + this.forwardStreamEvent(event); } - // Phase 9.3: when the tool exposed a structured TodoList - // display, additionally fire a `plan` session_update so ACP - // clients can render the agent's evolving TODO list. Other - // display kinds (diff/file_io/command/…) are already folded - // into the tool_call card; only `todo_list` becomes a plan. - // The emission is fire-and-forget under the same idle-stream - // discipline as the assistant deltas above. - if (event.display) { - const planNote = planFromDisplayBlock(sessionId, event.turnId, event.display); - if (planNote !== null) { - conn.sessionUpdate(planNote).catch((err) => { - log.warn('acp: failed to push plan', { - sessionId, - error: err instanceof Error ? err.message : String(err), - }); - }); - } - } - return; - } - if (event.type === 'tool.call.delta') { - if (!isFromMainAgent(event)) return; - // The agent-core emits these args-stream deltas BEFORE the - // `tool.call.started` event (deltas come from the provider's - // streaming phase; started is dispatched afterwards). If we - // haven't yet sent a `tool_call` CREATE for this id, do so now - // from the delta — Zed otherwise sees a `tool_call_update` - // for an unknown id and surfaces "Tool call not found" until - // the start eventually lands. - const deltaWireId = acpToolCallId(event.turnId, event.toolCallId); - if (!startedToolCalls.has(deltaWireId)) { - const initial = event.argumentsPart ?? ''; - argsByToolCall.set(event.toolCallId, { args: initial }); - startedToolCalls.add(deltaWireId); - conn - .sessionUpdate(toolCallLazyCreateToSessionUpdate(sessionId, event)) - .catch((err) => { - log.warn('acp: failed to push tool_call (lazy create from delta)', { - sessionId, - toolCallId: event.toolCallId, - error: err instanceof Error ? err.message : String(err), - }); - }); - return; - } - // Subsequent delta — accumulate then emit an update with the - // cumulative args text (REPLACE-content semantics). - let acc = argsByToolCall.get(event.toolCallId); - if (!acc) { - acc = { args: '' }; - argsByToolCall.set(event.toolCallId, acc); - } - conn - .sessionUpdate(toolCallDeltaToSessionUpdate(sessionId, event, acc)) - .catch((err) => { - log.warn('acp: failed to push tool_call_update (delta)', { - sessionId, - toolCallId: event.toolCallId, - error: err instanceof Error ? err.message : String(err), - }); - }); - return; - } - if (event.type === 'tool.progress') { - if (!isFromMainAgent(event)) return; - const note = toolProgressToSessionUpdate(sessionId, event); - if (note === null) return; - conn.sessionUpdate(note).catch((err) => { - log.warn('acp: failed to push tool_call_update (progress)', { - sessionId, - toolCallId: event.toolCallId, - error: err instanceof Error ? err.message : String(err), - }); - }); - return; - } - if (event.type === 'tool.result') { - if (!isFromMainAgent(event)) return; - conn - .sessionUpdate(toolResultToSessionUpdate(sessionId, event)) - .catch((err) => { - log.warn('acp: failed to push tool_call_update (result)', { - sessionId, - toolCallId: event.toolCallId, - error: err instanceof Error ? err.message : String(err), - }); - }); return; } if (event.type === 'turn.ended') { if (settled) return; - if (!isFromMainAgent(event)) return; + if (!isMainAgentEvent(event)) return; + if (waitingForIdle) { + // The turn we were queued behind has finished. A cancel + // settles the queued request as cancelled too: the user + // pressed stop and expects the session to go quiet, not a + // deferred activation springing to life. + if (event.reason === 'cancelled') { + settled = true; + cleanup(); + resolve({ stopReason: 'cancelled' }); + return; + } + waitingForIdle = false; + // The foreign turn is over — reset the shared stream state + // exactly like the settle path does — then launch the + // queued activation as the next turn. + this.argsByToolCall.clear(); + this.startedToolCalls.clear(); + this.currentTurnId = undefined; + kick().catch((err: unknown) => { + if (settled) return; + settled = true; + cleanup(); + reject(mapPromptError(err, sessionId)); + }); + return; + } settled = true; if (event.reason === 'failed') { // Failures bubble up via the SDK `error` payload. Phase 11.1 @@ -1141,37 +1149,237 @@ export class AcpSession { sessionId, error: event.error, }); - argsByToolCall.clear(); - startedToolCalls.clear(); + this.argsByToolCall.clear(); + this.startedToolCalls.clear(); this.currentTurnId = undefined; - unsub(); + cleanup(); const authErr = authRequiredFromPayload(event.error); if (authErr) { reject(authErr); return; } } else { - argsByToolCall.clear(); - startedToolCalls.clear(); + this.argsByToolCall.clear(); + this.startedToolCalls.clear(); // Drop the turnId so a late-arriving approval (e.g. an SDK // reverse-RPC racing the turn boundary) falls back to the raw // SDK id rather than re-prefixing with a stale value. this.currentTurnId = undefined; - unsub(); + cleanup(); } resolve({ stopReason: turnEndReasonToStopReason(event.reason) }); } }); + const cleanup = (): void => { + this.promptSubscribers.delete(token); + unsub(); + }; kick().catch((err) => { if (settled) return; settled = true; - unsub(); + cleanup(); reject(mapPromptError(err, sessionId)); }); }); } + /** + * Constructor-level event forwarder for turns the agent starts on its + * own (background-task / cron notifications steered into an idle + * agent — `agent.turn.steer` launches a fresh turn when nothing is + * active). Those turns have no originating `session/prompt`, so this + * is the only path that makes their output visible to the ACP client. + * + * Muted whenever a client prompt is in flight ({@link promptSubscribers} + * non-empty): the prompt's own `runTurnBody` subscription forwards the + * stream then, including the case where the prompt was steered into an + * agent-initiated turn. Both gates flip synchronously with prompt + * registration, so no event is double-forwarded or dropped at the + * hand-off. + */ + private forwardAgentInitiatedEvent(event: Event): void { + if (this.promptSubscribers.size > 0) return; + if (!isMainAgentEvent(event)) return; + // Keep `currentTurnId` tracking alive for agent-initiated turns so + // an approval reverse-RPC raised by such a turn still composes the + // prefixed `${turnId}:${toolCallId}` wire id (see handleApproval). + if ('turnId' in event && typeof event.turnId === 'number') { + this.currentTurnId = event.turnId; + } + if (event.type === 'turn.ended') { + this.argsByToolCall.clear(); + this.startedToolCalls.clear(); + this.currentTurnId = undefined; + return; + } + if (isTurnStreamEvent(event)) { + this.forwardStreamEvent(event); + } + } + + /** + * Map a single main-agent turn-stream event to its `session/update` + * notification(s) and push them to the client. Shared by the + * per-prompt subscriptions in {@link runTurnBody} and by + * {@link forwardAgentInitiatedEvent}; callers gate on main-agent + * origin and forwarder ownership, this method only does the mapping. + * + * All pushes are fire-and-forget: `sessionUpdate` is itself async (it + * serializes onto the ndjson stream) and the deltas form a strictly + * ordered single-producer/single-consumer pipeline, so each await + * would force the next delta to wait for the previous flush. Push + * failures are logged rather than dropped silently. + */ + private forwardStreamEvent(event: TurnStreamEvent): void { + const sessionId = this.id; + const conn = this.conn; + if (event.type === 'assistant.delta') { + conn + .sessionUpdate(assistantDeltaToSessionUpdate(sessionId, event)) + .catch((err) => { + log.warn('acp: failed to push agent_message_chunk', { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + }); + return; + } + if (event.type === 'thinking.delta') { + conn + .sessionUpdate(thinkingDeltaToSessionUpdate(sessionId, event)) + .catch((err) => { + log.warn('acp: failed to push agent_thought_chunk', { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + }); + return; + } + if (event.type === 'tool.call.started') { + // Seed the accumulator with the **stringified initial args**. + // The wire-level `tool_call_update` is REPLACE-content (not + // append) so each subsequent delta emits the cumulative args + // string; if we seeded with an empty string the first delta + // would silently drop the initial args from the rendered card. + this.argsByToolCall.set(event.toolCallId, { args: stringifyArgs(event.args) }); + // Branch on whether a streaming delta already lazy-created + // the wire `tool_call` for this id: + // - YES → we cannot send a second `tool_call` CREATE; emit a + // `tool_call_update` (the "upgrade") so `title`/`kind`/ + // `rawInput`/`display`-derived diff land on the existing + // card and `status` flips to `'in_progress'`. + // - NO → no prior deltas (e.g. provider doesn't stream args); + // take the original path and emit the `tool_call` CREATE. + const startedWireId = acpToolCallId(event.turnId, event.toolCallId); + if (this.startedToolCalls.has(startedWireId)) { + conn + .sessionUpdate(toolCallStartedUpgradeToSessionUpdate(sessionId, event)) + .catch((err) => { + log.warn('acp: failed to push tool_call_update (start upgrade)', { + sessionId, + toolCallId: event.toolCallId, + error: err instanceof Error ? err.message : String(err), + }); + }); + } else { + this.startedToolCalls.add(startedWireId); + conn + .sessionUpdate(toolCallStartToSessionUpdate(sessionId, event)) + .catch((err) => { + log.warn('acp: failed to push tool_call', { + sessionId, + toolCallId: event.toolCallId, + error: err instanceof Error ? err.message : String(err), + }); + }); + } + // Phase 9.3: when the tool exposed a structured TodoList + // display, additionally fire a `plan` session_update so ACP + // clients can render the agent's evolving TODO list. Other + // display kinds (diff/file_io/command/…) are already folded + // into the tool_call card; only `todo_list` becomes a plan. + // The emission is fire-and-forget under the same idle-stream + // discipline as the assistant deltas above. + if (event.display) { + const planNote = planFromDisplayBlock(sessionId, event.turnId, event.display); + if (planNote !== null) { + conn.sessionUpdate(planNote).catch((err) => { + log.warn('acp: failed to push plan', { + sessionId, + error: err instanceof Error ? err.message : String(err), + }); + }); + } + } + return; + } + if (event.type === 'tool.call.delta') { + // The agent-core emits these args-stream deltas BEFORE the + // `tool.call.started` event (deltas come from the provider's + // streaming phase; started is dispatched afterwards). If we + // haven't yet sent a `tool_call` CREATE for this id, do so now + // from the delta — Zed otherwise sees a `tool_call_update` + // for an unknown id and surfaces "Tool call not found" until + // the start eventually lands. + const deltaWireId = acpToolCallId(event.turnId, event.toolCallId); + if (!this.startedToolCalls.has(deltaWireId)) { + const initial = event.argumentsPart ?? ''; + this.argsByToolCall.set(event.toolCallId, { args: initial }); + this.startedToolCalls.add(deltaWireId); + conn + .sessionUpdate(toolCallLazyCreateToSessionUpdate(sessionId, event)) + .catch((err) => { + log.warn('acp: failed to push tool_call (lazy create from delta)', { + sessionId, + toolCallId: event.toolCallId, + error: err instanceof Error ? err.message : String(err), + }); + }); + return; + } + // Subsequent delta — accumulate then emit an update with the + // cumulative args text (REPLACE-content semantics). + let acc = this.argsByToolCall.get(event.toolCallId); + if (!acc) { + acc = { args: '' }; + this.argsByToolCall.set(event.toolCallId, acc); + } + conn + .sessionUpdate(toolCallDeltaToSessionUpdate(sessionId, event, acc)) + .catch((err) => { + log.warn('acp: failed to push tool_call_update (delta)', { + sessionId, + toolCallId: event.toolCallId, + error: err instanceof Error ? err.message : String(err), + }); + }); + return; + } + if (event.type === 'tool.progress') { + const note = toolProgressToSessionUpdate(sessionId, event); + if (note === null) return; + conn.sessionUpdate(note).catch((err) => { + log.warn('acp: failed to push tool_call_update (progress)', { + sessionId, + toolCallId: event.toolCallId, + error: err instanceof Error ? err.message : String(err), + }); + }); + return; + } + // tool.result + conn + .sessionUpdate(toolResultToSessionUpdate(sessionId, event)) + .catch((err) => { + log.warn('acp: failed to push tool_call_update (result)', { + sessionId, + toolCallId: event.toolCallId, + error: err instanceof Error ? err.message : String(err), + }); + }); + } + /** * Bridge an SDK {@link ApprovalRequest} through the ACP reverse-RPC * `session/request_permission`. @@ -1566,6 +1774,45 @@ const THINKING_OFF_LEVEL = 'off'; */ const MAIN_AGENT_ID = 'main'; +/** + * True when the event originates from the main agent (or predates the + * `agentId` field). Subagent events carry their own generated id and + * must never be forwarded as parent-turn output. + */ +const isMainAgentEvent = (event: { agentId?: string }): boolean => + event.agentId === undefined || event.agentId === MAIN_AGENT_ID; + +/** + * The turn-output event types that map to client-visible + * `session/update` notifications in {@link AcpSession.forwardStreamEvent}. + * Lifecycle events (`turn.started` / `turn.ended` / `error` / …) are + * deliberately excluded — they drive subscription state, not the wire. + */ +type TurnStreamEvent = Extract< + Event, + { + type: + | 'assistant.delta' + | 'thinking.delta' + | 'tool.call.started' + | 'tool.call.delta' + | 'tool.progress' + | 'tool.result'; + } +>; + +const TURN_STREAM_EVENT_TYPES: ReadonlySet = new Set([ + 'assistant.delta', + 'thinking.delta', + 'tool.call.started', + 'tool.call.delta', + 'tool.progress', + 'tool.result', +] satisfies TurnStreamEvent['type'][]); + +const isTurnStreamEvent = (event: Event): event is TurnStreamEvent => + TURN_STREAM_EVENT_TYPES.has(event.type); + /** * Parse a tool call's `arguments` field (kosong wire format: a JSON * string or `null`) into the structured object expected by the live diff --git a/packages/acp-adapter/test/session-steer.test.ts b/packages/acp-adapter/test/session-steer.test.ts new file mode 100644 index 000000000..2ee1a78f2 --- /dev/null +++ b/packages/acp-adapter/test/session-steer.test.ts @@ -0,0 +1,513 @@ +import { describe, expect, it } from 'vitest'; + +import { + AgentSideConnection, + ClientSideConnection, + ndJsonStream, + type Client, + type ContentBlock, + type ReadTextFileRequest, + type ReadTextFileResponse, + type RequestPermissionRequest, + type RequestPermissionResponse, + type SessionNotification, + type WriteTextFileRequest, + type WriteTextFileResponse, +} from '@agentclientprotocol/sdk'; +import type { Event, KimiHarness, Session } from '@moonshot-ai/kimi-code-sdk'; + +import { AcpServer } from '../src/server'; +import { AUTHED_STATUS } from './_helpers/harness-stubs'; + +class CollectingClient implements Client { + readonly updates: SessionNotification[] = []; + + /** Updates excluding the `available_commands_update` emitted by session/new. */ + get promptUpdates(): readonly SessionNotification[] { + return this.updates.filter( + (n) => + (n.update as { sessionUpdate?: string }).sessionUpdate !== + 'available_commands_update', + ); + } + + get chunkTexts(): readonly string[] { + return this.promptUpdates + .map((n) => n.update as { sessionUpdate?: string; content?: { text?: string } }) + .filter((u) => u.sessionUpdate === 'agent_message_chunk') + .map((u) => u.content?.text ?? ''); + } + + async requestPermission(_p: RequestPermissionRequest): Promise { + throw new Error('CollectingClient.requestPermission should not be called in steer test'); + } + async sessionUpdate(n: SessionNotification): Promise { + this.updates.push(n); + } + async writeTextFile(_p: WriteTextFileRequest): Promise { + throw new Error('CollectingClient.writeTextFile should not be called in steer test'); + } + async readTextFile(_p: ReadTextFileRequest): Promise { + throw new Error('CollectingClient.readTextFile should not be called in steer test'); + } +} + +function makeInMemoryStreamPair(): { + agentStream: ReturnType; + clientStream: ReturnType; +} { + const clientToAgent = new TransformStream(); + const agentToClient = new TransformStream(); + const agentStream = ndJsonStream(agentToClient.writable, clientToAgent.readable); + const clientStream = ndJsonStream(clientToAgent.writable, agentToClient.readable); + return { agentStream, clientStream }; +} + +const textBlock = (text: string): ContentBlock => ({ type: 'text', text }); + +function makeHarness(session: Session): KimiHarness { + return { + auth: { status: async () => AUTHED_STATUS }, + createSession: async () => session, + } as unknown as KimiHarness; +} + +/** + * A steer-capable Session stub modelling an agent-initiated turn + * (e.g. a background-task notification turn) that is already active: + * `prompt()` answers with `turn.agent_busy`, and `steer()` emits the + * remainder of the active turn followed by `turn.ended`. + */ +function makeBusyThenSteerSession(sessionId: string): { + session: Session; + steerCalls: unknown[]; + emit: (event: Event) => void; + unsubscribeCount: () => number; +} { + const listeners = new Set<(event: Event) => void>(); + let unsubCount = 0; + const steerCalls: unknown[] = []; + const emit = (event: Event): void => { + for (const fn of listeners) fn(event); + }; + const session = { + id: sessionId, + prompt: async (_input: unknown) => { + emit({ + type: 'error', + sessionId, + agentId: 'main', + code: 'turn.agent_busy', + message: 'Cannot launch a new turn while another turn (ID 16) is active', + details: { turnId: 16 }, + retryable: true, + } as unknown as Event); + }, + steer: async (input: unknown) => { + steerCalls.push(input); + emit({ type: 'assistant.delta', sessionId, agentId: 'main', turnId: 16, delta: 'merged ' } as Event); + emit({ type: 'assistant.delta', sessionId, agentId: 'main', turnId: 16, delta: 'reply' } as Event); + emit({ type: 'turn.ended', sessionId, agentId: 'main', turnId: 16, reason: 'completed' } as Event); + }, + cancel: async () => undefined, + onEvent: (fn: (event: Event) => void) => { + listeners.add(fn); + return () => { + unsubCount += 1; + listeners.delete(fn); + }; + }, + } as unknown as Session; + return { session, steerCalls, emit, unsubscribeCount: () => unsubCount }; +} + +describe('AcpSession steer support', () => { + it('steers a prompt into an active agent-initiated turn and resolves on its turn.ended', async () => { + const sessionId = 'sess-steer'; + const { session, steerCalls, unsubscribeCount } = makeBusyThenSteerSession(sessionId); + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection((c) => new AcpServer(makeHarness(session), c), agentStream); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + + const response = await client.prompt({ sessionId, prompt: [textBlock('hi there')] }); + + expect(response.stopReason).toBe('end_turn'); + expect(steerCalls).toHaveLength(1); + expect(steerCalls[0]).toMatchObject([{ type: 'text', text: 'hi there' }]); + + await new Promise((resolve) => setTimeout(resolve, 20)); + // The active turn's output after the steer streams back as the + // response to this prompt — exactly once. + expect(collecting.chunkTexts).toEqual(['merged ', 'reply']); + expect(unsubscribeCount()).toBe(1); + }); + + it('rejects the prompt when the steer fallback itself rejects', async () => { + const sessionId = 'sess-steer-fail'; + const listeners = new Set<(event: Event) => void>(); + const session = { + id: sessionId, + prompt: async (_input: unknown) => { + for (const fn of listeners) { + fn({ + type: 'error', + sessionId, + agentId: 'main', + code: 'turn.agent_busy', + message: 'Cannot launch a new turn while another turn (ID 3) is active', + details: { turnId: 3 }, + retryable: true, + } as unknown as Event); + } + }, + steer: async (_input: unknown) => { + throw new Error('steer transport down'); + }, + cancel: async () => undefined, + onEvent: (fn: (event: Event) => void) => { + listeners.add(fn); + return () => { + listeners.delete(fn); + }; + }, + } as unknown as Session; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection((c) => new AcpServer(makeHarness(session), c), agentStream); + const client = new ClientSideConnection(() => new CollectingClient(), clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + + await expect( + client.prompt({ sessionId, prompt: [textBlock('hi')] }), + ).rejects.toBeDefined(); + }); + + it('merges a second prompt into the first turn without duplicating chunks', async () => { + const sessionId = 'sess-steer-overlap'; + const listeners = new Set<(event: Event) => void>(); + let unsubCount = 0; + let promptCall = 0; + const emit = (event: Event): void => { + for (const fn of listeners) fn(event); + }; + let releaseFirstTurn: (() => void) | undefined; + const firstTurnGate = new Promise((resolve) => { + releaseFirstTurn = resolve; + }); + const session = { + id: sessionId, + prompt: async (_input: unknown) => { + promptCall += 1; + if (promptCall === 1) { + emit({ + type: 'turn.started', + sessionId, + agentId: 'main', + turnId: 1, + origin: { kind: 'user' }, + } as unknown as Event); + emit({ type: 'assistant.delta', sessionId, agentId: 'main', turnId: 1, delta: 'a' } as Event); + await firstTurnGate; + return; + } + emit({ + type: 'error', + sessionId, + agentId: 'main', + code: 'turn.agent_busy', + message: 'Cannot launch a new turn while another turn (ID 1) is active', + details: { turnId: 1 }, + retryable: true, + } as unknown as Event); + }, + steer: async (_input: unknown) => { + // The steered message is consumed by turn 1, which continues + // and then ends once. + emit({ type: 'assistant.delta', sessionId, agentId: 'main', turnId: 1, delta: 'b' } as Event); + emit({ type: 'turn.ended', sessionId, agentId: 'main', turnId: 1, reason: 'completed' } as Event); + }, + cancel: async () => undefined, + onEvent: (fn: (event: Event) => void) => { + listeners.add(fn); + return () => { + unsubCount += 1; + listeners.delete(fn); + }; + }, + } as unknown as Session; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection((c) => new AcpServer(makeHarness(session), c), agentStream); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + + const firstPrompt = client.prompt({ sessionId, prompt: [textBlock('first')] }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + const secondPrompt = client.prompt({ sessionId, prompt: [textBlock('second')] }); + await new Promise((resolve) => setTimeout(resolve, 10)); + releaseFirstTurn?.(); + + await expect(firstPrompt).resolves.toMatchObject({ stopReason: 'end_turn' }); + await expect(secondPrompt).resolves.toMatchObject({ stopReason: 'end_turn' }); + + await new Promise((resolve) => setTimeout(resolve, 20)); + // Both subscriptions observed every delta; only the oldest one + // forwards, so each chunk reaches the client exactly once. + expect(collecting.chunkTexts).toEqual(['a', 'b']); + expect(unsubCount).toBe(2); + }); +}); + +describe('AcpSession skill activation queueing', () => { + /** + * Fake Session modelling: an agent-initiated turn (ID 16) is active, + * so the FIRST `activateSkill` gets `turn.agent_busy`; the test then + * ends the foreign turn via `emit`, and the SECOND (re-issued) + * `activateSkill` runs its own turn (ID 17). `listSkills` seeds the + * server's skillCommandMap so `/skill:foo` routes to activateSkill. + */ + function makeBusySkillSession(sessionId: string): { + session: Session; + activateCalls: Array<{ name: string; args?: string | undefined }>; + emit: (event: Event) => void; + } { + const listeners = new Set<(event: Event) => void>(); + const activateCalls: Array<{ name: string; args?: string | undefined }> = []; + const emit = (event: Event): void => { + for (const fn of listeners) fn(event); + }; + const session = { + id: sessionId, + prompt: async (_input: unknown) => undefined, + activateSkill: async (name: string, args?: string | undefined) => { + activateCalls.push({ name, args }); + if (activateCalls.length === 1) { + emit({ + type: 'error', + sessionId, + agentId: 'main', + code: 'turn.agent_busy', + message: 'Cannot launch a new turn while another turn (ID 16) is active', + details: { turnId: 16 }, + retryable: true, + } as unknown as Event); + return; + } + emit({ + type: 'turn.started', + sessionId, + agentId: 'main', + turnId: 17, + origin: { kind: 'user' }, + } as unknown as Event); + emit({ type: 'assistant.delta', sessionId, agentId: 'main', turnId: 17, delta: 'skill output' } as Event); + emit({ type: 'turn.ended', sessionId, agentId: 'main', turnId: 17, reason: 'completed' } as Event); + }, + cancel: async () => undefined, + onEvent: (fn: (event: Event) => void) => { + listeners.add(fn); + return () => { + listeners.delete(fn); + }; + }, + listSkills: async () => [ + { + name: 'foo', + description: 'foo skill', + path: '/tmp/foo.md', + source: 'user' as const, + type: 'prompt', + }, + ], + } as unknown as Session; + return { session, activateCalls, emit }; + } + + /** + * Mirror the CLI's `slashCommands` resolver wiring so the per-session + * `skillCommandMap` is seeded before the prompt fires (same pattern + * as session-slash.test.ts). + */ + function makeSkillServer( + session: Session, + agentStream: ReturnType, + ): void { + new AgentSideConnection( + (c) => + new AcpServer(makeHarness(session), c, { + slashCommands: async (s) => { + const skills = await s.listSkills(); + const map = new Map(); + const commands = skills.map((sk) => { + const name = `skill:${sk.name}`; + map.set(name, sk.name); + return { name, description: sk.description }; + }); + return { commands, skillCommandMap: map }; + }, + }), + agentStream, + ); + } + + /** Wait until the server's available_commands_update seeded the skill map. */ + async function waitForAvailableCommands( + collecting: CollectingClient, + timeoutMs = 200, + ): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if ( + collecting.updates.some( + (n) => + (n.update as { sessionUpdate?: string }).sessionUpdate === + 'available_commands_update', + ) + ) { + return; + } + await new Promise((resolve) => setTimeout(resolve, 5)); + } + throw new Error('available_commands_update never arrived'); + } + + it('queues a skill activation behind the active turn and re-issues it on idle', async () => { + const sessionId = 'sess-skill-queue'; + const { session, activateCalls, emit } = makeBusySkillSession(sessionId); + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + makeSkillServer(session, agentStream); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + await waitForAvailableCommands(collecting); + + const promptPromise = client.prompt({ sessionId, prompt: [textBlock('/skill:foo bar')] }); + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(activateCalls).toHaveLength(1); + + // The foreign turn streams a bit more and then finishes — the + // queued activation must launch right after. + emit({ type: 'assistant.delta', sessionId, agentId: 'main', turnId: 16, delta: 'bg tail' } as Event); + emit({ type: 'turn.ended', sessionId, agentId: 'main', turnId: 16, reason: 'completed' } as Event); + + await expect(promptPromise).resolves.toMatchObject({ stopReason: 'end_turn' }); + expect(activateCalls).toHaveLength(2); + expect(activateCalls[1]).toMatchObject({ name: 'foo', args: 'bar' }); + + await new Promise((resolve) => setTimeout(resolve, 20)); + // While queued, the pending subscription forwards the foreign + // turn's tail; the skill turn's output follows — each exactly once. + expect(collecting.chunkTexts).toEqual(['bg tail', 'skill output']); + }); + + it('settles a queued skill activation as cancelled when the active turn is cancelled', async () => { + const sessionId = 'sess-skill-queue-cancel'; + const { session, activateCalls, emit } = makeBusySkillSession(sessionId); + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + makeSkillServer(session, agentStream); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + await waitForAvailableCommands(collecting); + + const promptPromise = client.prompt({ sessionId, prompt: [textBlock('/skill:foo')] }); + await new Promise((resolve) => setTimeout(resolve, 10)); + expect(activateCalls).toHaveLength(1); + + emit({ type: 'turn.ended', sessionId, agentId: 'main', turnId: 16, reason: 'cancelled' } as Event); + + await expect(promptPromise).resolves.toMatchObject({ stopReason: 'cancelled' }); + // The queued activation is dropped, not re-issued. + expect(activateCalls).toHaveLength(1); + }); +}); + +describe('AcpSession agent-initiated turn forwarding', () => { + it('forwards output of a background-notification turn when no prompt is in flight', async () => { + const sessionId = 'sess-bg-turn'; + const listeners = new Set<(event: Event) => void>(); + const emit = (event: Event): void => { + for (const fn of listeners) fn(event); + }; + const session = { + id: sessionId, + prompt: async (_input: unknown) => undefined, + cancel: async () => undefined, + onEvent: (fn: (event: Event) => void) => { + listeners.add(fn); + return () => { + listeners.delete(fn); + }; + }, + } as unknown as Session; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection((c) => new AcpServer(makeHarness(session), c), agentStream); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + + // A background-task notification steers the idle agent, which + // launches a turn with no originating session/prompt. + emit({ + type: 'turn.started', + sessionId, + agentId: 'main', + turnId: 16, + origin: { kind: 'background_task', taskId: 't1', status: 'completed', notificationId: 'task:t1:completed' }, + } as unknown as Event); + emit({ type: 'assistant.delta', sessionId, agentId: 'main', turnId: 16, delta: 'task t1 ' } as Event); + emit({ type: 'assistant.delta', sessionId, agentId: 'main', turnId: 16, delta: 'finished' } as Event); + emit({ type: 'turn.ended', sessionId, agentId: 'main', turnId: 16, reason: 'completed' } as Event); + + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(collecting.chunkTexts).toEqual(['task t1 ', 'finished']); + }); + + it('ignores subagent events while forwarding an agent-initiated turn', async () => { + const sessionId = 'sess-bg-sub'; + const listeners = new Set<(event: Event) => void>(); + const emit = (event: Event): void => { + for (const fn of listeners) fn(event); + }; + const session = { + id: sessionId, + prompt: async (_input: unknown) => undefined, + cancel: async () => undefined, + onEvent: (fn: (event: Event) => void) => { + listeners.add(fn); + return () => { + listeners.delete(fn); + }; + }, + } as unknown as Session; + + const { agentStream, clientStream } = makeInMemoryStreamPair(); + new AgentSideConnection((c) => new AcpServer(makeHarness(session), c), agentStream); + const collecting = new CollectingClient(); + const client = new ClientSideConnection(() => collecting, clientStream); + + await client.newSession({ cwd: '/tmp/x', mcpServers: [] }); + + emit({ type: 'assistant.delta', sessionId, agentId: 'main', turnId: 7, delta: 'main' } as Event); + emit({ type: 'assistant.delta', sessionId, agentId: 'sub-1', turnId: 99, delta: 'leak' } as Event); + emit({ type: 'turn.ended', sessionId, agentId: 'sub-1', turnId: 99, reason: 'completed' } as Event); + emit({ type: 'turn.ended', sessionId, agentId: 'main', turnId: 7, reason: 'completed' } as Event); + + await new Promise((resolve) => setTimeout(resolve, 20)); + expect(collecting.chunkTexts).toEqual(['main']); + }); +});