Skip to content

Commit debfd7d

Browse files
feat: add idle timeout for StreamableHTTP sessions
Add session_idle_timeout parameter to StreamableHTTPSessionManager that enables automatic cleanup of idle sessions, fixing the memory leak described in issue #1283. Key design decisions: - Idle timeout logic lives in the session manager, not the transport. The manager already owns the session lifecycle, so this is the natural place for reaping. The transport stays unaware of timeout logic. - A background reaper task scans _server_instances periodically and terminates sessions that have been idle longer than the threshold. - When retry_interval (SSE polling) is configured, the effective timeout is at least retry_interval_seconds * 3 to avoid reaping sessions that are simply between polling reconnections. - terminate() on the transport is now idempotent (early return if already terminated), making it safe to call from both the reaper and explicit DELETE requests. Based on the approach from PR #1159 by @hopeful0, reworked to: - Move idle tracking to the session manager instead of the transport - Remove __aenter__/__aexit__ context manager and condition variables - Account for retry_interval in the idle threshold - Add comprehensive tests Github-Issue: #1283 Reported-by: hopeful0
1 parent bac2789 commit debfd7d

File tree

3 files changed

+295
-0
lines changed

3 files changed

+295
-0
lines changed

src/mcp/server/streamable_http.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,8 +773,12 @@ async def terminate(self) -> None:
773773
"""Terminate the current session, closing all streams.
774774
775775
Once terminated, all requests with this session ID will receive 404 Not Found.
776+
Calling this method multiple times is safe (idempotent).
776777
"""
777778

779+
if self._terminated:
780+
return
781+
778782
self._terminated = True
779783
logger.info(f"Terminating session: {self.mcp_session_id}")
780784

