forked from modelcontextprotocol/typescript-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimpleSseServer.ts
More file actions
174 lines (151 loc) · 5.67 KB
/
simpleSseServer.ts
File metadata and controls
174 lines (151 loc) · 5.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import { Request, Response } from 'express';
import { McpServer } from '../../server/mcp.js';
import { SSEServerTransport } from '../../server/sse.js';
import * as z from 'zod/v4';
import { CallToolResult } from '../../types.js';
import { createMcpExpressApp } from '../../server/express.js';
/**
* This example server demonstrates the deprecated HTTP+SSE transport
* (protocol version 2024-11-05). It mainly used for testing backward compatible clients.
*
* The server exposes two endpoints:
* - /mcp: For establishing the SSE stream (GET)
* - /messages: For receiving client messages (POST)
*
*/
// Create an MCP server instance
const getServer = () => {
const server = new McpServer(
{
name: 'simple-sse-server',
version: '1.0.0'
},
{ capabilities: { logging: {} } }
);
server.tool(
'start-notification-stream',
'Starts sending periodic notifications',
{
interval: z.number().describe('Interval in milliseconds between notifications').default(1000),
count: z.number().describe('Number of notifications to send').default(10)
},
async ({ interval, count }, extra): Promise<CallToolResult> => {
const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms));
let counter = 0;
// Send the initial notification
await server.sendLoggingMessage(
{
level: 'info',
data: `Starting notification stream with ${count} messages every ${interval}ms`
},
extra.sessionId
);
// Send periodic notifications
while (counter < count) {
counter++;
await sleep(interval);
try {
await server.sendLoggingMessage(
{
level: 'info',
data: `Notification #${counter} at ${new Date().toISOString()}`
},
extra.sessionId
);
} catch (error) {
console.error('Error sending notification:', error);
}
}
return {
content: [
{
type: 'text',
text: `Completed sending ${count} notifications every ${interval}ms`
}
]
};
}
);
return server;
};
const app = createMcpExpressApp();
// Store transports by session ID
const transports: Record<string, SSEServerTransport> = {};
// SSE endpoint for establishing the stream
app.get('/mcp', async (req: Request, res: Response) => {
console.log('Received GET request to /sse (establishing SSE stream)');
try {
// Create a new SSE transport for the client
// The endpoint for POST messages is '/messages'
const transport = new SSEServerTransport('/messages', res);
// Store the transport by session ID
const sessionId = transport.sessionId;
transports[sessionId] = transport;
// Set up onclose handler to clean up transport when closed
transport.onclose = () => {
console.log(`SSE transport closed for session ${sessionId}`);
delete transports[sessionId];
};
// Connect the transport to the MCP server
const server = getServer();
await server.connect(transport);
console.log(`Established SSE stream with session ID: ${sessionId}`);
} catch (error) {
console.error('Error establishing SSE stream:', error);
if (!res.headersSent) {
res.status(500).send('Error establishing SSE stream');
}
}
});
// Messages endpoint for receiving client JSON-RPC requests
app.post('/messages', async (req: Request, res: Response) => {
console.log('Received POST request to /messages');
// Extract session ID from URL query parameter
// In the SSE protocol, this is added by the client based on the endpoint event
const sessionId = req.query.sessionId as string | undefined;
if (!sessionId) {
console.error('No session ID provided in request URL');
res.status(400).send('Missing sessionId parameter');
return;
}
const transport = transports[sessionId];
if (!transport) {
console.error(`No active transport found for session ID: ${sessionId}`);
res.status(404).send('Session not found');
return;
}
try {
// Handle the POST message with the transport
await transport.handlePostMessage(req, res, req.body);
} catch (error) {
console.error('Error handling request:', error);
if (!res.headersSent) {
res.status(500).send('Error handling request');
}
}
});
// Start the server
const PORT = 3000;
app.listen(PORT, error => {
if (error) {
console.error('Failed to start server:', error);
process.exit(1);
}
console.log(`Simple SSE Server (deprecated protocol version 2024-11-05) listening on port ${PORT}`);
});
// Handle server shutdown
process.on('SIGINT', async () => {
console.log('Shutting down server...');
// Close all active transports to properly clean up resources
for (const sessionId in transports) {
try {
console.log(`Closing transport for session ${sessionId}`);
await transports[sessionId].close();
delete transports[sessionId];
} catch (error) {
console.error(`Error closing transport for session ${sessionId}:`, error);
}
}
console.log('Server shutdown complete');
process.exit(0);
});