diff --git a/src/strands/models/anthropic.py b/src/strands/models/anthropic.py index b5f6fcf91..89a7eefd4 100644 --- a/src/strands/models/anthropic.py +++ b/src/strands/models/anthropic.py @@ -405,12 +405,22 @@ async def stream( try: async with self.client.messages.stream(**request) as stream: logger.debug("got response from model") + event = None async for event in stream: if event.type in AnthropicModel.EVENT_TYPES: yield self.format_chunk(event.model_dump()) - usage = event.message.usage # type: ignore - yield self.format_chunk({"type": "metadata", "usage": usage.model_dump()}) + usage = getattr(getattr(event, "message", None), "usage", None) if event else None + if usage is not None: + yield self.format_chunk({"type": "metadata", "usage": usage.model_dump()}) + else: + logger.warning("stream ended without usage metadata (possible premature termination)") + yield self.format_chunk( + { + "type": "metadata", + "usage": {"input_tokens": 0, "output_tokens": 0}, + } + ) except anthropic.RateLimitError as error: raise ModelThrottledException(str(error)) from error diff --git a/tests/strands/models/test_anthropic.py b/tests/strands/models/test_anthropic.py index c5aff8062..8612ed1b6 100644 --- a/tests/strands/models/test_anthropic.py +++ b/tests/strands/models/test_anthropic.py @@ -739,7 +739,55 @@ async def test_stream(anthropic_client, model, agenerator, alist): @pytest.mark.asyncio -async def test_stream_rate_limit_error(anthropic_client, model, alist): +async def test_stream_premature_termination(anthropic_client, model, agenerator, alist): + """Test that stream handles premature termination without crashing. + + When the Anthropic API stream ends before message_stop (e.g. network + timeout), event.message.usage may be None. The code must not crash + with AttributeError. + + Regression test for #1868. + """ + mock_event_1 = unittest.mock.Mock( + type="message_start", + model_dump=lambda: {"type": "message_start"}, + ) + # Last event has no .message attribute (simulating premature termination) + mock_event_2 = unittest.mock.Mock( + type="content_block_stop", + model_dump=lambda: {"type": "content_block_stop", "index": 0}, + spec=["type", "model_dump"], + ) + + mock_context = unittest.mock.AsyncMock() + mock_context.__aenter__.return_value = agenerator([mock_event_1, mock_event_2]) + anthropic_client.messages.stream.return_value = mock_context + + messages = [{"role": "user", "content": [{"text": "hello"}]}] + response = model.stream(messages, None, None) + + # Should not raise AttributeError + tru_events = await alist(response) + + # Should still yield a metadata event with zero usage + assert any("metadata" in str(e) for e in tru_events) + + +@pytest.mark.asyncio +async def test_stream_empty_no_events(anthropic_client, model, agenerator, alist): + """Test that stream handles an empty event sequence without crashing.""" + mock_context = unittest.mock.AsyncMock() + mock_context.__aenter__.return_value = agenerator([]) + anthropic_client.messages.stream.return_value = mock_context + + messages = [{"role": "user", "content": [{"text": "hello"}]}] + response = model.stream(messages, None, None) + + # Should not raise UnboundLocalError or AttributeError + tru_events = await alist(response) + + # Should still yield a metadata event with zero usage + assert any("metadata" in str(e) for e in tru_events) anthropic_client.messages.stream.side_effect = anthropic.RateLimitError( "rate limit", response=unittest.mock.Mock(), body=None )