diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a5de9d..2191456 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ We [keep a changelog.](http://keepachangelog.com/) ## [Unreleased] -## [1.7.0] - 2026-04-XX +## [1.7.0] - 2026-05-01 ### Added @@ -20,6 +20,9 @@ We [keep a changelog.](http://keepachangelog.com/) - Native dynamic routing support for Mailgun Optimize, Validations Service, and Email Preview APIs without requiring new custom handlers. - Explicit support for raw MIME string (`multipart/form-data`) uploads via the `files` parameter in the `.create()` method (essential for `client.mimemessage`). - Advanced path interpolation in `handle_default` to automatically inject inline URL parameters (e.g., `/v2/x509/{domain}/status`). +- Added `MailgunTimeoutError` (inheriting from `ApiError` and `TimeoutError`) to cleanly distinguish API connection timeouts from standard system timeouts. +- Implemented `ROUTE_ALIASES` in the configuration engine to safely route virtual SDK properties (e.g., `domains_webhooks`) without hardcoding intercept logic. +- Added `TimeoutType` type alias for cleaner and more robust type hinting across the HTTP client. - Added a new "Logging & Debugging" section to `README.md`. - An intelligent live meta-testing suite (`test_routing_meta_live.py`) to strictly verify SDK endpoint aliases against live Mailgun servers. - PEP 561 Compliance: Added a `py.typed` marker to expose the SDK's strict type hints to downstream users (`mypy`, `pyright`). @@ -38,6 +41,9 @@ We [keep a changelog.](http://keepachangelog.com/) - **Performance**: Memoized internal route resolution logic using `@lru_cache` in `_get_cached_route_data`, eliminating redundant string splitting and dictionary lookups during repeated API calls. - Updated `DOMAIN_ENDPOINTS` mapping to reflect Mailgun's latest architecture, officially moving `tracking`, `click`, `open`, `unsubscribe`, and `webhooks` from `v1` to `v3`. - Modernized the codebase using modern Python idioms (e.g., `contextlib.suppress`) and resolved strict typing errors for `pyright`. +- Abstracted HTTP header manipulation into a centralized `_merge_headers` method in `BaseEndpoint`, eliminating DRY violations across all sync and async HTTP verbs. +- Hardened all URL handlers (`domains`, `ips`, `keys`, `mailinglists`, `metrics`, `routes`, `suppressions`, `tags`) to use `.rstrip("/")` and safe `.get("keys", [])` dictionary lookups, preventing `404 Not Found` and `KeyError` crashes from malformed internal configurations. +- Replaced `pass` blocks with explicit `logging.warning` in integration tests to surface ignored 404s gracefully. - **Documentation**: Migrated all internal and public docstrings from legacy Sphinx/reST format to modern Google Style for cleaner readability and better IDE hover-hints. - Updated Dependabot configuration to group minor and patch updates and limit open PRs. - CI/CD Optimization: Grouped Dependabot updates (`minor-and-patch`) to reduce Pull Request noise and optimized `.editorconfig`. @@ -63,6 +69,12 @@ We [keep a changelog.](http://keepachangelog.com/) - Fixed DKIM selector test names to strictly comply with RFC 6376 formatting (replaced underscores with hyphens). - Python Data Model Integrity: The Catch-All router (`__getattr__`) now strictly rejects Python magic methods (`__dunder__`), preventing crashes when using `hasattr()`, `pickle`, or `copy.deepcopy()`. - Version Drift: Corrected endpoints for `spamtraps` and `ip_whitelist` to route to their modern `v2` Mailgun backends. +- Fixed a `TypeError: got multiple values for keyword argument 'headers'` crash when passing custom headers to `.get()`, `.put()`, `.patch()`, and `.delete()` methods by safely popping headers from `kwargs` before argument unpacking. +- Fixed a routing bug where the greedy `domains` router swallowed the `domains_webhooks` identifier, causing webhook payload updates to drop the `webhook_name` and hit the wrong API endpoint. +- Fixed a bug where `AsyncClient` transports were permanently closed after exiting an `async with` context manager, allowing safe client reuse across multiple blocks. +- Fixed `AttributeError` traceback leakage by strictly suppressing internal `KeyError`s from the dynamic router (`raise ... from None`). +- Fixed a silent string-concatenation bug that could generate invalid double-slashes (`//`) in base URLs during the `Config` engine initialization. +- Fixed `email_validation_examples.py` to correctly `raise` the `ValueError` on empty files instead of failing silently. ### Security @@ -70,9 +82,12 @@ We [keep a changelog.](http://keepachangelog.com/) - OWASP Input Validation: Added strict sanitization in `Client._validate_auth` to strip trailing whitespace and block HTTP Header Injection attacks (rejecting `\n` and `\r` characters in API keys). - CWE-113 (HTTP Header Injection): Implemented strict CRLF (`\r\n`) sanitization inside `SecurityGuard.sanitize_headers` to block malicious header manipulation. - Supply Chain Security: Patched a potential OS Command Injection vulnerability in GitHub Actions (`publish.yml`) by safely routing `github.*` contexts through environment variables. +- CWE-22 (Path Traversal): Enforced strict URL-encoding via `sanitize_path_segment` on `webhook_name` parameters to neutralize path traversal injection attempts in the `handle_webhooks` router. ### Pull Requests Merged +- [PR_39](https://github.com/mailgun/mailgun-python/pull/39) - Release 1.7.0 +- [PR_38](https://github.com/mailgun/mailgun-python/pull/38) - build(deps): Bump conda-incubator/setup-miniconda from 3.3.0 to 4.0.1 - [PR_36](https://github.com/mailgun/mailgun-python/pull/36) - Improve client, update & fix tests - [PR_35](https://github.com/mailgun/mailgun-python/pull/35) - Removed \_prepare_files logic - [PR_34](https://github.com/mailgun/mailgun-python/pull/34) - Improve the Config class and routes diff --git a/README.md b/README.md index 5e49060..eae1529 100644 --- a/README.md +++ b/README.md @@ -140,11 +140,11 @@ To build the `mailgun` package from the sources you need `setuptools` (as a buil ### Runtime dependencies -At runtime the package requires only `requests >=2.32.5`. For async support, it uses `httpx` and `typing-extensions >=4.7.1` for Python `<3.11`. +At runtime the package requires only `requests >=2.33.0`. For async support, it uses `httpx >=0.24` and `typing-extensions >=4.7.1` for Python `<3.11`. ### Test dependencies -For running test you need `pytest >=7.0.0` at least. Make sure to provide the environment variables from +For running test you need `pytest >=9.0.3`, `pytest-asyncio`, and `responses` at least. Make sure to provide the environment variables from [Authentication](#authentication). ## Installation diff --git a/mailgun/_version.py b/mailgun/_version.py index bcbf3fb..14d9d2f 100644 --- a/mailgun/_version.py +++ b/mailgun/_version.py @@ -1 +1 @@ -__version__ = "1.6.0.post1.dev77" \ No newline at end of file +__version__ = "1.7.0" diff --git a/mailgun/client.py b/mailgun/client.py index 166b963..f00d4e2 100644 --- a/mailgun/client.py +++ b/mailgun/client.py @@ -24,7 +24,7 @@ from enum import Enum from functools import lru_cache from types import MappingProxyType -from typing import TYPE_CHECKING, Any, Final +from typing import TYPE_CHECKING, Any, Final, TypeAlias from urllib.parse import unquote, urlparse import httpx @@ -36,8 +36,7 @@ from urllib3.util.retry import Retry from mailgun import routes -from mailgun._version import __version__ -from mailgun.handlers.error_handler import ApiError +from mailgun.handlers.error_handler import ApiError, MailgunTimeoutError if sys.version_info >= (3, 11): @@ -82,6 +81,8 @@ _TIMEOUT_TUPLE_LEN: Final[int] = 2 _DEFAULT_TIMEOUT = 60.0 +# Type Aliases for SDK Signatures +TimeoutType: TypeAlias = float | tuple[float, float] | None # ============================================================================== # 2. CORE TYPES & SECURITY GUARDRAILS @@ -257,9 +258,7 @@ def sanitize_http_method(cls, method: str) -> str: return safe_method @classmethod - def sanitize_timeout( - cls, timeout: float | tuple[float, float] | None - ) -> float | tuple[float, float] | None: + def sanitize_timeout(cls, timeout: TimeoutType) -> TimeoutType: """Prevent Infinite Timeout Thread Exhaustion (DoS). Args: @@ -285,7 +284,7 @@ def _ensure_positive(val: Any) -> float: return f_val if isinstance(timeout, tuple) and len(timeout) == _TIMEOUT_TUPLE_LEN: - return (_ensure_positive(timeout[0]), _ensure_positive(timeout[1])) + return _ensure_positive(timeout[0]), _ensure_positive(timeout[1]) return _ensure_positive(timeout) @classmethod @@ -451,6 +450,9 @@ def _get_cached_route_data(clean_key: str) -> dict[str, Any]: Returns: A dictionary containing versioning and path data for the route. """ + # Resolve virtual property aliases before processing + clean_key = routes.ROUTE_ALIASES.get(clean_key, clean_key) + if clean_key in routes.EXACT_ROUTES: version, route_keys = routes.EXACT_ROUTES[clean_key] return {"version": version, "keys": tuple(route_keys)} @@ -529,8 +531,8 @@ def _build_base_url(self, version: APIVersion | str, suffix: str = "") -> str: The fully constructed base URL string. """ ver_str: str = version.value if isinstance(version, APIVersion) else version - # O(1) access instead of dynamic concatenation - base: str = self._baked_urls.get(ver_str, f"{self.api_url}/{ver_str}") + # O(1) access instead of dynamic concatenation, ensuring no trailing slash + base: str = self._baked_urls.get(ver_str, f"{self.api_url}/{ver_str}").rstrip("/") if suffix: path: str = f"{suffix}/" if suffix == self._DOMAINS_RESOURCE else suffix @@ -697,7 +699,7 @@ def __init__( url: dict[str, Any], headers: dict[str, str], auth: tuple[str, str] | None, - timeout: float | tuple[float, float] | None = 60, + timeout: TimeoutType = 60, ) -> None: """Initialize a new BaseEndpoint instance. @@ -774,15 +776,29 @@ def build_url( return handler(url, domain, method, **kwargs) # type: ignore[no-untyped-call] + def _merge_headers(self, kwargs: dict[str, Any]) -> dict[str, str]: + """Safely extract and merge custom headers from kwargs. + + Returns: + A dictionary containing the safely merged headers. + """ + custom_headers = kwargs.pop("headers", {}) + req_headers = self.headers.copy() + + if custom_headers and isinstance(custom_headers, dict): + req_headers.update(custom_headers) + + return req_headers + def _prepare_request( self, method: str, url: dict[str, Any], domain: str | None, - timeout: float | tuple[float, float] | None, + timeout: TimeoutType, headers: dict[str, str], kwargs: dict[str, Any], - ) -> tuple[str, str, str, float | tuple[float, float] | None, dict[str, str], dict[str, Any]]: + ) -> tuple[str, str, str, TimeoutType, dict[str, str], dict[str, Any]]: """Security and routing preparation logic. Args: @@ -887,10 +903,10 @@ def __getattr__(self, name: str) -> Any: session=self._session, timeout=self.timeout, ) - except KeyError as e: + except KeyError: # __getattr__ must return AttributeError msg = f"'{self.__class__.__name__}' object has no attribute '{name}'" - raise AttributeError(msg) from e + raise AttributeError(msg) from None def close(self) -> None: """Close the underlying requests.Session connection pool and purge memory.""" @@ -928,7 +944,7 @@ def __init__( headers: dict[str, str], auth: tuple[str, str] | None = None, session: requests.Session | None = None, - timeout: float | tuple[float, float] | None = 60, + timeout: TimeoutType = 60, ) -> None: """Initialize a new Endpoint instance for synchronous API interaction. @@ -949,7 +965,7 @@ def api_call( headers: dict[str, str], data: Any | None = None, filters: Mapping[str, str | Any] | None = None, - timeout: float | tuple[float, float] | None = None, + timeout: TimeoutType = None, files: Any | None = None, domain: str | None = None, **kwargs: Any, @@ -972,13 +988,22 @@ def api_call( The HTTP response object from the server. Raises: - TimeoutError: If the request times out. + MailgunTimeoutError: If the request times out. ApiError: If the server returns a 4xx or 5xx status code or a network error occurs. """ safe_method, target_url, safe_url_for_log, safe_timeout, safe_headers, safe_kwargs = ( self._prepare_request(method, url, domain, timeout, headers, kwargs) ) + # Case-insensitive validation for Content-Type to conform with RFC 7230 + is_json_request = any( + k.lower() == "content-type" and "application/json" in str(v).lower() + for k, v in safe_headers.items() + ) + + if is_json_request and data is not None and not isinstance(data, (str, bytes)): + data = json.dumps(data, separators=(",", ":")) + req_method = getattr(self._session, safe_method.lower()) sys.audit("mailgun.api.request", safe_method.upper(), safe_url_for_log) @@ -1015,7 +1040,7 @@ def api_call( except requests.exceptions.Timeout as e: logger.exception("Timeout Error: %s %s", safe_method.upper(), safe_url_for_log) - raise TimeoutError from e + raise MailgunTimeoutError("Request timed out") from e except RequestsConnectionError as e: logger.critical("Connection Failed (DNS/Network): %s | URL: %s", e, safe_url_for_log) msg = f"Network routing failed: {e}" @@ -1042,12 +1067,13 @@ def get( Returns: The HTTP response object. """ + merged_headers = self._merge_headers(kwargs) return self.api_call( self._auth, "get", self._url, domain=domain, - headers=self.headers, + headers=merged_headers, filters=filters, **kwargs, ) @@ -1074,16 +1100,9 @@ def create( Returns: The HTTP response object. """ - req_headers = self.headers.copy() - if headers and isinstance(headers, dict): - req_headers.update(headers) - - if ( - req_headers.get("Content-Type") == "application/json" - and data is not None - and not isinstance(data, (str, bytes)) - ): - data = json.dumps(data, separators=(",", ":")) + if headers is not None: + kwargs["headers"] = headers + merged_headers = self._merge_headers(kwargs) return self.api_call( self._auth, @@ -1091,7 +1110,7 @@ def create( self._url, files=files, domain=domain, - headers=req_headers, + headers=merged_headers, data=data, filters=filters, **kwargs, @@ -1110,8 +1129,15 @@ def put( Returns: The HTTP response object. """ + merged_headers = self._merge_headers(kwargs) return self.api_call( - self._auth, "put", self._url, headers=self.headers, data=data, filters=filters, **kwargs + self._auth, + "put", + self._url, + headers=merged_headers, + data=data, + filters=filters, + **kwargs, ) def patch( @@ -1127,12 +1153,13 @@ def patch( Returns: The HTTP response object. """ + merged_headers = self._merge_headers(kwargs) return self.api_call( self._auth, "patch", self._url, - headers=self.headers, data=data, + headers=merged_headers, filters=filters, **kwargs, ) @@ -1150,20 +1177,15 @@ def update( Returns: The HTTP response object. """ - custom_headers = kwargs.pop("headers", {}) - req_headers = self.headers.copy() - if custom_headers and isinstance(custom_headers, dict): - req_headers.update(custom_headers) - - if ( - req_headers.get("Content-Type") == "application/json" - and data is not None - and not isinstance(data, (str, bytes)) - ): - data = json.dumps(data, separators=(",", ":")) - + merged_headers = self._merge_headers(kwargs) return self.api_call( - self._auth, "put", self._url, headers=req_headers, data=data, filters=filters, **kwargs + self._auth, + "put", + self._url, + headers=merged_headers, + data=data, + filters=filters, + **kwargs, ) def delete(self, domain: str | None = None, **kwargs: Any) -> Response: @@ -1176,8 +1198,9 @@ def delete(self, domain: str | None = None, **kwargs: Any) -> Response: Returns: The HTTP response object. """ + merged_headers = self._merge_headers(kwargs) return self.api_call( - self._auth, "delete", self._url, headers=self.headers, domain=domain, **kwargs + self._auth, "delete", self._url, headers=merged_headers, domain=domain, **kwargs ) @@ -1197,7 +1220,7 @@ def __init__( headers: dict[str, str], auth: tuple[str, str] | None, client: httpx.AsyncClient | None = None, - timeout: float | tuple[float, float] | None = None, + timeout: TimeoutType = 60, ) -> None: """Initialize a new AsyncEndpoint instance for asynchronous API interaction. @@ -1218,7 +1241,7 @@ async def api_call( headers: dict[str, str], data: Any | None = None, filters: Mapping[str, str | Any] | None = None, - timeout: float | tuple[float, float] = 60, + timeout: TimeoutType = None, files: Any | None = None, domain: str | None = None, **kwargs: Any, @@ -1241,13 +1264,25 @@ async def api_call( The HTTP response object from the server. Raises: - TimeoutError: If the request times out. + MailgunTimeoutError: If the request times out. ApiError: If the server returns a 4xx or 5xx status code or a network error occurs. """ safe_method, target_url, safe_url_for_log, safe_timeout, safe_headers, safe_kwargs = ( self._prepare_request(method, url, domain, timeout, headers, kwargs) ) + if isinstance(safe_timeout, tuple): + safe_timeout = httpx.Timeout(safe_timeout[1], connect=safe_timeout[0]) + + # Case-insensitive validation for Content-Type to conform with RFC 7230 + is_json_request = any( + k.lower() == "content-type" and "application/json" in str(v).lower() + for k, v in safe_headers.items() + ) + + if is_json_request and data is not None and not isinstance(data, (str, bytes)): + data = json.dumps(data, separators=(",", ":")) + request_kwargs: dict[str, Any] = { "method": safe_method.upper(), "url": target_url, @@ -1291,7 +1326,7 @@ async def api_call( except httpx.TimeoutException as e: logger.exception("Timeout Error: %s %s", safe_method.upper(), safe_url_for_log) - raise TimeoutError from e + raise MailgunTimeoutError("Request timed out") from e except httpx.ConnectError as e: logger.critical( "Async Connection Failed (DNS/Network): %s | URL: %s", e, safe_url_for_log @@ -1320,12 +1355,13 @@ async def get( Returns: The HTTP response object. """ + merged_headers = self._merge_headers(kwargs) return await self.api_call( self._auth, "get", self._url, domain=domain, - headers=self.headers, + headers=merged_headers, filters=filters, **kwargs, ) @@ -1338,7 +1374,7 @@ async def create( headers: Any = None, files: Any | None = None, **kwargs: Any, - ) -> httpx.Response: + ) -> HttpxResponse: """Send an asynchronous POST request to create a new resource or execute an action. Args: @@ -1352,16 +1388,9 @@ async def create( Returns: The HTTP response object. """ - req_headers = self.headers.copy() - if headers and isinstance(headers, dict): - req_headers.update(headers) - - if ( - req_headers.get("Content-Type") == "application/json" - and data is not None - and not isinstance(data, (str, bytes)) - ): - data = json.dumps(data, separators=(",", ":")) + if headers is not None: + kwargs["headers"] = headers + merged_headers = self._merge_headers(kwargs) return await self.api_call( self._auth, @@ -1369,7 +1398,7 @@ async def create( self._url, files=files, domain=domain, - headers=req_headers, + headers=merged_headers, data=data, filters=filters, **kwargs, @@ -1377,7 +1406,7 @@ async def create( async def put( self, data: Any | None = None, filters: Mapping[str, str | Any] | None = None, **kwargs: Any - ) -> httpx.Response: + ) -> HttpxResponse: """Send an asynchronous PUT request to update or replace a resource. Args: @@ -1388,13 +1417,20 @@ async def put( Returns: The HTTP response object. """ + merged_headers = self._merge_headers(kwargs) return await self.api_call( - self._auth, "put", self._url, headers=self.headers, data=data, filters=filters, **kwargs + self._auth, + "put", + self._url, + headers=merged_headers, + data=data, + filters=filters, + **kwargs, ) async def patch( self, data: Any | None = None, filters: Mapping[str, str | Any] | None = None, **kwargs: Any - ) -> httpx.Response: + ) -> HttpxResponse: """Send an asynchronous PATCH request to partially update a resource. Args: @@ -1405,11 +1441,12 @@ async def patch( Returns: The HTTP response object. """ + merged_headers = self._merge_headers(kwargs) return await self.api_call( self._auth, "patch", self._url, - headers=self.headers, + headers=merged_headers, data=data, filters=filters, **kwargs, @@ -1417,7 +1454,7 @@ async def patch( async def update( self, data: Any | None, filters: Mapping[str, str | Any] | None = None, **kwargs: Any - ) -> httpx.Response: + ) -> HttpxResponse: """Send an asynchronous PUT request specifically structured for updating resources with dynamic headers. Args: @@ -1428,20 +1465,16 @@ async def update( Returns: The HTTP response object. """ - custom_headers = kwargs.pop("headers", {}) - req_headers = self.headers.copy() - if custom_headers and isinstance(custom_headers, dict): - req_headers.update(custom_headers) - - if ( - req_headers.get("Content-Type") == "application/json" - and data is not None - and not isinstance(data, (str, bytes)) - ): - data = json.dumps(data, separators=(",", ":")) + merged_headers = self._merge_headers(kwargs) return await self.api_call( - self._auth, "put", self._url, headers=req_headers, data=data, filters=filters, **kwargs + self._auth, + "put", + self._url, + headers=merged_headers, + data=data, + filters=filters, + **kwargs, ) async def delete(self, domain: str | None = None, **kwargs: Any) -> httpx.Response: @@ -1454,8 +1487,9 @@ async def delete(self, domain: str | None = None, **kwargs: Any) -> httpx.Respon Returns: The HTTP response object. """ + merged_headers = self._merge_headers(kwargs) return await self.api_call( - self._auth, "delete", self._url, headers=self.headers, domain=domain, **kwargs + self._auth, "delete", self._url, headers=merged_headers, domain=domain, **kwargs ) @@ -1508,9 +1542,9 @@ def __getattr__(self, name: str) -> Any: client=self._client, timeout=self.timeout, ) - except KeyError as e: + except KeyError: msg = f"'{self.__class__.__name__}' object has no attribute '{name}'" - raise AttributeError(msg) from e + raise AttributeError(msg) from None @property def _client(self) -> httpx.AsyncClient: @@ -1521,14 +1555,12 @@ def _client(self) -> httpx.AsyncClient: """ if not self._httpx_client or self._httpx_client.is_closed: # Check if the user already provided a custom transport (e.g. for mocking) - if "transport" not in self._client_kwargs: - # Expand connection pool for async high-throughput batching + kwargs = self._client_kwargs.copy() + if "transport" not in kwargs: limits = httpx.Limits(max_keepalive_connections=100, max_connections=100) - # Note: httpx retries only apply to connection errors, not 5xx HTTP statuses. - transport = httpx.AsyncHTTPTransport(retries=3, limits=limits) - self._client_kwargs["transport"] = transport + kwargs["transport"] = httpx.AsyncHTTPTransport(retries=3, limits=limits) - self._httpx_client = httpx.AsyncClient(**self._client_kwargs) + self._httpx_client = httpx.AsyncClient(**kwargs) return self._httpx_client async def aclose(self) -> None: diff --git a/mailgun/examples/email_validation_examples.py b/mailgun/examples/email_validation_examples.py index 25ac863..c4554bc 100644 --- a/mailgun/examples/email_validation_examples.py +++ b/mailgun/examples/email_validation_examples.py @@ -66,7 +66,7 @@ def post_bulk_list_validate() -> None: csv_data = csv_filepath.read_bytes() if not csv_data.startswith(b"") and not csv_data: - ValueError("File is empty.") + raise ValueError("File is empty.") files = {"file": csv_data} req = client.addressvalidate_bulk.create(domain=domain, files=files, list_name="python2_list") print(req.json()) diff --git a/mailgun/handlers/domains_handler.py b/mailgun/handlers/domains_handler.py index d4ae14b..9458242 100644 --- a/mailgun/handlers/domains_handler.py +++ b/mailgun/handlers/domains_handler.py @@ -52,7 +52,7 @@ def handle_domains( Raises: ApiError: If the domain is missing. """ - keys = list(url["keys"]) + keys = list(url.get("keys", [])) if "domains" in keys: keys.remove("domains") @@ -110,12 +110,12 @@ def handle_sending_queues( url: Incoming URL configuration dictionary. domain: Target domain name. _method: Incoming request method (unused in this handler). - **kwargs: Additional keyword arguments (e.g., 'domain_name'). + **_kwargs: Additional keyword arguments (e.g., 'domain_name'). Returns: The final URL for the sending queues endpoint. """ - keys = url["keys"] + keys = url.get("keys", []) if "sending_queues" in keys or "sendingqueues" in keys: base_clean = str(url["base"]).replace("domains/", "").replace("domains", "").rstrip("/") return f"{base_clean}/{domain}/sending_queues" @@ -142,15 +142,21 @@ def handle_mailboxes_credentials( Raises: ApiError: If the domain is missing. """ - keys = list(url["keys"]) + keys = list(url.get("keys", [])) + if "domains" in keys: keys.remove("domains") base_url = str(url["base"]).rstrip("/") - target_domain = kwargs.get("domain_name", domain) + + # Sanitize the target domain + raw_target_domain = kwargs.get("domain_name", domain) + target_domain = sanitize_path_segment(raw_target_domain) if raw_target_domain else None if not target_domain: - raise ApiError("Domain is missing!") + if keys: + raise ApiError("Domain is missing!") + return base_url path_segments = [target_domain, *keys] constructed_url = f"{base_url}/{'/'.join(path_segments)}" @@ -243,6 +249,7 @@ def handle_webhooks( if not is_v4 and webhook_name: # v3 API requires webhook name in the URL - return f"{domain_path}/{webhook_name}" + safe_webhook_name = sanitize_path_segment(webhook_name) + return f"{domain_path}/{safe_webhook_name}" return domain_path diff --git a/mailgun/handlers/email_validation_handler.py b/mailgun/handlers/email_validation_handler.py index 1fecd39..ded6e06 100644 --- a/mailgun/handlers/email_validation_handler.py +++ b/mailgun/handlers/email_validation_handler.py @@ -7,7 +7,7 @@ from typing import Any -from mailgun.handlers.utils import sanitize_path_segment +from mailgun.handlers.utils import build_path_from_keys, sanitize_path_segment def handle_address_validate( @@ -27,7 +27,7 @@ def handle_address_validate( Returns: The final URL for the email validation endpoint. """ - final_keys = "/" + "/".join(url["keys"][1:]) if url["keys"][1:] else "" + final_keys = build_path_from_keys(url.get("keys", [])[1:]) base_url = str(url["base"]).rstrip("/") if "list_name" in kwargs: diff --git a/mailgun/handlers/error_handler.py b/mailgun/handlers/error_handler.py index cf89f67..77a0450 100644 --- a/mailgun/handlers/error_handler.py +++ b/mailgun/handlers/error_handler.py @@ -16,6 +16,10 @@ class ApiError(Exception): """ +class MailgunTimeoutError(ApiError, TimeoutError): + """Raised when a request to the Mailgun API times out.""" + + class RouteNotFoundError(ApiError): """Raised when the requested Mailgun endpoint cannot be resolved.""" diff --git a/mailgun/handlers/ips_handler.py b/mailgun/handlers/ips_handler.py index e0fb41b..599ecea 100644 --- a/mailgun/handlers/ips_handler.py +++ b/mailgun/handlers/ips_handler.py @@ -28,7 +28,7 @@ def handle_ips( The final URL for the IPs endpoint. """ final_keys = build_path_from_keys(url.get("keys", [])) - base_url = url["base"][:-1] + final_keys + base_url = str(url["base"]).rstrip("/") + final_keys if "ip" in kwargs: safe_ip = sanitize_path_segment(kwargs["ip"]) return f"{base_url}/{safe_ip}" diff --git a/mailgun/handlers/keys_handler.py b/mailgun/handlers/keys_handler.py index d3d7e19..cd13f23 100644 --- a/mailgun/handlers/keys_handler.py +++ b/mailgun/handlers/keys_handler.py @@ -28,7 +28,7 @@ def handle_keys( The final URL for the Keys endpoint. """ final_keys = build_path_from_keys(url.get("keys", [])) - base_url = url["base"][:-1] + final_keys + base_url = str(url["base"]).rstrip("/") + final_keys if "key_id" in kwargs: safe_key = sanitize_path_segment(kwargs["key_id"]) return f"{base_url}/{safe_key}" diff --git a/mailgun/handlers/mailinglists_handler.py b/mailgun/handlers/mailinglists_handler.py index d9ef978..209799f 100644 --- a/mailgun/handlers/mailinglists_handler.py +++ b/mailgun/handlers/mailinglists_handler.py @@ -28,7 +28,7 @@ def handle_lists( The final URL for the mailing list endpoint. """ final_keys = build_path_from_keys(url.get("keys", [])) - base = url["base"][:-1] + base = str(url["base"]).rstrip("/") if "address" not in kwargs: return f"{base}{final_keys}" diff --git a/mailgun/handlers/metrics_handler.py b/mailgun/handlers/metrics_handler.py index 3f1338d..bd012f1 100644 --- a/mailgun/handlers/metrics_handler.py +++ b/mailgun/handlers/metrics_handler.py @@ -28,7 +28,7 @@ def handle_metrics( The final URL for the Metrics and Tags New endpoints. """ final_keys = build_path_from_keys(url.get("keys", [])) - base = url["base"][:-1] + base = str(url["base"]).rstrip("/") if "usage" in kwargs: safe_usage = sanitize_path_segment(kwargs["usage"]) diff --git a/mailgun/handlers/routes_handler.py b/mailgun/handlers/routes_handler.py index 323e0cf..4095b0b 100644 --- a/mailgun/handlers/routes_handler.py +++ b/mailgun/handlers/routes_handler.py @@ -28,7 +28,7 @@ def handle_routes( The final URL for the Routes endpoint. """ final_keys = build_path_from_keys(url.get("keys", [])) - base_url = url["base"][:-1] + final_keys + base_url = str(url["base"]).rstrip("/") + final_keys if "route_id" in kwargs: safe_route = sanitize_path_segment(kwargs["route_id"]) diff --git a/mailgun/handlers/suppressions_handler.py b/mailgun/handlers/suppressions_handler.py index 642919e..a327bac 100644 --- a/mailgun/handlers/suppressions_handler.py +++ b/mailgun/handlers/suppressions_handler.py @@ -28,7 +28,9 @@ def handle_bounces( The final URL for the Bounces endpoint. """ final_keys = build_path_from_keys(url.get("keys", [])) - base = f"{url['base']}{domain}{final_keys}" + base_url = str(url.get("base", "")).rstrip("/") + base = f"{base_url}/{domain}{final_keys}" + if "bounce_address" in kwargs: safe_addr = sanitize_path_segment(kwargs["bounce_address"]) return f"{base}/{safe_addr}" @@ -53,7 +55,9 @@ def handle_unsubscribes( The final URL for the Unsubscribes endpoint. """ final_keys = build_path_from_keys(url.get("keys", [])) - base = f"{url['base']}{domain}{final_keys}" + base_url = str(url.get("base", "")).rstrip("/") + base = f"{base_url}/{domain}{final_keys}" + if "unsubscribe_address" in kwargs: safe_addr = sanitize_path_segment(kwargs["unsubscribe_address"]) return f"{base}/{safe_addr}" @@ -78,7 +82,9 @@ def handle_complaints( The final URL for the Complaints endpoint. """ final_keys = build_path_from_keys(url.get("keys", [])) - base = f"{url['base']}{domain}{final_keys}" + base_url = str(url.get("base", "")).rstrip("/") + base = f"{base_url}/{domain}{final_keys}" + if "complaint_address" in kwargs: safe_addr = sanitize_path_segment(kwargs["complaint_address"]) return f"{base}/{safe_addr}" @@ -103,7 +109,9 @@ def handle_whitelists( The final URL for the Whitelists endpoint. """ final_keys = build_path_from_keys(url.get("keys", [])) - base = f"{url['base']}{domain}{final_keys}" + base_url = str(url.get("base", "")).rstrip("/") + base = f"{base_url}/{domain}{final_keys}" + if "whitelist_address" in kwargs: safe_addr = sanitize_path_segment(kwargs["whitelist_address"]) return f"{base}/{safe_addr}" diff --git a/mailgun/handlers/tags_handler.py b/mailgun/handlers/tags_handler.py index 84ad0f4..0220d84 100644 --- a/mailgun/handlers/tags_handler.py +++ b/mailgun/handlers/tags_handler.py @@ -28,10 +28,12 @@ def handle_tags( The final URL for the Tags endpoint. """ final_keys = build_path_from_keys(url.get("keys", [])) - base = f"{url['base']}{domain}/" + base_url = str(url.get("base", "")).rstrip("/") + + base = f"{base_url}/{domain}/" keys_without_tags = url.get("keys", [])[1:] - result_url = f"{url['base']}{domain}{final_keys}" + result_url = f"{base_url}/{domain}{final_keys}" if "tag_name" in kwargs: safe_tag = sanitize_path_segment(kwargs["tag_name"]) diff --git a/mailgun/routes.py b/mailgun/routes.py index 73e7aea..ec12308 100644 --- a/mailgun/routes.py +++ b/mailgun/routes.py @@ -154,6 +154,16 @@ DOMAIN_ENDPOINTS: Final = MappingProxyType(_DOMAIN_ENDPOINTS) +# --- ROUTE_ALIASES --- +# Maps virtual SDK properties to their actual routing resources. +# This prevents the greedy 'domains' router from swallowing specialized endpoints +# that require their own complex handlers (like webhooks). +_ROUTE_ALIASES: dict[str, str] = { + "domains_webhooks": "webhooks", +} +ROUTE_ALIASES: Final = MappingProxyType(_ROUTE_ALIASES) + + # --- DEPRECATED_ROUTES --- # String patterns to identify deprecated paths and their corresponding messages. # Defined as strings to prevent expensive regex compilation on cold boot. diff --git a/manage.sh b/manage.sh index 600b58a..4bb0df0 100644 --- a/manage.sh +++ b/manage.sh @@ -176,8 +176,8 @@ clean() { find . -type d -name '*.egg-info' -exec rm -rf {} + find . -type f -name '*.egg' -exec rm -f {} + - # Temp logs and profilers - rm -f *.prof profile.html profile.json tmp.txt wget-log +# Temp logs and profilers + rm -f ./*.prof ./profile.html ./profile.json ./tmp.txt ./wget-log success "Workspace cleaned!" } diff --git a/py.typed b/py.typed deleted file mode 100644 index e69de29..0000000 diff --git a/tests/integration/tests.py b/tests/integration/tests.py index 07b4645..e6ff256 100644 --- a/tests/integration/tests.py +++ b/tests/integration/tests.py @@ -6,11 +6,10 @@ import json import os import email.utils -import string +import logging +import unittest import subprocess import time -import unittest -import random from pathlib import Path from typing import Any, Callable from datetime import datetime, timedelta @@ -1512,8 +1511,8 @@ def test_maillists_lists_members_create(self) -> None: address=self.maillist_address, member_address=self.messages_to ) - except Exception: - pass # If it doesn't exist (404), that's perfectly fine + except Exception as e: + logging.getLogger(__name__).warning(f"Ignored integration error: {e}") # If it doesn't exist (404), that's perfectly fine # 2. Execute the actual creation test data = {"address": self.messages_to, "name": "Bob", "subscribed": True} @@ -2774,7 +2773,8 @@ def _safe_execute( try: return req.json() - except Exception: + except Exception as e: + logging.getLogger(__name__).warning(f"Ignored integration error: {e}") self.fail(f"API did not return JSON. Route: {req.url}. Response: {req.text}") # --- SUCCESSFUL ENDPOINTS --- @@ -2824,7 +2824,8 @@ def test_mtls_and_dkim(self) -> None: self._safe_execute(self.client.domains_tracking.get, domain=self.domain) try: self.client.x509_status.get(domain=self.domain) - except Exception: + except Exception as e: + logging.getLogger(__name__).warning(f"Ignored integration error: {e}") self.skipTest("x509 status returns 500 Server Error for accounts without active TLS certs") @@ -2928,9 +2929,6 @@ async def asyncSetUp(self) -> None: ) self.client: AsyncClient = AsyncClient(auth=self.auth) self.domain: str = os.environ["DOMAIN"] - random_domain_name = "".join( - random.choice(string.ascii_lowercase + string.digits) for _ in range(10) - ) self.test_domain: str = "python.test.com" self.post_domain_data: dict[str, str] = { "name": self.test_domain, @@ -4085,8 +4083,8 @@ async def test_maillists_lists_members_create(self) -> None: address=self.maillist_address, member_address=self.messages_to ) - except Exception: - pass + except Exception as e: + logging.getLogger(__name__).warning(f"Ignored integration error: {e}") data = {"address": self.messages_to, "name": "Bob", "subscribed": True} req = await self.client.lists_members.create(address=self.maillist_address, data=data) @@ -5219,7 +5217,8 @@ async def _safe_execute( try: return req.json() - except Exception: + except Exception as e: + logging.getLogger(__name__).warning(f"Ignored integration error: {e}") self.fail(f"Async API did not return JSON. Route: {req.url}. Response: {req.text}") # --- SUCCESSFUL ENDPOINTS --- @@ -5266,7 +5265,8 @@ async def test_mtls_and_dkim(self) -> None: await self._safe_execute(self.client.domains_tracking.get, domain=self.domain) try: await self.client.x509_status.get(domain=self.domain) - except Exception: + except Exception as e: + logging.getLogger(__name__).warning(f"Ignored integration error: {e}") self.skipTest("x509 status returns 500 Server Error for accounts without active TLS certs") diff --git a/tests/test_boot.py b/tests/test_boot.py index 76194f0..6ab4191 100644 --- a/tests/test_boot.py +++ b/tests/test_boot.py @@ -6,7 +6,7 @@ def boot_test() -> None: # Placing the import INSIDE the profiled function ensures we capture # the exact cost of Python crawling the disk to compile the modules. import mailgun.client - client = mailgun.client.Client(auth=("api", "key")) + _client = mailgun.client.Client(auth=("api", "key")) if __name__ == "__main__": profiler = cProfile.Profile() diff --git a/tests/test_perf.py b/tests/test_perf.py index e8540ab..489cfa7 100644 --- a/tests/test_perf.py +++ b/tests/test_perf.py @@ -1,7 +1,7 @@ import asyncio from collections.abc import Generator from concurrent.futures import ThreadPoolExecutor -from typing import Any +from typing import Any, Coroutine, cast import httpx import pytest @@ -138,4 +138,5 @@ def dispatch_batch() -> None: # Safely close the async client aclose_method = getattr(client, "aclose", None) if callable(aclose_method): - asyncio.run(aclose_method()) # pyright: ignore[reportArgumentType] + coro = cast(Coroutine[Any, Any, None], cast(object, aclose_method())) + asyncio.run(coro) diff --git a/tests/unit/test_async_client.py b/tests/unit/test_async_client.py index 4077945..8717c48 100644 --- a/tests/unit/test_async_client.py +++ b/tests/unit/test_async_client.py @@ -1,8 +1,7 @@ """Unit tests for mailgun.client (AsyncClient, AsyncEndpoint).""" -import json import copy -from typing import Any +import unittest from unittest.mock import AsyncMock, patch, MagicMock import httpx @@ -16,7 +15,8 @@ class TestAsyncEndpointPrepareFiles: """Tests for AsyncEndpoint._prepare_files.""" - def _make_endpoint(self) -> AsyncEndpoint: + @staticmethod + def _make_endpoint() -> AsyncEndpoint: url = {"base": f"{BASE_URL_V3}/", "keys": ["messages"]} return AsyncEndpoint( url=url, @@ -286,3 +286,72 @@ def test_async_client_connection_pooling_configured(self, mock_httpx: MagicMock, assert kwargs["retries"] == 3 assert kwargs["limits"].max_keepalive_connections == 100 assert kwargs["limits"].max_connections == 100 + + @pytest.mark.asyncio + @patch("httpx.AsyncClient.request") + @patch("httpx.AsyncHTTPTransport") + async def test_async_client_global_timeout_not_shadowed(self, mock_transport: MagicMock, mock_request: MagicMock) -> None: + """Verify that the global timeout is not shadowed by the method's default value.""" + + # Set up the mock and create a client with a unique global timeout + mock_request.return_value = MagicMock(status_code=200, spec=httpx.Response) + client = AsyncClient(auth=("api", "key"), timeout=999.0) + + # Make a request without specifying a timeout at the method level + await client.messages.create(domain="test.com", data={"to": "test@test.com"}) + + # Verify that the global timeout 999.0 was actually passed to httpx + mock_request.assert_called_once() + kwargs = mock_request.call_args[1] + + assert "timeout" in kwargs, "Timeout parameter is missing in request kwargs" + assert kwargs["timeout"] == 999.0, f"Expected timeout 999.0, got {kwargs['timeout']} (Shadowing bug detected!)" + + def test_async_client_getattr_suppresses_keyerror(self) -> None: + """Verify that accessing an invalid attribute raises AttributeError from None. + + This ensures internal KeyErrors from the routing dictionary do not leak + into the user's exception traceback (PEP 3134). + """ + client = AsyncClient(auth=("api", "key")) + + # We must use getattr() with illegal characters to bypass the dynamic catch-all router + # and forcefully trigger the internal KeyError inside Config/SecurityGuard. + with pytest.raises(AttributeError, match="'AsyncClient' object has no attribute '!@#'") as exc_info: + _ = getattr(client, "!@#") + + # Assert that 'from None' was used to break the exception chain + assert exc_info.value.__cause__ is None + assert exc_info.value.__suppress_context__ is True, "Internal KeyError is leaking! Use 'from None'." + + +class TestAsyncClientLifecycle(unittest.IsolatedAsyncioTestCase): + + @pytest.mark.asyncio + @patch("httpx.AsyncClient.request") + @patch("httpx.AsyncHTTPTransport") + async def test_async_client_context_manager_reuse(self, mock_transport_class: MagicMock, mock_request: MagicMock) -> None: + """Verify that reusing the AsyncClient creates a new transport.""" + mock_transport_instance = mock_transport_class.return_value + mock_transport_instance.aclose = AsyncMock() + + # Set up a fake response from the server + mock_response = MagicMock() + mock_response.status_code = 200 + mock_request.return_value = mock_response + + # Create a single instance of the client + client = AsyncClient(auth=("api", "key")) + + # First session (Creates transport, makes request, closes transport) + async with client: + await client.domains.get(domain_name="test.com") + + # The second session MUST NOT fail with a "Transport is closed" error + try: + async with client: + await client.domains.get(domain_name="test.com") + except RuntimeError as e: + if "closed" in str(e).lower(): + self.fail(f"Regression caught: Transport was reused after being closed! {e}") + raise # Re-raise the error if it's a different, unexpected RuntimeError diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 1f27861..b346c28 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -13,7 +13,7 @@ from mailgun.client import Endpoint from mailgun.client import SecurityGuard from mailgun.handlers.error_handler import ApiError -from tests.conftest import BASE_URL_V4, BASE_URL_V3, BASE_URL_V1 +from tests.conftest import BASE_URL_V4, BASE_URL_V3 class TestSecurityGuard: @@ -31,7 +31,8 @@ def test_sanitize_timeout_valid(self) -> None: assert SecurityGuard.sanitize_timeout(10.0) == 10.0 def test_sanitize_timeout_invalid(self) -> None: - assert SecurityGuard.sanitize_timeout(None) is None + with pytest.warns(DeprecationWarning, match="allows infinite socket blocking \\(CWE-400\\)"): + assert SecurityGuard.sanitize_timeout(None) is None def test_sanitize_domain_valid(self) -> None: assert SecurityGuard.sanitize_domain("test.com") == "test.com" @@ -207,6 +208,23 @@ def test_client_connection_pooling_configured(self) -> None: assert getattr(adapter, "_pool_connections", 10) == 100 assert getattr(adapter, "_pool_maxsize", 10) == 100 + def test_client_getattr_suppresses_keyerror(self) -> None: + """Verify that accessing an invalid attribute raises AttributeError from None. + + This ensures internal KeyErrors from the routing dictionary do not leak + into the user's exception traceback (PEP 3134). + """ + client = Client(auth=("api", "key")) + + # We must use getattr() with illegal characters to bypass the dynamic catch-all router + # and forcefully trigger the internal KeyError inside Config/SecurityGuard. + with pytest.raises(AttributeError, match="'Client' object has no attribute '!@#'") as exc_info: + _ = getattr(client, "!@#") + + # Assert that 'from None' was used to break the exception chain + assert exc_info.value.__cause__ is None + assert exc_info.value.__suppress_context__ is True, "Internal KeyError is leaking! Use 'from None'." + class TestBaseEndpointBuildUrl: """Tests for BaseEndpoint.build_url.""" @@ -422,7 +440,7 @@ def test_endpoint_slots_usage(self) -> None: assert not hasattr(ep, "__dict__"), "Endpoint should use __slots__ to save memory." with pytest.raises(AttributeError): - ep.undefined_attribute = "should_fail" # type: ignore[attr-defined] + setattr(ep, "undefined_attribute", "should_fail") @patch("requests.Session.request") def test_api_call_exception_chaining(self, mock_request: MagicMock) -> None: diff --git a/tests/unit/test_client_security.py b/tests/unit/test_client_security.py index 890f92f..334ebd6 100644 --- a/tests/unit/test_client_security.py +++ b/tests/unit/test_client_security.py @@ -1,12 +1,10 @@ """Unit tests for the new Security Guardrails and Performance optimizations in client.py.""" -import sys import pytest from unittest.mock import patch, AsyncMock, MagicMock import httpx import requests -from requests.exceptions import ConnectionError as RequestsConnectionError from mailgun.handlers.error_handler import ApiError from mailgun.handlers.utils import validate_mailgun_url @@ -186,3 +184,26 @@ def test_validate_mailgun_url_blocked() -> None: for url in invalid_urls: with pytest.raises(ValueError, match="CWE-918"): validate_mailgun_url(url) + +# ========================================== +# 6. CWE-22: Path traversal prevention +# ========================================== + +@patch("requests.Session.request") +def test_client_webhook_path_traversal_prevention(mock_request: MagicMock) -> None: + """Ensure the high-level Client API sanitizes malicious webhook names (CWE-22).""" + client = Client(auth=("api", "key")) + + # The user (or an attacker exploiting a user's script) passes a malicious ID + client.domains_webhooks.delete( + domain="test.com", + webhook_name="clicked/../../delete" + ) + + # Intercept the exact URL about to be sent over the wire + mock_request.assert_called_once() + target_url = mock_request.call_args[0][1] # request(method, url, ...) + + # The SDK must neutralize the payload to prevent escaping the /webhooks/ scope + assert "clicked%2F..%2F..%2Fdelete" in target_url + assert "clicked/../../delete" not in target_url, "Critical CWE-22 Vuln: Unsanitized path segment sent to network!" diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index c3f1fac..44158f3 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -193,3 +193,21 @@ def test_validate_api_url_warns_on_unrecognized_host(self, mock_warn: MagicMock) assert "SECURITY WARNING: Invalid API host 'custom.corporate.proxy'" in warning_msg assert "Ensure this is a trusted proxy" in warning_msg + + def test_build_base_url_prevents_double_slash(self) -> None: + """Verify that _build_base_url strips trailing slashes to prevent // in paths.""" + config = Config(api_url="https://api.mailgun.net") + + # Simulate a scenario where the baked URL accidentally has a trailing slash + config._baked_urls["v3"] = "https://api.mailgun.net/v3/" + + # Request a URL with a suffix + result_with_suffix = config._build_base_url("v3", suffix="domains") + + # Request a URL without a suffix + result_no_suffix = config._build_base_url("v3") + + assert result_with_suffix == "https://api.mailgun.net/v3/domains/" + assert result_no_suffix == "https://api.mailgun.net/v3/" + # The critical check: ensure no double slashes were formed + assert "//domains" not in result_with_suffix diff --git a/tests/unit/test_handlers.py b/tests/unit/test_handlers.py index 4a5dbc2..df07417 100644 --- a/tests/unit/test_handlers.py +++ b/tests/unit/test_handlers.py @@ -34,7 +34,6 @@ from mailgun.handlers.templates_handler import handle_templates from mailgun.handlers.users_handler import handle_users from tests.conftest import ( - parse_domain_name, TEST_DOMAIN, BASE_URL_V3, BASE_URL_V4, @@ -480,3 +479,16 @@ def test_domain_webhooks_v3_delete_fluent(self) -> None: def test_domain_webhooks_v4_delete_bulk(self) -> None: url = {"base": f"{BASE_URL_V3}/domains/", "keys": ["webhooks"]} assert handle_webhooks(url, TEST_DOMAIN, "delete", filters={"url": "https://hook.com"}) == f"{BASE_URL_V4}/domains/{TEST_DOMAIN}/webhooks" + + def test_domain_webhooks_path_traversal_prevention(self) -> None: + """Verify CWE-22 Path Traversal is blocked when a malicious webhook name is passed.""" + url = {"base": "https://api.mailgun.net/v3/domains/", "keys": ["webhooks"]} + malicious_name = "../../../delete_all" + + # Simulate a v3 webhook deletion call + result = handle_webhooks(url, "example.com", "delete", webhook_name=malicious_name) + + # The path traversal attempt MUST be URL-encoded, preventing directory escape. + # sanitize_path_segment will convert "../../../delete_all" into "..%2F..%2F..%2Fdelete_all" + assert result == "https://api.mailgun.net/v3/domains/example.com/webhooks/..%2F..%2F..%2Fdelete_all" + assert "../" not in result, "Path traversal characters were not sanitized!"