-
Notifications
You must be signed in to change notification settings - Fork 10.6k
Expand file tree
/
Copy pathstreamableHttp.ts
More file actions
240 lines (215 loc) · 7.34 KB
/
streamableHttp.ts
File metadata and controls
240 lines (215 loc) · 7.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import {
StreamableHTTPServerTransport,
EventStore,
} from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import express, { Request, Response } from "express";
import { createServer } from "../server/index.js";
import { randomUUID } from "node:crypto";
import cors from "cors";
// Simple in-memory event store for SSE resumability
class InMemoryEventStore implements EventStore {
private events: Map<string, { streamId: string; message: unknown }> =
new Map();
async storeEvent(streamId: string, message: unknown): Promise<string> {
const eventId = randomUUID();
this.events.set(eventId, { streamId, message });
return eventId;
}
async replayEventsAfter(
lastEventId: string,
{ send }: { send: (eventId: string, message: unknown) => Promise<void> }
): Promise<string> {
const entries = Array.from(this.events.entries());
const startIndex = entries.findIndex(([id]) => id === lastEventId);
if (startIndex === -1) return lastEventId;
let lastId: string = lastEventId;
for (let i = startIndex + 1; i < entries.length; i++) {
const [eventId, { message }] = entries[i];
await send(eventId, message);
lastId = eventId;
}
return lastId;
}
}
console.log("Starting Streamable HTTP server...");
// Express app with permissive CORS for testing with Inspector direct connect mode
const app = express();
app.use(
cors({
origin: "*", // use "*" with caution in production
methods: "GET,POST,DELETE",
preflightContinue: false,
optionsSuccessStatus: 204,
exposedHeaders: ["mcp-session-id", "last-event-id", "mcp-protocol-version"],
})
);
// Map sessionId to server transport for each client
const transports: Map<string, StreamableHTTPServerTransport> = new Map<
string,
StreamableHTTPServerTransport
>();
// Handle POST requests for client messages
app.post("/mcp", async (req: Request, res: Response) => {
console.log("Received MCP POST request");
try {
// Check for existing session ID
const sessionId = req.headers["mcp-session-id"] as string | undefined;
let transport: StreamableHTTPServerTransport;
if (sessionId && transports.has(sessionId)) {
// Reuse existing transport
transport = transports.get(sessionId)!;
} else if (!sessionId) {
const { server, cleanup } = createServer();
// New initialization request
const eventStore = new InMemoryEventStore();
transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => randomUUID(),
eventStore, // Enable resumability
onsessioninitialized: (sessionId: string) => {
// Store the transport by session ID when a session is initialized
// This avoids race conditions where requests might come in before the session is stored
console.log(`Session initialized with ID: ${sessionId}`);
transports.set(sessionId, transport);
},
});
// Set up onclose handler to clean up transport when closed
server.server.onclose = async () => {
const sid = transport.sessionId;
if (sid && transports.has(sid)) {
console.log(
`Transport closed for session ${sid}, removing from transports map`
);
transports.delete(sid);
cleanup(sid);
}
};
// Connect the transport to the MCP server BEFORE handling the request
// so responses can flow back through the same transport
await server.connect(transport);
await transport.handleRequest(req, res);
return;
} else {
// Invalid request - no session ID or not initialization request
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: No valid session ID provided",
},
id: req?.body?.id,
});
return;
}
// Handle the request with existing transport - no need to reconnect
// The existing transport is already connected to the server
await transport.handleRequest(req, res);
} catch (error) {
console.log("Error handling MCP request:", error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: "2.0",
error: {
code: -32603,
message: "Internal server error",
},
id: req?.body?.id,
});
return;
}
}
});
// Handle GET requests for SSE streams
app.get("/mcp", async (req: Request, res: Response) => {
console.log("Received MCP GET request");
const sessionId = req.headers["mcp-session-id"] as string | undefined;
if (!sessionId || !transports.has(sessionId)) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: No valid session ID provided",
},
id: req?.body?.id,
});
return;
}
// Check for Last-Event-ID header for resumability
const lastEventId = req.headers["last-event-id"] as string | undefined;
if (lastEventId) {
console.log(`Client reconnecting with Last-Event-ID: ${lastEventId}`);
} else {
console.log(`Establishing new SSE stream for session ${sessionId}`);
}
const transport = transports.get(sessionId);
await transport!.handleRequest(req, res);
});
// Handle DELETE requests for session termination
app.delete("/mcp", async (req: Request, res: Response) => {
const sessionId = req.headers["mcp-session-id"] as string | undefined;
if (!sessionId || !transports.has(sessionId)) {
res.status(400).json({
jsonrpc: "2.0",
error: {
code: -32000,
message: "Bad Request: No valid session ID provided",
},
id: req?.body?.id,
});
return;
}
console.log(`Received session termination request for session ${sessionId}`);
try {
const transport = transports.get(sessionId);
await transport!.handleRequest(req, res);
} catch (error) {
console.log("Error handling session termination:", error);
if (!res.headersSent) {
res.status(500).json({
jsonrpc: "2.0",
error: {
code: -32603,
message: "Error handling session termination",
},
id: req?.body?.id,
});
return;
}
}
});
// Start the server
const PORT = process.env.PORT || 3001;
const server = app.listen(PORT, () => {
console.error(`MCP Streamable HTTP Server listening on port ${PORT}`);
});
// Handle server errors
server.on("error", (err: unknown) => {
const code =
typeof err === "object" && err !== null && "code" in err
? (err as { code?: unknown }).code
: undefined;
if (code === "EADDRINUSE") {
console.error(
`Failed to start: Port ${PORT} is already in use. Set PORT to a free port or stop the conflicting process.`
);
} else {
console.error("HTTP server encountered an error while starting:", err);
}
// Ensure a non-zero exit so npm reports the failure instead of silently exiting
process.exit(1);
});
// Handle server shutdown
process.on("SIGINT", async () => {
console.log("Shutting down server...");
// Close all active transports to properly clean up resources
for (const [sessionId, transport] of transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transport.close();
transports.delete(sessionId);
} catch (error) {
console.log(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log("Server shutdown complete");
process.exit(0);
});