Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 94 additions & 139 deletions src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,8 @@ export function extractCompletionFromStream(output: string): {
let usage: OpenAiUsage | undefined;
let sawAssistantPartials = false;
let sawThinkingPartials = false;
let lastPartialText = "";
let lastPartialThinking = "";

for (const line of lines) {
const event = parseStreamJsonLine(line);
Expand All @@ -457,8 +459,15 @@ export function extractCompletionFromStream(output: string): {

const isPartial = typeof (event as any).timestamp_ms === "number";
if (isPartial) {
assistantText += text;
sawAssistantPartials = true;
if (text.startsWith(lastPartialText)) {
assistantText += text.slice(lastPartialText.length);
} else if (!lastPartialText.startsWith(text)) {
assistantText += text;
}
if (text.length > lastPartialText.length) {
lastPartialText = text;
}
} else if (!sawAssistantPartials) {
assistantText = text;
}
Expand All @@ -469,8 +478,15 @@ export function extractCompletionFromStream(output: string): {
if (thinking) {
const isPartial = typeof (event as any).timestamp_ms === "number";
if (isPartial) {
reasoningText += thinking;
sawThinkingPartials = true;
if (thinking.startsWith(lastPartialThinking)) {
reasoningText += thinking.slice(lastPartialThinking.length);
} else if (!lastPartialThinking.startsWith(thinking)) {
reasoningText += thinking;
}
if (thinking.length > lastPartialThinking.length) {
lastPartialThinking = thinking;
}
} else if (!sawThinkingPartials) {
reasoningText = thinking;
}
Expand Down Expand Up @@ -1415,19 +1431,16 @@ async function ensureCursorProxyServer(workspaceDirectory: string, toolRouter?:
}
};

child.stdout.on("data", async (chunk) => {
if (streamTerminated || res.writableEnded) {
return;
}
if (!firstTokenReceived) { perf.mark("first-token"); firstTokenReceived = true; }
for (const line of lineBuffer.push(chunk)) {
if (streamTerminated || res.writableEnded) {
break;
}
const chunkQueue: Buffer[] = [];
let draining = false;
let childClosed = false;
let childExitCode: number | null = null;

const processLines = async (lines: string[]) => {
for (const line of lines) {
if (streamTerminated || res.writableEnded) break;
const event = parseStreamJsonLine(line);
if (!event) {
continue;
}
if (!event) continue;

if (isResult(event)) {
usage = extractOpenAiUsageFromResult(event) ?? usage;
Expand Down Expand Up @@ -1469,156 +1482,98 @@ async function ensureCursorProxyServer(workspaceDirectory: string, toolRouter?:
if (!result.terminate.silent) {
emitTerminalAssistantErrorAndTerminate(result.terminate.message);
} else {
// Silent termination: just end the stream without an error message
streamTerminated = true;
try { child.kill(); } catch { /* ignore */ }
}
break;
}
if (result.intercepted) {
break;
}
if (result.skipConverter) {
continue;
}
if (result.intercepted) break;
if (result.skipConverter) continue;
}

if (streamTerminated || res.writableEnded) {
break;
}
if (streamTerminated || res.writableEnded) break;
for (const sse of converter.handleEvent(event)) {
res.write(sse);
}
}
});
};

child.on("close", async (code) => {
if (streamTerminated || res.writableEnded) {
return;
}
for (const line of lineBuffer.flush()) {
if (streamTerminated || res.writableEnded) {
break;
}
const event = parseStreamJsonLine(line);
if (!event) {
continue;
const drainQueue = async () => {
if (draining) return;
draining = true;
try {
while (chunkQueue.length > 0) {
if (streamTerminated || res.writableEnded) break;
const chunk = chunkQueue.shift()!;
if (!firstTokenReceived) { perf.mark("first-token"); firstTokenReceived = true; }
await processLines(lineBuffer.push(chunk));
}

if (isResult(event)) {
usage = extractOpenAiUsageFromResult(event) ?? usage;
}
if (childClosed && !streamTerminated && !res.writableEnded) {
await processLines(lineBuffer.flush());
if (streamTerminated || res.writableEnded) return;

if (event.type === "tool_call") {
const result = await handleToolLoopEventWithFallback({
event: event as any,
boundary: boundaryContext.getBoundary(),
boundaryMode: boundaryContext.getBoundary().mode,
autoFallbackToLegacy: ENABLE_PROVIDER_BOUNDARY_AUTOFALLBACK,
toolLoopMode: TOOL_LOOP_MODE,
allowedToolNames,
toolSchemaMap,
toolLoopGuard,
toolMapper,
toolSessionId,
shouldEmitToolUpdates: SHOULD_EMIT_TOOL_UPDATES,
proxyExecuteToolCalls: PROXY_EXECUTE_TOOL_CALLS,
suppressConverterToolEvents: SUPPRESS_CONVERTER_TOOL_EVENTS,
toolRouter,
responseMeta: { id, created, model },
passThroughTracker,
onToolUpdate: (update) => {
res.write(formatToolUpdateEvent(update));
},
onToolResult: (toolResult) => {
res.write(`data: ${JSON.stringify(toolResult)}\n\n`);
},
onInterceptedToolCall: (toolCall) => {
emitToolCallAndTerminate(toolCall);
},
onFallbackToLegacy: (error) => {
boundaryContext.activateLegacyFallback("handleToolLoopEvent.close", error);
},
perf.mark("request:done");
perf.summarize();
const stderrText = Buffer.concat(stderrChunks).toString().trim();
log.debug("cursor-agent completed (node stream)", {
code: childExitCode,
stderrChars: stderrText.length,
});
if (result.terminate) {
if (!result.terminate.silent) {
emitTerminalAssistantErrorAndTerminate(result.terminate.message);
} else {
// Silent termination: just end the stream without an error message
streamTerminated = true;
try { child.kill(); } catch { /* ignore */ }
}
break;
if (childExitCode !== 0) {
const errSource =
stderrText
|| `cursor-agent exited with code ${String(childExitCode ?? "unknown")} and no output`;
const parsed = parseAgentError(errSource);
const msg = formatErrorForUser(parsed);
const errChunk = createChatCompletionChunk(id, created, model, msg, true);
res.write(`data: ${JSON.stringify(errChunk)}\n\n`);
res.write(formatSseDone());
streamTerminated = true;
res.end();
return;
}
if (result.intercepted) {
break;

const passThroughSummary = passThroughTracker.getSummary();
if (passThroughSummary.hasActivity) {
await toastService.showPassThroughSummary(passThroughSummary.tools);
}
if (result.skipConverter) {
continue;
if (passThroughSummary.errors.length > 0) {
await toastService.showErrorSummary(passThroughSummary.errors);
}
}

if (streamTerminated || res.writableEnded) {
break;
const doneChunk = {
id,
object: "chat.completion.chunk",
created,
model,
choices: [{ index: 0, delta: {}, finish_reason: "stop" }],
};
res.write(`data: ${JSON.stringify(doneChunk)}\n\n`);
if (usage) {
const usageChunk = createChatCompletionUsageChunk(id, created, model, usage);
res.write(`data: ${JSON.stringify(usageChunk)}\n\n`);
}
res.write(formatSseDone());
res.end();
}
for (const sse of converter.handleEvent(event)) {
res.write(sse);
} finally {
draining = false;
if (chunkQueue.length > 0 || (childClosed && !streamTerminated && !res.writableEnded)) {
drainQueue();
}
}
if (streamTerminated || res.writableEnded) {
return;
}

perf.mark("request:done");
perf.summarize();
const stderrText = Buffer.concat(stderrChunks).toString().trim();
log.debug("cursor-agent completed (node stream)", {
code,
stderrChars: stderrText.length,
});
if (code !== 0) {
const errSource =
stderrText
|| `cursor-agent exited with code ${String(code ?? "unknown")} and no output`;
const parsed = parseAgentError(errSource);
const msg = formatErrorForUser(parsed);
const errChunk = createChatCompletionChunk(id, created, model, msg, true);
res.write(`data: ${JSON.stringify(errChunk)}\n\n`);
res.write(formatSseDone());
streamTerminated = true;
res.end();
return;
}
};

// Emit toast for passed-through MCP tools
const passThroughSummary = passThroughTracker.getSummary();
if (passThroughSummary.hasActivity) {
await toastService.showPassThroughSummary(passThroughSummary.tools);
}
if (passThroughSummary.errors.length > 0) {
await toastService.showErrorSummary(passThroughSummary.errors);
}
child.stdout.on("data", (chunk) => {
chunkQueue.push(Buffer.from(chunk));
drainQueue();
});

const doneChunk = {
id,
object: "chat.completion.chunk",
created,
model,
choices: [
{
index: 0,
delta: {},
finish_reason: "stop",
},
],
};
res.write(`data: ${JSON.stringify(doneChunk)}\n\n`);
if (usage) {
const usageChunk = createChatCompletionUsageChunk(id, created, model, usage);
res.write(`data: ${JSON.stringify(usageChunk)}\n\n`);
}
res.write(formatSseDone());
res.end();
child.on("close", (code) => {
childClosed = true;
childExitCode = code;
drainQueue();
});
}
} catch (error) {
Expand Down
67 changes: 37 additions & 30 deletions src/streaming/ai-sdk-parts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
type StreamJsonEvent,
type StreamJsonToolCallEvent,
} from "./types.js";
import { DeltaTracker } from "./delta-tracker.js";

export type AiSdkStreamPart =
| {
Expand All @@ -34,44 +33,23 @@ export type AiSdkStreamPart =
};

export class StreamToAiSdkParts {
private readonly tracker = new DeltaTracker();
private readonly toolArgsById = new Map<string, string>();
private readonly startedToolIds = new Set<string>();
private sawAssistantPartials = false;
private sawThinkingPartials = false;
private emittedText = "";
private emittedThinking = "";

handleEvent(event: StreamJsonEvent): AiSdkStreamPart[] {
if (isAssistantText(event)) {
const isPartial = typeof event.timestamp_ms === "number";
if (isPartial) {
const text = extractText(event);
if (text) {
this.sawAssistantPartials = true;
return [{ type: "text-delta", textDelta: text }];
}
return [];
}
if (this.sawAssistantPartials) {
return [];
}
const delta = this.tracker.nextText(extractText(event));
const text = extractText(event);
if (!text) return [];
const delta = this.nextDelta(text, "text");
return delta ? [{ type: "text-delta", textDelta: delta }] : [];
}

if (isThinking(event)) {
const isPartial = typeof event.timestamp_ms === "number";
if (isPartial) {
const text = extractThinking(event);
if (text) {
this.sawThinkingPartials = true;
return [{ type: "text-delta", textDelta: text }];
}
return [];
}
if (this.sawThinkingPartials) {
return [];
}
const delta = this.tracker.nextThinking(extractThinking(event));
const text = extractThinking(event);
if (!text) return [];
const delta = this.nextDelta(text, "thinking");
return delta ? [{ type: "text-delta", textDelta: delta }] : [];
}

Expand All @@ -82,6 +60,35 @@ export class StreamToAiSdkParts {
return [];
}

/**
* Computes the actual new delta, correctly handling both delta-style and
* accumulated-style partial events from cursor-agent.
*/
private nextDelta(text: string, channel: "text" | "thinking"): string {
const emitted = channel === "text" ? this.emittedText : this.emittedThinking;

if (!emitted) {
if (channel === "text") this.emittedText = text;
else this.emittedThinking = text;
return text;
}

if (text.startsWith(emitted)) {
const delta = text.slice(emitted.length);
if (channel === "text") this.emittedText = text;
else this.emittedThinking = text;
return delta;
}

if (emitted.startsWith(text)) {
return "";
}

if (channel === "text") this.emittedText += text;
else this.emittedThinking += text;
return text;
}

private handleToolCall(event: StreamJsonToolCallEvent): AiSdkStreamPart[] {
const toolCallId = event.call_id || (event as { tool_call_id?: string }).tool_call_id || "unknown";
const toolName = inferToolName(event) || "tool";
Expand Down
Loading