diff --git a/.changeset/associate-server-requests-with-originating-request.md b/.changeset/associate-server-requests-with-originating-request.md new file mode 100644 index 0000000000..db93430dee --- /dev/null +++ b/.changeset/associate-server-requests-with-originating-request.md @@ -0,0 +1,8 @@ +--- +'@modelcontextprotocol/server': patch +'@modelcontextprotocol/node': patch +--- + +Server-to-client requests made from a tool handler (`ctx.mcpReq.elicitInput()`, `ctx.mcpReq.requestSampling()`) are now associated with the originating request and delivered on its SSE response stream, instead of the standalone GET stream (SEP-2260). When that stream cannot carry +SSE — `enableJsonResponse: true`, or the stream has closed — the call fails with a clear error instead of being silently dropped or delivered unassociated. Also adds an opt-in `keepAliveInterval` option to the streamable HTTP server transport, which writes periodic SSE keepalive +comments so proxies and load balancers do not drop idle connections during long-running requests. diff --git a/docs/server.md b/docs/server.md index 3b173af4e0..04da586754 100644 --- a/docs/server.md +++ b/docs/server.md @@ -48,7 +48,9 @@ const transport = new NodeStreamableHTTPServerTransport({ await server.connect(transport); ``` -**Options:** Set `sessionIdGenerator` to a function (shown above) for stateful sessions. Set it to `undefined` for stateless mode (simpler, but does not support resumability). Set `enableJsonResponse: true` to return plain JSON instead of SSE streams. +**Options:** Set `sessionIdGenerator` to a function (shown above) for stateful sessions. Set it to `undefined` for stateless mode (simpler, but does not support resumability). Set `enableJsonResponse: true` to return plain JSON instead of SSE streams; server-initiated requests +from tool handlers (`ctx.mcpReq.elicitInput()`, `ctx.mcpReq.requestSampling()`) require an SSE response stream and throw in this mode. Set `keepAliveInterval` (milliseconds) to write periodic SSE keepalive comments on open streams, so proxies and load balancers do not drop idle +connections during long-running requests. For a complete server with sessions, logging, and CORS mounted on Express, see [`simpleStreamableHttp.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/simpleStreamableHttp.ts). @@ -64,7 +66,8 @@ await server.connect(transport); ## Server instructions -Instructions describe how to use the server and its features — cross-tool relationships, workflow patterns, and constraints (see [Instructions](https://modelcontextprotocol.io/specification/latest/basic/lifecycle#instructions) in the MCP specification). Clients may add them to the system prompt. Instructions should not duplicate information already in tool descriptions. +Instructions describe how to use the server and its features — cross-tool relationships, workflow patterns, and constraints (see [Instructions](https://modelcontextprotocol.io/specification/latest/basic/lifecycle#instructions) in the MCP specification). Clients may add them to +the system prompt. Instructions should not duplicate information already in tool descriptions. ```ts source="../examples/server/src/serverGuide.examples.ts#instructions_basic" const server = new McpServer( @@ -104,12 +107,13 @@ server.registerTool( ); ``` -> [!NOTE] -> When defining a named type for `structuredContent`, use a `type` alias rather than an `interface`. Named interfaces lack implicit index signatures in TypeScript, so they aren't assignable to `{ [key: string]: unknown }`: +> [!NOTE] When defining a named type for `structuredContent`, use a `type` alias rather than an `interface`. Named interfaces lack implicit index signatures in TypeScript, so they aren't assignable to `{ [key: string]: unknown }`: > > ```ts -> type BmiResult = { bmi: number }; // assignable -> interface BmiResult { bmi: number } // type error +> type BmiResult = { bmi: number }; // assignable +> interface BmiResult { +> bmi: number; +> } // type error > ``` > > Alternatively, spread the value: `structuredContent: { ...result }`. @@ -204,7 +208,8 @@ If a handler throws instead of returning `isError`, the SDK catches the exceptio ## Resources -Resources expose read-only data — files, database schemas, configuration — that the host application can retrieve and attach as context for the model (see [Resources](https://modelcontextprotocol.io/docs/learn/server-concepts#resources) in the MCP overview). Unlike [tools](#tools), which the LLM invokes on its own, resources are application-controlled: the host decides which resources to fetch and how to present them. +Resources expose read-only data — files, database schemas, configuration — that the host application can retrieve and attach as context for the model (see [Resources](https://modelcontextprotocol.io/docs/learn/server-concepts#resources) in the MCP overview). Unlike +[tools](#tools), which the LLM invokes on its own, resources are application-controlled: the host decides which resources to fetch and how to present them. A static resource at a fixed URI: @@ -254,7 +259,8 @@ server.registerResource( ## Prompts -Prompts are reusable templates that help structure interactions with models (see [Prompts](https://modelcontextprotocol.io/docs/learn/server-concepts#prompts) in the MCP overview). Use a prompt when you want to offer a canned interaction pattern that users invoke explicitly; use a [tool](#tools) when the LLM should decide when to call it. +Prompts are reusable templates that help structure interactions with models (see [Prompts](https://modelcontextprotocol.io/docs/learn/server-concepts#prompts) in the MCP overview). Use a prompt when you want to offer a canned interaction pattern that users invoke explicitly; use +a [tool](#tools) when the LLM should decide when to call it. ```ts source="../examples/server/src/serverGuide.examples.ts#registerPrompt_basic" server.registerPrompt( @@ -380,11 +386,14 @@ server.registerTool( ## Server-initiated requests -MCP is bidirectional — servers can send requests *to* the client during tool execution, as long as the client declares matching capabilities (see [Architecture](https://modelcontextprotocol.io/docs/learn/architecture) in the MCP overview). +MCP is bidirectional — servers can send requests _to_ the client during tool execution, as long as the client declares matching capabilities (see [Architecture](https://modelcontextprotocol.io/docs/learn/architecture) in the MCP overview). + +On streamable HTTP, these requests are delivered on the originating request's SSE response stream. They are not available with `enableJsonResponse: true` — the call throws. ### Sampling -Sampling lets a tool handler request an LLM completion from the connected client — the handler describes a prompt and the client returns the model's response (see [Sampling](https://modelcontextprotocol.io/docs/learn/client-concepts#sampling) in the MCP overview). Use sampling when a tool needs the model to generate or transform text mid-execution. +Sampling lets a tool handler request an LLM completion from the connected client — the handler describes a prompt and the client returns the model's response (see [Sampling](https://modelcontextprotocol.io/docs/learn/client-concepts#sampling) in the MCP overview). Use sampling +when a tool needs the model to generate or transform text mid-execution. Call `ctx.mcpReq.requestSampling(params)` (from {@linkcode @modelcontextprotocol/server!index.ServerContext | ServerContext}) inside a tool handler: @@ -429,8 +438,7 @@ Elicitation lets a tool handler request direct input from the user — form fiel - **Form** (`mode: 'form'`) — collects non-sensitive data via a schema-driven form. - **URL** (`mode: 'url'`) — opens a browser URL for sensitive data or secure flows (API keys, payments, OAuth). -> [!IMPORTANT] -> Sensitive information must not be collected via form elicitation; always use URL elicitation or out-of-band flows for secrets. +> [!IMPORTANT] Sensitive information must not be collected via form elicitation; always use URL elicitation or out-of-band flows for secrets. Call `ctx.mcpReq.elicitInput(params)` (from {@linkcode @modelcontextprotocol/server!index.ServerContext | ServerContext}) inside a tool handler: @@ -474,11 +482,13 @@ server.registerTool( ); ``` -For runnable examples, see [`elicitationFormExample.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/elicitationFormExample.ts) (form) and [`elicitationUrlExample.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/elicitationUrlExample.ts) (URL). +For runnable examples, see [`elicitationFormExample.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/elicitationFormExample.ts) (form) and +[`elicitationUrlExample.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/elicitationUrlExample.ts) (URL). ### Roots -Roots let a tool handler discover the client's workspace directories — for example, to scope a file search or identify project boundaries (see [Roots](https://modelcontextprotocol.io/docs/learn/client-concepts#roots) in the MCP overview). Call {@linkcode @modelcontextprotocol/server!server/server.Server#listRoots | server.server.listRoots()} (requires the client to declare the `roots` capability): +Roots let a tool handler discover the client's workspace directories — for example, to scope a file search or identify project boundaries (see [Roots](https://modelcontextprotocol.io/docs/learn/client-concepts#roots) in the MCP overview). Call {@linkcode +@modelcontextprotocol/server!server/server.Server#listRoots | server.server.listRoots()} (requires the client to declare the `roots` capability): ```ts source="../examples/server/src/serverGuide.examples.ts#registerTool_roots" server.registerTool( @@ -497,10 +507,10 @@ server.registerTool( ## Tasks (experimental) -> [!WARNING] -> The tasks API is experimental and may change without notice. +> [!WARNING] The tasks API is experimental and may change without notice. -Task-based execution enables "call-now, fetch-later" patterns for long-running operations (see [Tasks](https://modelcontextprotocol.io/specification/latest/basic/utilities/tasks) in the MCP specification). Instead of returning a result immediately, a tool creates a task that can be polled or resumed later. To use tasks: +Task-based execution enables "call-now, fetch-later" patterns for long-running operations (see [Tasks](https://modelcontextprotocol.io/specification/latest/basic/utilities/tasks) in the MCP specification). Instead of returning a result immediately, a tool creates a task that can +be polled or resumed later. To use tasks: - Provide a {@linkcode @modelcontextprotocol/server!index.TaskStore | TaskStore} implementation that persists task metadata and results (see {@linkcode @modelcontextprotocol/server!index.InMemoryTaskStore | InMemoryTaskStore} for reference). - Enable the `tasks` capability when constructing the server. @@ -545,9 +555,11 @@ For a complete multi-session server with shutdown handling, see [`simpleStreamab ### DNS rebinding protection -Under normal circumstances, cross-origin browser restrictions limit what a malicious website can do to your localhost server. [DNS rebinding attacks](https://en.wikipedia.org/wiki/DNS_rebinding) get around those restrictions entirely by making the requests appear as same-origin, since the attacking domain resolves to localhost. Validating the host header on the server side protects against this scenario. **All localhost MCP servers should use DNS rebinding protection.** +Under normal circumstances, cross-origin browser restrictions limit what a malicious website can do to your localhost server. [DNS rebinding attacks](https://en.wikipedia.org/wiki/DNS_rebinding) get around those restrictions entirely by making the requests appear as same-origin, +since the attacking domain resolves to localhost. Validating the host header on the server side protects against this scenario. **All localhost MCP servers should use DNS rebinding protection.** -The recommended approach is to use {@linkcode @modelcontextprotocol/express!express.createMcpExpressApp | createMcpExpressApp()} (from `@modelcontextprotocol/express`) or {@linkcode @modelcontextprotocol/hono!hono.createMcpHonoApp | createMcpHonoApp()} (from `@modelcontextprotocol/hono`), which enable Host header validation by default: +The recommended approach is to use {@linkcode @modelcontextprotocol/express!express.createMcpExpressApp | createMcpExpressApp()} (from `@modelcontextprotocol/express`) or {@linkcode @modelcontextprotocol/hono!hono.createMcpHonoApp | createMcpHonoApp()} (from +`@modelcontextprotocol/hono`), which enable Host header validation by default: ```ts source="../examples/server/src/serverGuide.examples.ts#dnsRebinding_basic" // Default: DNS rebinding protection auto-enabled (host is 127.0.0.1) @@ -571,7 +583,8 @@ const app = createMcpExpressApp({ `createMcpHonoApp()` from `@modelcontextprotocol/hono` provides the same protection for Hono-based servers and Web Standard runtimes (Cloudflare Workers, Deno, Bun). -If you use `NodeStreamableHTTPServerTransport` directly with your own HTTP framework, you must implement Host header validation yourself. See the [`hostHeaderValidation`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/packages/middleware/express/src/express.ts) middleware source for reference. +If you use `NodeStreamableHTTPServerTransport` directly with your own HTTP framework, you must implement Host header validation yourself. See the [`hostHeaderValidation`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/packages/middleware/express/src/express.ts) +middleware source for reference. ## See also @@ -583,10 +596,10 @@ If you use `NodeStreamableHTTPServerTransport` directly with your own HTTP frame ### Additional examples -| Feature | Description | Example | -|---------|-------------|---------| -| Web Standard transport | Deploy on Cloudflare Workers, Deno, or Bun | [`honoWebStandardStreamableHttp.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/honoWebStandardStreamableHttp.ts) | -| Session management | Per-session transport routing, initialization, and cleanup | [`simpleStreamableHttp.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/simpleStreamableHttp.ts) | -| Resumability | Replay missed SSE events via an event store | [`inMemoryEventStore.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/inMemoryEventStore.ts) | -| CORS | Expose MCP headers for browser clients | [`simpleStreamableHttp.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/simpleStreamableHttp.ts) | -| Multi-node deployment | Stateless, persistent-storage, and distributed routing patterns | [`examples/server/README.md`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/README.md#multi-node-deployment-patterns) | +| Feature | Description | Example | +| ---------------------- | --------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Web Standard transport | Deploy on Cloudflare Workers, Deno, or Bun | [`honoWebStandardStreamableHttp.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/honoWebStandardStreamableHttp.ts) | +| Session management | Per-session transport routing, initialization, and cleanup | [`simpleStreamableHttp.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/simpleStreamableHttp.ts) | +| Resumability | Replay missed SSE events via an event store | [`inMemoryEventStore.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/inMemoryEventStore.ts) | +| CORS | Expose MCP headers for browser clients | [`simpleStreamableHttp.ts`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/src/simpleStreamableHttp.ts) | +| Multi-node deployment | Stateless, persistent-storage, and distributed routing patterns | [`examples/server/README.md`](https://github.com/modelcontextprotocol/typescript-sdk/blob/main/examples/server/README.md#multi-node-deployment-patterns) | diff --git a/packages/server/src/server/server.ts b/packages/server/src/server/server.ts index 45ee84eb66..7f974975d6 100644 --- a/packages/server/src/server/server.ts +++ b/packages/server/src/server/server.ts @@ -161,8 +161,8 @@ export class Server extends Protocol { mcpReq: { ...ctx.mcpReq, log: (level, data, logger) => this.sendLoggingMessage({ level, data, logger }), - elicitInput: (params, options) => this.elicitInput(params, options), - requestSampling: (params, options) => this.createMessage(params, options) + elicitInput: (params, options) => this.elicitInput(params, { ...options, relatedRequestId: ctx.mcpReq.id }), + requestSampling: (params, options) => this.createMessage(params, { ...options, relatedRequestId: ctx.mcpReq.id }) }, http: hasHttpInfo ? { diff --git a/packages/server/src/server/streamableHttp.ts b/packages/server/src/server/streamableHttp.ts index fd3563a077..4ba7f8bce5 100644 --- a/packages/server/src/server/streamableHttp.ts +++ b/packages/server/src/server/streamableHttp.ts @@ -18,6 +18,9 @@ import { SUPPORTED_PROTOCOL_VERSIONS } from '@modelcontextprotocol/core'; +/** Placeholder stop-function used until a stream's keepalive is started (or when keepalive is disabled). */ +const noKeepAlive = (): void => {}; + export type StreamId = string; export type EventId = string; @@ -141,6 +144,19 @@ export interface WebStandardStreamableHTTPServerTransportOptions { */ retryInterval?: number; + /** + * Interval in milliseconds between SSE keepalive comment frames. + * + * When set, the transport periodically writes an SSE comment (`: keepalive`) on every + * open SSE stream (POST response streams and the standalone GET stream) so idle + * connections are not torn down by intermediaries — for example while a request handler + * is waiting on a nested server→client request such as `elicitation/create` + * (human-in-the-loop delays). + * + * Disabled by default. Non-positive values (`<= 0`) are treated as disabled. + */ + keepAliveInterval?: number; + /** * List of protocol versions that this transport will accept. * Used to validate the `mcp-protocol-version` header in incoming requests. @@ -239,6 +255,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { private _allowedOrigins?: string[]; private _enableDnsRebindingProtection: boolean; private _retryInterval?: number; + private _keepAliveInterval?: number; private _supportedProtocolVersions: string[]; sessionId?: string; @@ -256,6 +273,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { this._allowedOrigins = options.allowedOrigins; this._enableDnsRebindingProtection = options.enableDnsRebindingProtection ?? false; this._retryInterval = options.retryInterval; + this._keepAliveInterval = options.keepAliveInterval; this._supportedProtocolVersions = options.supportedProtocolVersions ?? SUPPORTED_PROTOCOL_VERSIONS; } @@ -438,11 +456,13 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const encoder = new TextEncoder(); let streamController: ReadableStreamDefaultController; + let stopKeepAlive: () => void = noKeepAlive; // Create a ReadableStream with a controller we can use to push SSE events const readable = new ReadableStream({ start: controller => { streamController = controller; + stopKeepAlive = this.startKeepAlive(controller, encoder); }, cancel: () => { // Stream was cancelled by client @@ -466,6 +486,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { controller: streamController!, encoder, cleanup: () => { + stopKeepAlive(); this._streamMapping.delete(this._standaloneSseStreamId); try { streamController!.close(); @@ -488,6 +509,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { return this.createJsonErrorResponse(400, -32_000, 'Event store not configured'); } + let stopKeepAlive: () => void = noKeepAlive; try { // If getStreamIdForEventId is available, use it for conflict checking let streamId: string | undefined; @@ -523,6 +545,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const readable = new ReadableStream({ start: controller => { streamController = controller; + stopKeepAlive = this.startKeepAlive(controller, encoder); }, cancel: () => { // Stream was cancelled by client @@ -548,6 +571,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { controller: streamController!, encoder, cleanup: () => { + stopKeepAlive(); this._streamMapping.delete(replayedStreamId); try { streamController!.close(); @@ -559,11 +583,34 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { return new Response(readable, { headers }); } catch (error) { + // The keepalive was armed when the stream was constructed; without a mapping entry + // there is no cleanup() to stop it, so stop it here. + stopKeepAlive(); this.onerror?.(error as Error); return this.createJsonErrorResponse(500, -32_000, 'Error replaying events'); } } + /** + * Starts the SSE keepalive timer for a stream, writing comment frames until stopped. + * Returns a stop function (a no-op when keepalive is not configured or disabled). + */ + private startKeepAlive(controller: ReadableStreamDefaultController, encoder: InstanceType): () => void { + const interval = this._keepAliveInterval; + if (interval === undefined || !Number.isFinite(interval) || interval <= 0) { + return noKeepAlive; + } + const timer = setInterval(() => { + try { + controller.enqueue(encoder.encode(': keepalive\n\n')); + } catch { + // Stream already closed or cancelled + clearInterval(timer); + } + }, interval); + return () => clearInterval(timer); + } + /** * Writes an event to an SSE stream via controller with proper formatting */ @@ -747,10 +794,12 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { // SSE streaming mode - use ReadableStream with controller for more reliable data pushing const encoder = new TextEncoder(); let streamController: ReadableStreamDefaultController; + let stopKeepAlive: () => void = noKeepAlive; const readable = new ReadableStream({ start: controller => { streamController = controller; + stopKeepAlive = this.startKeepAlive(controller, encoder); }, cancel: () => { // Stream was cancelled by client @@ -777,6 +826,7 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { controller: streamController!, encoder, cleanup: () => { + stopKeepAlive(); this._streamMapping.delete(streamId); try { streamController!.close(); @@ -985,6 +1035,17 @@ export class WebStandardStreamableHTTPServerTransport implements Transport { const stream = this._streamMapping.get(streamId); + // Server→client requests must ride the originating request's response stream (SEP-2260). + // When that stream cannot carry SSE — JSON-response mode, or the stream is gone — fail fast + // instead of delivering the request unassociated. The standalone GET stream is not a valid + // fallback: clients are not required to open one. + if (isJSONRPCRequest(message) && (this._enableJsonResponse || !stream?.controller)) { + throw new Error( + `Cannot deliver server-to-client request (${message.method}): the originating request's response cannot carry SSE ` + + (this._enableJsonResponse ? '(JSON response mode)' : '(response stream closed)') + ); + } + if (!this._enableJsonResponse && stream?.controller && stream?.encoder) { // For SSE responses, generate event ID if event store is provided let eventId: string | undefined; diff --git a/packages/server/test/server/server.test.ts b/packages/server/test/server/server.test.ts index 0edcfd3af0..7d4b7527bf 100644 --- a/packages/server/test/server/server.test.ts +++ b/packages/server/test/server/server.test.ts @@ -1,4 +1,4 @@ -import type { JSONRPCMessage, JSONRPCRequest } from '@modelcontextprotocol/core'; +import type { JSONRPCMessage, JSONRPCRequest, ServerContext } from '@modelcontextprotocol/core'; import { InitializeResultSchema, InMemoryTransport, @@ -86,6 +86,110 @@ describe('Server', () => { }); }); + describe('ctx-scoped request association', () => { + const ELICIT_PARAMS = { + message: 'Need input', + requestedSchema: { type: 'object' as const, properties: { ok: { type: 'boolean' as const } } } + }; + + const SAMPLING_PARAMS = { + messages: [{ role: 'user' as const, content: { type: 'text' as const, text: 'hi' } }], + maxTokens: 5 + }; + + /** + * Connects a Server (with a tools/call handler) to an in-memory transport pair, runs the + * initialize handshake declaring elicitation + sampling client capabilities, and records + * every transport-level send so tests can assert on the options passed to transport.send(). + * + * The fake client auto-responds to elicitation/create and sampling/createMessage requests + * so handlers can run to completion. + */ + async function setup(onToolCall: (ctx: ServerContext) => Promise) { + const server = new Server({ name: 'test', version: '1.0.0' }, { capabilities: { tools: {} } }); + + server.setRequestHandler('tools/call', async (_request, ctx) => { + await onToolCall(ctx); + return { content: [{ type: 'text' as const, text: 'done' }] }; + }); + + const [clientTransport, serverTransport] = InMemoryTransport.createLinkedPair(); + + // Records every [message, options] pair the server passes to transport.send() + const sendSpy = vi.spyOn(serverTransport, 'send'); + + const clientMessages: JSONRPCMessage[] = []; + clientTransport.onmessage = message => { + clientMessages.push(message); + if ('method' in message && 'id' in message) { + const request = message as JSONRPCRequest; + if (request.method === 'elicitation/create') { + void clientTransport.send({ jsonrpc: '2.0', id: request.id, result: { action: 'decline' } }); + } else if (request.method === 'sampling/createMessage') { + void clientTransport.send({ + jsonrpc: '2.0', + id: request.id, + result: { role: 'assistant', content: { type: 'text', text: 'ok' }, model: 'test-model' } + }); + } + } + }; + + await server.connect(serverTransport); + await clientTransport.start(); + + // Initialize handshake declaring elicitation + sampling client capabilities + await clientTransport.send({ + jsonrpc: '2.0', + id: 'init-1', + method: 'initialize', + params: { + protocolVersion: LATEST_PROTOCOL_VERSION, + capabilities: { elicitation: { form: {} }, sampling: {} }, + clientInfo: { name: 'test-client', version: '1.0.0' } + } + } as JSONRPCMessage); + await vi.waitFor(() => expect(clientMessages.some(m => 'id' in m && m.id === 'init-1')).toBe(true)); + await clientTransport.send({ jsonrpc: '2.0', method: 'notifications/initialized' } as JSONRPCMessage); + + /** Sends a tools/call request and waits for its response to come back to the client. */ + async function callTool(id: string): Promise { + await clientTransport.send({ + jsonrpc: '2.0', + id, + method: 'tools/call', + params: { name: 'test-tool', arguments: {} } + } as JSONRPCMessage); + await vi.waitFor(() => + expect(clientMessages.some(m => 'id' in m && m.id === id && ('result' in m || 'error' in m))).toBe(true) + ); + } + + /** Returns the transport.send() options for the first sent request with the given method. */ + function sentOptionsFor(method: string) { + const call = sendSpy.mock.calls.find(([message]) => 'method' in message && message.method === method); + expect(call).toBeDefined(); + return call![1]; + } + + return { server, clientMessages, callTool, sentOptionsFor }; + } + + it('handler-supplied relatedRequestId cannot override the association', async () => { + const { server, callTool, sentOptionsFor } = await setup(async ctx => { + await ctx.mcpReq.elicitInput(ELICIT_PARAMS, { relatedRequestId: 'attempted-override' }); + await ctx.mcpReq.requestSampling(SAMPLING_PARAMS, { relatedRequestId: 'attempted-override' }); + }); + + await callTool('tools-call-3'); + + expect(sentOptionsFor('elicitation/create')?.relatedRequestId).toBe('tools-call-3'); + expect(sentOptionsFor('sampling/createMessage')?.relatedRequestId).toBe('tools-call-3'); + + await server.close(); + }); + }); + describe('getNegotiatedProtocolVersion', () => { it('returns undefined before initialization', () => { const server = new Server({ name: 'test', version: '1.0.0' }, { capabilities: {} }); diff --git a/packages/server/test/server/streamableHttp.test.ts b/packages/server/test/server/streamableHttp.test.ts index 7a23dd56bb..c9f205aff7 100644 --- a/packages/server/test/server/streamableHttp.test.ts +++ b/packages/server/test/server/streamableHttp.test.ts @@ -4,7 +4,7 @@ import type { CallToolResult, JSONRPCErrorResponse, JSONRPCMessage } from '@mode import * as z from 'zod/v4'; import { McpServer } from '../../src/server/mcp.js'; -import type { EventId, EventStore, StreamId } from '../../src/server/streamableHttp.js'; +import type { EventId, EventStore, StreamId, WebStandardStreamableHTTPServerTransportOptions } from '../../src/server/streamableHttp.js'; import { WebStandardStreamableHTTPServerTransport } from '../../src/server/streamableHttp.js'; /** @@ -559,6 +559,103 @@ describe('Zod v4', () => { id: 'call-1' }); }); + + it('rejects server-to-client requests when the response stream cannot carry SSE', async () => { + // Case 1: JSON response mode — the originating POST's response is JSON and cannot carry SSE + let releaseTool!: () => void; + const toolBlocked = new Promise(resolve => { + releaseTool = resolve; + }); + let signalToolStarted!: () => void; + const toolStarted = new Promise(resolve => { + signalToolStarted = resolve; + }); + + mcpServer.registerTool('block', { description: 'Blocks until released', inputSchema: z.object({}) }, async () => { + signalToolStarted(); + await toolBlocked; + return { content: [{ type: 'text', text: 'released' }] }; + }); + + sessionId = await initializeServer(); + + // Hold a tools/call in flight; its JSON-mode response stream cannot carry SSE + const toolCall: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'block', arguments: {} }, + id: 'call-block' + }; + const toolCallPromise = transport.handleRequest(createRequest('POST', toolCall, { sessionId })); + await toolStarted; + + const elicitRequest: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'elicitation/create', + params: { mode: 'form', message: 'Need input', requestedSchema: { type: 'object', properties: {} } }, + id: 'elicit-1' + }; + await expect(transport.send(elicitRequest, { relatedRequestId: 'call-block' })).rejects.toThrow( + "Cannot deliver server-to-client request (elicitation/create): the originating request's response cannot carry SSE (JSON response mode)" + ); + + // The tool call still completes as JSON + releaseTool(); + const toolResponse = await toolCallPromise; + expect(toolResponse.status).toBe(200); + expect(toolResponse.headers.get('content-type')).toBe('application/json'); + + // Case 2: SSE mode, but the originating request's response stream has closed (client disconnected) + let releaseSseTool!: () => void; + const sseToolBlocked = new Promise(resolve => { + releaseSseTool = resolve; + }); + let signalSseToolStarted!: () => void; + const sseToolStarted = new Promise(resolve => { + signalSseToolStarted = resolve; + }); + + const sseServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: {} }); + sseServer.registerTool('block', { description: 'Blocks until released', inputSchema: z.object({}) }, async () => { + signalSseToolStarted(); + await sseToolBlocked; + return { content: [{ type: 'text', text: 'released' }] }; + }); + const sseTransport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID() + }); + await sseServer.connect(sseTransport); + + const initResponse = await sseTransport.handleRequest(createRequest('POST', TEST_MESSAGES.initialize)); + expect(initResponse.status).toBe(200); + const sseSessionId = initResponse.headers.get('mcp-session-id') as string; + + const sseToolCall: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'block', arguments: {} }, + id: 'call-block-sse' + }; + const sseToolResponse = await sseTransport.handleRequest(createRequest('POST', sseToolCall, { sessionId: sseSessionId })); + expect(sseToolResponse.status).toBe(200); + await sseToolStarted; + + // The client disconnects from the originating request's response stream + await sseToolResponse.body!.cancel(); + + const sseElicitRequest: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'elicitation/create', + params: { mode: 'form', message: 'Need input', requestedSchema: { type: 'object', properties: {} } }, + id: 'elicit-2' + }; + await expect(sseTransport.send(sseElicitRequest, { relatedRequestId: 'call-block-sse' })).rejects.toThrow( + "Cannot deliver server-to-client request (elicitation/create): the originating request's response cannot carry SSE (response stream closed)" + ); + + releaseSseTool(); + await sseTransport.close(); + }); }); describe('HTTPServerTransport - Session Callbacks', () => { @@ -957,6 +1054,286 @@ describe('Zod v4', () => { }); }); + describe('HTTPServerTransport - SSE Keepalive', () => { + let transport: WebStandardStreamableHTTPServerTransport; + let mcpServer: McpServer; + + beforeEach(() => { + // Only fake interval timers (what keepalive uses) so promise/stream plumbing, + // request parsing, and real-time waits keep working. + vi.useFakeTimers({ toFake: ['setInterval', 'clearInterval'] }); + }); + + afterEach(async () => { + await transport?.close(); + vi.useRealTimers(); + }); + + /** + * Creates a transport + server, registers a 'greet' tool, runs initialization, + * and drains the initialization response stream so it completes and cleans up. + */ + async function setupTransport( + options: Partial = {}, + configureServer?: (server: McpServer) => void + ): Promise { + mcpServer = new McpServer({ name: 'test-server', version: '1.0.0' }, { capabilities: { logging: {} } }); + + mcpServer.registerTool( + 'greet', + { description: 'Greeting tool', inputSchema: z.object({ name: z.string() }) }, + async ({ name }): Promise => { + return { content: [{ type: 'text', text: `Hello, ${name}!` }] }; + } + ); + configureServer?.(mcpServer); + + transport = new WebStandardStreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + ...options + }); + + await mcpServer.connect(transport); + + const initResponse = await transport.handleRequest(createRequest('POST', TEST_MESSAGES.initialize)); + expect(initResponse.status).toBe(200); + const newSessionId = initResponse.headers.get('mcp-session-id') as string; + // Drain the initialize response so its POST stream completes (and stops any keepalive) + await readSSEEvent(initResponse); + return newSessionId; + } + + /** Starts a background reader that collects decoded chunks from an SSE response body. */ + function collectChunks(response: Response): string[] { + const received: string[] = []; + const decoder = new TextDecoder(); + const reader = response.body!.getReader(); + void (async () => { + for (;;) { + const { done, value } = await reader.read(); + if (done) { + break; + } + received.push(decoder.decode(value)); + } + })(); + return received; + } + + /** Counts keepalive comment frames across collected chunks (robust to chunk boundaries). */ + function keepaliveCount(chunks: string[]): number { + return chunks.join('').split(': keepalive\n\n').length - 1; + } + + /** Yields real time so background stream readers observe chunks enqueued by fake timers. */ + function drainStream(): Promise { + return new Promise(resolve => setTimeout(resolve, 10)); + } + + /** Polls (in real time) until the predicate holds. */ + async function waitForCondition(predicate: () => boolean, timeoutMs = 2000): Promise { + const start = Date.now(); + while (!predicate()) { + if (Date.now() - start > timeoutMs) { + throw new Error('Timed out waiting for condition'); + } + await new Promise(resolve => setTimeout(resolve, 5)); + } + } + + it('does not schedule a keepalive timer when keepAliveInterval is not set', async () => { + const sessionId = await setupTransport(); + + const response = await transport.handleRequest(createRequest('GET', undefined, { sessionId })); + expect(response.status).toBe(200); + const chunks = collectChunks(response); + + // Default off: no timer scheduled for the open standalone stream + expect(vi.getTimerCount()).toBe(0); + + // ... and nothing is ever written to the stream + vi.advanceTimersByTime(60_000); + await drainStream(); + expect(chunks).toHaveLength(0); + }); + + it.each([0, -1000, NaN, Infinity])('treats keepAliveInterval=%d as disabled', async interval => { + const sessionId = await setupTransport({ keepAliveInterval: interval }); + + const response = await transport.handleRequest(createRequest('GET', undefined, { sessionId })); + expect(response.status).toBe(200); + const chunks = collectChunks(response); + + expect(vi.getTimerCount()).toBe(0); + + vi.advanceTimersByTime(60_000); + await drainStream(); + expect(chunks).toHaveLength(0); + }); + + it('writes keepalive comments at the configured interval while a stream is open', async () => { + const sessionId = await setupTransport({ keepAliveInterval: 1000 }); + + const response = await transport.handleRequest(createRequest('GET', undefined, { sessionId })); + expect(response.status).toBe(200); + const chunks = collectChunks(response); + + // Nothing before the interval elapses + vi.advanceTimersByTime(999); + await drainStream(); + expect(keepaliveCount(chunks)).toBe(0); + + // First interval elapses + vi.advanceTimersByTime(1); + await drainStream(); + expect(keepaliveCount(chunks)).toBe(1); + + // Two more intervals + vi.advanceTimersByTime(2000); + await drainStream(); + expect(keepaliveCount(chunks)).toBe(3); + + // Frames are pure SSE comments — nothing else is written + expect(chunks.join('')).toBe(': keepalive\n\n'.repeat(3)); + }); + + it('stops the keepalive timer when a request response stream completes', async () => { + let releaseTool!: () => void; + const toolBlocked = new Promise(resolve => { + releaseTool = resolve; + }); + + const sessionId = await setupTransport({ keepAliveInterval: 1000 }, server => { + server.registerTool('block', { description: 'Blocks until released', inputSchema: z.object({}) }, async () => { + await toolBlocked; + return { content: [{ type: 'text', text: 'released' }] }; + }); + }); + + // The initialization response stream has already completed — its keepalive must be gone + expect(vi.getTimerCount()).toBe(0); + + const toolCall: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'block', arguments: {} }, + id: 'call-block' + }; + const response = await transport.handleRequest(createRequest('POST', toolCall, { sessionId })); + expect(response.status).toBe(200); + const chunks = collectChunks(response); + + // While the handler is in flight, the keepalive timer is active and writes frames + expect(vi.getTimerCount()).toBe(1); + vi.advanceTimersByTime(2000); + await drainStream(); + expect(keepaliveCount(chunks)).toBe(2); + + // Complete the handler: the response is written and the stream completes + releaseTool(); + await waitForCondition(() => chunks.some(chunk => chunk.includes('call-block'))); + + // No timer leak after stream completion + expect(vi.getTimerCount()).toBe(0); + + // No further keepalives are written after completion + vi.advanceTimersByTime(10_000); + await drainStream(); + expect(keepaliveCount(chunks)).toBe(2); + }); + + it('stops keepalive timers when the transport is closed mid-stream', async () => { + const sessionId = await setupTransport({ keepAliveInterval: 1000 }); + + const response = await transport.handleRequest(createRequest('GET', undefined, { sessionId })); + expect(response.status).toBe(200); + const chunks = collectChunks(response); + + expect(vi.getTimerCount()).toBe(1); + vi.advanceTimersByTime(1000); + await drainStream(); + expect(keepaliveCount(chunks)).toBe(1); + + await transport.close(); + + // Closing the transport mid-stream clears the timer and writes nothing further + expect(vi.getTimerCount()).toBe(0); + vi.advanceTimersByTime(10_000); + await drainStream(); + expect(keepaliveCount(chunks)).toBe(1); + }); + + it('runs independent keepalives for concurrent streams and cleans each up independently', async () => { + let releaseTool!: () => void; + const toolBlocked = new Promise(resolve => { + releaseTool = resolve; + }); + + const sessionId = await setupTransport({ keepAliveInterval: 1000 }, server => { + server.registerTool('block', { description: 'Blocks until released', inputSchema: z.object({}) }, async () => { + await toolBlocked; + return { content: [{ type: 'text', text: 'released' }] }; + }); + }); + + // Stream A: standalone GET stream + const getResponse = await transport.handleRequest(createRequest('GET', undefined, { sessionId })); + expect(getResponse.status).toBe(200); + const getChunks = collectChunks(getResponse); + + // Stream B: POST response stream held open by the blocked tool + const toolCall: JSONRPCMessage = { + jsonrpc: '2.0', + method: 'tools/call', + params: { name: 'block', arguments: {} }, + id: 'call-block' + }; + const postResponse = await transport.handleRequest(createRequest('POST', toolCall, { sessionId })); + expect(postResponse.status).toBe(200); + const postChunks = collectChunks(postResponse); + + // Each open stream has its own keepalive timer + expect(vi.getTimerCount()).toBe(2); + + // Both streams receive keepalives + vi.advanceTimersByTime(1000); + await drainStream(); + expect(keepaliveCount(getChunks)).toBe(1); + expect(keepaliveCount(postChunks)).toBe(1); + + // Completing the tool call cleans up only the POST stream's keepalive + releaseTool(); + await waitForCondition(() => postChunks.some(chunk => chunk.includes('call-block'))); + expect(vi.getTimerCount()).toBe(1); + + // The GET stream keepalive continues; the completed POST stream gets nothing further + vi.advanceTimersByTime(1000); + await drainStream(); + expect(keepaliveCount(getChunks)).toBe(2); + expect(keepaliveCount(postChunks)).toBe(1); + }); + + it('stops the keepalive timer when event replay fails', async () => { + const failingEventStore: EventStore = { + async storeEvent(): Promise { + return 'stored-event'; + }, + async replayEventsAfter(): Promise { + throw new Error('unknown event id'); + } + }; + const sessionId = await setupTransport({ keepAliveInterval: 1000, eventStore: failingEventStore }); + + const response = await transport.handleRequest( + createRequest('GET', undefined, { sessionId, extraHeaders: { 'last-event-id': 'unknown-event' } }) + ); + expect(response.status).toBe(500); + + // The keepalive armed when the replay stream was constructed must not outlive the failed replay + expect(vi.getTimerCount()).toBe(0); + }); + }); + describe('close() re-entrancy guard', () => { it('should not recurse when onclose triggers a second close()', async () => { const transport = new WebStandardStreamableHTTPServerTransport({ sessionIdGenerator: randomUUID }); diff --git a/test/e2e/requirements.ts b/test/e2e/requirements.ts index 092abb1a7c..9353428397 100644 --- a/test/e2e/requirements.ts +++ b/test/e2e/requirements.ts @@ -766,6 +766,30 @@ export const REQUIREMENTS: Record = { note: 'Stateless hosting creates a fresh server per request and has no standalone GET stream, so there is no server→client channel to deliver/observe these.' }, + // Server-request association (SEP-2260) + + 'protocol:assoc:nested-on-originating-stream': { + source: 'https://modelcontextprotocol.io/seps/2260-Require-Server-requests-to-be-associated-with-Client-requests#specification-changes', + behavior: + "[SEP-2260] Server-to-client requests issued while handling a client request (such as roots/list, sampling/createMessage, or elicitation/create from a tool handler) are associated with that originating request — on streamable HTTP they ride that request's SSE response stream, not a separate channel.", + transports: ['streamableHttp'], + note: 'SHOULD-strength on the published 2025-11-25 spec (the transports page only recommends delivering requests on the originating stream); SEP-2260 is the citation for the association itself. The stream association is observable only on the HTTP hosting layer; the matrix transport arg is ignored, so it runs as a single streamableHttp-labelled cell to avoid duplicate runs.' + }, + 'protocol:assoc:keepalive-during-nested': { + source: 'https://modelcontextprotocol.io/seps/2260-Require-Server-requests-to-be-associated-with-Client-requests#timeout-considerations', + behavior: + '[SEP-2260] During a nested server→client wait (e.g. an elicitation pending inside tools/call) the server keeps the originating SSE response stream alive — keepalive frames are observed on that stream when the transport-level SSE keepalive is enabled — and the parent call does not fail.', + transports: ['streamableHttp'], + note: 'SHOULD-strength transport guidance (SEP Timeout Considerations); doubles as a regression guard against idle-stream drops during human-in-the-loop waits. This exercises the HTTP hosting layer; the matrix transport arg is ignored, so it runs as a single streamableHttp-labelled cell to avoid duplicate runs.' + }, + 'protocol:assoc:ping-exempt': { + source: 'https://modelcontextprotocol.io/seps/2260-Require-Server-requests-to-be-associated-with-Client-requests#1-add-warning-blocks-to-feature-documentation', + behavior: + '[SEP-2260] ping is exempt from the request-association rule: either party MAY send ping at any time on an established connection — including the server with no client request in flight — and receives an empty result.', + transports: STATEFUL_TRANSPORTS, + note: 'Server→client ping with nothing in flight needs a server→client channel; stateless hosting has none.' + }, + // Sampling 'sampling:capability:declare': { diff --git a/test/e2e/scenarios/hosting-http.test.ts b/test/e2e/scenarios/hosting-http.test.ts index e055672e47..96a5e7fe21 100644 --- a/test/e2e/scenarios/hosting-http.test.ts +++ b/test/e2e/scenarios/hosting-http.test.ts @@ -1036,3 +1036,249 @@ verifies('hosting:http:send-no-listener-noop', async (_args: TestArgs) => { await close(); } }); + +verifies('protocol:assoc:nested-on-originating-stream', async (_args: TestArgs) => { + const makeServer = () => { + const s = new McpServer({ name: 's', version: '0' }); + s.registerTool('nested', { inputSchema: z.object({}) }, async (_a, ctx) => { + const elicited = await ctx.mcpReq.elicitInput({ + mode: 'form', + message: 'Need input', + requestedSchema: { type: 'object', properties: { ok: { type: 'boolean' } } } + }); + const sampled = await ctx.mcpReq.requestSampling({ + messages: [{ role: 'user', content: { type: 'text', text: 'hi' } }], + maxTokens: 5 + }); + const roots = await ctx.mcpReq.send({ method: 'roots/list' }); + return { content: [{ type: 'text', text: `${elicited.action}|${sampled.model}|${roots.roots.length}` }] }; + }); + return s; + }; + const { handleRequest, close } = hostPerSession(makeServer); + + const base = { + 'mcp-protocol-version': LATEST_PROTOCOL_VERSION, + 'content-type': 'application/json', + accept: 'application/json, text/event-stream' + }; + + try { + const initRes = await handleRequest( + new Request('http://in-process/mcp', { + method: 'POST', + headers: base, + body: JSON.stringify({ + jsonrpc: '2.0', + id: 1, + method: 'initialize', + params: { + protocolVersion: LATEST_PROTOCOL_VERSION, + capabilities: { elicitation: { form: {} }, sampling: {}, roots: {} }, + clientInfo: { name: 'probe', version: '0' } + } + }) + }) + ); + expect(initRes.status).toBe(200); + const sessionId = initRes.headers.get('mcp-session-id')!; + const sessionHeaders = { ...base, 'mcp-session-id': sessionId }; + + // A standalone GET stream is open as the alternative channel the nested requests must NOT use. + const sse = await handleRequest( + new Request('http://in-process/mcp', { + method: 'GET', + headers: { accept: 'text/event-stream', 'mcp-protocol-version': LATEST_PROTOCOL_VERSION, 'mcp-session-id': sessionId } + }) + ); + expect(sse.status).toBe(200); + const getTap = sseTap(sse.body!); + + const callRes = await handleRequest( + new Request('http://in-process/mcp', { + method: 'POST', + headers: sessionHeaders, + body: JSON.stringify({ jsonrpc: '2.0', id: 2, method: 'tools/call', params: { name: 'nested', arguments: {} } }) + }) + ); + expect(callRes.status).toBe(200); + expect(callRes.headers.get('content-type')).toMatch(/text\/event-stream/); + const postTap = sseTap(callRes.body!); + + const answers: Record = { + 'elicitation/create': { action: 'accept', content: { ok: true } }, + 'sampling/createMessage': { + model: 'stub-model', + role: 'assistant', + content: { type: 'text', text: 'ok' }, + stopReason: 'endTurn' + }, + 'roots/list': { roots: [] } + }; + const answerServerRequest = async (msg: JSONRPCMessage) => { + if (!('method' in msg) || !('id' in msg) || msg.id === undefined) return; + const result = answers[msg.method]; + if (result === undefined) return; + const res = await handleRequest( + new Request('http://in-process/mcp', { + method: 'POST', + headers: sessionHeaders, + body: JSON.stringify({ jsonrpc: '2.0', id: msg.id, result }) + }) + ); + expect(res.status).toBe(202); + }; + + const onPostStream: string[] = []; + const onGetStream: string[] = []; + let toolResponse: JSONRPCMessage | undefined; + + try { + for (let i = 0; i < 100 && toolResponse === undefined; i++) { + for (const msg of await postTap.poll(50)) { + if ('method' in msg && 'id' in msg) { + onPostStream.push(msg.method); + await answerServerRequest(msg); + } else if ('id' in msg && msg.id === 2) { + toolResponse = msg; + } + } + for (const msg of await getTap.poll(10)) { + if ('method' in msg && 'id' in msg) { + onGetStream.push(msg.method); + // Answer misrouted requests too, so the loop terminates deterministically either way. + await answerServerRequest(msg); + } + } + } + + // All nested server→client requests rode the originating request's response stream... + expect(onPostStream).toEqual(['elicitation/create', 'sampling/createMessage', 'roots/list']); + // ...and none of them used the standalone GET stream. + expect(onGetStream).toEqual([]); + expect(toolResponse).toMatchObject({ + jsonrpc: '2.0', + id: 2, + result: { content: [{ type: 'text', text: 'accept|stub-model|0' }] } + }); + } finally { + await getTap.cancel(); + await postTap.cancel(); + } + } finally { + await close(); + } +}); + +verifies('protocol:assoc:keepalive-during-nested', async (_args: TestArgs) => { + const server = new McpServer({ name: 's', version: '0' }); + server.registerTool('ask', { inputSchema: z.object({}) }, async (_a, ctx) => { + const ans = await ctx.mcpReq.elicitInput({ + mode: 'form', + message: 'Take your time', + requestedSchema: { type: 'object', properties: { ok: { type: 'boolean' } } } + }); + return { content: [{ type: 'text', text: `done:${ans.action}` }] }; + }); + const tx = new WebStandardStreamableHTTPServerTransport({ sessionIdGenerator: () => randomUUID(), keepAliveInterval: 25 }); + await server.connect(tx); + const handleRequest = (req: Request) => tx.handleRequest(req); + + const base = { + 'mcp-protocol-version': LATEST_PROTOCOL_VERSION, + 'content-type': 'application/json', + accept: 'application/json, text/event-stream' + }; + + try { + const initRes = await handleRequest( + new Request('http://in-process/mcp', { + method: 'POST', + headers: base, + body: JSON.stringify({ + jsonrpc: '2.0', + id: 1, + method: 'initialize', + params: { + protocolVersion: LATEST_PROTOCOL_VERSION, + capabilities: { elicitation: { form: {} } }, + clientInfo: { name: 'probe', version: '0' } + } + }) + }) + ); + expect(initRes.status).toBe(200); + const sessionId = initRes.headers.get('mcp-session-id')!; + const sessionHeaders = { ...base, 'mcp-session-id': sessionId }; + + const callRes = await handleRequest( + new Request('http://in-process/mcp', { + method: 'POST', + headers: sessionHeaders, + body: JSON.stringify({ jsonrpc: '2.0', id: 2, method: 'tools/call', params: { name: 'ask', arguments: {} } }) + }) + ); + expect(callRes.status).toBe(200); + expect(callRes.headers.get('content-type')).toMatch(/text\/event-stream/); + + // Raw tap (keeps comment frames, which sseTap would discard). + const reader = callRes.body!.getReader(); + const decoder = new TextDecoder(); + let raw = ''; + let pending: ReturnType | null = null; + const pollRaw = async (timeoutMs: number): Promise => { + pending ??= reader.read(); + const result = await Promise.race([pending, new Promise(resolve => setTimeout(resolve, timeoutMs, null))]); + if (result === null) return; + pending = null; + if (result.done || !result.value) return; + raw += decoder.decode(result.value, { stream: true }); + }; + const dataMessages = (): JSONRPCMessage[] => + raw + .split('\n') + .filter(l => l.startsWith('data: ')) + .map((l): JSONRPCMessage => JSON.parse(l.slice(6))); + + try { + // Wait for the nested elicitation/create request to arrive on the originating response stream. + let elicitRequest: JSONRPCMessage | undefined; + for (let i = 0; i < 100 && elicitRequest === undefined; i++) { + await pollRaw(50); + elicitRequest = dataMessages().find(m => 'method' in m && m.method === 'elicitation/create' && 'id' in m); + } + expect(elicitRequest).toBeDefined(); + if (!elicitRequest || !('id' in elicitRequest)) throw new Error('expected an elicitation/create request'); + + // While the handler is parked on the elicitation, keepalive comment frames keep the stream alive. + const sizeAtElicit = raw.length; + let sawKeepAlive = false; + for (let i = 0; i < 40 && !sawKeepAlive; i++) { + await pollRaw(50); + sawKeepAlive = /^: /m.test(raw.slice(sizeAtElicit)); + } + expect(sawKeepAlive).toBe(true); + + // Answer the elicitation; the parent tools/call completes normally on the same stream. + const answerRes = await handleRequest( + new Request('http://in-process/mcp', { + method: 'POST', + headers: sessionHeaders, + body: JSON.stringify({ jsonrpc: '2.0', id: elicitRequest.id, result: { action: 'accept', content: { ok: true } } }) + }) + ); + expect(answerRes.status).toBe(202); + + let toolResponse: JSONRPCMessage | undefined; + for (let i = 0; i < 100 && toolResponse === undefined; i++) { + await pollRaw(50); + toolResponse = dataMessages().find(m => 'id' in m && m.id === 2 && !('method' in m)); + } + expect(toolResponse).toMatchObject({ jsonrpc: '2.0', id: 2, result: { content: [{ type: 'text', text: 'done:accept' }] } }); + } finally { + await reader.cancel(); + } + } finally { + await server.close(); + } +}); diff --git a/test/e2e/scenarios/protocol.test.ts b/test/e2e/scenarios/protocol.test.ts index 40b5a20af3..44d2fe702c 100644 --- a/test/e2e/scenarios/protocol.test.ts +++ b/test/e2e/scenarios/protocol.test.ts @@ -1678,3 +1678,32 @@ verifies('protocol:transport-callbacks:wrappable-after-connect', async ({ transp await closingServer.close(); } }); + +verifies('protocol:assoc:ping-exempt', async ({ transport }: TestArgs) => { + let server!: McpServer; + const makeServer = () => { + server = new McpServer({ name: 's', version: '0' }, { capabilities: { logging: {} } }); + return server; + }; + const logs: unknown[] = []; + const client = newClient(); + client.setNotificationHandler('notifications/message', n => { + logs.push(n.params); + }); + + await using _ = await wire(transport, makeServer, client); + + // Wait for the server→client channel to be up (on streamable HTTP the standalone GET + // stream opens asynchronously after connect); the probe notification is re-sent until seen. + await vi.waitFor( + async () => { + await server.server.sendLoggingMessage({ level: 'info', data: 'channel-probe' }); + expect(logs.length).toBeGreaterThan(0); + }, + { timeout: 5000, interval: 50 } + ); + + // No client request is in flight: ping is exempt from the request-association rule in both directions. + await expect(server.server.ping()).resolves.toEqual({}); + await expect(client.ping()).resolves.toEqual({}); +});