Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,42 @@
from typing import Callable, Optional
from kiota_abstractions.request_option import RequestOption

import httpx

# Type alias for the scrub sensitive headers callback
ScrubSensitiveHeadersCallback = Callable[[httpx.Request, httpx.URL], None]


def default_scrub_sensitive_headers(new_request: httpx.Request, original_url: httpx.URL) -> None:
"""
The default implementation for scrubbing sensitive headers during redirects.
This method removes Authorization and Cookie headers when the host, scheme, or port changes.
Args:
new_request: The new redirect request to modify
original_url: The original request URL
"""
if not new_request or not original_url:
return

new_url = new_request.url
if not new_url:
return

# Remove Authorization and Cookie headers if the request's scheme, host, or port changes
is_different_origin = (
original_url.host != new_url.host or original_url.scheme != new_url.scheme
or original_url.port != new_url.port
)

if is_different_origin:
new_request.headers.pop("Authorization", None)
new_request.headers.pop("Cookie", None)

# Note: Proxy-Authorization is not handled here as proxy configuration in httpx
# is managed at the transport level and not accessible to middleware.
# In environments where this matters, the proxy configuration should be managed
# at the HTTP client level.


class RedirectHandlerOption(RequestOption):

Expand All @@ -15,7 +52,8 @@ def __init__(
self,
max_redirect: int = DEFAULT_MAX_REDIRECT,
should_redirect: bool = True,
allow_redirect_on_scheme_change: bool = False
allow_redirect_on_scheme_change: bool = False,
scrub_sensitive_headers: Optional[ScrubSensitiveHeadersCallback] = None
) -> None:

if max_redirect > self.MAX_MAX_REDIRECT:
Expand All @@ -28,6 +66,7 @@ def __init__(
self._max_redirect = max_redirect
self._should_redirect = should_redirect
self._allow_redirect_on_scheme_change = allow_redirect_on_scheme_change
self._scrub_sensitive_headers = scrub_sensitive_headers or default_scrub_sensitive_headers

@property
def max_redirect(self):
Expand Down Expand Up @@ -59,6 +98,16 @@ def allow_redirect_on_scheme_change(self):
def allow_redirect_on_scheme_change(self, value: bool):
self._allow_redirect_on_scheme_change = value

@property
def scrub_sensitive_headers(self) -> ScrubSensitiveHeadersCallback:
"""The callback for scrubbing sensitive headers during redirects.
Defaults to default_scrub_sensitive_headers."""
return self._scrub_sensitive_headers

@scrub_sensitive_headers.setter
def scrub_sensitive_headers(self, value: ScrubSensitiveHeadersCallback):
self._scrub_sensitive_headers = value

@staticmethod
def get_key() -> str:
return RedirectHandlerOption.REDIRECT_HANDLER_OPTION_KEY
64 changes: 18 additions & 46 deletions packages/http/httpx/kiota_http/middleware/redirect_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class RedirectHandler(BaseMiddleware):
}
STATUS_CODE_SEE_OTHER: int = 303
LOCATION_HEADER: str = "Location"
AUTHORIZATION_HEADER: str = "Authorization"

def __init__(self, options: RedirectHandlerOption = RedirectHandlerOption()) -> None:
super().__init__()
Expand Down Expand Up @@ -125,15 +124,31 @@ def _build_redirect_request(
"""
method = self._redirect_method(request, response)
url = self._redirect_url(request, response, options)
headers = self._redirect_headers(request, url, method)
stream = self._redirect_stream(request, method)

# Create the new request with the redirect URL and original headers
new_request = httpx.Request(
method=method,
url=url,
headers=headers,
headers=request.headers.copy(),
stream=stream,
extensions=request.extensions,
)

# Scrub sensitive headers before following the redirect
options.scrub_sensitive_headers(new_request, request.url)

# Update the Host header if not same origin
if not self._same_origin(url, request.url):
new_request.headers["Host"] = url.netloc.decode("ascii")

# Handle 303 See Other and other method changes
if method != request.method and method == "GET":
# If we've switched to a 'GET' request, strip any headers which
# are only relevant to the request body.
new_request.headers.pop("Content-Length", None)
new_request.headers.pop("Transfer-Encoding", None)

if hasattr(request, "context"):
new_request.context = request.context #type: ignore
new_request.options = {} #type: ignore
Expand Down Expand Up @@ -201,35 +216,6 @@ def _redirect_url(

return url

def _redirect_headers(
self, request: httpx.Request, url: httpx.URL, method: str
) -> httpx.Headers:
"""
Return the headers that should be used for the redirect request.
"""
headers = httpx.Headers(request.headers)

if not self._same_origin(url, request.url):
if not self.is_https_redirect(request.url, url):
# Strip Authorization headers when responses are redirected
# away from the origin. (Except for direct HTTP to HTTPS redirects.)
headers.pop("Authorization", None)

# Update the Host header.
headers["Host"] = url.netloc.decode("ascii")

if method != request.method and method == "GET":
# If we've switch to a 'GET' request, then strip any headers which
# are only relevant to the request body.
headers.pop("Content-Length", None)
headers.pop("Transfer-Encoding", None)

# We should use the client cookie store to determine any cookie header,
# rather than whatever was on the original outgoing request.
headers.pop("Cookie", None)

return headers

def _redirect_stream(
self, request: httpx.Request, method: str
) -> typing.Optional[typing.Union[httpx.SyncByteStream, httpx.AsyncByteStream]]:
Expand All @@ -246,17 +232,3 @@ def _same_origin(self, url: httpx.URL, other: httpx.URL) -> bool:
Return 'True' if the given URLs share the same origin.
"""
return (url.scheme == other.scheme and url.host == other.host)

def port_or_default(self, url: httpx.URL) -> typing.Optional[int]:
if url.port is not None:
return url.port
return {"http": 80, "https": 443}.get(url.scheme)

def is_https_redirect(self, url: httpx.URL, location: httpx.URL) -> bool:
"""
Return 'True' if 'location' is a HTTPS upgrade of 'url'
"""
if url.host != location.host:
return False

return (url.scheme == "http" and location.scheme == "https")
Loading