Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions src/anthropic/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@
from ._client import Anthropic, AsyncAnthropic
from ._models import FinalRequestOptions

# Mapping from SSE error type to HTTP status code.
# When a mid-stream SSE error arrives, the HTTP response already has status 200.
# We use this map to derive the correct status code for error classification.
_SSE_ERROR_TYPE_TO_STATUS: dict[str, int] = {
"invalid_request_error": 400,
"authentication_error": 401,
"permission_error": 403,
"not_found_error": 404,
"request_too_large": 413,
"rate_limit_error": 429,
"api_error": 500,
"overloaded_error": 529,
}


_T = TypeVar("_T")

Expand Down Expand Up @@ -111,10 +125,24 @@ def __stream__(self) -> Iterator[_T]:
except Exception:
err_msg = sse.data or f"Error code: {response.status_code}"

# Derive the correct status code from the error type when available.
# The HTTP response is already 200 (stream started), so we need to
# look at the error body to determine the right error class.
error_response = self.response
if is_dict(body):
error_type = body.get("error", {}).get("type") if is_dict(body.get("error")) else body.get("type")
mapped_status = _SSE_ERROR_TYPE_TO_STATUS.get(error_type or "", 0)
if mapped_status:
error_response = httpx.Response(
status_code=mapped_status,
headers=self.response.headers,
request=self.response.request,
)

raise self._client._make_status_error(
err_msg,
body=body,
response=self.response,
response=error_response,
)
finally:
# Ensure the response is closed even if the consumer doesn't read all data
Expand Down Expand Up @@ -231,10 +259,22 @@ async def __stream__(self) -> AsyncIterator[_T]:
except Exception:
err_msg = sse.data or f"Error code: {response.status_code}"

# Derive the correct status code from the error type when available.
error_response = self.response
if is_dict(body):
error_type = body.get("error", {}).get("type") if is_dict(body.get("error")) else body.get("type")
mapped_status = _SSE_ERROR_TYPE_TO_STATUS.get(error_type or "", 0)
if mapped_status:
error_response = httpx.Response(
status_code=mapped_status,
headers=self.response.headers,
request=self.response.request,
)

raise self._client._make_status_error(
err_msg,
body=body,
response=self.response,
response=error_response,
)
finally:
# Ensure the response is closed even if the consumer doesn't read all data
Expand Down