Skip to content

Commit 2c87708

Browse files
feat: add idle timeout for StreamableHTTP sessions
Add a session_idle_timeout parameter to StreamableHTTPSessionManager that enables automatic cleanup of idle sessions, fixing the memory leak described in #1283. Uses a CancelScope with a deadline inside each session's run_server task. When the deadline passes, app.run() is cancelled and the session cleans up. Incoming requests push the deadline forward. No background tasks needed — each session manages its own lifecycle. - terminate() on the transport is now idempotent - Effective timeout accounts for retry_interval - Default is None (no timeout, fully backwards compatible) Github-Issue: #1283
1 parent bac2789 commit 2c87708

File tree

3 files changed

+346
-7
lines changed

3 files changed

+346
-7
lines changed

src/mcp/server/streamable_http.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@ def __init__(
180180
] = {}
181181
self._sse_stream_writers: dict[RequestId, MemoryObjectSendStream[dict[str, str]]] = {}
182182
self._terminated = False
183+
# Idle timeout cancel scope; managed by the session manager.
184+
self.idle_scope: anyio.CancelScope | None = None
183185

184186
@property
185187
def is_terminated(self) -> bool:
@@ -773,8 +775,12 @@ async def terminate(self) -> None:
773775
"""Terminate the current session, closing all streams.
774776
775777
Once terminated, all requests with this session ID will receive 404 Not Found.
778+
Calling this method multiple times is safe (idempotent).
776779
"""
777780

781+
if self._terminated:
782+
return
783+
778784
self._terminated = True
779785
logger.info(f"Terminating session: {self.mcp_session_id}")
780786

src/mcp/server/streamable_http_manager.py

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class StreamableHTTPSessionManager:
3838
2. Resumability via an optional event store
3939
3. Connection management and lifecycle
4040
4. Request handling and transport setup
41+
5. Idle session cleanup via optional timeout
4142
4243
Important: Only one StreamableHTTPSessionManager instance should be created
4344
per application. The instance cannot be reused after its run() context has
@@ -55,6 +56,15 @@ class StreamableHTTPSessionManager:
5556
security_settings: Optional transport security settings.
5657
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
5758
retry field. Used for SSE polling behavior.
59+
session_idle_timeout: Optional idle timeout in seconds for stateful sessions.
60+
If set, sessions that receive no HTTP requests for this
61+
duration will be automatically terminated and removed.
62+
When retry_interval is also set, the effective idle
63+
threshold is at least ``retry_interval / 1000 * 3`` to
64+
avoid prematurely reaping sessions that are simply
65+
waiting for SSE polling reconnections. Default is None
66+
(no timeout). A value of 1800 (30 minutes) is
67+
recommended for most deployments.
5868
"""
5969

6070
def __init__(
@@ -65,13 +75,20 @@ def __init__(
6575
stateless: bool = False,
6676
security_settings: TransportSecuritySettings | None = None,
6777
retry_interval: int | None = None,
78+
session_idle_timeout: float | None = None,
6879
):
80+
if session_idle_timeout is not None and session_idle_timeout <= 0:
81+
raise ValueError("session_idle_timeout must be a positive number of seconds")
82+
if stateless and session_idle_timeout is not None:
83+
raise ValueError("session_idle_timeout is not supported in stateless mode")
84+
6985
self.app = app
7086
self.event_store = event_store
7187
self.json_response = json_response
7288
self.stateless = stateless
7389
self.security_settings = security_settings
7490
self.retry_interval = retry_interval
91+
self.session_idle_timeout = session_idle_timeout
7592

7693
# Session tracking (only used if not stateless)
7794
self._session_creation_lock = anyio.Lock()
@@ -114,6 +131,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
114131
# Store the task group for later use
115132
self._task_group = tg
116133
logger.info("StreamableHTTP session manager started")
134+
117135
try:
118136
yield # Let the application run
119137
finally:
@@ -219,6 +237,9 @@ async def _handle_stateful_request(
219237
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
220238
transport = self._server_instances[request_mcp_session_id]
221239
logger.debug("Session already exists, handling request directly")
240+
# Push back idle deadline on activity
241+
if transport.idle_scope is not None:
242+
transport.idle_scope.deadline = anyio.current_time() + self._effective_idle_timeout()
222243
await transport.handle_request(scope, receive, send)
223244
return
224245

@@ -245,19 +266,36 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
245266
read_stream, write_stream = streams
246267
task_status.started()
247268
try:
248-
await self.app.run(
249-
read_stream,
250-
write_stream,
251-
self.app.create_initialization_options(),
252-
stateless=False, # Stateful mode
253-
)
269+
# Use a cancel scope for idle timeout — when the
270+
# deadline passes the scope cancels app.run() and
271+
# execution continues after the ``with`` block.
272+
# Incoming requests push the deadline forward.
273+
idle_scope = anyio.CancelScope()
274+
if self.session_idle_timeout is not None:
275+
timeout = self._effective_idle_timeout()
276+
idle_scope.deadline = anyio.current_time() + timeout
277+
http_transport.idle_scope = idle_scope
278+
279+
with idle_scope:
280+
await self.app.run(
281+
read_stream,
282+
write_stream,
283+
self.app.create_initialization_options(),
284+
stateless=False,
285+
)
286+
287+
if idle_scope.cancelled_caught:
288+
session_id = http_transport.mcp_session_id
289+
logger.info(f"Session {session_id} idle timeout")
290+
if session_id is not None: # pragma: no branch
291+
self._server_instances.pop(session_id, None)
292+
await http_transport.terminate()
254293
except Exception as e:
255294
logger.error(
256295
f"Session {http_transport.mcp_session_id} crashed: {e}",
257296
exc_info=True,
258297
)
259298
finally:
260-
# Only remove from instances if not terminated
261299
if ( # pragma: no branch
262300
http_transport.mcp_session_id
263301
and http_transport.mcp_session_id in self._server_instances
@@ -295,3 +333,12 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
295333
media_type="application/json",
296334
)
297335
await response(scope, receive, send)
336+
337+
def _effective_idle_timeout(self) -> float:
338+
"""Compute the effective idle timeout, accounting for retry_interval."""
339+
assert self.session_idle_timeout is not None
340+
timeout = self.session_idle_timeout
341+
if self.retry_interval is not None:
342+
retry_seconds = self.retry_interval / 1000.0
343+
timeout = max(timeout, retry_seconds * 3)
344+
return timeout

0 commit comments

Comments
 (0)