Skip to content

Commit 988ce33

Browse files
committed
fix(security): stream A2A response body to unblock SSE; keep text/json/arrayBuffer for non-streaming callers
1 parent 5719fd6 commit 988ce33

2 files changed

Lines changed: 66 additions & 57 deletions

File tree

apps/sim/lib/a2a/utils.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ export async function createA2AClient(agentUrl: string, apiKey?: string): Promis
107107

108108
const res = await secureFetchWithPinnedIP(url, resolvedIP, { method, headers, body, signal })
109109
const resHeaders = new Headers(res.headers.toRecord())
110-
const resBody = await res.text()
111-
return new Response(resBody, {
110+
return new Response(res.body, {
112111
status: res.status,
113112
statusText: res.statusText,
114113
headers: resHeaders,

apps/sim/lib/core/security/input-validation.server.ts

Lines changed: 65 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ export interface SecureFetchResponse {
251251
status: number
252252
statusText: string
253253
headers: SecureFetchHeaders
254+
body: ReadableStream<Uint8Array> | null
254255
text: () => Promise<string>
255256
json: () => Promise<unknown>
256257
arrayBuffer: () => Promise<ArrayBuffer>
@@ -361,67 +362,76 @@ export async function secureFetchWithPinnedIP(
361362
return
362363
}
363364

364-
const chunks: Buffer[] = []
365-
let totalBytes = 0
366-
let responseTerminated = false
367-
368-
res.on('data', (chunk: Buffer) => {
369-
if (responseTerminated) return
370-
371-
totalBytes += chunk.length
372-
if (
373-
typeof maxResponseBytes === 'number' &&
374-
maxResponseBytes > 0 &&
375-
totalBytes > maxResponseBytes
376-
) {
377-
responseTerminated = true
378-
res.destroy(new Error(`Response exceeded maximum size of ${maxResponseBytes} bytes`))
379-
return
365+
// Parse headers immediately — they're available before any body data arrives
366+
const headersRecord: Record<string, string> = {}
367+
let setCookieArray: string[] = []
368+
for (const [key, value] of Object.entries(res.headers)) {
369+
const lowerKey = key.toLowerCase()
370+
if (lowerKey === 'set-cookie') {
371+
if (Array.isArray(value)) {
372+
setCookieArray = value
373+
headersRecord[lowerKey] = value.join(', ')
374+
} else if (typeof value === 'string') {
375+
setCookieArray = [value]
376+
headersRecord[lowerKey] = value
377+
}
378+
} else if (typeof value === 'string') {
379+
headersRecord[lowerKey] = value
380+
} else if (Array.isArray(value)) {
381+
headersRecord[lowerKey] = value.join(', ')
380382
}
383+
}
381384

382-
chunks.push(chunk)
383-
})
384-
385-
res.on('error', (error) => {
386-
settledReject(error)
385+
let totalBytes = 0
386+
const nodeRes = res
387+
const body = new ReadableStream<Uint8Array>({
388+
start(controller) {
389+
nodeRes.on('data', (chunk: Buffer) => {
390+
totalBytes += chunk.length
391+
if (
392+
typeof maxResponseBytes === 'number' &&
393+
maxResponseBytes > 0 &&
394+
totalBytes > maxResponseBytes
395+
) {
396+
controller.error(
397+
new Error(`Response exceeded maximum size of ${maxResponseBytes} bytes`)
398+
)
399+
nodeRes.destroy()
400+
return
401+
}
402+
controller.enqueue(new Uint8Array(chunk))
403+
})
404+
nodeRes.on('end', () => controller.close())
405+
nodeRes.on('error', (err) => controller.error(err))
406+
},
407+
cancel() {
408+
nodeRes.destroy()
409+
},
387410
})
388411

389-
res.on('end', () => {
390-
if (responseTerminated) return
391-
const bodyBuffer = Buffer.concat(chunks)
392-
const body = bodyBuffer.toString('utf-8')
393-
const headersRecord: Record<string, string> = {}
394-
let setCookieArray: string[] = []
395-
for (const [key, value] of Object.entries(res.headers)) {
396-
const lowerKey = key.toLowerCase()
397-
if (lowerKey === 'set-cookie') {
398-
if (Array.isArray(value)) {
399-
setCookieArray = value
400-
headersRecord[lowerKey] = value.join(', ')
401-
} else if (typeof value === 'string') {
402-
setCookieArray = [value]
403-
headersRecord[lowerKey] = value
404-
}
405-
} else if (typeof value === 'string') {
406-
headersRecord[lowerKey] = value
407-
} else if (Array.isArray(value)) {
408-
headersRecord[lowerKey] = value.join(', ')
409-
}
412+
async function readBodyAsBuffer(): Promise<Buffer> {
413+
const reader = body.getReader()
414+
const buffers: Uint8Array[] = []
415+
while (true) {
416+
const { done, value } = await reader.read()
417+
if (done) break
418+
if (value) buffers.push(value)
410419
}
420+
return Buffer.concat(buffers.map((b) => Buffer.from(b)))
421+
}
411422

412-
settledResolve({
413-
ok: statusCode >= 200 && statusCode < 300,
414-
status: statusCode,
415-
statusText: res.statusMessage || '',
416-
headers: new SecureFetchHeaders(headersRecord, setCookieArray),
417-
text: async () => body,
418-
json: async () => JSON.parse(body),
419-
arrayBuffer: async () =>
420-
bodyBuffer.buffer.slice(
421-
bodyBuffer.byteOffset,
422-
bodyBuffer.byteOffset + bodyBuffer.byteLength
423-
),
424-
})
423+
settledResolve({
424+
ok: statusCode >= 200 && statusCode < 300,
425+
status: statusCode,
426+
statusText: res.statusMessage || '',
427+
headers: new SecureFetchHeaders(headersRecord, setCookieArray),
428+
body,
429+
text: async () => (await readBodyAsBuffer()).toString('utf-8'),
430+
json: async () => JSON.parse((await readBodyAsBuffer()).toString('utf-8')),
431+
arrayBuffer: async () => {
432+
const buf = await readBodyAsBuffer()
433+
return buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength)
434+
},
425435
})
426436
})
427437

0 commit comments

Comments
 (0)