From 6c44f2f8b4d520236c48864b6e04d2c9659965d4 Mon Sep 17 00:00:00 2001 From: g97iulio1609 Date: Sat, 28 Feb 2026 18:15:51 +0100 Subject: [PATCH] fix: gracefully terminate active sessions on shutdown During StreamableHTTPSessionManager shutdown, iterate through all active transports and call terminate() before cancelling the task group. This allows SSE connections to receive a proper HTTP response instead of being abruptly dropped, which caused Uvicorn to log 'ASGI callable returned without completing response'. Fixes #2150 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/mcp/server/streamable_http_manager.py | 9 ++++++++ tests/server/test_streamable_http_manager.py | 22 ++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index 50bcd5e79..0f9b34831 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -130,6 +130,15 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]: yield # Let the application run finally: logger.info("StreamableHTTP session manager shutting down") + # Gracefully terminate all active sessions before cancelling + # the task group so that SSE connections receive a proper + # HTTP response instead of being abruptly dropped. + for session_id, transport in list(self._server_instances.items()): + try: + await transport.terminate() + logger.debug(f"Terminated session {session_id} during shutdown") + except Exception: # pragma: no cover + logger.debug(f"Error terminating session {session_id} during shutdown", exc_info=True) # Cancel task group to stop all spawned tasks tg.cancel_scope.cancel() self._task_group = None diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index 54a898cc5..32e07e702 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -410,3 +410,25 @@ def test_session_idle_timeout_rejects_non_positive(): def test_session_idle_timeout_rejects_stateless(): with pytest.raises(RuntimeError, match="not supported in stateless"): StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) + + +@pytest.mark.anyio +async def test_shutdown_terminates_active_sessions(): + """Test that run() shutdown terminates active transports before cancelling tasks.""" + app = Server("test-shutdown-terminate") + manager = StreamableHTTPSessionManager(app=app) + + # We'll manually inject a mock transport into _server_instances + # and verify terminate() is called during shutdown. + mock_transport = AsyncMock(spec=StreamableHTTPServerTransport) + mock_transport.mcp_session_id = "test-session-1" + mock_transport.is_terminated = False + + async with manager.run(): + # Inject mock transport as if a session was created + manager._server_instances["test-session-1"] = mock_transport + + # After exiting run(), terminate should have been called + mock_transport.terminate.assert_awaited_once() + # Server instances should be cleared + assert len(manager._server_instances) == 0