diff --git a/examples/servers/typescript/naive-shared-state-server.ts b/examples/servers/typescript/naive-shared-state-server.ts new file mode 100644 index 0000000..a68633c --- /dev/null +++ b/examples/servers/typescript/naive-shared-state-server.ts @@ -0,0 +1,295 @@ +#!/usr/bin/env node + +/** + * A minimal MCP server built without the SDK. + * + * Implements just enough of the protocol to pass basic conformance tests: + * initialize, tools/list, tools/call (with progress notifications). + * + * Uses a per-session architecture with session IDs, but keeps an in-flight + * request table to route notifications back to the correct SSE stream. + */ + +import { randomUUID } from 'crypto'; +import express from 'express'; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +interface Session { + id: string; + initialized: boolean; +} + +interface InFlightRequest { + streamWriter: (event: string, data: string) => void; + sessionId: string; +} + +// --------------------------------------------------------------------------- +// State +// --------------------------------------------------------------------------- + +const sessions = new Map(); + +// Track in-flight requests so we can send notifications (progress, logging) +// back on the correct SSE stream while a tool is still executing. +// +// Key: JSON-RPC request id (number | string) +// Value: the SSE writer + session context for that request +const inFlightRequests = new Map(); + +// --------------------------------------------------------------------------- +// Tool implementations +// --------------------------------------------------------------------------- + +const TEST_IMAGE_BASE64 = + 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=='; + +async function handleToolCall( + toolName: string, + _args: Record, + meta: Record | undefined, + requestId: number | string +): Promise { + switch (toolName) { + case 'test_simple_text': + return { + content: [ + { type: 'text', text: 'This is a simple text response for testing.' } + ] + }; + + case 'test_image_content': + return { + content: [ + { type: 'image', data: TEST_IMAGE_BASE64, mimeType: 'image/png' } + ] + }; + + case 'test_tool_with_progress': { + const progressToken = meta?.progressToken ?? 0; + + for (let i = 0; i <= 2; i++) { + // Look up the stream for this request on each iteration + const entry = inFlightRequests.get(requestId); + if (entry) { + const notification = { + jsonrpc: '2.0', + method: 'notifications/progress', + params: { + progressToken, + progress: i * 50, + total: 100 + } + }; + entry.streamWriter('message', JSON.stringify(notification)); + } + await new Promise((r) => setTimeout(r, 50)); + } + + return { + content: [{ type: 'text', text: String(progressToken) }] + }; + } + + default: + throw Object.assign(new Error(`Unknown tool: ${toolName}`), { + code: -32601 + }); + } +} + +const TOOLS = [ + { + name: 'test_simple_text', + description: 'Tests simple text content response', + inputSchema: { type: 'object', properties: {} } + }, + { + name: 'test_image_content', + description: 'Tests image content response', + inputSchema: { type: 'object', properties: {} } + }, + { + name: 'test_tool_with_progress', + description: 'Tests tool that reports progress notifications', + inputSchema: { type: 'object', properties: {} } + } +]; + +// --------------------------------------------------------------------------- +// JSON-RPC message handling +// --------------------------------------------------------------------------- + +function handleMessage( + message: any, + session: Session, + sseWriter: (event: string, data: string) => void +): Promise | undefined { + // Notification (no id) — fire and forget + if (message.id === undefined) { + if (message.method === 'notifications/initialized') { + session.initialized = true; + } + return; + } + + const requestId = message.id; + + switch (message.method) { + case 'initialize': { + const response = { + jsonrpc: '2.0', + id: requestId, + result: { + protocolVersion: '2025-11-25', + capabilities: { tools: {}, logging: {} }, + serverInfo: { name: 'naive-shared-state-server', version: '1.0.0' } + } + }; + sseWriter('message', JSON.stringify(response)); + return; + } + + case 'tools/list': { + sseWriter( + 'message', + JSON.stringify({ + jsonrpc: '2.0', + id: requestId, + result: { tools: TOOLS } + }) + ); + return; + } + + case 'tools/call': { + const toolName = message.params?.name; + const toolArgs = message.params?.arguments ?? {}; + const meta = message.params?._meta; + + // Register this request so in-flight notifications can find its stream + inFlightRequests.set(requestId, { + streamWriter: sseWriter, + sessionId: session.id + }); + + // Return a promise that resolves when the tool handler is done. + // The caller uses this to know when to close the SSE stream. + return handleToolCall(toolName, toolArgs, meta, requestId) + .then((result) => { + // Send final response directly on this request's own sseWriter, + // NOT through the inFlightRequests map (final response is always + // routed correctly since we captured sseWriter at request time) + sseWriter( + 'message', + JSON.stringify({ jsonrpc: '2.0', id: requestId, result }) + ); + }) + .catch((err) => { + sseWriter( + 'message', + JSON.stringify({ + jsonrpc: '2.0', + id: requestId, + error: { + code: err.code ?? -32603, + message: err.message ?? 'Internal error' + } + }) + ); + }) + .finally(() => { + inFlightRequests.delete(requestId); + }); + } + + case 'ping': { + sseWriter( + 'message', + JSON.stringify({ jsonrpc: '2.0', id: requestId, result: {} }) + ); + return; + } + + default: { + sseWriter( + 'message', + JSON.stringify({ + jsonrpc: '2.0', + id: requestId, + error: { + code: -32601, + message: `Method not found: ${message.method}` + } + }) + ); + } + } +} + +// --------------------------------------------------------------------------- +// Express app +// --------------------------------------------------------------------------- + +const app = express(); +app.use(express.json()); + +app.post('/mcp', async (req, res) => { + const sessionIdHeader = req.headers['mcp-session-id'] as string | undefined; + const body = req.body; + + // Resolve or create session + let session: Session; + + if (sessionIdHeader && sessions.has(sessionIdHeader)) { + session = sessions.get(sessionIdHeader)!; + } else if (!sessionIdHeader && body?.method === 'initialize') { + session = { id: randomUUID(), initialized: false }; + sessions.set(session.id, session); + } else { + res.status(400).json({ + jsonrpc: '2.0', + error: { code: -32000, message: 'Invalid or missing session ID' }, + id: body?.id ?? null + }); + return; + } + + // Set up SSE response + res.writeHead(200, { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + 'Mcp-Session-Id': session.id + }); + + const sseWriter = (event: string, data: string) => { + if (!res.writableEnded) { + res.write(`event: ${event}\ndata: ${data}\n\n`); + } + }; + + // Handle the message(s) + const messages = Array.isArray(body) ? body : [body]; + const promises: Promise[] = []; + + for (const message of messages) { + const p = handleMessage(message, session, sseWriter); + if (p) promises.push(p); + } + + if (promises.length > 0) { + // Wait for all async handlers (tool calls) to complete + await Promise.all(promises); + } + + res.end(); +}); + +const PORT = parseInt(process.env.PORT || '3007', 10); +app.listen(PORT, '127.0.0.1', () => { + console.log(`MCP server running on http://localhost:${PORT}/mcp`); +}); diff --git a/examples/servers/typescript/no-notification-isolation.ts b/examples/servers/typescript/no-notification-isolation.ts new file mode 100644 index 0000000..9e67546 --- /dev/null +++ b/examples/servers/typescript/no-notification-isolation.ts @@ -0,0 +1,160 @@ +#!/usr/bin/env node + +/** + * MCP Server WITHOUT Notification Isolation - Negative Test Case (Issue 2) + * + * This server is intentionally vulnerable to GHSA-345p-7cg4-v4c7 Issue 2: + * it creates a new transport per request but shares a single McpServer instance. + * Each call to server.connect(transport) overwrites this._transport, causing + * in-request notifications (progress, logging) to be routed to the wrong client. + * + * This mimics the pattern used by Cloudflare's createMcpHandler (stateless Workers path). + * + * DO NOT use this pattern in production. This exists solely for negative + * conformance testing of the notification-isolation scenario. + * + * This server should FAIL the notification-isolation conformance test. + */ + +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import express from 'express'; + +// Sample base64 encoded 1x1 red PNG pixel for testing +const TEST_IMAGE_BASE64 = + 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=='; + +// === VULNERABLE: Single shared McpServer across all requests === +const server = new McpServer( + { + name: 'no-notification-isolation-server', + version: '1.0.0' + }, + { + capabilities: { + tools: {}, + logging: {} + } + } +); + +// Simple text tool (no delay) +server.tool( + 'test_simple_text', + 'Tests simple text content response', + {}, + async () => { + return { + content: [ + { type: 'text', text: 'This is a simple text response for testing.' } + ] + }; + } +); + +// Image content tool (no delay) +server.registerTool( + 'test_image_content', + { description: 'Tests image content response' }, + async () => { + return { + content: [ + { type: 'image', data: TEST_IMAGE_BASE64, mimeType: 'image/png' } + ] + }; + } +); + +// Tool with progress notifications — the key tool for Issue 2 testing. +// Sends progress notifications via sendNotification which goes through +// this._transport (the vulnerable path). +server.registerTool( + 'test_tool_with_progress', + { + description: 'Tests tool that reports progress notifications', + inputSchema: {} + }, + async (_args, { sendNotification, _meta }) => { + const progressToken = _meta?.progressToken ?? 0; + + await sendNotification({ + method: 'notifications/progress', + params: { + progressToken, + progress: 0, + total: 100, + message: `Completed step 0 of 100` + } + }); + await new Promise((r) => setTimeout(r, 50)); + + await sendNotification({ + method: 'notifications/progress', + params: { + progressToken, + progress: 50, + total: 100, + message: `Completed step 50 of 100` + } + }); + await new Promise((r) => setTimeout(r, 50)); + + await sendNotification({ + method: 'notifications/progress', + params: { + progressToken, + progress: 100, + total: 100, + message: `Completed step 100 of 100` + } + }); + + return { + content: [{ type: 'text', text: String(progressToken) }] + }; + } +); + +const app = express(); +app.use(express.json()); + +// === VULNERABLE PATTERN === +// New transport per request, but server.connect() overwrites this._transport +// on the shared server. This is the pattern used by Cloudflare's createMcpHandler. +app.post('/mcp', async (req, res) => { + try { + const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined + }); + + // This overwrites this._transport on the shared server! + // If another request is in-flight, its in-request notifications + // will now be routed through this new transport instead. + await server.connect(transport); + await transport.handleRequest(req, res, req.body); + } catch (error) { + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: `Internal error: ${error instanceof Error ? error.message : String(error)}` + }, + id: null + }); + } + } +}); + +const PORT = parseInt(process.env.PORT || '3006', 10); +app.listen(PORT, '127.0.0.1', () => { + console.log( + `Vulnerable server (no notification isolation) running on http://localhost:${PORT}/mcp` + ); + console.log( + 'WARNING: Shared McpServer with per-request transports — vulnerable to Issue 2!' + ); + console.log( + 'In-request notifications will be routed through the last-connected transport.' + ); +}); diff --git a/examples/servers/typescript/no-session-isolation.ts b/examples/servers/typescript/no-session-isolation.ts new file mode 100644 index 0000000..8010c15 --- /dev/null +++ b/examples/servers/typescript/no-session-isolation.ts @@ -0,0 +1,118 @@ +#!/usr/bin/env node + +/** + * MCP Server WITHOUT Session Isolation - Negative Test Case + * + * This server is intentionally vulnerable to CWE-362 message ID collision. + * It uses a SINGLE SHARED StreamableHTTPServerTransport with sessionIdGenerator + * set to undefined (stateless mode). When two concurrent clients send requests + * with the same JSON-RPC message ID, the server's internal _requestToStreamMapping + * will overwrite entries, causing responses to be routed to the wrong client. + * + * DO NOT use this pattern in production. This exists solely for negative + * conformance testing of the session-isolation scenario. + * + * This server should FAIL the session-isolation conformance test. + */ + +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import express from 'express'; + +// Sample base64 encoded 1x1 red PNG pixel for testing +const TEST_IMAGE_BASE64 = + 'iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z8DwHwAFBQIAX8jx0gAAAABJRU5ErkJggg=='; + +// Create a single MCP server instance +const server = new McpServer({ + name: 'no-session-isolation-server', + version: '1.0.0' +}); + +// Register tools identical to the everything-server, with deliberate timing +// to produce cross-talk (not just a stolen slot). +// +// The conformance test sends test_simple_text (Client A) first, then +// test_image_content (Client B) 100ms later. With these delays: +// +// T=0ms: A arrives → mapping.set(2, streamA), tool starts (150ms) +// T=100ms: B arrives → mapping.set(2, streamB) ← OVERWRITES A's entry +// T=150ms: A's tool returns → mapping.get(2) → streamB → A's text response +// is written to B's HTTP stream. B receives A's data = CROSS-TALK. +// T=600ms: B's tool returns → mapping.get(2) → gone → error, A gets nothing. +// +// Result: B sees content type "text" instead of "image" — actual cross-talk. +server.tool( + 'test_simple_text', + 'Tests simple text content response', + {}, + async () => { + // Slow enough for B to overwrite the mapping, but finishes before B's tool + await new Promise((r) => setTimeout(r, 150)); + return { + content: [ + { type: 'text', text: 'This is a simple text response for testing.' } + ] + }; + } +); + +server.registerTool( + 'test_image_content', + { + description: 'Tests image content response' + }, + async () => { + // Finishes after A, so A's response is routed first (to B's stream) + await new Promise((r) => setTimeout(r, 500)); + return { + content: [ + { type: 'image', data: TEST_IMAGE_BASE64, mimeType: 'image/png' } + ] + }; + } +); + +// === VULNERABLE SERVER SETUP === +// Single shared transport with NO session management. +// This causes message ID collisions between concurrent clients. +const transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: undefined +}); + +// Connect the server to the single shared transport once +await server.connect(transport); + +const app = express(); +app.use(express.json()); + +// Route ALL requests through the single shared transport +app.post('/mcp', async (req, res) => { + try { + await transport.handleRequest(req, res, req.body); + } catch (error) { + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { + code: -32603, + message: `Internal error: ${error instanceof Error ? error.message : String(error)}` + }, + id: null + }); + } + } +}); + +const PORT = parseInt(process.env.PORT || '3005', 10); +app.listen(PORT, '127.0.0.1', () => { + console.log( + `Vulnerable server (no session isolation) running on http://localhost:${PORT}/mcp` + ); + console.log( + 'WARNING: This server uses a single shared transport without session management!' + ); + console.log( + 'It is intentionally vulnerable to CWE-362 message ID collision.' + ); +}); diff --git a/examples/servers/typescript/package-lock.json b/examples/servers/typescript/package-lock.json index 80a2f37..78c7c3c 100644 --- a/examples/servers/typescript/package-lock.json +++ b/examples/servers/typescript/package-lock.json @@ -8,7 +8,7 @@ "name": "mcp-conformance-test-server", "version": "1.0.0", "dependencies": { - "@modelcontextprotocol/sdk": "^1.24.0", + "@modelcontextprotocol/sdk": "^1.25.2", "@types/cors": "^2.8.19", "cors": "^2.8.5", "express": "^5.2.1", @@ -461,12 +461,24 @@ "node": ">=18" } }, - "node_modules/@modelcontextprotocol/sdk": { - "version": "1.24.0", - "resolved": "https://registry.npmjs.org/@modelcontextprotocol/sdk/-/sdk-1.24.0.tgz", - "integrity": "sha512-D8h5KXY2vHFW8zTuxn2vuZGN0HGrQ5No6LkHwlEA9trVgNdPL3TF1dSqKA7Dny6BbBYKSW/rOBDXdC8KJAjUCg==", + "node_modules/@hono/node-server": { + "version": "1.19.9", + "resolved": "https://artifactory.infra.ant.dev:443/artifactory/api/npm/npm-all/@hono/node-server/-/node-server-1.19.9.tgz", + "integrity": "sha512-vHL6w3ecZsky+8P5MD+eFfaGTyCeOHUIFYMGpQGbrBTSmNNoxv0if69rEZ5giu36weC5saFuznL411gRX7bJDw==", "license": "MIT", + "engines": { + "node": ">=18.14.1" + }, + "peerDependencies": { + "hono": "^4" + } + }, + "node_modules/@modelcontextprotocol/sdk": { + "version": "1.26.0", + "resolved": "https://artifactory.infra.ant.dev:443/artifactory/api/npm/npm-all/@modelcontextprotocol/sdk/-/sdk-1.26.0.tgz", + "integrity": "sha512-Y5RmPncpiDtTXDbLKswIJzTqu2hyBKxTNsgKqKclDbhIgg1wgtf1fRuvxgTnRfcnxtvvgbIEcqUOzZrJ6iSReg==", "dependencies": { + "@hono/node-server": "^1.19.9", "ajv": "^8.17.1", "ajv-formats": "^3.0.1", "content-type": "^1.0.5", @@ -474,13 +486,15 @@ "cross-spawn": "^7.0.5", "eventsource": "^3.0.2", "eventsource-parser": "^3.0.0", - "express": "^5.0.1", - "express-rate-limit": "^7.5.0", - "jose": "^6.1.1", + "express": "^5.2.1", + "express-rate-limit": "^8.2.1", + "hono": "^4.11.4", + "jose": "^6.1.3", + "json-schema-typed": "^8.0.2", "pkce-challenge": "^5.0.0", "raw-body": "^3.0.0", "zod": "^3.25 || ^4.0", - "zod-to-json-schema": "^3.25.0" + "zod-to-json-schema": "^3.25.1" }, "engines": { "node": ">=18" @@ -859,7 +873,6 @@ "resolved": "https://registry.npmjs.org/express/-/express-5.2.1.tgz", "integrity": "sha512-hIS4idWWai69NezIdRt2xFVofaF4j+6INOpJlVOLDO8zXGpUVEVzIYk12UUi2JzjEzWL3IOAxcTubgz9Po0yXw==", "license": "MIT", - "peer": true, "dependencies": { "accepts": "^2.0.0", "body-parser": "^2.2.1", @@ -899,10 +912,13 @@ } }, "node_modules/express-rate-limit": { - "version": "7.5.1", - "resolved": "https://registry.npmjs.org/express-rate-limit/-/express-rate-limit-7.5.1.tgz", - "integrity": "sha512-7iN8iPMDzOMHPUYllBEsQdWVB6fPDMPqwjBaFrgr4Jgr/+okjvzAy+UHlYYL/Vs0OsOrMkwS6PJDkFlJwoxUnw==", + "version": "8.2.1", + "resolved": "https://artifactory.infra.ant.dev:443/artifactory/api/npm/npm-all/express-rate-limit/-/express-rate-limit-8.2.1.tgz", + "integrity": "sha512-PCZEIEIxqwhzw4KF0n7QF4QqruVTcF73O5kFKUnGOyjbCCgizBBiFaYpd/fnBLUMPw/BWw9OsiN7GgrNYr7j6g==", "license": "MIT", + "dependencies": { + "ip-address": "10.0.1" + }, "engines": { "node": ">= 16" }, @@ -1080,6 +1096,14 @@ "node": ">= 0.4" } }, + "node_modules/hono": { + "version": "4.11.4", + "resolved": "https://artifactory.infra.ant.dev:443/artifactory/api/npm/npm-all/hono/-/hono-4.11.4.tgz", + "integrity": "sha512-U7tt8JsyrxSRKspfhtLET79pU8K+tInj5QZXs1jSugO1Vq5dFj3kmZsRldo29mTBfcjDRVRXrEZ6LS63Cog9ZA==", + "engines": { + "node": ">=16.9.0" + } + }, "node_modules/http-errors": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/http-errors/-/http-errors-2.0.1.tgz", @@ -1122,6 +1146,15 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "license": "ISC" }, + "node_modules/ip-address": { + "version": "10.0.1", + "resolved": "https://artifactory.infra.ant.dev:443/artifactory/api/npm/npm-all/ip-address/-/ip-address-10.0.1.tgz", + "integrity": "sha512-NWv9YLW4PoW2B7xtzaS3NCot75m6nK7Icdv0o3lfMceJVRfSoQwqD4wEH5rLwoKJwUiZ/rfpiVBhnaF0FK4HoA==", + "license": "MIT", + "engines": { + "node": ">= 12" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -1158,6 +1191,12 @@ "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==", "license": "MIT" }, + "node_modules/json-schema-typed": { + "version": "8.0.2", + "resolved": "https://artifactory.infra.ant.dev:443/artifactory/api/npm/npm-all/json-schema-typed/-/json-schema-typed-8.0.2.tgz", + "integrity": "sha512-fQhoXdcvc3V28x7C7BMs4P5+kNlgUURe2jmUT1T//oBRMDrqy1QPelJimwZGo7Hg9VPV3EQV5Bnq4hbFy2vetA==", + "license": "BSD-2-Clause" + }, "node_modules/math-intrinsics": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/math-intrinsics/-/math-intrinsics-1.1.0.tgz", @@ -1652,16 +1691,14 @@ "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz", "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==", "license": "MIT", - "peer": true, "funding": { "url": "https://github.com/sponsors/colinhacks" } }, "node_modules/zod-to-json-schema": { - "version": "3.25.0", - "resolved": "https://registry.npmjs.org/zod-to-json-schema/-/zod-to-json-schema-3.25.0.tgz", - "integrity": "sha512-HvWtU2UG41LALjajJrML6uQejQhNJx+JBO9IflpSja4R03iNWfKXrj6W2h7ljuLyc1nKS+9yDyL/9tD1U/yBnQ==", - "license": "ISC", + "version": "3.25.1", + "resolved": "https://artifactory.infra.ant.dev:443/artifactory/api/npm/npm-all/zod-to-json-schema/-/zod-to-json-schema-3.25.1.tgz", + "integrity": "sha512-pM/SU9d3YAggzi6MtR4h7ruuQlqKtad8e9S0fmxcMi+ueAK5Korys/aWcV9LIIHTVbj01NdzxcnXSN+O74ZIVA==", "peerDependencies": { "zod": "^3.25 || ^4" } diff --git a/src/scenarios/index.ts b/src/scenarios/index.ts index 6e17101..ebad8b6 100644 --- a/src/scenarios/index.ts +++ b/src/scenarios/index.ts @@ -52,6 +52,11 @@ import { } from './server/prompts'; import { DNSRebindingProtectionScenario } from './server/dns-rebinding'; +import { + SessionIsolationScenario, + NotificationIsolationScenario, + NotificationIsolationFuzzScenario +} from './server/session-isolation'; import { authScenariosList, @@ -124,7 +129,10 @@ const allClientScenariosList: ClientScenario[] = [ new PromptsGetWithImageScenario(), // Security scenarios - new DNSRebindingProtectionScenario() + new DNSRebindingProtectionScenario(), + new SessionIsolationScenario(), + new NotificationIsolationScenario(), + new NotificationIsolationFuzzScenario() ]; // Active client scenarios (excludes pending) diff --git a/src/scenarios/server/session-isolation.ts b/src/scenarios/server/session-isolation.ts new file mode 100644 index 0000000..174a11d --- /dev/null +++ b/src/scenarios/server/session-isolation.ts @@ -0,0 +1,970 @@ +/** + * Session Isolation conformance test scenarios for MCP servers + * + * Tests that servers correctly isolate responses and notifications between + * concurrent clients. Validates protection against GHSA-345p-7cg4-v4c7: + * + * - Issue 1 (Transport re-use): Shared transport's _requestToStreamMapping + * causes response cross-wiring when message IDs collide. + * → SessionIsolationScenario (deterministic, 2 clients) + * + * - Issue 2 (Server re-use): Shared server's this._transport reference is + * overwritten by new connections, causing in-request notifications to be + * routed to the wrong client. + * → NotificationIsolationScenario (deterministic, 2 clients) + * → NotificationIsolationFuzzScenario (N concurrent clients) + */ + +import { ClientScenario, ConformanceCheck } from '../../types.js'; +import { EventSourceParserStream } from 'eventsource-parser/stream'; + +const SPEC_REFERENCES = [ + { + id: 'CWE-362', + url: 'https://cwe.mitre.org/data/definitions/362.html' + }, + { + id: 'MCP-Transport-Security', + url: 'https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#security-warning' + } +]; + +/** + * Send a raw JSON-RPC request via fetch and parse the response. + * Handles both application/json and text/event-stream response types. + */ +async function sendJsonRpcRequest( + serverUrl: string, + body: object, + sessionId?: string +): Promise<{ status: number; headers: Headers; jsonRpcResponse: any }> { + const headers: Record = { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + 'mcp-protocol-version': '2025-03-26' + }; + + if (sessionId) { + headers['mcp-session-id'] = sessionId; + } + + const response = await fetch(serverUrl, { + method: 'POST', + headers, + body: JSON.stringify(body) + }); + + const contentType = response.headers.get('content-type') || ''; + let jsonRpcResponse: any; + + if (contentType.includes('text/event-stream')) { + // Parse SSE stream to extract the JSON-RPC response + jsonRpcResponse = await parseSSEResponse(response); + } else if (contentType.includes('application/json')) { + // Direct JSON response + jsonRpcResponse = await response.json(); + } else { + // Non-JSON, non-SSE response (e.g. plain text error) + const text = await response.text(); + jsonRpcResponse = { error: { code: -32603, message: text } }; + } + + return { + status: response.status, + headers: response.headers, + jsonRpcResponse + }; +} + +/** + * Parse an SSE response body to extract the JSON-RPC message. + * Returns null if the stream ends or times out without a JSON-RPC response. + */ +async function parseSSEResponse( + response: Response, + timeoutMs = 800 +): Promise { + if (!response.body) { + return null; + } + + const reader = response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + .getReader(); + + const readWithTimeout = async (): Promise => { + const timeout = new Promise((resolve) => + setTimeout(() => resolve(null), timeoutMs) + ); + + const read = (async () => { + try { + while (true) { + const { value, done } = await reader.read(); + if (done) return null; + + if (value && value.data) { + try { + const parsed = JSON.parse(value.data); + if (parsed.result !== undefined || parsed.error !== undefined) { + return parsed; + } + } catch { + // Skip non-JSON events + } + } + } + } catch { + return null; + } + })(); + + return Promise.race([read, timeout]); + }; + + try { + return await readWithTimeout(); + } finally { + await reader.cancel().catch(() => {}); + } +} + +/** + * Collected SSE events from a stream, separated into notifications and the + * final JSON-RPC response. + */ +interface SSEStreamResult { + /** All JSON-RPC notification messages (no id, has method) */ + notifications: any[]; + /** The final JSON-RPC response (has result or error) */ + response: any | null; +} + +/** + * Parse an SSE response body collecting ALL events: notifications and the + * final response. Used by notification isolation tests to verify that + * in-request notifications (progress, logging) arrive on the correct stream. + */ +async function parseSSEStreamFull( + response: Response, + timeoutMs = 5000 +): Promise { + const result: SSEStreamResult = { notifications: [], response: null }; + + if (!response.body) { + return result; + } + + const reader = response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + .getReader(); + + const readAll = async (): Promise => { + try { + while (true) { + const { value, done } = await reader.read(); + if (done) return; + + if (value && value.data) { + try { + const parsed = JSON.parse(value.data); + if (parsed.result !== undefined || parsed.error !== undefined) { + result.response = parsed; + // Keep reading — there may be leaked notifications after + // the response from other clients' handlers + } else if (parsed.method) { + // Notification (has method, no id for response) + result.notifications.push(parsed); + } + } catch { + // Skip non-JSON events + } + } + } + } catch { + // Stream error — return what we have + } + }; + + const timeout = new Promise((resolve) => + setTimeout(resolve, timeoutMs) + ); + + try { + await Promise.race([readAll(), timeout]); + } finally { + await reader.cancel().catch(() => {}); + } + + return result; +} + +/** + * Send a raw JSON-RPC request and return the raw Response object (unconsumed) + * so the caller can parse the SSE stream themselves. + */ +async function sendJsonRpcRequestRaw( + serverUrl: string, + body: object, + sessionId?: string +): Promise { + const headers: Record = { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + 'mcp-protocol-version': '2025-03-26' + }; + + if (sessionId) { + headers['mcp-session-id'] = sessionId; + } + + return fetch(serverUrl, { + method: 'POST', + headers, + body: JSON.stringify(body) + }); +} + +/** + * Initialize a client session and send notifications/initialized. + * Returns the session ID (if any) or throws on failure. + */ +async function initializeSession( + serverUrl: string, + messageId: number = 1, + clientName: string = 'conformance-session-isolation-test' +): Promise<{ sessionId: string | undefined }> { + const initBody = { + jsonrpc: '2.0', + id: messageId, + method: 'initialize', + params: { + protocolVersion: '2025-11-25', + capabilities: {}, + clientInfo: { name: clientName, version: '1.0.0' } + } + }; + + const init = await sendJsonRpcRequest(serverUrl, initBody); + if (init.status !== 200 || !init.jsonRpcResponse?.result) { + throw new Error(`Initialize failed: status=${init.status}`); + } + + const sessionId = init.headers.get('mcp-session-id') || undefined; + + await sendNotification(serverUrl, 'notifications/initialized', {}, sessionId); + + return { sessionId }; +} + +/** + * Send a JSON-RPC notification (no id field, no response expected). + */ +async function sendNotification( + serverUrl: string, + method: string, + params: object, + sessionId?: string +): Promise { + const headers: Record = { + 'Content-Type': 'application/json', + Accept: 'application/json, text/event-stream', + 'mcp-protocol-version': '2025-03-26' + }; + + if (sessionId) { + headers['mcp-session-id'] = sessionId; + } + + await fetch(serverUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + jsonrpc: '2.0', + method, + params + }) + }); +} + +export class SessionIsolationScenario implements ClientScenario { + name = 'session-isolation'; + description = + 'Test that concurrent clients with colliding JSON-RPC message IDs receive ' + + 'correctly routed responses (CWE-362 session isolation). Verifies that a ' + + 'server does not cross-wire responses when two clients use identical message IDs.'; + + async run(serverUrl: string): Promise { + const checks: ConformanceCheck[] = []; + const timestamp = new Date().toISOString(); + + // -- Step 1: Initialize two independent sessions -- + + let sessionIdA: string | undefined; + let sessionIdB: string | undefined; + + try { + const initBody = { + jsonrpc: '2.0', + id: 1, // Both clients use the same message ID + method: 'initialize', + params: { + protocolVersion: '2025-11-25', + capabilities: {}, + clientInfo: { + name: 'conformance-session-isolation-test', + version: '1.0.0' + } + } + }; + + // Initialize both clients (sequentially to avoid server-side init races) + const initA = await sendJsonRpcRequest(serverUrl, initBody); + sessionIdA = initA.headers.get('mcp-session-id') || undefined; + + const initB = await sendJsonRpcRequest(serverUrl, initBody); + sessionIdB = initB.headers.get('mcp-session-id') || undefined; + + const bothInitSucceeded = + initA.status === 200 && + initB.status === 200 && + initA.jsonRpcResponse?.result && + initB.jsonRpcResponse?.result; + + checks.push({ + id: 'session-isolation-init', + name: 'SessionIsolationInit', + description: + 'Both clients initialize successfully with the same message ID', + status: bothInitSucceeded ? 'SUCCESS' : 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + details: { + clientA: { + status: initA.status, + sessionId: sessionIdA || '(none)', + hasResult: !!initA.jsonRpcResponse?.result + }, + clientB: { + status: initB.status, + sessionId: sessionIdB || '(none)', + hasResult: !!initB.jsonRpcResponse?.result + } + }, + errorMessage: !bothInitSucceeded + ? `Initialization failed: Client A status=${initA.status}, Client B status=${initB.status}` + : undefined + }); + + if (!bothInitSucceeded) { + return checks; + } + + // Send notifications/initialized for both sessions + await sendNotification( + serverUrl, + 'notifications/initialized', + {}, + sessionIdA + ); + await sendNotification( + serverUrl, + 'notifications/initialized', + {}, + sessionIdB + ); + } catch (error) { + checks.push({ + id: 'session-isolation-init', + name: 'SessionIsolationInit', + description: 'Both clients initialize successfully', + status: 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + errorMessage: `Initialization error: ${error instanceof Error ? error.message : String(error)}` + }); + return checks; + } + + // -- Step 2: Send concurrent tools/call with colliding message IDs -- + + try { + // Both requests use the same JSON-RPC id to trigger the collision + const textToolRequest = { + jsonrpc: '2.0', + id: 2, + method: 'tools/call', + params: { + name: 'test_simple_text', + arguments: {} + } + }; + + const imageToolRequest = { + jsonrpc: '2.0', + id: 2, + method: 'tools/call', + params: { + name: 'test_image_content', + arguments: {} + } + }; + + // Send the slow tool first (test_simple_text), wait briefly to ensure + // it is in-flight, then send the fast tool (test_image_content). + // Against a vulnerable stateless server, the second request overwrites + // the first's _requestToStreamMapping entry while the slow tool is still + // processing, causing deterministic cross-wiring. + // + // We resolve eagerly: once Client B's (fast) response arrives, we give + // Client A a very short grace period. If cross-wiring happened, Client A's + // response was stolen so there's nothing to wait for. + const responseAPromise = sendJsonRpcRequest( + serverUrl, + textToolRequest, + sessionIdA + ); + await new Promise((r) => setTimeout(r, 100)); + const responseBPromise = sendJsonRpcRequest( + serverUrl, + imageToolRequest, + sessionIdB + ); + + // Wait for B first (fast tool). If B got the wrong content type, + // A's response was stolen — no point waiting for A at all. + const responseB = await responseBPromise; + const bContentType = + responseB.jsonRpcResponse?.result?.content?.[0]?.type; + const bCorrect = bContentType === 'image'; + + const responseA = bCorrect + ? await Promise.race([ + responseAPromise, + // If A hasn't resolved 200ms after B, it's not coming + new Promise<{ + status: number; + headers: Headers; + jsonRpcResponse: any; + }>((resolve) => + setTimeout( + () => + resolve({ + status: 0, + headers: new Headers(), + jsonRpcResponse: null + }), + 200 + ) + ) + ]) + : { status: 0, headers: new Headers(), jsonRpcResponse: null }; + + // Verify: Client A called test_simple_text -> should get content[0].type === "text" + // Verify: Client B called test_image_content -> should get content[0].type === "image" + const resultA = responseA.jsonRpcResponse?.result; + const resultB = responseB.jsonRpcResponse?.result; + + const contentTypeA = resultA?.content?.[0]?.type; + const contentTypeB = resultB?.content?.[0]?.type; + + // A null/undefined response means the stream timed out waiting for a response, + // which happens when responses are cross-wired and the real response went to + // the other client's stream. + const clientAGotResponse = responseA.jsonRpcResponse != null; + const clientBGotResponse = responseB.jsonRpcResponse != null; + + const clientACorrect = clientAGotResponse && contentTypeA === 'text'; + const clientBCorrect = clientBGotResponse && contentTypeB === 'image'; + const bothCorrect = clientACorrect && clientBCorrect; + + // Detect cross-wiring: any case where a client got the wrong response or + // no response (timeout) indicates the server's internal mapping was corrupted. + const crossWired = !bothCorrect; + + // Check for error responses (which can happen when the mapping is deleted + // before the second response is sent) + const errorA = responseA.jsonRpcResponse?.error; + const errorB = responseB.jsonRpcResponse?.error; + + let errorMessage: string | undefined; + if (!bothCorrect) { + const describeResult = ( + contentType: string | undefined, + error: any, + gotResponse: boolean + ) => { + if (contentType) return `content type "${contentType}"`; + if (error) return `error: ${error.message}`; + if (!gotResponse) return 'no response (timeout)'; + return 'unknown'; + }; + + errorMessage = + 'Responses were not correctly isolated between clients (CWE-362). ' + + `Client A (test_simple_text) received: ${describeResult(contentTypeA, errorA, clientAGotResponse)} (expected "text"), ` + + `Client B (test_image_content) received: ${describeResult(contentTypeB, errorB, clientBGotResponse)} (expected "image").`; + } + + checks.push({ + id: 'session-isolation', + name: 'SessionIsolation', + description: + 'Each client receives the correct tool response when message IDs collide', + status: bothCorrect ? 'SUCCESS' : 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + details: { + clientA: { + toolCalled: 'test_simple_text', + expectedContentType: 'text', + receivedContentType: contentTypeA || '(missing)', + correct: clientACorrect, + sessionId: sessionIdA || '(none)' + }, + clientB: { + toolCalled: 'test_image_content', + expectedContentType: 'image', + receivedContentType: contentTypeB || '(missing)', + correct: clientBCorrect, + sessionId: sessionIdB || '(none)' + }, + crossWired + }, + errorMessage + }); + } catch (error) { + checks.push({ + id: 'session-isolation', + name: 'SessionIsolation', + description: + 'Each client receives the correct tool response when message IDs collide', + status: 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + errorMessage: `Tool call error: ${error instanceof Error ? error.message : String(error)}` + }); + } + + return checks; + } +} + +/** + * Issue 2: Notification Isolation (deterministic, 2 clients) + * + * Tests that in-request notifications (progress) are routed to the correct + * client when a shared server instance has its this._transport overwritten + * by a second client connecting mid-handler. + * + * Requires the server to implement `test_tool_with_progress` which sends + * progress notifications during execution. + */ +export class NotificationIsolationScenario implements ClientScenario { + name = 'notification-isolation'; + description = + 'Test that in-request notifications (progress) are correctly isolated ' + + 'between concurrent clients. Client A calls a slow tool that emits ' + + 'progress notifications. Client B connects while A is still processing. ' + + "Verifies that A's notifications do not leak to B's stream (Issue 2: " + + 'server re-use / this._transport overwrite).'; + + async run(serverUrl: string): Promise { + const checks: ConformanceCheck[] = []; + const timestamp = new Date().toISOString(); + + // -- Step 1: Initialize two sessions -- + + let sessionIdA: string | undefined; + let sessionIdB: string | undefined; + + try { + const sessA = await initializeSession( + serverUrl, + 1, + 'notification-isolation-A' + ); + sessionIdA = sessA.sessionId; + const sessB = await initializeSession( + serverUrl, + 1, + 'notification-isolation-B' + ); + sessionIdB = sessB.sessionId; + } catch (error) { + checks.push({ + id: 'notification-isolation', + name: 'NotificationIsolation', + description: 'Both clients initialize successfully', + status: 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + errorMessage: `Initialization error: ${error instanceof Error ? error.message : String(error)}` + }); + return checks; + } + + // -- Step 2: Client A calls test_tool_with_progress (slow, emits notifications) -- + // While A is in-flight, Client B sends a request to trigger this._transport overwrite. + + try { + // Client A: call the progress tool. Use a unique progressToken so we + // can identify A's notifications if they leak to B. + const progressRequestA = { + jsonrpc: '2.0', + id: 2, + method: 'tools/call', + params: { + name: 'test_tool_with_progress', + arguments: {}, + _meta: { progressToken: 'client-a-progress' } + } + }; + + // Client B: call a simple tool (fast) with a different progressToken. + const progressRequestB = { + jsonrpc: '2.0', + id: 2, // Same message ID to maximize collision potential + method: 'tools/call', + params: { + name: 'test_tool_with_progress', + arguments: {}, + _meta: { progressToken: 'client-b-progress' } + } + }; + + // Fire A first, then B shortly after to create the race window + const rawA = await sendJsonRpcRequestRaw( + serverUrl, + progressRequestA, + sessionIdA + ); + // Small delay to let A's handler start processing + await new Promise((r) => setTimeout(r, 20)); + const rawB = await sendJsonRpcRequestRaw( + serverUrl, + progressRequestB, + sessionIdB + ); + + // Parse both streams fully, collecting notifications + response + const [streamA, streamB] = await Promise.all([ + parseSSEStreamFull(rawA, 5000), + parseSSEStreamFull(rawB, 5000) + ]); + + // -- Analyze results -- + + // The test_tool_with_progress spec requires 3 notifications (0, 50, 100) + const EXPECTED_NOTIFICATION_COUNT = 3; + + // Categorize notifications on each stream + const analyzeStream = ( + stream: SSEStreamResult, + ownToken: string, + otherToken: string, + label: string + ) => { + const allProgress = stream.notifications.filter( + (n) => n.method === 'notifications/progress' + ); + const ownCount = allProgress.filter( + (t) => t.params?.progressToken === ownToken + ).length; + const foreignTokens = allProgress + .filter((t) => t.params?.progressToken === otherToken) + .map((t) => t.params?.progressToken); + const gotResponse = stream.response != null; + + const errors: string[] = []; + if (foreignTokens.length > 0) { + errors.push( + `${label} received ${foreignTokens.length} notification(s) ` + + `belonging to the other client — the server wrote them to ` + + `the wrong SSE stream` + ); + } + if (!gotResponse) { + errors.push( + `${label} received no final response (response may have been ` + + `sent to the other client's stream)` + ); + } + if (ownCount < EXPECTED_NOTIFICATION_COUNT) { + const missing = EXPECTED_NOTIFICATION_COUNT - ownCount; + errors.push( + `${label} received ${ownCount}/${EXPECTED_NOTIFICATION_COUNT} ` + + `expected progress notifications (${missing} likely routed ` + + `to the other client's stream)` + ); + } + + return { + ownCount, + foreignCount: foreignTokens.length, + gotResponse, + ok: + foreignTokens.length === 0 && + gotResponse && + ownCount >= EXPECTED_NOTIFICATION_COUNT, + errors + }; + }; + + const a = analyzeStream( + streamA, + 'client-a-progress', + 'client-b-progress', + 'Client A' + ); + const b = analyzeStream( + streamB, + 'client-b-progress', + 'client-a-progress', + 'Client B' + ); + + const allErrors = [...a.errors, ...b.errors]; + + checks.push({ + id: 'notification-isolation', + name: 'NotificationIsolation', + description: + 'In-request progress notifications are routed to the correct client', + status: a.ok && b.ok ? 'SUCCESS' : 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + details: { + clientA: { + sessionId: sessionIdA || '(none)', + progressToken: 'client-a-progress', + ownNotifications: a.ownCount, + foreignNotifications: a.foreignCount, + expectedNotifications: EXPECTED_NOTIFICATION_COUNT, + gotResponse: a.gotResponse + }, + clientB: { + sessionId: sessionIdB || '(none)', + progressToken: 'client-b-progress', + ownNotifications: b.ownCount, + foreignNotifications: b.foreignCount, + expectedNotifications: EXPECTED_NOTIFICATION_COUNT, + gotResponse: b.gotResponse + } + }, + errorMessage: allErrors.length > 0 ? allErrors.join('; ') : undefined + }); + } catch (error) { + checks.push({ + id: 'notification-isolation', + name: 'NotificationIsolation', + description: + 'In-request progress notifications are routed to the correct client', + status: 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + errorMessage: `Error: ${error instanceof Error ? error.message : String(error)}` + }); + } + + return checks; + } +} + +/** + * Issue 2: Notification Isolation Fuzz Test (N concurrent clients) + * + * Sends N concurrent requests that each emit progress notifications with + * unique tokens. Verifies that every client only receives notifications + * with its own token — no cross-contamination. + */ +export class NotificationIsolationFuzzScenario implements ClientScenario { + name = 'notification-isolation-fuzz'; + description = + 'Fuzz test: N concurrent clients each call test_tool_with_progress with ' + + 'unique progress tokens. Verifies that every notification arrives at the ' + + 'correct client and no cross-contamination occurs.'; + + private clientCount: number; + + constructor(clientCount?: number) { + this.clientCount = + clientCount ?? parseInt(process.env.FUZZ_CLIENT_COUNT || '10', 10); + } + + async run(serverUrl: string): Promise { + const checks: ConformanceCheck[] = []; + const timestamp = new Date().toISOString(); + const N = this.clientCount; + + // -- Step 1: Initialize N sessions -- + + const sessions: Array<{ index: number; sessionId: string | undefined }> = + []; + + try { + // Initialize sequentially to avoid overwhelming the server + for (let i = 0; i < N; i++) { + const sess = await initializeSession( + serverUrl, + 1, + `notification-fuzz-client-${i}` + ); + sessions.push({ index: i, sessionId: sess.sessionId }); + } + } catch (error) { + checks.push({ + id: 'notification-isolation-fuzz', + name: 'NotificationIsolationFuzz', + description: `All ${N} clients initialize successfully`, + status: 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + errorMessage: `Initialization error after ${sessions.length}/${N} clients: ${error instanceof Error ? error.message : String(error)}` + }); + return checks; + } + + checks.push({ + id: 'notification-isolation-fuzz-init', + name: 'NotificationIsolationFuzzInit', + description: `All ${N} clients initialize successfully`, + status: 'SUCCESS', + timestamp, + specReferences: SPEC_REFERENCES, + details: { + clientCount: N, + sessionsWithIds: sessions.filter((s) => s.sessionId).length, + sessionsWithoutIds: sessions.filter((s) => !s.sessionId).length + } + }); + + // -- Step 2: Fire N concurrent tool calls with unique progress tokens -- + + try { + // Build requests — each with a unique progressToken and the same + // message ID to maximize collision potential + const requests = sessions.map((sess) => ({ + body: { + jsonrpc: '2.0', + id: 2, + method: 'tools/call', + params: { + name: 'test_tool_with_progress', + arguments: {}, + _meta: { progressToken: `fuzz-client-${sess.index}` } + } + }, + sessionId: sess.sessionId, + index: sess.index + })); + + // Fire all requests concurrently + const streamPromises = requests.map((req) => + (async () => { + const raw = await sendJsonRpcRequestRaw( + serverUrl, + req.body, + req.sessionId + ); + const stream = await parseSSEStreamFull(raw, 8000); + return { index: req.index, stream }; + })() + ); + + const results = await Promise.all(streamPromises); + + // -- Analyze results: one check per client -- + + // The test_tool_with_progress spec requires 3 notifications + const EXPECTED_NOTIFICATION_COUNT = 3; + + for (const { index, stream } of results) { + const expectedToken = `fuzz-client-${index}`; + const allProgress = stream.notifications.filter( + (n) => n.method === 'notifications/progress' + ); + const ownCount = allProgress.filter( + (n) => n.params?.progressToken === expectedToken + ).length; + + // Identify foreign tokens and count how many of each + const foreignTokenCounts = new Map(); + for (const n of allProgress) { + const token = n.params?.progressToken; + if (token !== expectedToken && token !== undefined) { + foreignTokenCounts.set( + token, + (foreignTokenCounts.get(token) || 0) + 1 + ); + } + } + + const gotResponse = stream.response != null; + const foreignCount = allProgress.length - ownCount; + const ok = + foreignCount === 0 && + gotResponse && + ownCount >= EXPECTED_NOTIFICATION_COUNT; + + const errors: string[] = []; + + if (foreignCount > 0) { + const leaked = Array.from(foreignTokenCounts.entries()) + .map(([token, count]) => `${token} (${count}x)`) + .join(', '); + errors.push( + `Received ${foreignCount} notification(s) from other clients: ${leaked} ` + + `— the server wrote them to this client's SSE stream` + ); + } + if (!gotResponse) { + errors.push( + 'No final response (response may have been sent to another ' + + "client's stream)" + ); + } + if (ownCount < EXPECTED_NOTIFICATION_COUNT) { + const missing = EXPECTED_NOTIFICATION_COUNT - ownCount; + errors.push( + `Received ${ownCount}/${EXPECTED_NOTIFICATION_COUNT} expected ` + + `progress notifications (${missing} likely routed to another ` + + "client's stream)" + ); + } + + checks.push({ + id: `notification-isolation-fuzz-client-${index}`, + name: `NotificationIsolationFuzz`, + description: `Client ${index}/${N}: notifications correctly isolated (token=${expectedToken})`, + status: ok ? 'SUCCESS' : 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + details: { + clientIndex: index, + expectedToken, + ownNotifications: ownCount, + expectedNotifications: EXPECTED_NOTIFICATION_COUNT, + foreignNotifications: foreignCount, + gotResponse + }, + errorMessage: errors.length > 0 ? errors.join('; ') : undefined + }); + } + } catch (error) { + checks.push({ + id: 'notification-isolation-fuzz', + name: 'NotificationIsolationFuzz', + description: `${N} concurrent clients all receive correctly routed notifications`, + status: 'FAILURE', + timestamp, + specReferences: SPEC_REFERENCES, + errorMessage: `Error: ${error instanceof Error ? error.message : String(error)}` + }); + } + + return checks; + } +}