Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions src/everything/__tests__/inMemoryEventStore.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { describe, it, expect } from "vitest";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
import { InMemoryEventStore } from "../transports/inMemoryEventStore.js";

const message = (method: string): JSONRPCMessage => ({
jsonrpc: "2.0",
method,
});

describe("InMemoryEventStore", () => {
it("replays only events from the same stream after the requested event", async () => {
const eventStore = new InMemoryEventStore();
const firstStreamEvent = await eventStore.storeEvent(
"stream-a",
message("notifications/stream-a/first")
);
const otherStreamEvent = await eventStore.storeEvent(
"stream-b",
message("notifications/stream-b/first")
);
const secondStreamEvent = await eventStore.storeEvent(
"stream-a",
message("notifications/stream-a/second")
);
await eventStore.storeEvent(
"stream-b",
message("notifications/stream-b/second")
);

const replayedEvents: Array<{ eventId: string; message: JSONRPCMessage }> =
[];
const replayedStreamId = await eventStore.replayEventsAfter(
firstStreamEvent,
{
send: async (eventId, replayedMessage) => {
replayedEvents.push({ eventId, message: replayedMessage });
},
}
);

expect(replayedStreamId).toBe("stream-a");
expect(replayedEvents).toEqual([
{
eventId: secondStreamEvent,
message: message("notifications/stream-a/second"),
},
]);
expect(replayedEvents.map(({ eventId }) => eventId)).not.toContain(
otherStreamEvent
);
});

it("returns undefined without replaying events for unknown event ids", async () => {
const eventStore = new InMemoryEventStore();
await eventStore.storeEvent("stream-a", message("notifications/stream-a"));

const replayedEvents: Array<{ eventId: string; message: JSONRPCMessage }> =
[];
const replayedStreamId = await eventStore.replayEventsAfter(
"unknown-event-id",
{
send: async (eventId, replayedMessage) => {
replayedEvents.push({ eventId, message: replayedMessage });
},
}
);

expect(replayedStreamId).toBeUndefined();
expect(replayedEvents).toEqual([]);
});

it("looks up the stream id for a stored event id", async () => {
const eventStore = new InMemoryEventStore();
const eventId = await eventStore.storeEvent(
"stream-a",
message("notifications/stream-a")
);

await expect(eventStore.getStreamIdForEventId(eventId)).resolves.toBe(
"stream-a"
);
await expect(
eventStore.getStreamIdForEventId("unknown-event-id")
).resolves.toBeUndefined();
});
});
62 changes: 62 additions & 0 deletions src/everything/transports/inMemoryEventStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { randomUUID } from "node:crypto";
import type { JSONRPCMessage } from "@modelcontextprotocol/sdk/types.js";
import type {
EventId,
EventStore,
StreamId,
} from "@modelcontextprotocol/sdk/server/streamableHttp.js";

// Simple in-memory event store for SSE resumability.
// Primarily intended for examples and testing, not production use.
export class InMemoryEventStore implements EventStore {
private events: Map<
EventId,
{ streamId: StreamId; message: JSONRPCMessage }
> = new Map();

async storeEvent(
streamId: StreamId,
message: JSONRPCMessage
): Promise<EventId> {
const eventId = randomUUID();
this.events.set(eventId, { streamId, message });
return eventId;
}

async getStreamIdForEventId(eventId: EventId): Promise<StreamId | undefined> {
return this.events.get(eventId)?.streamId;
}

async replayEventsAfter(
lastEventId: EventId,
{
send,
}: { send: (eventId: EventId, message: JSONRPCMessage) => Promise<void> }
): Promise<StreamId> {
const lastEvent = this.events.get(lastEventId);
if (!lastEvent) {
// The SDK type currently requires a StreamId, but unknown event IDs have
// no stream to resume. Return undefined at runtime to match the intended
// EventStore semantics and Python API behavior.
return undefined as unknown as StreamId;
}

const { streamId } = lastEvent;
let foundLastEvent = false;

for (const [eventId, event] of this.events) {
if (eventId === lastEventId) {
foundLastEvent = true;
continue;
}

if (!foundLastEvent || event.streamId !== streamId) {
continue;
}

await send(eventId, event.message);
}

return streamId;
}
}
35 changes: 2 additions & 33 deletions src/everything/transports/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,9 @@
import {
StreamableHTTPServerTransport,
EventStore,
} from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import express, { Request, Response } from "express";
import { createServer } from "../server/index.js";
import { randomUUID } from "node:crypto";
import cors from "cors";

// Simple in-memory event store for SSE resumability
class InMemoryEventStore implements EventStore {
private events: Map<string, { streamId: string; message: unknown }> =
new Map();

async storeEvent(streamId: string, message: unknown): Promise<string> {
const eventId = randomUUID();
this.events.set(eventId, { streamId, message });
return eventId;
}

async replayEventsAfter(
lastEventId: string,
{ send }: { send: (eventId: string, message: unknown) => Promise<void> }
): Promise<string> {
const entries = Array.from(this.events.entries());
const startIndex = entries.findIndex(([id]) => id === lastEventId);
if (startIndex === -1) return lastEventId;

let lastId: string = lastEventId;
for (let i = startIndex + 1; i < entries.length; i++) {
const [eventId, { message }] = entries[i];
await send(eventId, message);
lastId = eventId;
}
return lastId;
}
}
import { InMemoryEventStore } from "./inMemoryEventStore.js";

console.log("Starting Streamable HTTP server...");

Expand Down
Loading