From 542396228deef98f15756e4c2209953a46be35cc Mon Sep 17 00:00:00 2001 From: dgenio Date: Sat, 28 Feb 2026 13:28:02 +0000 Subject: [PATCH 1/5] fix: add SSRF redirect protection to httpx client factory Add a RedirectPolicy enum to create_mcp_http_client() that validates redirect targets via httpx event hooks. The default policy (BLOCK_SCHEME_DOWNGRADE) blocks HTTPS-to-HTTP redirect downgrades. ENFORCE_HTTPS restricts all redirects to HTTPS-only destinations. ALLOW_ALL preserves the previous unrestricted behavior. Github-Issue: #2106 --- src/mcp/shared/_httpx_utils.py | 84 +++++++++++++++++- tests/shared/test_httpx_utils.py | 144 ++++++++++++++++++++++++++++++- 2 files changed, 226 insertions(+), 2 deletions(-) diff --git a/src/mcp/shared/_httpx_utils.py b/src/mcp/shared/_httpx_utils.py index 251469eaa..7277b50cb 100644 --- a/src/mcp/shared/_httpx_utils.py +++ b/src/mcp/shared/_httpx_utils.py @@ -1,22 +1,89 @@ """Utilities for creating standardized httpx AsyncClient instances.""" +import logging +from enum import Enum from typing import Any, Protocol import httpx -__all__ = ["create_mcp_http_client", "MCP_DEFAULT_TIMEOUT", "MCP_DEFAULT_SSE_READ_TIMEOUT"] +logger = logging.getLogger(__name__) + +__all__ = [ + "MCP_DEFAULT_SSE_READ_TIMEOUT", + "MCP_DEFAULT_TIMEOUT", + "RedirectPolicy", + "create_mcp_http_client", +] # Default MCP timeout configuration MCP_DEFAULT_TIMEOUT = 30.0 # General operations (seconds) MCP_DEFAULT_SSE_READ_TIMEOUT = 300.0 # SSE streams - 5 minutes (seconds) +class RedirectPolicy(Enum): + """Policy for validating HTTP redirects to protect against SSRF attacks. + + Attributes: + ALLOW_ALL: No restrictions on redirects (legacy behavior). + BLOCK_SCHEME_DOWNGRADE: Block HTTPS-to-HTTP downgrades on redirect (default). + ENFORCE_HTTPS: Only allow HTTPS redirect destinations. + """ + + ALLOW_ALL = "allow_all" + BLOCK_SCHEME_DOWNGRADE = "block_scheme_downgrade" + ENFORCE_HTTPS = "enforce_https" + + +async def _check_redirect(response: httpx.Response, policy: RedirectPolicy) -> None: + """Validate redirect responses against the configured policy. + + This is installed as an httpx response event hook. It inspects redirect + responses (3xx with a ``next_request``) and raises + :class:`httpx.HTTPStatusError` when the redirect violates *policy*. + + Args: + response: The httpx response to check. + policy: The redirect policy to enforce. + """ + if not response.is_redirect or response.next_request is None: + return + + original_url = response.request.url + redirect_url = response.next_request.url + + if policy == RedirectPolicy.BLOCK_SCHEME_DOWNGRADE: + if original_url.scheme == "https" and redirect_url.scheme == "http": + logger.warning( + "Blocked HTTPS-to-HTTP redirect from %s to %s", + original_url, + redirect_url, + ) + raise httpx.HTTPStatusError( + f"HTTPS-to-HTTP redirect blocked: {original_url} -> {redirect_url}", + request=response.request, + response=response, + ) + elif policy == RedirectPolicy.ENFORCE_HTTPS: + if redirect_url.scheme != "https": + logger.warning( + "Blocked non-HTTPS redirect from %s to %s", + original_url, + redirect_url, + ) + raise httpx.HTTPStatusError( + f"Non-HTTPS redirect blocked: {original_url} -> {redirect_url}", + request=response.request, + response=response, + ) + + class McpHttpClientFactory(Protocol): # pragma: no branch def __call__( # pragma: no branch self, headers: dict[str, str] | None = None, timeout: httpx.Timeout | None = None, auth: httpx.Auth | None = None, + redirect_policy: RedirectPolicy = RedirectPolicy.BLOCK_SCHEME_DOWNGRADE, ) -> httpx.AsyncClient: ... @@ -24,18 +91,25 @@ def create_mcp_http_client( headers: dict[str, str] | None = None, timeout: httpx.Timeout | None = None, auth: httpx.Auth | None = None, + redirect_policy: RedirectPolicy = RedirectPolicy.BLOCK_SCHEME_DOWNGRADE, ) -> httpx.AsyncClient: """Create a standardized httpx AsyncClient with MCP defaults. This function provides common defaults used throughout the MCP codebase: - follow_redirects=True (always enabled) - Default timeout of 30 seconds if not specified + - SSRF redirect protection via *redirect_policy* Args: headers: Optional headers to include with all requests. timeout: Request timeout as httpx.Timeout object. Defaults to 30 seconds if not specified. auth: Optional authentication handler. + redirect_policy: Policy controlling which redirects are allowed. + Defaults to ``RedirectPolicy.BLOCK_SCHEME_DOWNGRADE`` which blocks + HTTPS-to-HTTP downgrades. Use ``RedirectPolicy.ENFORCE_HTTPS`` to + only allow HTTPS destinations, or ``RedirectPolicy.ALLOW_ALL`` to + disable redirect validation entirely (legacy behavior). Returns: Configured httpx.AsyncClient instance with MCP defaults. @@ -94,4 +168,12 @@ def create_mcp_http_client( if auth is not None: # pragma: no cover kwargs["auth"] = auth + # Install redirect validation hook + if redirect_policy != RedirectPolicy.ALLOW_ALL: + + async def check_redirect_hook(response: httpx.Response) -> None: + await _check_redirect(response, redirect_policy) + + kwargs["event_hooks"] = {"response": [check_redirect_hook]} + return httpx.AsyncClient(**kwargs) diff --git a/tests/shared/test_httpx_utils.py b/tests/shared/test_httpx_utils.py index dcc6fd003..57f81adf9 100644 --- a/tests/shared/test_httpx_utils.py +++ b/tests/shared/test_httpx_utils.py @@ -1,8 +1,11 @@ """Tests for httpx utility functions.""" import httpx +import pytest -from mcp.shared._httpx_utils import create_mcp_http_client +from mcp.shared._httpx_utils import RedirectPolicy, _check_redirect, create_mcp_http_client + +pytestmark = pytest.mark.anyio def test_default_settings(): @@ -22,3 +25,142 @@ def test_custom_parameters(): assert client.headers["Authorization"] == "Bearer token" assert client.timeout.connect == 60.0 + + +def test_default_redirect_policy(): + """Test that the default redirect policy is BLOCK_SCHEME_DOWNGRADE.""" + client = create_mcp_http_client() + # Event hooks should be installed for the default policy + assert len(client.event_hooks["response"]) == 1 + + +def test_allow_all_policy_no_hooks(): + """Test that ALLOW_ALL does not install event hooks.""" + client = create_mcp_http_client(redirect_policy=RedirectPolicy.ALLOW_ALL) + assert len(client.event_hooks["response"]) == 0 + + +# --- _check_redirect unit tests --- + + +async def test_check_redirect_ignores_non_redirect(): + """Test that non-redirect responses are ignored.""" + response = httpx.Response(200, request=httpx.Request("GET", "https://example.com")) + # Should not raise + await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) + await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS) + + +async def test_check_redirect_ignores_redirect_without_next_request(): + """Test that redirect responses without next_request are ignored.""" + response = httpx.Response( + 302, + headers={"Location": "http://evil.com"}, + request=httpx.Request("GET", "https://example.com"), + ) + # next_request is None on a manually constructed response + assert response.next_request is None + await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) + + +# --- BLOCK_SCHEME_DOWNGRADE tests --- + + +async def test_block_scheme_downgrade_blocks_https_to_http(): + """Test BLOCK_SCHEME_DOWNGRADE blocks HTTPS->HTTP redirect.""" + response = httpx.Response( + 302, + headers={"Location": "http://evil.com"}, + request=httpx.Request("GET", "https://example.com"), + ) + response.next_request = httpx.Request("GET", "http://evil.com") + + with pytest.raises(httpx.HTTPStatusError, match="HTTPS-to-HTTP redirect blocked"): + await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) + + +async def test_block_scheme_downgrade_allows_https_to_https(): + """Test BLOCK_SCHEME_DOWNGRADE allows HTTPS->HTTPS redirect.""" + response = httpx.Response( + 302, + headers={"Location": "https://other.com"}, + request=httpx.Request("GET", "https://example.com"), + ) + response.next_request = httpx.Request("GET", "https://other.com") + await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) + + +async def test_block_scheme_downgrade_allows_http_to_http(): + """Test BLOCK_SCHEME_DOWNGRADE allows HTTP->HTTP redirect.""" + response = httpx.Response( + 302, + headers={"Location": "http://other.com"}, + request=httpx.Request("GET", "http://example.com"), + ) + response.next_request = httpx.Request("GET", "http://other.com") + await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) + + +async def test_block_scheme_downgrade_allows_http_to_https(): + """Test BLOCK_SCHEME_DOWNGRADE allows HTTP->HTTPS upgrade.""" + response = httpx.Response( + 302, + headers={"Location": "https://other.com"}, + request=httpx.Request("GET", "http://example.com"), + ) + response.next_request = httpx.Request("GET", "https://other.com") + await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) + + +# --- ENFORCE_HTTPS tests --- + + +async def test_enforce_https_blocks_http_target(): + """Test ENFORCE_HTTPS blocks any HTTP redirect target.""" + response = httpx.Response( + 302, + headers={"Location": "http://evil.com"}, + request=httpx.Request("GET", "https://example.com"), + ) + response.next_request = httpx.Request("GET", "http://evil.com") + + with pytest.raises(httpx.HTTPStatusError, match="Non-HTTPS redirect blocked"): + await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS) + + +async def test_enforce_https_blocks_http_to_http(): + """Test ENFORCE_HTTPS blocks HTTP->HTTP redirect.""" + response = httpx.Response( + 302, + headers={"Location": "http://other.com"}, + request=httpx.Request("GET", "http://example.com"), + ) + response.next_request = httpx.Request("GET", "http://other.com") + + with pytest.raises(httpx.HTTPStatusError, match="Non-HTTPS redirect blocked"): + await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS) + + +async def test_enforce_https_allows_https_target(): + """Test ENFORCE_HTTPS allows HTTPS redirect target.""" + response = httpx.Response( + 302, + headers={"Location": "https://other.com"}, + request=httpx.Request("GET", "https://example.com"), + ) + response.next_request = httpx.Request("GET", "https://other.com") + await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS) + + +# --- ALLOW_ALL tests --- + + +async def test_allow_all_permits_https_to_http(): + """Test ALLOW_ALL permits HTTPS->HTTP redirect.""" + response = httpx.Response( + 302, + headers={"Location": "http://evil.com"}, + request=httpx.Request("GET", "https://example.com"), + ) + response.next_request = httpx.Request("GET", "http://evil.com") + await _check_redirect(response, RedirectPolicy.ALLOW_ALL) From 1f1efbd91481464f34838b4a74ebabca696066fd Mon Sep 17 00:00:00 2001 From: dgenio Date: Sat, 28 Feb 2026 15:20:22 +0000 Subject: [PATCH 2/5] fix: use Location header for redirect validation instead of next_request response.next_request is not populated by httpx when follow_redirects=True, causing the redirect protection hook to silently bypass all checks. Parse the Location header directly (matching httpx's own _build_redirect_request flow) and add integration tests via MockTransport to exercise the event hook wiring end-to-end. Github-Issue: #2106 --- src/mcp/shared/_httpx_utils.py | 8 +++-- tests/shared/test_httpx_utils.py | 50 +++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/src/mcp/shared/_httpx_utils.py b/src/mcp/shared/_httpx_utils.py index 7277b50cb..a55b389f1 100644 --- a/src/mcp/shared/_httpx_utils.py +++ b/src/mcp/shared/_httpx_utils.py @@ -38,18 +38,20 @@ async def _check_redirect(response: httpx.Response, policy: RedirectPolicy) -> N """Validate redirect responses against the configured policy. This is installed as an httpx response event hook. It inspects redirect - responses (3xx with a ``next_request``) and raises + responses (3xx with a ``Location`` header) and raises :class:`httpx.HTTPStatusError` when the redirect violates *policy*. Args: response: The httpx response to check. policy: The redirect policy to enforce. """ - if not response.is_redirect or response.next_request is None: + if not response.has_redirect_location: return original_url = response.request.url - redirect_url = response.next_request.url + redirect_url = httpx.URL(response.headers["Location"]) + if redirect_url.is_relative_url: + redirect_url = original_url.join(redirect_url) if policy == RedirectPolicy.BLOCK_SCHEME_DOWNGRADE: if original_url.scheme == "https" and redirect_url.scheme == "http": diff --git a/tests/shared/test_httpx_utils.py b/tests/shared/test_httpx_utils.py index 57f81adf9..58dba167c 100644 --- a/tests/shared/test_httpx_utils.py +++ b/tests/shared/test_httpx_utils.py @@ -51,15 +51,14 @@ async def test_check_redirect_ignores_non_redirect(): await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS) -async def test_check_redirect_ignores_redirect_without_next_request(): - """Test that redirect responses without next_request are ignored.""" +async def test_check_redirect_ignores_redirect_without_location_header(): + """Test that redirect responses without a Location header are ignored.""" response = httpx.Response( 302, - headers={"Location": "http://evil.com"}, request=httpx.Request("GET", "https://example.com"), ) - # next_request is None on a manually constructed response - assert response.next_request is None + # No Location header → has_redirect_location is False + assert not response.has_redirect_location await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) @@ -73,7 +72,6 @@ async def test_block_scheme_downgrade_blocks_https_to_http(): headers={"Location": "http://evil.com"}, request=httpx.Request("GET", "https://example.com"), ) - response.next_request = httpx.Request("GET", "http://evil.com") with pytest.raises(httpx.HTTPStatusError, match="HTTPS-to-HTTP redirect blocked"): await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) @@ -86,7 +84,6 @@ async def test_block_scheme_downgrade_allows_https_to_https(): headers={"Location": "https://other.com"}, request=httpx.Request("GET", "https://example.com"), ) - response.next_request = httpx.Request("GET", "https://other.com") await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) @@ -97,7 +94,6 @@ async def test_block_scheme_downgrade_allows_http_to_http(): headers={"Location": "http://other.com"}, request=httpx.Request("GET", "http://example.com"), ) - response.next_request = httpx.Request("GET", "http://other.com") await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) @@ -108,7 +104,6 @@ async def test_block_scheme_downgrade_allows_http_to_https(): headers={"Location": "https://other.com"}, request=httpx.Request("GET", "http://example.com"), ) - response.next_request = httpx.Request("GET", "https://other.com") await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) @@ -122,7 +117,6 @@ async def test_enforce_https_blocks_http_target(): headers={"Location": "http://evil.com"}, request=httpx.Request("GET", "https://example.com"), ) - response.next_request = httpx.Request("GET", "http://evil.com") with pytest.raises(httpx.HTTPStatusError, match="Non-HTTPS redirect blocked"): await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS) @@ -135,7 +129,6 @@ async def test_enforce_https_blocks_http_to_http(): headers={"Location": "http://other.com"}, request=httpx.Request("GET", "http://example.com"), ) - response.next_request = httpx.Request("GET", "http://other.com") with pytest.raises(httpx.HTTPStatusError, match="Non-HTTPS redirect blocked"): await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS) @@ -148,7 +141,6 @@ async def test_enforce_https_allows_https_target(): headers={"Location": "https://other.com"}, request=httpx.Request("GET", "https://example.com"), ) - response.next_request = httpx.Request("GET", "https://other.com") await _check_redirect(response, RedirectPolicy.ENFORCE_HTTPS) @@ -162,5 +154,37 @@ async def test_allow_all_permits_https_to_http(): headers={"Location": "http://evil.com"}, request=httpx.Request("GET", "https://example.com"), ) - response.next_request = httpx.Request("GET", "http://evil.com") await _check_redirect(response, RedirectPolicy.ALLOW_ALL) + + +# --- Integration tests (exercise the event hook wiring end-to-end) --- + + +async def test_redirect_hook_blocks_scheme_downgrade_via_transport(): + """Test that the event hook installed by create_mcp_http_client blocks HTTPS->HTTP.""" + + def mock_handler(request: httpx.Request) -> httpx.Response: + if str(request.url) == "https://example.com/start": + return httpx.Response(302, headers={"Location": "http://evil.com/stolen"}) + return httpx.Response(200, text="OK") + + async with create_mcp_http_client() as client: + client._transport = httpx.MockTransport(mock_handler) + + with pytest.raises(httpx.HTTPStatusError, match="HTTPS-to-HTTP redirect blocked"): + await client.get("https://example.com/start") + + +async def test_redirect_hook_allows_safe_redirect_via_transport(): + """Test that the event hook allows HTTPS->HTTPS redirects through the client.""" + + def mock_handler(request: httpx.Request) -> httpx.Response: + if str(request.url) == "https://example.com/start": + return httpx.Response(302, headers={"Location": "https://example.com/final"}) + return httpx.Response(200, text="OK") + + async with create_mcp_http_client() as client: + client._transport = httpx.MockTransport(mock_handler) + + response = await client.get("https://example.com/start") + assert response.status_code == 200 From 6fc1bc724a813720f9eb69b0911a96470e6f8e29 Mon Sep 17 00:00:00 2001 From: dgenio Date: Sat, 28 Feb 2026 15:22:57 +0000 Subject: [PATCH 3/5] fix: remove redirect_policy from McpHttpClientFactory Protocol The Protocol describes the caller-side contract. No caller passes redirect_policy to the factory (sse_client only passes headers, timeout, auth). Adding it would break downstream code implementing custom factories. Github-Issue: #2106 --- src/mcp/shared/_httpx_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mcp/shared/_httpx_utils.py b/src/mcp/shared/_httpx_utils.py index a55b389f1..2670f40d1 100644 --- a/src/mcp/shared/_httpx_utils.py +++ b/src/mcp/shared/_httpx_utils.py @@ -85,7 +85,6 @@ def __call__( # pragma: no branch headers: dict[str, str] | None = None, timeout: httpx.Timeout | None = None, auth: httpx.Auth | None = None, - redirect_policy: RedirectPolicy = RedirectPolicy.BLOCK_SCHEME_DOWNGRADE, ) -> httpx.AsyncClient: ... From 06d1f1e5664b623d235cba0af6fc61187fb6e820 Mon Sep 17 00:00:00 2001 From: dgenio Date: Sat, 28 Feb 2026 15:26:43 +0000 Subject: [PATCH 4/5] docs: document SSRF redirect protection gap for user-provided clients Add docstring note on the http_client parameter of streamable_http_client() clarifying that user-provided clients do not receive SSRF redirect protection. Also emit a logger.debug when a user-provided client is used. Github-Issue: #2106 --- src/mcp/client/streamable_http.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9f3dd5e0b..3433fbe68 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -523,6 +523,9 @@ async def streamable_http_client( http_client: Optional pre-configured httpx.AsyncClient. If None, a default client with recommended MCP timeouts will be created. To configure headers, authentication, or other HTTP settings, create an httpx.AsyncClient and pass it here. + Note: User-provided clients do not receive SSRF redirect protection. + If redirect validation is required, use ``create_mcp_http_client()`` + or configure redirect hooks manually. terminate_on_close: If True, send a DELETE request to terminate the session when the context exits. Yields: @@ -543,6 +546,8 @@ async def streamable_http_client( if client is None: # Create default client with recommended MCP timeouts client = create_mcp_http_client() + else: + logger.debug("Using user-provided HTTP client; SSRF redirect protection is not applied") transport = StreamableHTTPTransport(url) From 75a4e4520f958e6f88acebc9bfbacfcde80507d5 Mon Sep 17 00:00:00 2001 From: dgenio Date: Sat, 28 Feb 2026 15:30:48 +0000 Subject: [PATCH 5/5] test: fix coverage gaps for relative redirect and unreachable mock branch Add test for relative Location headers (exercises the is_relative_url branch). Mark unreachable 200-response fallback in blocking test with pragma: no cover. Github-Issue: #2106 --- tests/shared/test_httpx_utils.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/shared/test_httpx_utils.py b/tests/shared/test_httpx_utils.py index 58dba167c..148d1532a 100644 --- a/tests/shared/test_httpx_utils.py +++ b/tests/shared/test_httpx_utils.py @@ -107,6 +107,16 @@ async def test_block_scheme_downgrade_allows_http_to_https(): await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) +async def test_block_scheme_downgrade_allows_relative_redirect(): + """Test BLOCK_SCHEME_DOWNGRADE allows relative Location headers.""" + response = httpx.Response( + 302, + headers={"Location": "/other-path"}, + request=httpx.Request("GET", "https://example.com/start"), + ) + await _check_redirect(response, RedirectPolicy.BLOCK_SCHEME_DOWNGRADE) + + # --- ENFORCE_HTTPS tests --- @@ -166,7 +176,7 @@ async def test_redirect_hook_blocks_scheme_downgrade_via_transport(): def mock_handler(request: httpx.Request) -> httpx.Response: if str(request.url) == "https://example.com/start": return httpx.Response(302, headers={"Location": "http://evil.com/stolen"}) - return httpx.Response(200, text="OK") + return httpx.Response(200, text="OK") # pragma: no cover async with create_mcp_http_client() as client: client._transport = httpx.MockTransport(mock_handler)