src/mcp/server/streamable_http_manager.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class StreamableHTTPSessionManager:
3838
2. Resumability via an optional event store
3939
3. Connection management and lifecycle
4040
4. Request handling and transport setup
41+
5. Idle session cleanup via optional timeout
4142
4243
Important: Only one StreamableHTTPSessionManager instance should be created
4344
per application. The instance cannot be reused after its run() context has
@@ -55,6 +56,22 @@ class StreamableHTTPSessionManager:
5556
security_settings: Optional transport security settings.
5657
retry_interval: Retry interval in milliseconds to suggest to clients in SSE
5758
retry field. Used for SSE polling behavior.
59+
session_idle_timeout: Optional idle timeout in seconds for stateful sessions.
60+
If set, sessions that receive no HTTP requests for this
61+
duration will be automatically terminated and removed.
62+
When retry_interval is also set, the effective idle
63+
threshold is at least ``retry_interval_seconds * 3`` to
64+
avoid prematurely reaping sessions that are simply
65+
waiting for SSE polling reconnections. Default is None
66+
(no timeout). A value of 1800 (30 minutes) is
67+
recommended for most deployments.
68+
69+
Note: The idle timer is based on incoming HTTP requests
70+
(POST, GET, DELETE), not on whether SSE connections are
71+
open. If clients maintain long-lived GET SSE streams
72+
without sending other requests, set this value higher
73+
than the longest expected SSE connection lifetime to
74+
avoid premature reaping.
5875
"""
5976

6077
def __init__(
@@ -65,17 +82,23 @@ def __init__(
6582
stateless: bool = False,
6683
security_settings: TransportSecuritySettings | None = None,
6784
retry_interval: int | None = None,
85+
session_idle_timeout: float | None = None,
6886
):
87+
if session_idle_timeout is not None and session_idle_timeout <= 0:
88+
raise ValueError("session_idle_timeout must be a positive number of seconds")
89+
6990
self.app = app
7091
self.event_store = event_store
7192
self.json_response = json_response
7293
self.stateless = stateless
7394
self.security_settings = security_settings
7495
self.retry_interval = retry_interval
96+
self.session_idle_timeout = session_idle_timeout
7597

7698
# Session tracking (only used if not stateless)
7799
self._session_creation_lock = anyio.Lock()
78100
self._server_instances: dict[str, StreamableHTTPServerTransport] = {}
101+
self._last_activity: dict[str, float] = {}
79102

80103
# The task group will be set during lifespan
81104
self._task_group = None
@@ -114,6 +137,11 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
114137
# Store the task group for later use
115138
self._task_group = tg
116139
logger.info("StreamableHTTP session manager started")
140+
141+
# Start idle session reaper if timeout is configured
142+
if self.session_idle_timeout is not None:
143+
tg.start_soon(self._idle_session_reaper)
144+
117145
try:
118146
yield # Let the application run
119147
finally:
@@ -123,6 +151,7 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
123151
self._task_group = None
124152
# Clear any remaining server instances
125153
self._server_instances.clear()
154+
self._last_activity.clear()
126155

127156
async def handle_request(
128157
self,
@@ -219,6 +248,8 @@ async def _handle_stateful_request(
219248
if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: # pragma: no cover
220249
transport = self._server_instances[request_mcp_session_id]
221250
logger.debug("Session already exists, handling request directly")
251+
# Update activity timestamp for idle timeout tracking
252+
self._last_activity[request_mcp_session_id] = anyio.current_time()
222253
await transport.handle_request(scope, receive, send)
223254
return
224255

@@ -237,6 +268,7 @@ async def _handle_stateful_request(
237268

238269
assert http_transport.mcp_session_id is not None
239270
self._server_instances[http_transport.mcp_session_id] = http_transport
271+
self._last_activity[http_transport.mcp_session_id] = anyio.current_time()
240272
logger.info(f"Created new transport with session ID: {new_session_id}")
241273

242274
# Define the server runner
@@ -269,6 +301,7 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
269301
"active instances."
270302
)
271303
del self._server_instances[http_transport.mcp_session_id]
304+
self._last_activity.pop(http_transport.mcp_session_id, None)
272305

273306
# Assert task group is not None for type checking
274307
assert self._task_group is not None
@@ -295,3 +328,43 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE
295328
media_type="application/json",
296329
)
297330
await response(scope, receive, send)
331+
332+
def _effective_idle_timeout(self) -> float:
333+
"""Compute the effective idle timeout, accounting for retry_interval.
334+
335+
When SSE polling is configured via ``retry_interval`` (milliseconds),
336+
the client may legitimately go quiet between polls. The idle threshold
337+
must be large enough so that normal polling gaps don't cause premature
338+
session reaping.
339+
"""
340+
assert self.session_idle_timeout is not None
341+
timeout = self.session_idle_timeout
342+
if self.retry_interval is not None:
343+
retry_seconds = self.retry_interval / 1000.0
344+
timeout = max(timeout, retry_seconds * 3)
345+
return timeout
346+
347+
async def _idle_session_reaper(self) -> None:
348+
"""Background task that periodically terminates idle sessions."""
349+
timeout = self._effective_idle_timeout()
350+
scan_interval = min(timeout / 2, 30.0)
351+
logger.info(f"Idle session reaper started (timeout={timeout}s, scan_interval={scan_interval}s)")
352+
353+
while True:
354+
await anyio.sleep(scan_interval)
355+
now = anyio.current_time()
356+
# Snapshot keys to avoid mutation during iteration
357+
for session_id in list(self._server_instances.keys()):
358+
last = self._last_activity.get(session_id)
359+
if last is None:
360+
continue # pragma: no cover
361+
if now - last > timeout:
362+
transport = self._server_instances.get(session_id)
363+
if transport is None:
364+
continue # pragma: no cover
365+
logger.info(
366+
f"Terminating idle session {session_id} (idle for {now - last:.1f}s, timeout={timeout}s)"
367+
)
368+
await transport.terminate()
369+
self._server_instances.pop(session_id, None)
370+
self._last_activity.pop(session_id, None)
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
"""Test for issue #1283 - Memory leak from idle sessions never being cleaned up.
2+
3+
Without an idle timeout mechanism, sessions created via StreamableHTTPSessionManager
4+
persist indefinitely in ``_server_instances`` even after the client disconnects.
5+
Over time this leaks memory.
6+
7+
The ``session_idle_timeout`` parameter on ``StreamableHTTPSessionManager`` allows
8+
the manager to automatically terminate and remove sessions that have been idle for
9+
longer than the configured duration.
10+
"""
11+
12+
import anyio
13+
import pytest
14+
from starlette.types import Message
15+
16+
from mcp.server.lowlevel import Server
17+
from mcp.server.streamable_http import MCP_SESSION_ID_HEADER, StreamableHTTPServerTransport
18+
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
19+
20+
21+
def _make_scope() -> dict:
22+
return {
23+
"type": "http",
24+
"method": "POST",
25+
"path": "/mcp",
26+
"headers": [(b"content-type", b"application/json")],
27+
}
28+
29+
30+
async def _mock_receive() -> Message: # pragma: no cover
31+
return {"type": "http.request", "body": b"", "more_body": False}
32+
33+
34+
def _make_send(sent: list[Message]):
35+
async def mock_send(message: Message) -> None:
36+
sent.append(message)
37+
38+
return mock_send
39+
40+
41+
def _extract_session_id(sent_messages: list[Message]) -> str:
42+
for msg in sent_messages:
43+
if msg["type"] == "http.response.start":
44+
for name, value in msg.get("headers", []):
45+
if name.decode().lower() == MCP_SESSION_ID_HEADER.lower():
46+
return value.decode()
47+
raise AssertionError("Session ID not found in response headers")
48+
49+
50+
def _make_blocking_run(stop_event: anyio.Event):
51+
"""Create a mock app.run that blocks until stop_event is set."""
52+
53+
async def blocking_run(*args, **kwargs): # type: ignore[no-untyped-def]
54+
await stop_event.wait()
55+
56+
return blocking_run
57+
58+
59+
@pytest.mark.anyio
60+
async def test_idle_session_is_reaped():
61+
"""Session should be removed from _server_instances after idle timeout."""
62+
app = Server("test-idle-reap")
63+
stop = anyio.Event()
64+
app.run = _make_blocking_run(stop) # type: ignore[assignment]
65+
66+
manager = StreamableHTTPSessionManager(
67+
app=app,
68+
session_idle_timeout=0.15,
69+
)
70+
71+
async with manager.run():
72+
sent: list[Message] = []
73+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
74+
session_id = _extract_session_id(sent)
75+
76+
assert session_id in manager._server_instances
77+
78+
# Wait long enough for the reaper to fire (scan_interval = timeout/2 = 0.075s)
79+
await anyio.sleep(0.4)
80+
81+
assert session_id not in manager._server_instances
82+
assert session_id not in manager._last_activity
83+
84+
stop.set()
85+
86+
87+
@pytest.mark.anyio
88+
async def test_activity_resets_idle_timer():
89+
"""Requests during the timeout window should prevent the session from being reaped."""
90+
app = Server("test-idle-reset")
91+
stop = anyio.Event()
92+
app.run = _make_blocking_run(stop) # type: ignore[assignment]
93+
94+
manager = StreamableHTTPSessionManager(
95+
app=app,
96+
session_idle_timeout=0.3,
97+
)
98+
99+
async with manager.run():
100+
sent: list[Message] = []
101+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
102+
session_id = _extract_session_id(sent)
103+
104+
# Simulate ongoing activity by updating the activity timestamp periodically
105+
for _ in range(4):
106+
await anyio.sleep(0.1)
107+
manager._last_activity[session_id] = anyio.current_time()
108+
109+
# Session should still be alive because we kept it active
110+
assert session_id in manager._server_instances
111+
112+
# Now stop activity and let the timeout expire
113+
await anyio.sleep(0.6)
114+
115+
assert session_id not in manager._server_instances
116+
117+
stop.set()
118+
119+
120+
@pytest.mark.anyio
121+
async def test_multiple_sessions_reaped_independently():
122+
"""Each session tracks its own idle timeout independently."""
123+
app = Server("test-multi-idle")
124+
stop = anyio.Event()
125+
app.run = _make_blocking_run(stop) # type: ignore[assignment]
126+
127+
manager = StreamableHTTPSessionManager(
128+
app=app,
129+
session_idle_timeout=0.15,
130+
)
131+
132+
async with manager.run():
133+
sent1: list[Message] = []
134+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent1))
135+
session_id_1 = _extract_session_id(sent1)
136+
137+
await anyio.sleep(0.05)
138+
sent2: list[Message] = []
139+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent2))
140+
session_id_2 = _extract_session_id(sent2)
141+
142+
assert session_id_1 in manager._server_instances
143+
assert session_id_2 in manager._server_instances
144+
145+
# After enough time, both should be reaped
146+
await anyio.sleep(0.4)
147+
148+
assert session_id_1 not in manager._server_instances
149+
assert session_id_2 not in manager._server_instances
150+
151+
stop.set()
152+
153+
154+
@pytest.mark.anyio
155+
async def test_terminate_idempotency():
156+
"""Calling terminate() multiple times should be safe."""
157+
transport = StreamableHTTPServerTransport(
158+
mcp_session_id="test-idempotent",
159+
)
160+
161+
async with transport.connect():
162+
await transport.terminate()
163+
assert transport.is_terminated
164+
165+
# Second call should be a no-op (no exception)
166+
await transport.terminate()
167+
assert transport.is_terminated
168+
169+
170+
@pytest.mark.anyio
171+
async def test_idle_timeout_with_retry_interval():
172+
"""When retry_interval is set, effective timeout should account for polling gaps."""
173+
app = Server("test-retry-interval")
174+
175+
# retry_interval = 5000ms = 5s -> retry_seconds * 3 = 15s
176+
# session_idle_timeout = 1s -> effective = max(1, 15) = 15
177+
manager = StreamableHTTPSessionManager(
178+
app=app,
179+
session_idle_timeout=1.0,
180+
retry_interval=5000,
181+
)
182+
assert manager._effective_idle_timeout() == 15.0
183+
184+
# When retry_interval is small, session_idle_timeout should dominate
185+
manager2 = StreamableHTTPSessionManager(
186+
app=app,
187+
session_idle_timeout=10.0,
188+
retry_interval=100, # 0.1s -> 0.3s, less than 10
189+
)
190+
assert manager2._effective_idle_timeout() == 10.0
191+
192+
# No retry_interval -> raw timeout
193+
manager3 = StreamableHTTPSessionManager(
194+
app=app,
195+
session_idle_timeout=5.0,
196+
)
197+
assert manager3._effective_idle_timeout() == 5.0
198+
199+
200+
@pytest.mark.anyio
201+
async def test_no_idle_timeout_no_reaper():
202+
"""When session_idle_timeout is None (default), sessions persist indefinitely."""
203+
app = Server("test-no-timeout")
204+
stop = anyio.Event()
205+
app.run = _make_blocking_run(stop) # type: ignore[assignment]
206+
207+
manager = StreamableHTTPSessionManager(app=app)
208+
209+
async with manager.run():
210+
sent: list[Message] = []
211+
await manager.handle_request(_make_scope(), _mock_receive, _make_send(sent))
212+
session_id = _extract_session_id(sent)
213+
214+
# Wait a while - session should never be reaped
215+
await anyio.sleep(0.3)
216+
assert session_id in manager._server_instances
217+
218+
stop.set()

0 commit comments

Comments
 (0)