From 5426aa8bc5e8d265d3d6df22b8713bd9faede70b Mon Sep 17 00:00:00 2001 From: cyphercodes Date: Mon, 4 May 2026 06:58:04 +0300 Subject: [PATCH] fix(everything): replay streamable HTTP events by stream --- .../__tests__/inMemoryEventStore.test.ts | 86 +++++++++++++++++++ .../transports/inMemoryEventStore.ts | 62 +++++++++++++ src/everything/transports/streamableHttp.ts | 35 +------- 3 files changed, 150 insertions(+), 33 deletions(-) create mode 100644 src/everything/__tests__/inMemoryEventStore.test.ts create mode 100644 src/everything/transports/inMemoryEventStore.ts diff --git a/src/everything/__tests__/inMemoryEventStore.test.ts b/src/everything/__tests__/inMemoryEventStore.test.ts new file mode 100644 index 0000000000..dc06e15c11 --- /dev/null +++ b/src/everything/__tests__/inMemoryEventStore.test.ts @@ -0,0 +1,86 @@ +import { describe, it, expect } from "vitest"; +import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; +import { InMemoryEventStore } from "../transports/inMemoryEventStore.js"; + +const message = (method: string): JSONRPCMessage => ({ + jsonrpc: "2.0", + method, +}); + +describe("InMemoryEventStore", () => { + it("replays only events from the same stream after the requested event", async () => { + const eventStore = new InMemoryEventStore(); + const firstStreamEvent = await eventStore.storeEvent( + "stream-a", + message("notifications/stream-a/first") + ); + const otherStreamEvent = await eventStore.storeEvent( + "stream-b", + message("notifications/stream-b/first") + ); + const secondStreamEvent = await eventStore.storeEvent( + "stream-a", + message("notifications/stream-a/second") + ); + await eventStore.storeEvent( + "stream-b", + message("notifications/stream-b/second") + ); + + const replayedEvents: Array<{ eventId: string; message: JSONRPCMessage }> = + []; + const replayedStreamId = await eventStore.replayEventsAfter( + firstStreamEvent, + { + send: async (eventId, replayedMessage) => { + replayedEvents.push({ eventId, message: replayedMessage }); + }, + } + ); + + expect(replayedStreamId).toBe("stream-a"); + expect(replayedEvents).toEqual([ + { + eventId: secondStreamEvent, + message: message("notifications/stream-a/second"), + }, + ]); + expect(replayedEvents.map(({ eventId }) => eventId)).not.toContain( + otherStreamEvent + ); + }); + + it("returns undefined without replaying events for unknown event ids", async () => { + const eventStore = new InMemoryEventStore(); + await eventStore.storeEvent("stream-a", message("notifications/stream-a")); + + const replayedEvents: Array<{ eventId: string; message: JSONRPCMessage }> = + []; + const replayedStreamId = await eventStore.replayEventsAfter( + "unknown-event-id", + { + send: async (eventId, replayedMessage) => { + replayedEvents.push({ eventId, message: replayedMessage }); + }, + } + ); + + expect(replayedStreamId).toBeUndefined(); + expect(replayedEvents).toEqual([]); + }); + + it("looks up the stream id for a stored event id", async () => { + const eventStore = new InMemoryEventStore(); + const eventId = await eventStore.storeEvent( + "stream-a", + message("notifications/stream-a") + ); + + await expect(eventStore.getStreamIdForEventId(eventId)).resolves.toBe( + "stream-a" + ); + await expect( + eventStore.getStreamIdForEventId("unknown-event-id") + ).resolves.toBeUndefined(); + }); +}); diff --git a/src/everything/transports/inMemoryEventStore.ts b/src/everything/transports/inMemoryEventStore.ts new file mode 100644 index 0000000000..48bf99106e --- /dev/null +++ b/src/everything/transports/inMemoryEventStore.ts @@ -0,0 +1,62 @@ +import { randomUUID } from "node:crypto"; +import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js"; +import type { + EventId, + EventStore, + StreamId, +} from "@modelcontextprotocol/sdk/server/streamableHttp.js"; + +// Simple in-memory event store for SSE resumability. +// Primarily intended for examples and testing, not production use. +export class InMemoryEventStore implements EventStore { + private events: Map< + EventId, + { streamId: StreamId; message: JSONRPCMessage } + > = new Map(); + + async storeEvent( + streamId: StreamId, + message: JSONRPCMessage + ): Promise { + const eventId = randomUUID(); + this.events.set(eventId, { streamId, message }); + return eventId; + } + + async getStreamIdForEventId(eventId: EventId): Promise { + return this.events.get(eventId)?.streamId; + } + + async replayEventsAfter( + lastEventId: EventId, + { + send, + }: { send: (eventId: EventId, message: JSONRPCMessage) => Promise } + ): Promise { + const lastEvent = this.events.get(lastEventId); + if (!lastEvent) { + // The SDK type currently requires a StreamId, but unknown event IDs have + // no stream to resume. Return undefined at runtime to match the intended + // EventStore semantics and Python API behavior. + return undefined as unknown as StreamId; + } + + const { streamId } = lastEvent; + let foundLastEvent = false; + + for (const [eventId, event] of this.events) { + if (eventId === lastEventId) { + foundLastEvent = true; + continue; + } + + if (!foundLastEvent || event.streamId !== streamId) { + continue; + } + + await send(eventId, event.message); + } + + return streamId; + } +} diff --git a/src/everything/transports/streamableHttp.ts b/src/everything/transports/streamableHttp.ts index 2e79abc554..be94a4162f 100644 --- a/src/everything/transports/streamableHttp.ts +++ b/src/everything/transports/streamableHttp.ts @@ -1,40 +1,9 @@ -import { - StreamableHTTPServerTransport, - EventStore, -} from "@modelcontextprotocol/sdk/server/streamableHttp.js"; +import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js"; import express, { Request, Response } from "express"; import { createServer } from "../server/index.js"; import { randomUUID } from "node:crypto"; import cors from "cors"; - -// Simple in-memory event store for SSE resumability -class InMemoryEventStore implements EventStore { - private events: Map = - new Map(); - - async storeEvent(streamId: string, message: unknown): Promise { - const eventId = randomUUID(); - this.events.set(eventId, { streamId, message }); - return eventId; - } - - async replayEventsAfter( - lastEventId: string, - { send }: { send: (eventId: string, message: unknown) => Promise } - ): Promise { - const entries = Array.from(this.events.entries()); - const startIndex = entries.findIndex(([id]) => id === lastEventId); - if (startIndex === -1) return lastEventId; - - let lastId: string = lastEventId; - for (let i = startIndex + 1; i < entries.length; i++) { - const [eventId, { message }] = entries[i]; - await send(eventId, message); - lastId = eventId; - } - return lastId; - } -} +import { InMemoryEventStore } from "./inMemoryEventStore.js"; console.log("Starting Streamable HTTP server...");