Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
yield # Let the application run
finally:
logger.info("StreamableHTTP session manager shutting down")
# Gracefully terminate all active sessions before cancelling
# the task group so that SSE connections receive a proper
# HTTP response instead of being abruptly dropped.
for session_id, transport in list(self._server_instances.items()):
try:
await transport.terminate()
logger.debug(f"Terminated session {session_id} during shutdown")
except Exception: # pragma: no cover
logger.debug(f"Error terminating session {session_id} during shutdown", exc_info=True)
# Cancel task group to stop all spawned tasks
tg.cancel_scope.cancel()
self._task_group = None
Expand Down
22 changes: 22 additions & 0 deletions tests/server/test_streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,25 @@ def test_session_idle_timeout_rejects_non_positive():
def test_session_idle_timeout_rejects_stateless():
with pytest.raises(RuntimeError, match="not supported in stateless"):
StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True)


@pytest.mark.anyio
async def test_shutdown_terminates_active_sessions():
"""Test that run() shutdown terminates active transports before cancelling tasks."""
app = Server("test-shutdown-terminate")
manager = StreamableHTTPSessionManager(app=app)

# We'll manually inject a mock transport into _server_instances
# and verify terminate() is called during shutdown.
mock_transport = AsyncMock(spec=StreamableHTTPServerTransport)
mock_transport.mcp_session_id = "test-session-1"
mock_transport.is_terminated = False

async with manager.run():
# Inject mock transport as if a session was created
manager._server_instances["test-session-1"] = mock_transport

# After exiting run(), terminate should have been called
mock_transport.terminate.assert_awaited_once()
# Server instances should be cleared
assert len(manager._server_instances) == 0