Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,20 @@ async def handle_get_stream(self, client: httpx.AsyncClient, read_stream_writer:
# Stream ended normally (server closed) - reset attempt counter
attempt = 0

except Exception: # pragma: lax no cover
logger.debug("GET stream error", exc_info=True)
except httpx.HTTPStatusError as exc: # pragma: lax no cover
# Handle HTTP errors that are retryable
if exc.response.status_code == 405:
# Method Not Allowed - server doesn't support GET for SSE
logger.warning(
"Server does not support GET for SSE events (405 Method Not Allowed). "
"Server-initiated messages will not be available."
)
return
# For other HTTP errors, log and retry
logger.debug(f"GET stream HTTP error: {exc.response.status_code} - {exc}")
attempt += 1
except Exception as exc: # pragma: lax no cover
logger.debug("GET stream error: %s", exc, exc_info=True)
attempt += 1

if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover
Expand Down
171 changes: 171 additions & 0 deletions tests/issues/test_streamable_http_405_get_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""Test for streamable_http client handling of 405 Method Not Allowed on GET requests.

This test verifies the fix for the race condition where the client hangs when connecting
to servers (like GitHub MCP) that don't support GET for SSE events.
"""

import logging
from typing import Protocol, cast

import anyio
import httpx
import pytest
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Route

from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamable_http_client
from mcp.types import InitializeResult


class _ExceptionGroupWithExceptions(Protocol):
exceptions: tuple[BaseException, ...]


async def mock_github_endpoint(request: Request) -> Response:
"""Mock endpoint that returns 405 for GET (like GitHub MCP)."""
if request.method == "GET":
return Response(
content="Method Not Allowed",
status_code=405,
headers={"Allow": "POST, DELETE"},
)
elif request.method == "POST":
body = await request.json()
if body.get("method") == "initialize":
return JSONResponse(
{
"jsonrpc": "2.0",
"id": body.get("id"),
"result": {
"protocolVersion": "2025-03-26",
"serverInfo": {"name": "mock_github_server", "version": "1.0"},
"capabilities": {"tools": {}},
},
},
headers={"mcp-session-id": "test-session"},
)
elif body.get("method") == "notifications/initialized":
return Response(status_code=202)
elif body.get("method") == "tools/list":
return JSONResponse(
{
"jsonrpc": "2.0",
"id": body.get("id"),
"result": {
"tools": [
{
"name": "test_tool",
"description": "A test tool",
"inputSchema": {"type": "object", "properties": {}},
}
]
},
}
)
return Response(status_code=405)


@pytest.mark.anyio
async def test_405_get_stream_does_not_hang(caplog: pytest.LogCaptureFixture) -> None:
"""Test that client handles 405 on GET gracefully and doesn't hang."""
app = Starlette(routes=[Route("/mcp", mock_github_endpoint, methods=["GET", "POST"])])

expected_log = "Server does not support GET for SSE events (405 Method Not Allowed)"
got_405 = anyio.Event()

class _405LogHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
if expected_log in record.getMessage():
got_405.set()

mcp_logger = logging.getLogger("mcp.client.streamable_http")
handler = _405LogHandler()
mcp_logger.addHandler(handler)
with caplog.at_level(logging.INFO):
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app), base_url="http://testserver", timeout=5.0
) as http_client:
transport_cm = streamable_http_client("http://testserver/mcp", http_client=http_client)
async with transport_cm as transport_streams: # pragma: no branch
read_stream, write_stream = transport_streams
async with ClientSession(read_stream, write_stream) as session: # pragma: no branch
with anyio.fail_after(5.0):
init_result = await session.initialize()
assert isinstance(init_result, InitializeResult)

with anyio.fail_after(5.0):
await got_405.wait()

with anyio.fail_after(5.0):
tools_result = await session.list_tools()
assert len(tools_result.tools) == 1
assert tools_result.tools[0].name == "test_tool"

log_messages = [record.getMessage() for record in caplog.records]
assert any(expected_log in msg for msg in log_messages) # pragma: no branch

reconnect_messages = [msg for msg in log_messages if "reconnecting" in msg.lower()]
assert len(reconnect_messages) == 0 # pragma: no branch
mcp_logger.removeHandler(handler)


@pytest.mark.anyio
async def test_streamable_http_client_context_manager_exception_exit_is_covered() -> None:
"""Cover the exceptional exit path of the streamable HTTP transport context manager.

Branch coverage can vary across Python versions for `async with` teardown paths.
This test ensures the exception-unwind path is exercised without relying on pragmas.
"""
app = Starlette(routes=[Route("/mcp", mock_github_endpoint, methods=["GET", "POST"])])

async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app),
base_url="http://testserver",
timeout=5.0,
) as http_client:
transport_cm = streamable_http_client("http://testserver/mcp", http_client=http_client)
with pytest.raises(BaseException) as excinfo:
async with transport_cm:
raise RuntimeError("boom")

exc = excinfo.value
# anyio always wraps exceptions in an ExceptionGroup
assert hasattr(exc, "exceptions")
excs = cast(_ExceptionGroupWithExceptions, exc).exceptions
assert any(isinstance(inner, RuntimeError) and str(inner) == "boom" for inner in excs)


@pytest.mark.anyio
async def test_mock_github_endpoint_other_method_returns_405() -> None:
"""Ensure fallback 405 branch is covered for non-GET/POST methods."""
app = Starlette(routes=[Route("/mcp", mock_github_endpoint, methods=["GET", "POST", "DELETE"])])

async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app),
base_url="http://testserver",
timeout=5.0,
) as http_client:
response = await http_client.delete("/mcp")

assert response.status_code == 405


@pytest.mark.anyio
async def test_mock_github_endpoint_post_unknown_method_returns_405() -> None:
"""Ensure POST with unknown method hits fallback 405 branch."""
app = Starlette(routes=[Route("/mcp", mock_github_endpoint, methods=["POST"])])

async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app),
base_url="http://testserver",
timeout=5.0,
) as http_client:
response = await http_client.post(
"/mcp",
json={"jsonrpc": "2.0", "id": 1, "method": "unknown/method"},
)

assert response.status_code == 405
Loading