forked from RooCodeInc/Roo-Code
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopenai-native.ts
More file actions
1585 lines (1444 loc) · 56.5 KB
/
openai-native.ts
File metadata and controls
1585 lines (1444 loc) · 56.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import * as os from "os"
import { v7 as uuidv7 } from "uuid"
import { Anthropic } from "@anthropic-ai/sdk"
import OpenAI from "openai"
import { Package } from "../../shared/package"
import {
type ModelInfo,
openAiNativeDefaultModelId,
OpenAiNativeModelId,
openAiNativeModels,
OPENAI_NATIVE_DEFAULT_TEMPERATURE,
type ReasoningEffort,
type VerbosityLevel,
type ReasoningEffortExtended,
type ServiceTier,
ApiProviderError,
} from "@roo-code/types"
import { TelemetryService } from "@roo-code/telemetry"
import type { ApiHandlerOptions } from "../../shared/api"
import { calculateApiCostOpenAI } from "../../shared/cost"
import { ApiStream, ApiStreamUsageChunk } from "../transform/stream"
import { getModelParams } from "../transform/model-params"
import { BaseProvider } from "./base-provider"
import type { SingleCompletionHandler, ApiHandlerCreateMessageMetadata } from "../index"
import { isMcpTool } from "../../utils/mcp-name"
import { sanitizeOpenAiCallId } from "../../utils/tool-id"
export type OpenAiNativeModel = ReturnType<OpenAiNativeHandler["getModel"]>
export class OpenAiNativeHandler extends BaseProvider implements SingleCompletionHandler {
protected options: ApiHandlerOptions
private client: OpenAI
private readonly providerName = "OpenAI Native"
// Session ID for request tracking (persists for the lifetime of the handler)
private readonly sessionId: string
/**
* Some Responses streams emit tool-call argument deltas without stable call id/name.
* Track the last observed tool identity from output_item events so we can still
* emit `tool_call_partial` chunks (tool-call-only streams).
*/
private pendingToolCallId: string | undefined
private pendingToolCallName: string | undefined
// Tracks whether this response already emitted text to avoid duplicate done-event rendering.
private sawTextOutputInCurrentResponse = false
// Tracks whether text arrived through delta events so content_part events can be treated as fallback-only.
private sawTextDeltaInCurrentResponse = false
// Tracks tool call IDs emitted via streaming partial events to prevent done-event duplicates.
private streamedToolCallIds = new Set<string>()
// Resolved service tier from Responses API (actual tier used by OpenAI)
private lastServiceTier: ServiceTier | undefined
// Complete response output array (includes reasoning items with encrypted_content)
private lastResponseOutput: any[] | undefined
// Last top-level response id from Responses API (for troubleshooting)
private lastResponseId: string | undefined
// Abort controller for cancelling ongoing requests
private abortController?: AbortController
// Event types handled by the shared event processor to avoid duplication
private readonly coreHandledEventTypes = new Set<string>([
"response.text.delta",
"response.output_text.delta",
"response.text.done",
"response.output_text.done",
"response.content_part.added",
"response.content_part.done",
"response.reasoning.delta",
"response.reasoning_text.delta",
"response.reasoning_summary.delta",
"response.reasoning_summary_text.delta",
"response.refusal.delta",
"response.output_item.added",
"response.output_item.done",
"response.done",
"response.completed",
"response.tool_call_arguments.delta",
"response.function_call_arguments.delta",
"response.tool_call_arguments.done",
"response.function_call_arguments.done",
])
constructor(options: ApiHandlerOptions) {
super()
this.options = options
// Generate a session ID for request tracking
this.sessionId = uuidv7()
// Default to including reasoning.summary: "auto" for models that support Responses API
// reasoning summaries unless explicitly disabled.
if (this.options.enableResponsesReasoningSummary === undefined) {
this.options.enableResponsesReasoningSummary = true
}
const apiKey = this.options.openAiNativeApiKey ?? "not-provided"
// Include originator, session_id, and User-Agent headers for API tracking and debugging
const userAgent = `roo-code/${Package.version} (${os.platform()} ${os.release()}; ${os.arch()}) node/${process.version.slice(1)}`
this.client = new OpenAI({
baseURL: this.options.openAiNativeBaseUrl || undefined,
apiKey,
defaultHeaders: {
originator: "roo-code",
session_id: this.sessionId,
"User-Agent": userAgent,
},
})
}
private normalizeUsage(usage: any, model: OpenAiNativeModel): ApiStreamUsageChunk | undefined {
if (!usage) return undefined
// Prefer detailed shapes when available (Responses API)
const inputDetails = usage.input_tokens_details ?? usage.prompt_tokens_details
// Extract cache information from details with better readability
const hasCachedTokens = typeof inputDetails?.cached_tokens === "number"
const hasCacheMissTokens = typeof inputDetails?.cache_miss_tokens === "number"
const cachedFromDetails = hasCachedTokens ? inputDetails.cached_tokens : 0
const missFromDetails = hasCacheMissTokens ? inputDetails.cache_miss_tokens : 0
// If total input tokens are missing but we have details, derive from them
let totalInputTokens = usage.input_tokens ?? usage.prompt_tokens ?? 0
if (totalInputTokens === 0 && inputDetails && (cachedFromDetails > 0 || missFromDetails > 0)) {
totalInputTokens = cachedFromDetails + missFromDetails
}
const totalOutputTokens = usage.output_tokens ?? usage.completion_tokens ?? 0
// Note: missFromDetails is NOT used as fallback for cache writes
// Cache miss tokens represent tokens that weren't found in cache (part of input)
// Cache write tokens represent tokens being written to cache for future use
const cacheWriteTokens = usage.cache_creation_input_tokens ?? usage.cache_write_tokens ?? 0
const cacheReadTokens =
usage.cache_read_input_tokens ?? usage.cache_read_tokens ?? usage.cached_tokens ?? cachedFromDetails ?? 0
// Resolve effective tier: prefer actual tier from response; otherwise requested tier
const effectiveTier =
this.lastServiceTier || (this.options.openAiNativeServiceTier as ServiceTier | undefined) || undefined
const effectiveInfo = this.applyServiceTierPricing(model.info, effectiveTier)
// Pass total input tokens directly to calculateApiCostOpenAI
// The function handles subtracting both cache reads and writes internally
const { totalCost } = calculateApiCostOpenAI(
effectiveInfo,
totalInputTokens,
totalOutputTokens,
cacheWriteTokens,
cacheReadTokens,
effectiveTier,
)
const reasoningTokens =
typeof usage.output_tokens_details?.reasoning_tokens === "number"
? usage.output_tokens_details.reasoning_tokens
: undefined
const out: ApiStreamUsageChunk = {
type: "usage",
// Keep inputTokens as TOTAL input to preserve correct context length
inputTokens: totalInputTokens,
outputTokens: totalOutputTokens,
cacheWriteTokens,
cacheReadTokens,
...(typeof reasoningTokens === "number" ? { reasoningTokens } : {}),
totalCost,
}
return out
}
override async *createMessage(
systemPrompt: string,
messages: Anthropic.Messages.MessageParam[],
metadata?: ApiHandlerCreateMessageMetadata,
): ApiStream {
const model = this.getModel()
// Use Responses API for ALL models
yield* this.handleResponsesApiMessage(model, systemPrompt, messages, metadata)
}
private async *handleResponsesApiMessage(
model: OpenAiNativeModel,
systemPrompt: string,
messages: Anthropic.Messages.MessageParam[],
metadata?: ApiHandlerCreateMessageMetadata,
): ApiStream {
// Reset resolved tier for this request; will be set from response if present
this.lastServiceTier = undefined
// Reset output array to capture current response output items
this.lastResponseOutput = undefined
// Reset last response id for this request
this.lastResponseId = undefined
// Reset pending tool identity for this request
this.pendingToolCallId = undefined
this.pendingToolCallName = undefined
this.sawTextOutputInCurrentResponse = false
this.sawTextDeltaInCurrentResponse = false
this.streamedToolCallIds.clear()
// Use Responses API for ALL models
const { verbosity, reasoning } = this.getModel()
// Resolve reasoning effort for models that support it
const reasoningEffort = this.getReasoningEffort(model)
// Format full conversation (messages already include reasoning items from API history)
const formattedInput = this.formatFullConversation(systemPrompt, messages)
// Build request body
const requestBody = this.buildRequestBody(
model,
formattedInput,
systemPrompt,
verbosity,
reasoningEffort,
metadata,
)
// Make the request (pass systemPrompt and messages for potential retry)
yield* this.executeRequest(requestBody, model, metadata, systemPrompt, messages)
}
private buildRequestBody(
model: OpenAiNativeModel,
formattedInput: any,
systemPrompt: string,
verbosity: any,
reasoningEffort: ReasoningEffortExtended | undefined,
metadata?: ApiHandlerCreateMessageMetadata,
): any {
// Ensure all properties are in the required array for OpenAI's strict mode
// This recursively processes nested objects and array items
const ensureAllRequired = (schema: any): any => {
if (!schema || typeof schema !== "object" || schema.type !== "object") {
return schema
}
const result = { ...schema }
// OpenAI Responses API requires additionalProperties: false on all object schemas
// Only add if not already set to false (to avoid unnecessary mutations)
if (result.additionalProperties !== false) {
result.additionalProperties = false
}
if (result.properties) {
const allKeys = Object.keys(result.properties)
result.required = allKeys
// Recursively process nested objects
const newProps = { ...result.properties }
for (const key of allKeys) {
const prop = newProps[key]
if (prop.type === "object") {
newProps[key] = ensureAllRequired(prop)
} else if (prop.type === "array" && prop.items?.type === "object") {
newProps[key] = {
...prop,
items: ensureAllRequired(prop.items),
}
}
}
result.properties = newProps
}
return result
}
// Adds additionalProperties: false to all object schemas recursively
// without modifying required array. Used for MCP tools with strict: false
// to comply with OpenAI Responses API requirements.
const ensureAdditionalPropertiesFalse = (schema: any): any => {
if (!schema || typeof schema !== "object" || schema.type !== "object") {
return schema
}
const result = { ...schema }
// OpenAI Responses API requires additionalProperties: false on all object schemas
// Only add if not already set to false (to avoid unnecessary mutations)
if (result.additionalProperties !== false) {
result.additionalProperties = false
}
if (result.properties) {
// Recursively process nested objects
const newProps = { ...result.properties }
for (const key of Object.keys(result.properties)) {
const prop = newProps[key]
if (prop && prop.type === "object") {
newProps[key] = ensureAdditionalPropertiesFalse(prop)
} else if (prop && prop.type === "array" && prop.items?.type === "object") {
newProps[key] = {
...prop,
items: ensureAdditionalPropertiesFalse(prop.items),
}
}
}
result.properties = newProps
}
return result
}
// Build a request body for the OpenAI Responses API.
// Ensure we explicitly pass max_output_tokens based on Roo's reserved model response calculation
// so requests do not default to very large limits (e.g., 120k).
interface ResponsesRequestBody {
model: string
input: Array<{ role: "user" | "assistant"; content: any[] } | { type: string; content: string }>
stream: boolean
reasoning?: { effort?: ReasoningEffortExtended; summary?: "auto" }
text?: { verbosity: VerbosityLevel }
temperature?: number
max_output_tokens?: number
store?: boolean
instructions?: string
service_tier?: ServiceTier
include?: string[]
/** Prompt cache retention policy: "in_memory" (default) or "24h" for extended caching */
prompt_cache_retention?: "in_memory" | "24h"
tools?: Array<{
type: "function"
name: string
description?: string
parameters?: any
strict?: boolean
}>
tool_choice?: any
parallel_tool_calls?: boolean
}
// Validate requested tier against model support; if not supported, omit.
const requestedTier = (this.options.openAiNativeServiceTier as ServiceTier | undefined) || undefined
const allowedTierNames = new Set(model.info.tiers?.map((t) => t.name).filter(Boolean) || [])
// Decide whether to enable extended prompt cache retention for this request
const promptCacheRetention = this.getPromptCacheRetention(model)
const body: ResponsesRequestBody = {
model: model.id,
input: formattedInput,
stream: true,
// Always use stateless operation with encrypted reasoning
store: false,
// Always include instructions (system prompt) for Responses API.
// Unlike Chat Completions, system/developer roles in input have no special semantics here.
// The official way to set system behavior is the top-level `instructions` field.
instructions: systemPrompt,
// Only include encrypted reasoning content when reasoning effort is set
...(reasoningEffort ? { include: ["reasoning.encrypted_content"] } : {}),
...(reasoningEffort
? {
reasoning: {
...(reasoningEffort ? { effort: reasoningEffort } : {}),
...(this.options.enableResponsesReasoningSummary ? { summary: "auto" as const } : {}),
},
}
: {}),
// Only include temperature if the model supports it
...(model.info.supportsTemperature !== false && {
temperature: this.options.modelTemperature ?? OPENAI_NATIVE_DEFAULT_TEMPERATURE,
}),
// Explicitly include the calculated max output tokens.
// Use the per-request reserved output computed by Roo (params.maxTokens from getModelParams).
...(model.maxTokens ? { max_output_tokens: model.maxTokens } : {}),
// Include tier when selected and supported by the model, or when explicitly "default"
...(requestedTier &&
(requestedTier === "default" || allowedTierNames.has(requestedTier)) && {
service_tier: requestedTier,
}),
// Enable extended prompt cache retention for models that support it.
// This uses the OpenAI Responses API `prompt_cache_retention` parameter.
...(promptCacheRetention ? { prompt_cache_retention: promptCacheRetention } : {}),
tools: (metadata?.tools ?? [])
.filter((tool) => tool.type === "function")
.map((tool) => {
// MCP tools use the 'mcp--' prefix - disable strict mode for them
// to preserve optional parameters from the MCP server schema
// But we still need to add additionalProperties: false for OpenAI Responses API
const isMcp = isMcpTool(tool.function.name)
return {
type: "function",
name: tool.function.name,
description: tool.function.description,
parameters: isMcp
? ensureAdditionalPropertiesFalse(tool.function.parameters)
: ensureAllRequired(tool.function.parameters),
strict: !isMcp,
}
}),
tool_choice: metadata?.tool_choice,
parallel_tool_calls: metadata?.parallelToolCalls ?? true,
}
// Include text.verbosity only when the model explicitly supports it
if (model.info.supportsVerbosity === true) {
body.text = { verbosity: (verbosity || "medium") as VerbosityLevel }
}
return body
}
private async *executeRequest(
requestBody: any,
model: OpenAiNativeModel,
metadata?: ApiHandlerCreateMessageMetadata,
systemPrompt?: string,
messages?: Anthropic.Messages.MessageParam[],
): ApiStream {
// Create AbortController for cancellation
this.abortController = new AbortController()
// Build per-request headers using taskId when available, falling back to sessionId
const taskId = metadata?.taskId
const userAgent = `roo-code/${Package.version} (${os.platform()} ${os.release()}; ${os.arch()}) node/${process.version.slice(1)}`
const requestHeaders: Record<string, string> = {
originator: "roo-code",
session_id: taskId || this.sessionId,
"User-Agent": userAgent,
}
try {
// Use the official SDK with per-request headers
const stream = (await (this.client as any).responses.create(requestBody, {
signal: this.abortController.signal,
headers: requestHeaders,
})) as AsyncIterable<any>
if (typeof (stream as any)[Symbol.asyncIterator] !== "function") {
throw new Error(
"OpenAI SDK did not return an AsyncIterable for Responses API streaming. Falling back to SSE.",
)
}
for await (const event of stream) {
// Check if request was aborted
if (this.abortController.signal.aborted) {
break
}
for await (const outChunk of this.processEvent(event, model)) {
yield outChunk
}
}
} catch (sdkErr: any) {
// For errors, fallback to manual SSE via fetch
yield* this.makeResponsesApiRequest(requestBody, model, metadata, systemPrompt, messages)
} finally {
this.abortController = undefined
}
}
private formatFullConversation(systemPrompt: string, messages: Anthropic.Messages.MessageParam[]): any {
// Format the entire conversation history for the Responses API using structured format
// The Responses API (like Realtime API) accepts a list of items, which can be messages, function calls, or function call outputs.
const formattedInput: any[] = []
// Do NOT embed the system prompt as a developer message in the Responses API input.
// The Responses API treats roles as free-form; use the top-level `instructions` field instead.
// Process each message
for (const message of messages) {
// Check if this is a reasoning item (already formatted in API history)
if ((message as any).type === "reasoning") {
// Pass through reasoning items as-is
formattedInput.push(message)
continue
}
if (message.role === "user") {
const content: any[] = []
const toolResults: any[] = []
if (typeof message.content === "string") {
content.push({ type: "input_text", text: message.content })
} else if (Array.isArray(message.content)) {
for (const block of message.content) {
if (block.type === "text") {
content.push({ type: "input_text", text: block.text })
} else if (block.type === "image") {
const image = block as Anthropic.Messages.ImageBlockParam
const imageUrl = `data:${image.source.media_type};base64,${image.source.data}`
content.push({ type: "input_image", image_url: imageUrl })
} else if (block.type === "tool_result") {
// Map Anthropic tool_result to Responses API function_call_output item
const result =
typeof block.content === "string"
? block.content
: block.content?.map((c) => (c.type === "text" ? c.text : "")).join("") || ""
toolResults.push({
type: "function_call_output",
// Sanitize and truncate call_id to fit OpenAI's 64-char limit
call_id: sanitizeOpenAiCallId(block.tool_use_id),
output: result,
})
}
}
}
// Add user message first
if (content.length > 0) {
formattedInput.push({ role: "user", content })
}
// Add tool results as separate items
if (toolResults.length > 0) {
formattedInput.push(...toolResults)
}
} else if (message.role === "assistant") {
const content: any[] = []
const toolCalls: any[] = []
if (typeof message.content === "string") {
content.push({ type: "output_text", text: message.content })
} else if (Array.isArray(message.content)) {
for (const block of message.content) {
if (block.type === "text") {
content.push({ type: "output_text", text: block.text })
} else if (block.type === "tool_use") {
// Map Anthropic tool_use to Responses API function_call item
toolCalls.push({
type: "function_call",
// Sanitize and truncate call_id to fit OpenAI's 64-char limit
call_id: sanitizeOpenAiCallId(block.id),
name: block.name,
arguments: JSON.stringify(block.input),
})
}
}
}
// Add assistant message if it has content
if (content.length > 0) {
formattedInput.push({ role: "assistant", content })
}
// Add tool calls as separate items
if (toolCalls.length > 0) {
formattedInput.push(...toolCalls)
}
}
}
return formattedInput
}
private async *makeResponsesApiRequest(
requestBody: any,
model: OpenAiNativeModel,
metadata?: ApiHandlerCreateMessageMetadata,
systemPrompt?: string,
messages?: Anthropic.Messages.MessageParam[],
): ApiStream {
const apiKey = this.options.openAiNativeApiKey ?? "not-provided"
const baseUrl = this.options.openAiNativeBaseUrl || "https://api.openai.com"
const url = `${baseUrl}/v1/responses`
// Create AbortController for cancellation
this.abortController = new AbortController()
// Build per-request headers using taskId when available, falling back to sessionId
const taskId = metadata?.taskId
const userAgent = `roo-code/${Package.version} (${os.platform()} ${os.release()}; ${os.arch()}) node/${process.version.slice(1)}`
try {
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${apiKey}`,
originator: "roo-code",
session_id: taskId || this.sessionId,
"User-Agent": userAgent,
},
body: JSON.stringify(requestBody),
signal: this.abortController.signal,
})
if (!response.ok) {
const errorText = await response.text()
let errorMessage = `OpenAI Responses API request failed (${response.status})`
let errorDetails = ""
// Try to parse error as JSON for better error messages
try {
const errorJson = JSON.parse(errorText)
if (errorJson.error?.message) {
errorDetails = errorJson.error.message
} else if (errorJson.message) {
errorDetails = errorJson.message
} else {
errorDetails = errorText
}
} catch {
// If not JSON, use the raw text
errorDetails = errorText
}
// Provide user-friendly error messages based on status code
switch (response.status) {
case 400:
errorMessage = "Invalid request to Responses API. Please check your input parameters."
break
case 401:
errorMessage = "Authentication failed. Please check your OpenAI API key."
break
case 403:
errorMessage = "Access denied. Your API key may not have access to this endpoint."
break
case 404:
errorMessage =
"Responses API endpoint not found. The endpoint may not be available yet or requires a different configuration."
break
case 429:
errorMessage = "Rate limit exceeded. Please try again later."
break
case 500:
case 502:
case 503:
errorMessage = "OpenAI service error. Please try again later."
break
default:
errorMessage = `Responses API error (${response.status})`
}
// Append details if available
if (errorDetails) {
errorMessage += ` - ${errorDetails}`
}
throw new Error(errorMessage)
}
if (!response.body) {
throw new Error("Responses API error: No response body")
}
// Handle streaming response
yield* this.handleStreamResponse(response.body, model)
} catch (error) {
const model = this.getModel()
const errorMessage = error instanceof Error ? error.message : String(error)
const apiError = new ApiProviderError(errorMessage, this.providerName, model.id, "createMessage")
TelemetryService.instance.captureException(apiError)
if (error instanceof Error) {
// Re-throw with the original error message if it's already formatted
if (error.message.includes("Responses API")) {
throw error
}
// Otherwise, wrap it with context
throw new Error(`Failed to connect to Responses API: ${error.message}`)
}
// Handle non-Error objects
throw new Error(`Unexpected error connecting to Responses API`)
} finally {
this.abortController = undefined
}
}
/**
* Handles the streaming response from the Responses API.
*
* This function iterates through the Server-Sent Events (SSE) stream, parses each event,
* and yields structured data chunks (`ApiStream`). It handles a wide variety of event types,
* including text deltas, reasoning, usage data, and various status/tool events.
*/
private async *handleStreamResponse(body: ReadableStream<Uint8Array>, model: OpenAiNativeModel): ApiStream {
const reader = body.getReader()
const decoder = new TextDecoder()
let buffer = ""
let hasContent = false
let totalInputTokens = 0
let totalOutputTokens = 0
try {
while (true) {
// Check if request was aborted
if (this.abortController?.signal.aborted) {
break
}
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split("\n")
buffer = lines.pop() || ""
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6).trim()
if (data === "[DONE]") {
continue
}
try {
const parsed = JSON.parse(data)
// Capture resolved service tier if present
if (parsed.response?.service_tier) {
this.lastServiceTier = parsed.response.service_tier as ServiceTier
}
// Capture complete output array (includes reasoning items with encrypted_content)
if (parsed.response?.output && Array.isArray(parsed.response.output)) {
this.lastResponseOutput = parsed.response.output
}
// Capture top-level response id
if (parsed.response?.id) {
this.lastResponseId = parsed.response.id as string
}
// Delegate standard event types to the shared processor to avoid duplication.
// This applies to both SDK and raw SSE fallback paths.
if (parsed?.type && this.coreHandledEventTypes.has(parsed.type)) {
for await (const outChunk of this.processEvent(parsed, model)) {
// Track whether we've emitted any content so fallback handling can decide appropriately
// Include tool calls so tool-call-only responses aren't treated as empty
if (
outChunk.type === "text" ||
outChunk.type === "reasoning" ||
outChunk.type === "tool_call" ||
outChunk.type === "tool_call_partial"
) {
hasContent = true
}
yield outChunk
}
continue
}
// Check if this is a complete response (non-streaming format)
if (parsed.response && parsed.response.output && Array.isArray(parsed.response.output)) {
// Handle complete response in the initial event
for (const outputItem of parsed.response.output) {
if (outputItem.type === "text" && outputItem.content) {
for (const content of outputItem.content) {
if (content.type === "text" && content.text) {
hasContent = true
yield {
type: "text",
text: content.text,
}
}
}
}
// Additionally handle reasoning summaries if present (non-streaming summary output)
if (outputItem.type === "reasoning" && Array.isArray(outputItem.summary)) {
for (const summary of outputItem.summary) {
if (summary?.type === "summary_text" && typeof summary.text === "string") {
hasContent = true
yield {
type: "reasoning",
text: summary.text,
}
}
}
}
}
// Check for usage in the complete response
if (parsed.response.usage) {
const usageData = this.normalizeUsage(parsed.response.usage, model)
if (usageData) {
yield usageData
}
}
}
// Handle streaming delta events for text content
else if (
parsed.type === "response.text.delta" ||
parsed.type === "response.output_text.delta"
) {
// Primary streaming event for text deltas
if (parsed.delta) {
hasContent = true
yield {
type: "text",
text: parsed.delta,
}
}
} else if (
parsed.type === "response.text.done" ||
parsed.type === "response.output_text.done"
) {
// Text streaming completed - final text already streamed via deltas
}
// Handle reasoning delta events
else if (
parsed.type === "response.reasoning.delta" ||
parsed.type === "response.reasoning_text.delta"
) {
// Streaming reasoning content
if (parsed.delta) {
hasContent = true
yield {
type: "reasoning",
text: parsed.delta,
}
}
} else if (
parsed.type === "response.reasoning.done" ||
parsed.type === "response.reasoning_text.done"
) {
// Reasoning streaming completed
}
// Handle reasoning summary events
else if (
parsed.type === "response.reasoning_summary.delta" ||
parsed.type === "response.reasoning_summary_text.delta"
) {
// Streaming reasoning summary
if (parsed.delta) {
hasContent = true
yield {
type: "reasoning",
text: parsed.delta,
}
}
} else if (
parsed.type === "response.reasoning_summary.done" ||
parsed.type === "response.reasoning_summary_text.done"
) {
// Reasoning summary completed
}
// Handle refusal delta events
else if (parsed.type === "response.refusal.delta") {
// Model is refusing to answer
if (parsed.delta) {
hasContent = true
yield {
type: "text",
text: `[Refusal] ${parsed.delta}`,
}
}
} else if (parsed.type === "response.refusal.done") {
// Refusal completed
}
// Handle audio delta events (for multimodal responses)
else if (parsed.type === "response.audio.delta") {
// Audio streaming - we'll skip for now as we focus on text
// Could be handled in future for voice responses
} else if (parsed.type === "response.audio.done") {
// Audio completed
}
// Handle audio transcript delta events
else if (parsed.type === "response.audio_transcript.delta") {
// Audio transcript streaming
if (parsed.delta) {
hasContent = true
yield {
type: "text",
text: parsed.delta,
}
}
} else if (parsed.type === "response.audio_transcript.done") {
// Audio transcript completed
}
// Handle content part events (for structured content)
else if (parsed.type === "response.content_part.added") {
// New content part added - could be text, image, etc.
if (parsed.part?.type === "text" && parsed.part.text) {
hasContent = true
yield {
type: "text",
text: parsed.part.text,
}
}
} else if (parsed.type === "response.content_part.done") {
// Content part completed
}
// Handle output item events (alternative format)
else if (parsed.type === "response.output_item.added") {
// This is where the actual content comes through in some test cases
if (parsed.item) {
if (parsed.item.type === "text" && parsed.item.text) {
hasContent = true
yield { type: "text", text: parsed.item.text }
} else if (parsed.item.type === "reasoning" && parsed.item.text) {
hasContent = true
yield { type: "reasoning", text: parsed.item.text }
} else if (parsed.item.type === "message" && parsed.item.content) {
// Handle message type items
for (const content of parsed.item.content) {
if (content.type === "text" && content.text) {
hasContent = true
yield { type: "text", text: content.text }
}
}
}
}
} else if (parsed.type === "response.output_item.done") {
// Output item completed
}
// Handle function/tool call events
else if (
parsed.type === "response.function_call_arguments.delta" ||
parsed.type === "response.tool_call_arguments.delta" ||
parsed.type === "response.function_call_arguments.done" ||
parsed.type === "response.tool_call_arguments.done"
) {
// Delegated to processEvent (handles accumulation and completion)
for await (const outChunk of this.processEvent(parsed, model)) {
yield outChunk
}
}
// Handle MCP (Model Context Protocol) tool events
else if (parsed.type === "response.mcp_call_arguments.delta") {
// MCP tool call arguments streaming
} else if (parsed.type === "response.mcp_call_arguments.done") {
// MCP tool call completed
} else if (parsed.type === "response.mcp_call.in_progress") {
// MCP tool call in progress
} else if (
parsed.type === "response.mcp_call.completed" ||
parsed.type === "response.mcp_call.failed"
) {
// MCP tool call status events
} else if (parsed.type === "response.mcp_list_tools.in_progress") {
// MCP list tools in progress
} else if (
parsed.type === "response.mcp_list_tools.completed" ||
parsed.type === "response.mcp_list_tools.failed"
) {
// MCP list tools status events
}
// Handle web search events
else if (parsed.type === "response.web_search_call.searching") {
// Web search in progress
} else if (parsed.type === "response.web_search_call.in_progress") {
// Processing web search results
} else if (parsed.type === "response.web_search_call.completed") {
// Web search completed
}
// Handle code interpreter events
else if (parsed.type === "response.code_interpreter_call_code.delta") {
// Code interpreter code streaming
if (parsed.delta) {
// Could yield as a special code type if needed
}
} else if (parsed.type === "response.code_interpreter_call_code.done") {
// Code interpreter code completed
} else if (parsed.type === "response.code_interpreter_call.interpreting") {
// Code interpreter running
} else if (parsed.type === "response.code_interpreter_call.in_progress") {
// Code execution in progress
} else if (parsed.type === "response.code_interpreter_call.completed") {
// Code interpreter completed
}
// Handle file search events
else if (parsed.type === "response.file_search_call.searching") {
// File search in progress
} else if (parsed.type === "response.file_search_call.in_progress") {
// Processing file search results
} else if (parsed.type === "response.file_search_call.completed") {
// File search completed
}
// Handle image generation events
else if (parsed.type === "response.image_gen_call.generating") {
// Image generation in progress
} else if (parsed.type === "response.image_gen_call.in_progress") {
// Processing image generation
} else if (parsed.type === "response.image_gen_call.partial_image") {
// Image partially generated
} else if (parsed.type === "response.image_gen_call.completed") {
// Image generation completed
}
// Handle computer use events
else if (
parsed.type === "response.computer_tool_call.output_item" ||
parsed.type === "response.computer_tool_call.output_screenshot"
) {
// Computer use tool events
}
// Handle annotation events
else if (
parsed.type === "response.output_text_annotation.added" ||
parsed.type === "response.text_annotation.added"
) {
// Text annotation events - could be citations, references, etc.
}
// Handle error events
else if (parsed.type === "response.error" || parsed.type === "error") {
// Error event from the API
if (parsed.error || parsed.message) {
throw new Error(
`Responses API error: ${parsed.error?.message || parsed.message || "Unknown error"}`,
)
}
}
// Handle incomplete event
else if (parsed.type === "response.incomplete") {
// Response was incomplete - might need to handle specially
}
// Handle queued event
else if (parsed.type === "response.queued") {
// Response is queued