Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions src/strands/models/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,12 +405,22 @@ async def stream(
try:
async with self.client.messages.stream(**request) as stream:
logger.debug("got response from model")
event = None
async for event in stream:
if event.type in AnthropicModel.EVENT_TYPES:
yield self.format_chunk(event.model_dump())

usage = event.message.usage # type: ignore
yield self.format_chunk({"type": "metadata", "usage": usage.model_dump()})
usage = getattr(getattr(event, "message", None), "usage", None) if event else None
if usage is not None:
yield self.format_chunk({"type": "metadata", "usage": usage.model_dump()})
else:
logger.warning("stream ended without usage metadata (possible premature termination)")
yield self.format_chunk(
{
"type": "metadata",
"usage": {"input_tokens": 0, "output_tokens": 0},
}
)

except anthropic.RateLimitError as error:
raise ModelThrottledException(str(error)) from error
Expand Down
50 changes: 49 additions & 1 deletion tests/strands/models/test_anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,55 @@ async def test_stream(anthropic_client, model, agenerator, alist):


@pytest.mark.asyncio
async def test_stream_rate_limit_error(anthropic_client, model, alist):
async def test_stream_premature_termination(anthropic_client, model, agenerator, alist):
"""Test that stream handles premature termination without crashing.

When the Anthropic API stream ends before message_stop (e.g. network
timeout), event.message.usage may be None. The code must not crash
with AttributeError.

Regression test for #1868.
"""
mock_event_1 = unittest.mock.Mock(
type="message_start",
model_dump=lambda: {"type": "message_start"},
)
# Last event has no .message attribute (simulating premature termination)
mock_event_2 = unittest.mock.Mock(
type="content_block_stop",
model_dump=lambda: {"type": "content_block_stop", "index": 0},
spec=["type", "model_dump"],
)

mock_context = unittest.mock.AsyncMock()
mock_context.__aenter__.return_value = agenerator([mock_event_1, mock_event_2])
anthropic_client.messages.stream.return_value = mock_context

messages = [{"role": "user", "content": [{"text": "hello"}]}]
response = model.stream(messages, None, None)

# Should not raise AttributeError
tru_events = await alist(response)

# Should still yield a metadata event with zero usage
assert any("metadata" in str(e) for e in tru_events)


@pytest.mark.asyncio
async def test_stream_empty_no_events(anthropic_client, model, agenerator, alist):
"""Test that stream handles an empty event sequence without crashing."""
mock_context = unittest.mock.AsyncMock()
mock_context.__aenter__.return_value = agenerator([])
anthropic_client.messages.stream.return_value = mock_context

messages = [{"role": "user", "content": [{"text": "hello"}]}]
response = model.stream(messages, None, None)

# Should not raise UnboundLocalError or AttributeError
tru_events = await alist(response)

# Should still yield a metadata event with zero usage
assert any("metadata" in str(e) for e in tru_events)
anthropic_client.messages.stream.side_effect = anthropic.RateLimitError(
"rate limit", response=unittest.mock.Mock(), body=None
)
Expand Down