diff --git a/src/anthropic/_streaming.py b/src/anthropic/_streaming.py index fc6943ee..e55f447c 100644 --- a/src/anthropic/_streaming.py +++ b/src/anthropic/_streaming.py @@ -17,6 +17,20 @@ from ._client import Anthropic, AsyncAnthropic from ._models import FinalRequestOptions +# Mapping from SSE error type to HTTP status code. +# When a mid-stream SSE error arrives, the HTTP response already has status 200. +# We use this map to derive the correct status code for error classification. +_SSE_ERROR_TYPE_TO_STATUS: dict[str, int] = { + "invalid_request_error": 400, + "authentication_error": 401, + "permission_error": 403, + "not_found_error": 404, + "request_too_large": 413, + "rate_limit_error": 429, + "api_error": 500, + "overloaded_error": 529, +} + _T = TypeVar("_T") @@ -111,10 +125,24 @@ def __stream__(self) -> Iterator[_T]: except Exception: err_msg = sse.data or f"Error code: {response.status_code}" + # Derive the correct status code from the error type when available. + # The HTTP response is already 200 (stream started), so we need to + # look at the error body to determine the right error class. + error_response = self.response + if is_dict(body): + error_type = body.get("error", {}).get("type") if is_dict(body.get("error")) else body.get("type") + mapped_status = _SSE_ERROR_TYPE_TO_STATUS.get(error_type or "", 0) + if mapped_status: + error_response = httpx.Response( + status_code=mapped_status, + headers=self.response.headers, + request=self.response.request, + ) + raise self._client._make_status_error( err_msg, body=body, - response=self.response, + response=error_response, ) finally: # Ensure the response is closed even if the consumer doesn't read all data @@ -231,10 +259,22 @@ async def __stream__(self) -> AsyncIterator[_T]: except Exception: err_msg = sse.data or f"Error code: {response.status_code}" + # Derive the correct status code from the error type when available. + error_response = self.response + if is_dict(body): + error_type = body.get("error", {}).get("type") if is_dict(body.get("error")) else body.get("type") + mapped_status = _SSE_ERROR_TYPE_TO_STATUS.get(error_type or "", 0) + if mapped_status: + error_response = httpx.Response( + status_code=mapped_status, + headers=self.response.headers, + request=self.response.request, + ) + raise self._client._make_status_error( err_msg, body=body, - response=self.response, + response=error_response, ) finally: # Ensure the response is closed even if the consumer doesn't read all data