diff --git a/src/plugin.ts b/src/plugin.ts index 1bec06b..4d1f39f 100644 --- a/src/plugin.ts +++ b/src/plugin.ts @@ -444,6 +444,8 @@ export function extractCompletionFromStream(output: string): { let usage: OpenAiUsage | undefined; let sawAssistantPartials = false; let sawThinkingPartials = false; + let lastPartialText = ""; + let lastPartialThinking = ""; for (const line of lines) { const event = parseStreamJsonLine(line); @@ -457,8 +459,15 @@ export function extractCompletionFromStream(output: string): { const isPartial = typeof (event as any).timestamp_ms === "number"; if (isPartial) { - assistantText += text; sawAssistantPartials = true; + if (text.startsWith(lastPartialText)) { + assistantText += text.slice(lastPartialText.length); + } else if (!lastPartialText.startsWith(text)) { + assistantText += text; + } + if (text.length > lastPartialText.length) { + lastPartialText = text; + } } else if (!sawAssistantPartials) { assistantText = text; } @@ -469,8 +478,15 @@ export function extractCompletionFromStream(output: string): { if (thinking) { const isPartial = typeof (event as any).timestamp_ms === "number"; if (isPartial) { - reasoningText += thinking; sawThinkingPartials = true; + if (thinking.startsWith(lastPartialThinking)) { + reasoningText += thinking.slice(lastPartialThinking.length); + } else if (!lastPartialThinking.startsWith(thinking)) { + reasoningText += thinking; + } + if (thinking.length > lastPartialThinking.length) { + lastPartialThinking = thinking; + } } else if (!sawThinkingPartials) { reasoningText = thinking; } @@ -1415,19 +1431,16 @@ async function ensureCursorProxyServer(workspaceDirectory: string, toolRouter?: } }; - child.stdout.on("data", async (chunk) => { - if (streamTerminated || res.writableEnded) { - return; - } - if (!firstTokenReceived) { perf.mark("first-token"); firstTokenReceived = true; } - for (const line of lineBuffer.push(chunk)) { - if (streamTerminated || res.writableEnded) { - break; - } + const chunkQueue: Buffer[] = []; + let draining = false; + let childClosed = false; + let childExitCode: number | null = null; + + const processLines = async (lines: string[]) => { + for (const line of lines) { + if (streamTerminated || res.writableEnded) break; const event = parseStreamJsonLine(line); - if (!event) { - continue; - } + if (!event) continue; if (isResult(event)) { usage = extractOpenAiUsageFromResult(event) ?? usage; @@ -1469,156 +1482,98 @@ async function ensureCursorProxyServer(workspaceDirectory: string, toolRouter?: if (!result.terminate.silent) { emitTerminalAssistantErrorAndTerminate(result.terminate.message); } else { - // Silent termination: just end the stream without an error message streamTerminated = true; try { child.kill(); } catch { /* ignore */ } } break; } - if (result.intercepted) { - break; - } - if (result.skipConverter) { - continue; - } + if (result.intercepted) break; + if (result.skipConverter) continue; } - if (streamTerminated || res.writableEnded) { - break; - } + if (streamTerminated || res.writableEnded) break; for (const sse of converter.handleEvent(event)) { res.write(sse); } } - }); + }; - child.on("close", async (code) => { - if (streamTerminated || res.writableEnded) { - return; - } - for (const line of lineBuffer.flush()) { - if (streamTerminated || res.writableEnded) { - break; - } - const event = parseStreamJsonLine(line); - if (!event) { - continue; + const drainQueue = async () => { + if (draining) return; + draining = true; + try { + while (chunkQueue.length > 0) { + if (streamTerminated || res.writableEnded) break; + const chunk = chunkQueue.shift()!; + if (!firstTokenReceived) { perf.mark("first-token"); firstTokenReceived = true; } + await processLines(lineBuffer.push(chunk)); } - if (isResult(event)) { - usage = extractOpenAiUsageFromResult(event) ?? usage; - } + if (childClosed && !streamTerminated && !res.writableEnded) { + await processLines(lineBuffer.flush()); + if (streamTerminated || res.writableEnded) return; - if (event.type === "tool_call") { - const result = await handleToolLoopEventWithFallback({ - event: event as any, - boundary: boundaryContext.getBoundary(), - boundaryMode: boundaryContext.getBoundary().mode, - autoFallbackToLegacy: ENABLE_PROVIDER_BOUNDARY_AUTOFALLBACK, - toolLoopMode: TOOL_LOOP_MODE, - allowedToolNames, - toolSchemaMap, - toolLoopGuard, - toolMapper, - toolSessionId, - shouldEmitToolUpdates: SHOULD_EMIT_TOOL_UPDATES, - proxyExecuteToolCalls: PROXY_EXECUTE_TOOL_CALLS, - suppressConverterToolEvents: SUPPRESS_CONVERTER_TOOL_EVENTS, - toolRouter, - responseMeta: { id, created, model }, - passThroughTracker, - onToolUpdate: (update) => { - res.write(formatToolUpdateEvent(update)); - }, - onToolResult: (toolResult) => { - res.write(`data: ${JSON.stringify(toolResult)}\n\n`); - }, - onInterceptedToolCall: (toolCall) => { - emitToolCallAndTerminate(toolCall); - }, - onFallbackToLegacy: (error) => { - boundaryContext.activateLegacyFallback("handleToolLoopEvent.close", error); - }, + perf.mark("request:done"); + perf.summarize(); + const stderrText = Buffer.concat(stderrChunks).toString().trim(); + log.debug("cursor-agent completed (node stream)", { + code: childExitCode, + stderrChars: stderrText.length, }); - if (result.terminate) { - if (!result.terminate.silent) { - emitTerminalAssistantErrorAndTerminate(result.terminate.message); - } else { - // Silent termination: just end the stream without an error message - streamTerminated = true; - try { child.kill(); } catch { /* ignore */ } - } - break; + if (childExitCode !== 0) { + const errSource = + stderrText + || `cursor-agent exited with code ${String(childExitCode ?? "unknown")} and no output`; + const parsed = parseAgentError(errSource); + const msg = formatErrorForUser(parsed); + const errChunk = createChatCompletionChunk(id, created, model, msg, true); + res.write(`data: ${JSON.stringify(errChunk)}\n\n`); + res.write(formatSseDone()); + streamTerminated = true; + res.end(); + return; } - if (result.intercepted) { - break; + + const passThroughSummary = passThroughTracker.getSummary(); + if (passThroughSummary.hasActivity) { + await toastService.showPassThroughSummary(passThroughSummary.tools); } - if (result.skipConverter) { - continue; + if (passThroughSummary.errors.length > 0) { + await toastService.showErrorSummary(passThroughSummary.errors); } - } - if (streamTerminated || res.writableEnded) { - break; + const doneChunk = { + id, + object: "chat.completion.chunk", + created, + model, + choices: [{ index: 0, delta: {}, finish_reason: "stop" }], + }; + res.write(`data: ${JSON.stringify(doneChunk)}\n\n`); + if (usage) { + const usageChunk = createChatCompletionUsageChunk(id, created, model, usage); + res.write(`data: ${JSON.stringify(usageChunk)}\n\n`); + } + res.write(formatSseDone()); + res.end(); } - for (const sse of converter.handleEvent(event)) { - res.write(sse); + } finally { + draining = false; + if (chunkQueue.length > 0 || (childClosed && !streamTerminated && !res.writableEnded)) { + drainQueue(); } } - if (streamTerminated || res.writableEnded) { - return; - } - - perf.mark("request:done"); - perf.summarize(); - const stderrText = Buffer.concat(stderrChunks).toString().trim(); - log.debug("cursor-agent completed (node stream)", { - code, - stderrChars: stderrText.length, - }); - if (code !== 0) { - const errSource = - stderrText - || `cursor-agent exited with code ${String(code ?? "unknown")} and no output`; - const parsed = parseAgentError(errSource); - const msg = formatErrorForUser(parsed); - const errChunk = createChatCompletionChunk(id, created, model, msg, true); - res.write(`data: ${JSON.stringify(errChunk)}\n\n`); - res.write(formatSseDone()); - streamTerminated = true; - res.end(); - return; - } + }; - // Emit toast for passed-through MCP tools - const passThroughSummary = passThroughTracker.getSummary(); - if (passThroughSummary.hasActivity) { - await toastService.showPassThroughSummary(passThroughSummary.tools); - } - if (passThroughSummary.errors.length > 0) { - await toastService.showErrorSummary(passThroughSummary.errors); - } + child.stdout.on("data", (chunk) => { + chunkQueue.push(Buffer.from(chunk)); + drainQueue(); + }); - const doneChunk = { - id, - object: "chat.completion.chunk", - created, - model, - choices: [ - { - index: 0, - delta: {}, - finish_reason: "stop", - }, - ], - }; - res.write(`data: ${JSON.stringify(doneChunk)}\n\n`); - if (usage) { - const usageChunk = createChatCompletionUsageChunk(id, created, model, usage); - res.write(`data: ${JSON.stringify(usageChunk)}\n\n`); - } - res.write(formatSseDone()); - res.end(); + child.on("close", (code) => { + childClosed = true; + childExitCode = code; + drainQueue(); }); } } catch (error) { diff --git a/src/streaming/ai-sdk-parts.ts b/src/streaming/ai-sdk-parts.ts index c1c6194..35aad3d 100644 --- a/src/streaming/ai-sdk-parts.ts +++ b/src/streaming/ai-sdk-parts.ts @@ -8,7 +8,6 @@ import { type StreamJsonEvent, type StreamJsonToolCallEvent, } from "./types.js"; -import { DeltaTracker } from "./delta-tracker.js"; export type AiSdkStreamPart = | { @@ -34,44 +33,23 @@ export type AiSdkStreamPart = }; export class StreamToAiSdkParts { - private readonly tracker = new DeltaTracker(); private readonly toolArgsById = new Map(); private readonly startedToolIds = new Set(); - private sawAssistantPartials = false; - private sawThinkingPartials = false; + private emittedText = ""; + private emittedThinking = ""; handleEvent(event: StreamJsonEvent): AiSdkStreamPart[] { if (isAssistantText(event)) { - const isPartial = typeof event.timestamp_ms === "number"; - if (isPartial) { - const text = extractText(event); - if (text) { - this.sawAssistantPartials = true; - return [{ type: "text-delta", textDelta: text }]; - } - return []; - } - if (this.sawAssistantPartials) { - return []; - } - const delta = this.tracker.nextText(extractText(event)); + const text = extractText(event); + if (!text) return []; + const delta = this.nextDelta(text, "text"); return delta ? [{ type: "text-delta", textDelta: delta }] : []; } if (isThinking(event)) { - const isPartial = typeof event.timestamp_ms === "number"; - if (isPartial) { - const text = extractThinking(event); - if (text) { - this.sawThinkingPartials = true; - return [{ type: "text-delta", textDelta: text }]; - } - return []; - } - if (this.sawThinkingPartials) { - return []; - } - const delta = this.tracker.nextThinking(extractThinking(event)); + const text = extractThinking(event); + if (!text) return []; + const delta = this.nextDelta(text, "thinking"); return delta ? [{ type: "text-delta", textDelta: delta }] : []; } @@ -82,6 +60,35 @@ export class StreamToAiSdkParts { return []; } + /** + * Computes the actual new delta, correctly handling both delta-style and + * accumulated-style partial events from cursor-agent. + */ + private nextDelta(text: string, channel: "text" | "thinking"): string { + const emitted = channel === "text" ? this.emittedText : this.emittedThinking; + + if (!emitted) { + if (channel === "text") this.emittedText = text; + else this.emittedThinking = text; + return text; + } + + if (text.startsWith(emitted)) { + const delta = text.slice(emitted.length); + if (channel === "text") this.emittedText = text; + else this.emittedThinking = text; + return delta; + } + + if (emitted.startsWith(text)) { + return ""; + } + + if (channel === "text") this.emittedText += text; + else this.emittedThinking += text; + return text; + } + private handleToolCall(event: StreamJsonToolCallEvent): AiSdkStreamPart[] { const toolCallId = event.call_id || (event as { tool_call_id?: string }).tool_call_id || "unknown"; const toolName = inferToolName(event) || "tool"; diff --git a/src/streaming/openai-sse.ts b/src/streaming/openai-sse.ts index fe7b53d..c467f82 100644 --- a/src/streaming/openai-sse.ts +++ b/src/streaming/openai-sse.ts @@ -8,7 +8,6 @@ import { type StreamJsonEvent, type StreamJsonToolCallEvent, } from "./types.js"; -import { DeltaTracker } from "./delta-tracker.js"; type OpenAiToolCall = { index: number; @@ -60,12 +59,8 @@ export class StreamToSseConverter { private readonly id: string; private readonly created: number; private readonly model: string; - private readonly tracker = new DeltaTracker(); - // Events with timestamp_ms carry delta text; events without carry accumulated text. - // DeltaTracker handles accumulated text only. When partials (delta) were seen, - // the final accumulated event must be skipped to prevent 2x duplication. - private sawAssistantPartials = false; - private sawThinkingPartials = false; + private emittedText = ""; + private emittedThinking = ""; constructor(model: string, options?: { id?: string; created?: number }) { this.model = model; @@ -75,36 +70,16 @@ export class StreamToSseConverter { handleEvent(event: StreamJsonEvent): string[] { if (isAssistantText(event)) { - const isPartial = typeof event.timestamp_ms === "number"; - if (isPartial) { - const text = extractText(event); - if (text) { - this.sawAssistantPartials = true; - return [this.chunkWith({ content: text })]; - } - return []; - } - if (this.sawAssistantPartials) { - return []; - } - const delta = this.tracker.nextText(extractText(event)); + const text = extractText(event); + if (!text) return []; + const delta = this.nextDelta(text, "text"); return delta ? [this.chunkWith({ content: delta })] : []; } if (isThinking(event)) { - const isPartial = typeof event.timestamp_ms === "number"; - if (isPartial) { - const text = extractThinking(event); - if (text) { - this.sawThinkingPartials = true; - return [this.chunkWith({ reasoning_content: text })]; - } - return []; - } - if (this.sawThinkingPartials) { - return []; - } - const delta = this.tracker.nextThinking(extractThinking(event)); + const text = extractThinking(event); + if (!text) return []; + const delta = this.nextDelta(text, "thinking"); return delta ? [this.chunkWith({ reasoning_content: delta })] : []; } @@ -115,6 +90,35 @@ export class StreamToSseConverter { return []; } + /** + * Computes the actual new delta, correctly handling both delta-style and + * accumulated-style partial events from cursor-agent. + */ + private nextDelta(text: string, channel: "text" | "thinking"): string { + const emitted = channel === "text" ? this.emittedText : this.emittedThinking; + + if (!emitted) { + if (channel === "text") this.emittedText = text; + else this.emittedThinking = text; + return text; + } + + if (text.startsWith(emitted)) { + const delta = text.slice(emitted.length); + if (channel === "text") this.emittedText = text; + else this.emittedThinking = text; + return delta; + } + + if (emitted.startsWith(text)) { + return ""; + } + + if (channel === "text") this.emittedText += text; + else this.emittedThinking += text; + return text; + } + private chunkWith(delta: OpenAiDelta): string { return formatSseChunk(createChunk(this.id, this.created, this.model, delta)); }