-
Notifications
You must be signed in to change notification settings - Fork 28
Expand file tree
/
Copy pathshttp.ts
More file actions
186 lines (167 loc) · 6.13 KB
/
shttp.ts
File metadata and controls
186 lines (167 loc) · 6.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import { AuthInfo } from "@modelcontextprotocol/sdk/server/auth/types.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
import { Request, Response } from "express";
import { getShttpTransport, isSessionOwnedBy, redisRelayToMcpServer, ServerRedisTransport, setSessionOwner, shutdownSession } from "../services/redisTransport.js";
import { isInitializeRequest } from "@modelcontextprotocol/sdk/types.js";
import { randomUUID } from "crypto";
import { createMcpServer } from "../services/mcp.js";
import { logger } from "../../shared/logger.js";
declare module "express-serve-static-core" {
interface Request {
/**
* Information about the validated access token, if the `requireBearerAuth` middleware was used.
*/
auth?: AuthInfo;
}
}
function getUserIdFromAuth(auth?: AuthInfo): string | null {
return auth?.extra?.userId as string || null;
}
// TODO: Document Streamable HTTP implementation choices:
// 1. STATEFUL: Requires clients to initialize sessions and track session IDs
// - First request must be 'initialize' without Mcp-Session-Id header
// - Server returns session ID, client must include it in subsequent requests
// - Alternative: Could implement STATELESS mode (each request independent)
// 2. SSE RESPONSES: Returns results via Server-Sent Events stream, not JSON responses
// - Requires Accept: application/json, text/event-stream header
// - Responses formatted as: event: message\ndata: {...}
// - Alternative: Could use JSON response mode (check StreamableHTTPServerTransport options)
export async function handleStreamableHTTP(req: Request, res: Response) {
let shttpTransport: StreamableHTTPServerTransport | undefined = undefined;
res.on('finish', async () => {
await shttpTransport?.close();
});
const onsessionclosed = async (sessionId: string) => {
logger.info('Session closed callback triggered', {
sessionId,
userId: getUserIdFromAuth(req.auth)
});
await shutdownSession(sessionId);
}
try {
// Check for existing session ID
const sessionId = req.headers['mcp-session-id'] as string | undefined;
const userId = getUserIdFromAuth(req.auth);
logger.debug('SHTTP request received', {
method: req.method,
sessionId,
userId,
hasAuth: !!req.auth,
authExtra: req.auth?.extra
});
// if no userid, return 401, we shouldn't get here ideally
if (!userId) {
logger.warning('Request without user ID', {
sessionId,
hasAuth: !!req.auth
});
res.status(401).json({
"jsonrpc": "2.0",
"id": null,
"error": {
"code": -32002,
"message": "User ID required"
}
});
return;
}
const isGetRequest = req.method === 'GET';
// incorrect session for the authed user, return 401
if (sessionId) {
if (!(await isSessionOwnedBy(sessionId, userId))) {
logger.warning('Session ownership mismatch', {
sessionId,
userId,
requestMethod: req.method
});
res.status(401).json({
"jsonrpc": "2.0",
"id": null,
"error": {
"code": -32001,
"message": "Session not found or access denied"
}
});
return;
}
// Reuse existing transport for owned session
logger.info('Reusing existing session', {
sessionId,
userId,
isGetRequest
});
shttpTransport = await getShttpTransport(sessionId, onsessionclosed, isGetRequest);
} else if (isInitializeRequest(req.body)) {
// New initialization request - use JSON response mode
const initParams = req.body?.params;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const capabilities = initParams?.capabilities as Record<string, any> | undefined;
logger.info('=== MCP INITIALIZE REQUEST ===', {
userId,
clientInfo: initParams?.clientInfo,
protocolVersion: initParams?.protocolVersion,
capabilities,
hasExtensions: !!capabilities?.extensions,
extensions: capabilities?.extensions,
});
const onsessioninitialized = async (sessionId: string) => {
logger.info('Initializing new session', {
sessionId,
userId
});
const { server, cleanup: mcpCleanup } = createMcpServer();
const serverRedisTransport = new ServerRedisTransport(sessionId);
serverRedisTransport.onclose = mcpCleanup;
await server.connect(serverRedisTransport)
// Set session ownership
await setSessionOwner(sessionId, userId);
logger.info('Session initialized successfully', {
sessionId,
userId
});
}
const newSessionId = randomUUID();
shttpTransport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => newSessionId,
onsessionclosed,
onsessioninitialized,
});
shttpTransport.onclose = await redisRelayToMcpServer(newSessionId, shttpTransport);
} else {
// Invalid request - no session ID and not initialization request
logger.warning('Invalid request: no session ID and not initialization', {
hasSessionId: !!sessionId,
isInitRequest: false,
userId,
method: req.method
});
res.status(400).json({
"jsonrpc": "2.0",
"id": null,
"error": {
"code": -32600,
"message": "Invalid request method for existing session"
}
});
return;
}
// Handle the request with existing transport - no need to reconnect
await shttpTransport.handleRequest(req, res, req.body);
} catch (error) {
logger.error('Error handling MCP request', error as Error, {
sessionId: req.headers['mcp-session-id'] as string | undefined,
method: req.method,
userId: getUserIdFromAuth(req.auth)
});
if (!res.headersSent) {
res.status(500).json({
"jsonrpc": "2.0",
"id": null,
"error": {
"code": -32603,
"message": "Internal error during request processing"
}
});
}
}
}