Skip to content

Commit 73f4cf2

Browse files
feat: add idle timeout for StreamableHTTP sessions
Add a `session_idle_timeout` parameter to StreamableHTTPSessionManager that spawns a background reaper task to periodically terminate and clean up sessions that have been idle beyond the configured threshold. - Background reaper scans sessions at half the timeout interval - Accounts for retry_interval to avoid reaping sessions between polling reconnections - Makes transport.terminate() idempotent for safe cleanup - Validates that session_idle_timeout is not used with stateless mode - Reaper removes sessions from tracking dicts before calling terminate() to prevent race conditions - Includes comprehensive test coverage including lifecycle verification Fixes #1283
1 parent bac2789 commit 73f4cf2

File tree

3 files changed

+510
-0
lines changed

3 files changed

+510
-0
lines changed

src/mcp/server/streamable_http.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,8 +773,12 @@ async def terminate(self) -> None:
773773
"""Terminate the current session, closing all streams.
774774
775775
Once terminated, all requests with this session ID will receive 404 Not Found.
776+
Calling this method multiple times is safe (idempotent).
776777
"""
777778

779+
if self._terminated:
780+
return
781+
778782
self._terminated = True
779783
logger.info(f"Terminating session: {self.mcp_session_id}")
780784

src/mcp/server/streamable_http_manager.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class StreamableHTTPSessionManager:
3838
2. Resumability via an optional event store
3939
3. Connection management and lifecycle
4040
4. Request handling and transport setup
41+
5. Idle session cleanup via optional timeout
4142
4243
Important: Only one StreamableHTTPSessionManager instance should be created
4344
per application. The instance cannot be reused after its run() context has
@@ -55,6 +56,22 @@ class StreamableHTTPSessionManager:
5556
security_settings: Optional transport security settings.
5657
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
5758
retry field. Used for SSE polling behavior.
59+
session_idle_timeout: Optional idle timeout in seconds for stateful sessions.
60+
If set, sessions that receive no HTTP requests for this
61+
duration will be automatically terminated and removed.
62+
When retry_interval is also set, the effective idle
63+
threshold is at least ``retry_interval / 1000 * 3`` to
64+
avoid prematurely reaping sessions that are simply
65+
waiting for SSE polling reconnections. Default is None
66+
(no timeout). A value of 1800 (30 minutes) is
67+
recommended for most deployments.
68+
69+
Note: The idle timer is based on incoming HTTP requests
70+
(POST, GET, DELETE), not on whether SSE connections are
71+
open. If clients maintain long-lived GET SSE streams
72+
without sending other requests, set this value higher
73+
than the longest expected SSE connection lifetime to
74+
avoid premature reaping.
5875
"""
5976

6077
def __init__(
@@ -65,17 +82,25 @@ def __init__(
6582
stateless: bool = False,
6683
security_settings: TransportSecuritySettings | None = None,
6784
retry_interval: int | None = None,
85+
session_idle_timeout: float | None = None,
6886
):
87+
if session_idle_timeout is not None and session_idle_timeout <= 0:
88+
raise ValueError("session_idle_timeout must be a positive number of seconds")
89+
if stateless and session_idle_timeout is not None:
90+
raise ValueError("session_idle_timeout is not supported in stateless mode")
91+
6992
self.app = app
7093
self.event_store = event_store
7194
self.json_response = json_response
7295
self.stateless = stateless
7396
self.security_settings = security_settings
7497
self.retry_interval = retry_interval
98+
self.session_idle_timeout = session_idle_timeout
7599

76100
# Session tracking (only used if not stateless)
77101
self._session_creation_lock = anyio.Lock()
78102
self._server_instances: dict[str, StreamableHTTPServerTransport] = {}
103+
self._last_activity: dict[str, float] = {}
79104

80105
# The task group will be set during lifespan
81106
self._task_group = None
@@ -114,6 +139,11 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
114139
# Store the task group for later use
115140
self._task_group = tg
116141
logger.info("StreamableHTTP session manager started")
142+
143+
# Start idle session reaper if timeout is configured
144+
if self.session_idle_timeout is not None:
145+
tg.start_soon(self._idle_session_reaper)
146+
117147
try:
118148
yield # Let the application run
119149
finally:
@@ -123,6 +153,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
123153
self._task_group = None
124154
# Clear any remaining server instances
125155
self._server_instances.clear()
156+
self._last_activity.clear()
126157

127158
async def handle_request(
128159
self,
@@ -219,6 +250,8 @@ async def _handle_stateful_request(
219250
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
220251
transport = self._server_instances[request_mcp_session_id]
221252
logger.debug("Session already exists, handling request directly")
253+
# Update activity timestamp for idle timeout tracking
254+
self._last_activity[request_mcp_session_id] = anyio.current_time()
222255
await transport.handle_request(scope, receive, send)
223256
return
224257

@@ -237,6 +270,7 @@ async def _handle_stateful_request(
237270

238271
assert http_transport.mcp_session_id is not None
239272
self._server_instances[http_transport.mcp_session_id] = http_transport
273+
self._last_activity[http_transport.mcp_session_id] = anyio.current_time()
240274
logger.info(f"Created new transport with session ID: {new_session_id}")
241275

242276
# Define the server runner
@@ -269,6 +303,7 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
269303
"active instances."
270304
)
271305
del self._server_instances[http_transport.mcp_session_id]
306+
self._last_activity.pop(http_transport.mcp_session_id, None)
272307

273308
# Assert task group is not None for type checking
274309
assert self._task_group is not None
@@ -295,3 +330,43 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
295330
media_type="application/json",
296331
)
297332
await response(scope, receive, send)
333+
334+
def _effective_idle_timeout(self) -> float:
335+
"""Compute the effective idle timeout, accounting for retry_interval.
336+
337+
When SSE polling is configured via ``retry_interval`` (milliseconds),
338+
the client may legitimately go quiet between polls. The idle threshold
339+
must be large enough so that normal polling gaps don't cause premature
340+
session reaping.
341+
"""
342+
assert self.session_idle_timeout is not None
343+
timeout = self.session_idle_timeout
344+
if self.retry_interval is not None:
345+
retry_seconds = self.retry_interval / 1000.0
346+
timeout = max(timeout, retry_seconds * 3)
347+
return timeout
348+
349+
async def _idle_session_reaper(self) -> None:
350+
"""Background task that periodically terminates idle sessions."""
351+
timeout = self._effective_idle_timeout()
352+
scan_interval = min(timeout / 2, 30.0)
353+
logger.info(f"Idle session reaper started (timeout={timeout}s, scan_interval={scan_interval}s)")
354+
355+
while True:
356+
await anyio.sleep(scan_interval)
357+
now = anyio.current_time()
358+
# Snapshot keys to avoid mutation during iteration
359+
for session_id in list(self._server_instances.keys()):
360+
last = self._last_activity.get(session_id)
361+
if last is None:
362+
continue # pragma: no cover
363+
if now - last > timeout:
364+
transport = self._server_instances.get(session_id)
365+
if transport is None:
366+
continue # pragma: no cover
367+
logger.info(
368+
f"Terminating idle session {session_id} (idle for {now - last:.1f}s, timeout={timeout}s)"
369+
)
370+
self._server_instances.pop(session_id, None)
371+
self._last_activity.pop(session_id, None)
372+
await transport.terminate()

0 commit comments

Comments
 (0)