From ba341eb57734f2e2e4bf01cf182e860af53bb128 Mon Sep 17 00:00:00 2001 From: giulio-leone Date: Fri, 13 Mar 2026 11:39:12 +0100 Subject: [PATCH] fix: handle missing usage metadata on premature Anthropic stream termination When the Anthropic API stream terminates before sending the message_stop event (e.g. network timeout, connection reset), the code crashes with AttributeError because event.message.usage is None. The stream() method unconditionally accessed event.message.usage after the async iteration loop, assuming a complete stream. Two failure modes: 1. Empty stream: 'event' variable is never assigned (UnboundLocalError) 2. Premature termination: event.message or event.message.usage is None Fix: Initialize event=None before the loop, use safe attribute access via getattr() chain, and emit zero-usage metadata with a warning log when usage data is unavailable. Added two regression tests: - test_stream_premature_termination: stream ends without message.usage - test_stream_empty_no_events: completely empty stream Fixes #1868 --- src/strands/models/anthropic.py | 14 ++++++-- tests/strands/models/test_anthropic.py | 50 +++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/src/strands/models/anthropic.py b/src/strands/models/anthropic.py index b5f6fcf91..89a7eefd4 100644 --- a/src/strands/models/anthropic.py +++ b/src/strands/models/anthropic.py @@ -405,12 +405,22 @@ async def stream( try: async with self.client.messages.stream(**request) as stream: logger.debug("got response from model") + event = None async for event in stream: if event.type in AnthropicModel.EVENT_TYPES: yield self.format_chunk(event.model_dump()) - usage = event.message.usage # type: ignore - yield self.format_chunk({"type": "metadata", "usage": usage.model_dump()}) + usage = getattr(getattr(event, "message", None), "usage", None) if event else None + if usage is not None: + yield self.format_chunk({"type": "metadata", "usage": usage.model_dump()}) + else: + logger.warning("stream ended without usage metadata (possible premature termination)") + yield self.format_chunk( + { + "type": "metadata", + "usage": {"input_tokens": 0, "output_tokens": 0}, + } + ) except anthropic.RateLimitError as error: raise ModelThrottledException(str(error)) from error diff --git a/tests/strands/models/test_anthropic.py b/tests/strands/models/test_anthropic.py index c5aff8062..8612ed1b6 100644 --- a/tests/strands/models/test_anthropic.py +++ b/tests/strands/models/test_anthropic.py @@ -739,7 +739,55 @@ async def test_stream(anthropic_client, model, agenerator, alist): @pytest.mark.asyncio -async def test_stream_rate_limit_error(anthropic_client, model, alist): +async def test_stream_premature_termination(anthropic_client, model, agenerator, alist): + """Test that stream handles premature termination without crashing. + + When the Anthropic API stream ends before message_stop (e.g. network + timeout), event.message.usage may be None. The code must not crash + with AttributeError. + + Regression test for #1868. + """ + mock_event_1 = unittest.mock.Mock( + type="message_start", + model_dump=lambda: {"type": "message_start"}, + ) + # Last event has no .message attribute (simulating premature termination) + mock_event_2 = unittest.mock.Mock( + type="content_block_stop", + model_dump=lambda: {"type": "content_block_stop", "index": 0}, + spec=["type", "model_dump"], + ) + + mock_context = unittest.mock.AsyncMock() + mock_context.__aenter__.return_value = agenerator([mock_event_1, mock_event_2]) + anthropic_client.messages.stream.return_value = mock_context + + messages = [{"role": "user", "content": [{"text": "hello"}]}] + response = model.stream(messages, None, None) + + # Should not raise AttributeError + tru_events = await alist(response) + + # Should still yield a metadata event with zero usage + assert any("metadata" in str(e) for e in tru_events) + + +@pytest.mark.asyncio +async def test_stream_empty_no_events(anthropic_client, model, agenerator, alist): + """Test that stream handles an empty event sequence without crashing.""" + mock_context = unittest.mock.AsyncMock() + mock_context.__aenter__.return_value = agenerator([]) + anthropic_client.messages.stream.return_value = mock_context + + messages = [{"role": "user", "content": [{"text": "hello"}]}] + response = model.stream(messages, None, None) + + # Should not raise UnboundLocalError or AttributeError + tru_events = await alist(response) + + # Should still yield a metadata event with zero usage + assert any("metadata" in str(e) for e in tru_events) anthropic_client.messages.stream.side_effect = anthropic.RateLimitError( "rate limit", response=unittest.mock.Mock(), body=None )