diff --git a/packages/http/httpx/kiota_http/middleware/options/redirect_handler_option.py b/packages/http/httpx/kiota_http/middleware/options/redirect_handler_option.py index 9f0c43a3..3d619b19 100644 --- a/packages/http/httpx/kiota_http/middleware/options/redirect_handler_option.py +++ b/packages/http/httpx/kiota_http/middleware/options/redirect_handler_option.py @@ -1,5 +1,42 @@ +from typing import Callable, Optional from kiota_abstractions.request_option import RequestOption +import httpx + +# Type alias for the scrub sensitive headers callback +ScrubSensitiveHeadersCallback = Callable[[httpx.Request, httpx.URL], None] + + +def default_scrub_sensitive_headers(new_request: httpx.Request, original_url: httpx.URL) -> None: + """ + The default implementation for scrubbing sensitive headers during redirects. + This method removes Authorization and Cookie headers when the host, scheme, or port changes. + Args: + new_request: The new redirect request to modify + original_url: The original request URL + """ + if not new_request or not original_url: + return + + new_url = new_request.url + if not new_url: + return + + # Remove Authorization and Cookie headers if the request's scheme, host, or port changes + is_different_origin = ( + original_url.host != new_url.host or original_url.scheme != new_url.scheme + or original_url.port != new_url.port + ) + + if is_different_origin: + new_request.headers.pop("Authorization", None) + new_request.headers.pop("Cookie", None) + + # Note: Proxy-Authorization is not handled here as proxy configuration in httpx + # is managed at the transport level and not accessible to middleware. + # In environments where this matters, the proxy configuration should be managed + # at the HTTP client level. + class RedirectHandlerOption(RequestOption): @@ -15,7 +52,8 @@ def __init__( self, max_redirect: int = DEFAULT_MAX_REDIRECT, should_redirect: bool = True, - allow_redirect_on_scheme_change: bool = False + allow_redirect_on_scheme_change: bool = False, + scrub_sensitive_headers: Optional[ScrubSensitiveHeadersCallback] = None ) -> None: if max_redirect > self.MAX_MAX_REDIRECT: @@ -28,6 +66,7 @@ def __init__( self._max_redirect = max_redirect self._should_redirect = should_redirect self._allow_redirect_on_scheme_change = allow_redirect_on_scheme_change + self._scrub_sensitive_headers = scrub_sensitive_headers or default_scrub_sensitive_headers @property def max_redirect(self): @@ -59,6 +98,16 @@ def allow_redirect_on_scheme_change(self): def allow_redirect_on_scheme_change(self, value: bool): self._allow_redirect_on_scheme_change = value + @property + def scrub_sensitive_headers(self) -> ScrubSensitiveHeadersCallback: + """The callback for scrubbing sensitive headers during redirects. + Defaults to default_scrub_sensitive_headers.""" + return self._scrub_sensitive_headers + + @scrub_sensitive_headers.setter + def scrub_sensitive_headers(self, value: ScrubSensitiveHeadersCallback): + self._scrub_sensitive_headers = value + @staticmethod def get_key() -> str: return RedirectHandlerOption.REDIRECT_HANDLER_OPTION_KEY diff --git a/packages/http/httpx/kiota_http/middleware/redirect_handler.py b/packages/http/httpx/kiota_http/middleware/redirect_handler.py index e7a1197f..f0f8a930 100644 --- a/packages/http/httpx/kiota_http/middleware/redirect_handler.py +++ b/packages/http/httpx/kiota_http/middleware/redirect_handler.py @@ -26,7 +26,6 @@ class RedirectHandler(BaseMiddleware): } STATUS_CODE_SEE_OTHER: int = 303 LOCATION_HEADER: str = "Location" - AUTHORIZATION_HEADER: str = "Authorization" def __init__(self, options: RedirectHandlerOption = RedirectHandlerOption()) -> None: super().__init__() @@ -125,15 +124,31 @@ def _build_redirect_request( """ method = self._redirect_method(request, response) url = self._redirect_url(request, response, options) - headers = self._redirect_headers(request, url, method) stream = self._redirect_stream(request, method) + + # Create the new request with the redirect URL and original headers new_request = httpx.Request( method=method, url=url, - headers=headers, + headers=request.headers.copy(), stream=stream, extensions=request.extensions, ) + + # Scrub sensitive headers before following the redirect + options.scrub_sensitive_headers(new_request, request.url) + + # Update the Host header if not same origin + if not self._same_origin(url, request.url): + new_request.headers["Host"] = url.netloc.decode("ascii") + + # Handle 303 See Other and other method changes + if method != request.method and method == "GET": + # If we've switched to a 'GET' request, strip any headers which + # are only relevant to the request body. + new_request.headers.pop("Content-Length", None) + new_request.headers.pop("Transfer-Encoding", None) + if hasattr(request, "context"): new_request.context = request.context #type: ignore new_request.options = {} #type: ignore @@ -201,35 +216,6 @@ def _redirect_url( return url - def _redirect_headers( - self, request: httpx.Request, url: httpx.URL, method: str - ) -> httpx.Headers: - """ - Return the headers that should be used for the redirect request. - """ - headers = httpx.Headers(request.headers) - - if not self._same_origin(url, request.url): - if not self.is_https_redirect(request.url, url): - # Strip Authorization headers when responses are redirected - # away from the origin. (Except for direct HTTP to HTTPS redirects.) - headers.pop("Authorization", None) - - # Update the Host header. - headers["Host"] = url.netloc.decode("ascii") - - if method != request.method and method == "GET": - # If we've switch to a 'GET' request, then strip any headers which - # are only relevant to the request body. - headers.pop("Content-Length", None) - headers.pop("Transfer-Encoding", None) - - # We should use the client cookie store to determine any cookie header, - # rather than whatever was on the original outgoing request. - headers.pop("Cookie", None) - - return headers - def _redirect_stream( self, request: httpx.Request, method: str ) -> typing.Optional[typing.Union[httpx.SyncByteStream, httpx.AsyncByteStream]]: @@ -246,17 +232,3 @@ def _same_origin(self, url: httpx.URL, other: httpx.URL) -> bool: Return 'True' if the given URLs share the same origin. """ return (url.scheme == other.scheme and url.host == other.host) - - def port_or_default(self, url: httpx.URL) -> typing.Optional[int]: - if url.port is not None: - return url.port - return {"http": 80, "https": 443}.get(url.scheme) - - def is_https_redirect(self, url: httpx.URL, location: httpx.URL) -> bool: - """ - Return 'True' if 'location' is a HTTPS upgrade of 'url' - """ - if url.host != location.host: - return False - - return (url.scheme == "http" and location.scheme == "https") diff --git a/packages/http/httpx/tests/middleware_tests/test_redirect_handler.py b/packages/http/httpx/tests/middleware_tests/test_redirect_handler.py index 1f7308b1..4963af5e 100644 --- a/packages/http/httpx/tests/middleware_tests/test_redirect_handler.py +++ b/packages/http/httpx/tests/middleware_tests/test_redirect_handler.py @@ -69,16 +69,6 @@ def test_not_same_origin(mock_redirect_handler): assert not mock_redirect_handler._same_origin(origin1, origin2) -def test_is_https_redirect(mock_redirect_handler): - url = httpx.URL("http://example.com") - location = httpx.URL(BASE_URL) - assert mock_redirect_handler.is_https_redirect(url, location) - - -def test_is_not_https_redirect(mock_redirect_handler): - url = httpx.URL(BASE_URL) - location = httpx.URL("http://www.example.com") - assert not mock_redirect_handler.is_https_redirect(url, location) @pytest.mark.asyncio @@ -281,3 +271,320 @@ def request_handler(request: httpx.Request): with pytest.raises(Exception) as e: await handler.send(request, mock_transport) assert "Too many redirects" in str(e.value) + + +@pytest.mark.asyncio +async def test_redirect_cross_host_removes_auth_and_cookie(): + """Test that cross-host redirects remove both Authorization and Cookie headers""" + + def request_handler(request: httpx.Request): + if request.url == "https://other.example.com/api": + return httpx.Response(200, ) + return httpx.Response( + MOVED_PERMANENTLY, + headers={LOCATION_HEADER: "https://other.example.com/api"}, + ) + + handler = RedirectHandler() + request = httpx.Request( + 'GET', + BASE_URL, + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET" + }, + ) + mock_transport = httpx.MockTransport(request_handler) + resp = await handler.send(request, mock_transport) + assert resp.status_code == 200 + assert AUTHORIZATION_HEADER not in resp.request.headers + assert "Cookie" not in resp.request.headers + + +@pytest.mark.asyncio +async def test_redirect_scheme_change_removes_auth_and_cookie(): + """Test that scheme changes remove both Authorization and Cookie headers""" + + def request_handler(request: httpx.Request): + if request.url == "http://example.com/api": # NOSONAR + return httpx.Response(200, ) + return httpx.Response( + MOVED_PERMANENTLY, + headers={LOCATION_HEADER: "http://example.com/api"}, # NOSONAR + ) + + handler = RedirectHandler(RedirectHandlerOption(allow_redirect_on_scheme_change=True)) + request = httpx.Request( + 'GET', + BASE_URL, + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET" + }, + ) + mock_transport = httpx.MockTransport(request_handler) + resp = await handler.send(request, mock_transport) + assert resp.status_code == 200 + assert AUTHORIZATION_HEADER not in resp.request.headers + assert "Cookie" not in resp.request.headers + + +@pytest.mark.asyncio +async def test_redirect_same_host_and_scheme_keeps_all_headers(): + """Test that same-host and same-scheme redirects keep Authorization and Cookie headers""" + + def request_handler(request: httpx.Request): + if request.url == f"{BASE_URL}/v2/api": + return httpx.Response(200, ) + return httpx.Response( + MOVED_PERMANENTLY, + headers={LOCATION_HEADER: f"{BASE_URL}/v2/api"}, + ) + + handler = RedirectHandler() + request = httpx.Request( + 'GET', + f"{BASE_URL}/v1/api", + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET" + }, + ) + mock_transport = httpx.MockTransport(request_handler) + resp = await handler.send(request, mock_transport) + assert resp.status_code == 200 + assert AUTHORIZATION_HEADER in resp.request.headers + assert resp.request.headers[AUTHORIZATION_HEADER] == "Bearer token" + assert "Cookie" in resp.request.headers + assert resp.request.headers["Cookie"] == "session=SECRET" + + +@pytest.mark.asyncio +async def test_redirect_with_different_port_removes_auth_and_cookie(): + """Test that redirects to a different port remove Authorization and Cookie headers""" + + def request_handler(request: httpx.Request): + if request.url == "http://example.org:9090/bar": # NOSONAR + return httpx.Response(200, ) + return httpx.Response( + MOVED_PERMANENTLY, + headers={LOCATION_HEADER: "http://example.org:9090/bar"}, # NOSONAR + ) + + handler = RedirectHandler() + request = httpx.Request( + 'GET', + "http://example.org:8080/foo", # NOSONAR + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET" + }, + ) + mock_transport = httpx.MockTransport(request_handler) + resp = await handler.send(request, mock_transport) + assert resp.status_code == 200 + assert resp.request != request + assert AUTHORIZATION_HEADER not in resp.request.headers + assert "Cookie" not in resp.request.headers + + +@pytest.mark.asyncio +async def test_redirect_with_same_port_keeps_auth_and_cookie(): + """Test that redirects to the same port keep Authorization and Cookie headers""" + + def request_handler(request: httpx.Request): + if request.url == "http://example.org:8080/bar": # NOSONAR + return httpx.Response(200, ) + return httpx.Response( + FOUND, + headers={LOCATION_HEADER: "http://example.org:8080/bar"}, # NOSONAR + ) + + handler = RedirectHandler() + request = httpx.Request( + 'GET', + "http://example.org:8080/foo", # NOSONAR + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET" + }, + ) + mock_transport = httpx.MockTransport(request_handler) + resp = await handler.send(request, mock_transport) + assert resp.status_code == 200 + assert resp.request != request + assert AUTHORIZATION_HEADER in resp.request.headers + assert resp.request.headers[AUTHORIZATION_HEADER] == "Bearer token" + assert "Cookie" in resp.request.headers + assert resp.request.headers["Cookie"] == "session=SECRET" + + +@pytest.mark.asyncio +async def test_redirect_with_custom_scrubber(): + """Test that custom scrubber can be provided and is used""" + + def custom_scrubber(new_request, original_url): + # Custom logic: never remove headers + pass + + def request_handler(request: httpx.Request): + if request.url == "https://evil.attacker.com/steal": + return httpx.Response(200, ) + return httpx.Response( + MOVED_PERMANENTLY, + headers={LOCATION_HEADER: "https://evil.attacker.com/steal"}, + ) + + options = RedirectHandlerOption(scrub_sensitive_headers=custom_scrubber) + handler = RedirectHandler(options) + request = httpx.Request( + 'GET', + BASE_URL, + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET" + }, + ) + mock_transport = httpx.MockTransport(request_handler) + resp = await handler.send(request, mock_transport) + assert resp.status_code == 200 + # Headers should be kept because custom scrubber doesn't remove them + assert AUTHORIZATION_HEADER in resp.request.headers + assert "Cookie" in resp.request.headers + + +def test_default_scrub_sensitive_headers_removes_on_host_change(): + """Test that default scrubber removes Authorization and Cookie when host changes""" + from kiota_http.middleware.options.redirect_handler_option import default_scrub_sensitive_headers + + original_url = httpx.URL("https://example.com/v1/api") + new_request = httpx.Request( + "GET", + "https://other.com/api", + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET", + "Content-Type": "application/json" + } + ) + + default_scrub_sensitive_headers(new_request, original_url) + + assert AUTHORIZATION_HEADER not in new_request.headers + assert "Cookie" not in new_request.headers + assert "Content-Type" in new_request.headers # Other headers should remain + + +def test_default_scrub_sensitive_headers_removes_on_scheme_change(): + """Test that default scrubber removes Authorization and Cookie when scheme changes""" + from kiota_http.middleware.options.redirect_handler_option import default_scrub_sensitive_headers + + original_url = httpx.URL("https://example.com/v1/api") + new_request = httpx.Request( + "GET", + "http://example.com/v1/api", # NOSONAR + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET", + "Content-Type": "application/json" + } + ) + + default_scrub_sensitive_headers(new_request, original_url) + + assert AUTHORIZATION_HEADER not in new_request.headers + assert "Cookie" not in new_request.headers + assert "Content-Type" in new_request.headers + + +def test_default_scrub_sensitive_headers_keeps_on_same_origin(): + """Test that default scrubber keeps headers when host and scheme are the same""" + from kiota_http.middleware.options.redirect_handler_option import default_scrub_sensitive_headers + + original_url = httpx.URL("https://example.com/v1/api") + new_request = httpx.Request( + "GET", + "https://example.com/v2/api", + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET", + "Content-Type": "application/json" + } + ) + + default_scrub_sensitive_headers(new_request, original_url) + + assert AUTHORIZATION_HEADER in new_request.headers + assert "Cookie" in new_request.headers + assert "Content-Type" in new_request.headers + + +def test_default_scrub_sensitive_headers_removes_on_port_change(): + """Test that default scrubber removes Authorization and Cookie when port changes""" + from kiota_http.middleware.options.redirect_handler_option import default_scrub_sensitive_headers + + original_url = httpx.URL("http://example.org:8080/foo") # NOSONAR + new_request = httpx.Request( + "GET", + "http://example.org:9090/bar", # NOSONAR + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET", + "Content-Type": "application/json" + } + ) + + default_scrub_sensitive_headers(new_request, original_url) + + assert AUTHORIZATION_HEADER not in new_request.headers + assert "Cookie" not in new_request.headers + assert "Content-Type" in new_request.headers # Other headers should remain + + +def test_default_scrub_sensitive_headers_keeps_on_same_port(): + """Test that default scrubber keeps headers when port is the same""" + from kiota_http.middleware.options.redirect_handler_option import default_scrub_sensitive_headers + + original_url = httpx.URL("http://example.org:8080/foo") # NOSONAR + new_request = httpx.Request( + "GET", + "http://example.org:8080/bar", # NOSONAR + headers={ + AUTHORIZATION_HEADER: "Bearer token", + "Cookie": "session=SECRET", + "Content-Type": "application/json" + } + ) + + default_scrub_sensitive_headers(new_request, original_url) + + assert AUTHORIZATION_HEADER in new_request.headers + assert "Cookie" in new_request.headers + assert "Content-Type" in new_request.headers + + +def test_default_scrub_sensitive_headers_handles_none_gracefully(): + """Test that default scrubber handles None/empty inputs gracefully""" + from kiota_http.middleware.options.redirect_handler_option import default_scrub_sensitive_headers + + # Should not raise exceptions + default_scrub_sensitive_headers(None, httpx.URL(BASE_URL)) + default_scrub_sensitive_headers(httpx.Request("GET", BASE_URL), None) + + +def test_custom_scrub_sensitive_headers(): + """Test that custom scrubber can be set on options""" + def custom_scrubber(new_request, original_url): + # Custom logic + pass + + options = RedirectHandlerOption(scrub_sensitive_headers=custom_scrubber) + assert options.scrub_sensitive_headers == custom_scrubber + + +def test_default_options_uses_default_scrubber(): + """Test that default options use the default scrubber""" + from kiota_http.middleware.options.redirect_handler_option import default_scrub_sensitive_headers + + options = RedirectHandlerOption() + assert options.scrub_sensitive_headers == default_scrub_sensitive_headers