From c1da19fcd139e9f6dd4d1ececc4900ffe127cfe1 Mon Sep 17 00:00:00 2001 From: Stefano Amorelli Date: Sun, 15 Mar 2026 20:13:20 +0200 Subject: [PATCH 1/4] feat(types): extend ModelStopReason with optional cost field I'm adding a 5th element to the ModelStopReason tuple to carry per-invocation cost data (in USD [1]) through the streaming pipeline. The field defaults to None so all existing model providers continue to work without changes. The MetadataEvent TypedDict also gets an optional cost field, which is where model providers will inject their cost before it reaches the stop event. Existing consumers of the stop tuple (anthropic, bedrock, and the summarizing conversation manager) now use *_ unpacking so they're forward-compatible with the new element. [1]: https://docs.litellm.ai/docs/completion/token_usage#critical-cost_per_token --- .../summarizing_conversation_manager.py | 2 +- src/strands/models/anthropic.py | 2 +- src/strands/models/bedrock.py | 2 +- src/strands/types/_events.py | 4 +++- src/strands/types/streaming.py | 2 ++ tests/strands/event_loop/test_streaming.py | 6 ++++++ .../event_loop/test_streaming_structured_output.py | 2 +- tests/strands/types/test__events.py | 13 ++++++++++++- 8 files changed, 27 insertions(+), 6 deletions(-) diff --git a/src/strands/agent/conversation_manager/summarizing_conversation_manager.py b/src/strands/agent/conversation_manager/summarizing_conversation_manager.py index abd4d08b5..922bbbd51 100644 --- a/src/strands/agent/conversation_manager/summarizing_conversation_manager.py +++ b/src/strands/agent/conversation_manager/summarizing_conversation_manager.py @@ -284,7 +284,7 @@ async def _call_model() -> Message: result_message: Message | None = None async for event in process_stream(chunks): if "stop" in event: - _, result_message, _, _ = event["stop"] + _, result_message, *_ = event["stop"] if result_message is None: raise RuntimeError("Failed to generate summary: no response from model") diff --git a/src/strands/models/anthropic.py b/src/strands/models/anthropic.py index b5f6fcf91..b71dc8dbc 100644 --- a/src/strands/models/anthropic.py +++ b/src/strands/models/anthropic.py @@ -450,7 +450,7 @@ async def structured_output( async for event in process_stream(response): yield event - stop_reason, messages, _, _ = event["stop"] + stop_reason, messages, *_ = event["stop"] if stop_reason != "tool_use": raise ValueError(f'Model returned stop_reason: {stop_reason} instead of "tool_use".') diff --git a/src/strands/models/bedrock.py b/src/strands/models/bedrock.py index bab4031ed..98d83c69a 100644 --- a/src/strands/models/bedrock.py +++ b/src/strands/models/bedrock.py @@ -1074,7 +1074,7 @@ async def structured_output( async for event in streaming.process_stream(response): yield event - stop_reason, messages, _, _ = event["stop"] + stop_reason, messages, *_ = event["stop"] if stop_reason != "tool_use": raise ValueError(f'Model returned stop_reason: {stop_reason} instead of "tool_use".') diff --git a/src/strands/types/_events.py b/src/strands/types/_events.py index 5b0ae78f6..0b131d010 100644 --- a/src/strands/types/_events.py +++ b/src/strands/types/_events.py @@ -198,6 +198,7 @@ def __init__( message: Message, usage: Usage, metrics: Metrics, + cost: float | None = None, ) -> None: """Initialize with the final execution results. @@ -206,8 +207,9 @@ def __init__( message: Final message from the model usage: Usage information from the model metrics: Execution metrics and performance data + cost: Cost in USD for the model invocation, if available from the provider. """ - super().__init__({"stop": (stop_reason, message, usage, metrics)}) + super().__init__({"stop": (stop_reason, message, usage, metrics, cost)}) @property @override diff --git a/src/strands/types/streaming.py b/src/strands/types/streaming.py index 8ec2e8d7b..3cb69436b 100644 --- a/src/strands/types/streaming.py +++ b/src/strands/types/streaming.py @@ -163,11 +163,13 @@ class MetadataEvent(TypedDict, total=False): metrics: Performance metrics related to the model invocation. trace: Trace information for debugging and monitoring. usage: Resource usage information for the model invocation. + cost: Cost in USD for the model invocation, as calculated by the model provider (e.g. LiteLLM). """ metrics: Metrics trace: Trace | None usage: Usage + cost: float class ExceptionEvent(TypedDict): diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index 6d376450a..80d0c8cd8 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -702,6 +702,7 @@ def test_extract_usage_metrics_empty_metadata(): }, {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, {"latencyMs": 1}, + None, ) }, ], @@ -833,6 +834,7 @@ def test_extract_usage_metrics_empty_metadata(): }, {"inputTokens": 5, "outputTokens": 10, "totalTokens": 15}, {"latencyMs": 100}, + None, ) }, ], @@ -853,6 +855,7 @@ def test_extract_usage_metrics_empty_metadata(): }, {"inputTokens": 0, "outputTokens": 0, "totalTokens": 0}, {"latencyMs": 0, "timeToFirstByteMs": 0}, + None, ), }, ], @@ -938,6 +941,7 @@ async def test_process_stream(response, exp_events, agenerator, alist): {"role": "assistant", "content": [{"text": "REDACTED."}]}, {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, {"latencyMs": 1}, + None, ) }, ], @@ -998,6 +1002,7 @@ async def test_process_stream(response, exp_events, agenerator, alist): }, {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, {"latencyMs": 1}, + None, ) }, ], @@ -1144,6 +1149,7 @@ async def test_stream_messages(agenerator, alist): {"role": "assistant", "content": [{"text": "test"}]}, {"inputTokens": 0, "outputTokens": 0, "totalTokens": 0}, {"latencyMs": 0, "timeToFirstByteMs": 0}, + None, ) }, ] diff --git a/tests/strands/event_loop/test_streaming_structured_output.py b/tests/strands/event_loop/test_streaming_structured_output.py index 4c4082c00..bab3ab792 100644 --- a/tests/strands/event_loop/test_streaming_structured_output.py +++ b/tests/strands/event_loop/test_streaming_structured_output.py @@ -145,7 +145,7 @@ async def test_stream_messages_with_forced_structured_output(agenerator, alist): break assert stop_event is not None - stop_reason, message, usage, metrics = stop_event["stop"] + stop_reason, message, usage, metrics, _cost = stop_event["stop"] assert stop_reason == "tool_use" assert message["role"] == "assistant" diff --git a/tests/strands/types/test__events.py b/tests/strands/types/test__events.py index 6163faeb6..9e02551bd 100644 --- a/tests/strands/types/test__events.py +++ b/tests/strands/types/test__events.py @@ -263,7 +263,18 @@ def test_initialization(self): metrics = Mock(spec=Metrics) event = ModelStopReason(stop_reason, message, usage, metrics) - assert event["stop"] == (stop_reason, message, usage, metrics) + assert event["stop"] == (stop_reason, message, usage, metrics, None) + assert event.is_callback_event is False + + def test_initialization_with_cost(self): + """Test ModelStopReason initialization with cost.""" + stop_reason = Mock(spec=StopReason) + message = Mock(spec=Message) + usage = Mock(spec=Usage) + metrics = Mock(spec=Metrics) + + event = ModelStopReason(stop_reason, message, usage, metrics, cost=0.0025) + assert event["stop"] == (stop_reason, message, usage, metrics, 0.0025) assert event.is_callback_event is False From f4d5aa900bc08e6ffb8b7d767d7de64aa7726676 Mon Sep 17 00:00:00 2001 From: Stefano Amorelli Date: Sun, 15 Mar 2026 20:13:31 +0200 Subject: [PATCH 2/4] feat(metrics): add accumulated_cost tracking to EventLoopMetrics EventLoopMetrics now has an accumulated_cost field (defaults to 0.0) and an update_cost() method that the event loop will call after each model invocation. The cost in USD [1] is included in get_summary() and displayed in the metrics summary output when it's greater than zero. This is the accumulation layer that sits between the streaming pipeline (which provides per-invocation cost) and the user-facing AgentResult. [1]: https://docs.litellm.ai/docs/completion/token_usage#critical-cost_per_token --- src/strands/telemetry/metrics.py | 12 +++++++++++ tests/strands/telemetry/test_metrics.py | 27 +++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/src/strands/telemetry/metrics.py b/src/strands/telemetry/metrics.py index 163df803a..548f0e08d 100644 --- a/src/strands/telemetry/metrics.py +++ b/src/strands/telemetry/metrics.py @@ -201,6 +201,7 @@ class EventLoopMetrics: traces: list[Trace] = field(default_factory=list) accumulated_usage: Usage = field(default_factory=lambda: Usage(inputTokens=0, outputTokens=0, totalTokens=0)) accumulated_metrics: Metrics = field(default_factory=lambda: Metrics(latencyMs=0)) + accumulated_cost: float = 0.0 @property def _metrics_client(self) -> "MetricsClient": @@ -348,6 +349,14 @@ def reset_usage_metrics(self) -> None: """ self.agent_invocations.append(AgentInvocation()) + def update_cost(self, cost: float) -> None: + """Update the accumulated cost with new cost data. + + Args: + cost: The cost in USD to add to the accumulated total. + """ + self.accumulated_cost += cost + def update_metrics(self, metrics: Metrics) -> None: """Update the accumulated performance metrics with new metrics data. @@ -391,6 +400,7 @@ def get_summary(self) -> dict[str, Any]: "traces": [trace.to_dict() for trace in self.traces], "accumulated_usage": self.accumulated_usage, "accumulated_metrics": self.accumulated_metrics, + "accumulated_cost": self.accumulated_cost, "agent_invocations": [ { "usage": invocation.usage, @@ -436,6 +446,8 @@ def _metrics_summary_to_lines(event_loop_metrics: EventLoopMetrics, allowed_name token_parts.append(f"cache_write_input_tokens={summary['accumulated_usage']['cacheWriteInputTokens']}") yield f"├─ Tokens: {', '.join(token_parts)}" + if summary["accumulated_cost"] > 0: + yield f"├─ Cost: ${summary['accumulated_cost']:.6f}" yield f"├─ Bedrock Latency: {summary['accumulated_metrics']['latencyMs']}ms" yield "├─ Tool Usage:" diff --git a/tests/strands/telemetry/test_metrics.py b/tests/strands/telemetry/test_metrics.py index 800bcebc4..fbaa45109 100644 --- a/tests/strands/telemetry/test_metrics.py +++ b/tests/strands/telemetry/test_metrics.py @@ -416,11 +416,38 @@ def test_event_loop_metrics_get_summary(trace, tool, event_loop_metrics, mock_ge "total_cycles": 0, "total_duration": 0, "traces": [], + "accumulated_cost": 0.0, } assert tru_summary == exp_summary +def test_accumulated_cost_default_zero(event_loop_metrics): + """Test that accumulated_cost starts at 0.0.""" + assert event_loop_metrics.accumulated_cost == 0.0 + + +def test_update_cost(event_loop_metrics): + """Test that update_cost adds to accumulated_cost.""" + event_loop_metrics.update_cost(0.0025) + assert event_loop_metrics.accumulated_cost == 0.0025 + + +def test_update_cost_accumulates(event_loop_metrics): + """Test that multiple update_cost calls accumulate.""" + event_loop_metrics.update_cost(0.001) + event_loop_metrics.update_cost(0.002) + event_loop_metrics.update_cost(0.003) + assert event_loop_metrics.accumulated_cost == pytest.approx(0.006) + + +def test_get_summary_includes_accumulated_cost(event_loop_metrics): + """Test that get_summary includes accumulated_cost.""" + event_loop_metrics.update_cost(0.0075) + summary = event_loop_metrics.get_summary() + assert summary["accumulated_cost"] == 0.0075 + + @pytest.mark.parametrize( ("trace", "child_trace", "tool_metrics", "exp_str"), [ From f7b4e7e0b97941720e9fdfa2d3361c256d965f59 Mon Sep 17 00:00:00 2001 From: Stefano Amorelli Date: Sun, 15 Mar 2026 20:13:41 +0200 Subject: [PATCH 3/4] feat(streaming): wire cost through process_stream and event loop process_stream() now extracts the cost field from MetadataEvent (if present) and passes it as the 5th element of the ModelStopReason tuple. On the event loop side, I unpack the cost and call EventLoopMetrics.update_cost() when a value is available. This connects the model layer (which calculates cost) to the metrics layer (which accumulates it), completing the data flow for any model provider that populates MetadataEvent.cost. --- src/strands/event_loop/event_loop.py | 4 +++- src/strands/event_loop/streaming.py | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/strands/event_loop/event_loop.py b/src/strands/event_loop/event_loop.py index 3b1e2d76a..df131471e 100644 --- a/src/strands/event_loop/event_loop.py +++ b/src/strands/event_loop/event_loop.py @@ -340,7 +340,7 @@ async def _handle_model_execution( ): yield event - stop_reason, message, usage, metrics = event["stop"] + stop_reason, message, usage, metrics, cost = event["stop"] invocation_state.setdefault("request_state", {}) after_model_call_event = AfterModelCallEvent( @@ -412,6 +412,8 @@ async def _handle_model_execution( # Update metrics agent.event_loop_metrics.update_usage(usage) agent.event_loop_metrics.update_metrics(metrics) + if cost is not None: + agent.event_loop_metrics.update_cost(cost) except Exception as e: yield ForceStopEvent(reason=e) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index b7d85ca30..128fcd6e6 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -397,6 +397,7 @@ async def process_stream( usage: Usage = Usage(inputTokens=0, outputTokens=0, totalTokens=0) metrics: Metrics = Metrics(latencyMs=0, timeToFirstByteMs=0) + cost: float | None = None async for chunk in chunks: # Check for cancellation during stream processing @@ -433,10 +434,12 @@ async def process_stream( int(1000 * (first_byte_time - start_time)) if (start_time and first_byte_time) else None ) usage, metrics = extract_usage_metrics(chunk["metadata"], time_to_first_byte_ms) + if "cost" in chunk["metadata"]: + cost = chunk["metadata"]["cost"] elif "redactContent" in chunk: handle_redact_content(chunk["redactContent"], state) - yield ModelStopReason(stop_reason=stop_reason, message=state["message"], usage=usage, metrics=metrics) + yield ModelStopReason(stop_reason=stop_reason, message=state["message"], usage=usage, metrics=metrics, cost=cost) async def stream_messages( From 7ee954438dc3a88b1b4716284c0a1e0d9c916117 Mon Sep 17 00:00:00 2001 From: Stefano Amorelli Date: Sun, 15 Mar 2026 20:13:56 +0200 Subject: [PATCH 4/4] feat(litellm): calculate per-invocation cost via litellm.cost_per_token This is the actual cost calculation that makes use of the pipeline built in the previous commits. In format_chunk(), after extracting usage data, I call litellm.cost_per_token() to get prompt and completion costs and attach the total to MetadataEvent. The values returned by cost_per_token() are in USD [1][2], which is what we store in accumulated_cost. The calculation is wrapped in try/except because litellm's pricing database doesn't cover every model. When a model isn't mapped, the cost field is simply omitted and the rest of the pipeline continues as if cost tracking isn't available. I chose cost_per_token() over completion_cost() because it doesn't require constructing a fake ModelResponse object. Cache tokens (both read and creation) are forwarded to the cost function so pricing accounts for cached token discounts on providers like Anthropic. Closes #1216 [1]: https://github.com/strands-agents/sdk-python/issues/1216 [2]: https://docs.litellm.ai/docs/completion/token_usage#critical-cost_per_token "Returns: A tuple containing the cost in USD dollars for prompt tokens and completion tokens, respectively." --- src/strands/models/litellm.py | 61 +++++++++++++++--- tests/strands/models/test_litellm.py | 93 ++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 8 deletions(-) diff --git a/src/strands/models/litellm.py b/src/strands/models/litellm.py index be5337f0d..ab240f325 100644 --- a/src/strands/models/litellm.py +++ b/src/strands/models/litellm.py @@ -201,7 +201,7 @@ def format_chunk(self, event: dict[str, Any], **kwargs: Any) -> StreamEvent: """Format a LiteLLM response event into a standardized message chunk. This method overrides OpenAI's format_chunk to handle the metadata case - with prompt caching support. All other chunk types use the parent implementation. + with prompt caching support and cost tracking. All other chunk types use the parent implementation. Args: event: A response event from the LiteLLM model. @@ -223,23 +223,68 @@ def format_chunk(self, event: dict[str, Any], **kwargs: Any) -> StreamEvent: # Only LiteLLM over Anthropic supports cache write tokens # Waiting until a more general approach is available to set cacheWriteInputTokens + cache_read_tokens = 0 + cache_write_tokens = 0 if tokens_details := getattr(event["data"], "prompt_tokens_details", None): if cached := getattr(tokens_details, "cached_tokens", None): usage_data["cacheReadInputTokens"] = cached + cache_read_tokens = cached if creation := getattr(event["data"], "cache_creation_input_tokens", None): usage_data["cacheWriteInputTokens"] = creation + cache_write_tokens = creation - return StreamEvent( - metadata=MetadataEvent( - metrics={ - "latencyMs": 0, # TODO - }, - usage=usage_data, - ) + metadata_event = MetadataEvent( + metrics={ + "latencyMs": 0, # TODO + }, + usage=usage_data, + ) + + cost = self._calculate_cost( + prompt_tokens=event["data"].prompt_tokens, + completion_tokens=event["data"].completion_tokens, + cache_read_input_tokens=cache_read_tokens, + cache_creation_input_tokens=cache_write_tokens, ) + if cost is not None: + metadata_event["cost"] = cost + + return StreamEvent(metadata=metadata_event) # For all other cases, use the parent implementation return super().format_chunk(event) + def _calculate_cost( + self, + prompt_tokens: int, + completion_tokens: int, + cache_read_input_tokens: int = 0, + cache_creation_input_tokens: int = 0, + ) -> float | None: + """Calculate the cost for a model invocation using LiteLLM's cost tracking. + + Args: + prompt_tokens: Number of input tokens. + completion_tokens: Number of output tokens. + cache_read_input_tokens: Number of tokens read from cache. + cache_creation_input_tokens: Number of tokens written to cache. + + Returns: + Cost in USD, or None if cost calculation is not available for the model. + """ + try: + model_id = self.get_config()["model_id"] + prompt_cost, completion_cost = litellm.cost_per_token( + model=model_id, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + cache_read_input_tokens=cache_read_input_tokens, + cache_creation_input_tokens=cache_creation_input_tokens, + ) + return prompt_cost + completion_cost + except Exception: + logger.debug("model_id=<%s> | could not calculate completion cost", self.get_config().get("model_id")) + return None + @override async def stream( self, diff --git a/tests/strands/models/test_litellm.py b/tests/strands/models/test_litellm.py index 9bb0e09ca..113afec10 100644 --- a/tests/strands/models/test_litellm.py +++ b/tests/strands/models/test_litellm.py @@ -848,3 +848,96 @@ def test_format_request_messages_with_tool_calls_no_content(): }, ] assert tru_result == exp_result + + +def test_format_chunk_metadata_includes_cost(): + """Test that format_chunk includes cost when cost_per_token succeeds.""" + model = LiteLLMModel(model_id="openai/gpt-4o") + + mock_usage = unittest.mock.Mock() + mock_usage.prompt_tokens = 100 + mock_usage.completion_tokens = 50 + mock_usage.total_tokens = 150 + mock_usage.prompt_tokens_details = None + mock_usage.cache_creation_input_tokens = None + + event = {"chunk_type": "metadata", "data": mock_usage} + + with unittest.mock.patch.object(strands.models.litellm.litellm, "cost_per_token", return_value=(0.0025, 0.005)): + result = model.format_chunk(event) + + assert result["metadata"]["cost"] == 0.0075 + + +def test_format_chunk_metadata_omits_cost_on_failure(): + """Test that format_chunk gracefully omits cost when cost_per_token raises.""" + model = LiteLLMModel(model_id="unknown/model") + + mock_usage = unittest.mock.Mock() + mock_usage.prompt_tokens = 100 + mock_usage.completion_tokens = 50 + mock_usage.total_tokens = 150 + mock_usage.prompt_tokens_details = None + mock_usage.cache_creation_input_tokens = None + + event = {"chunk_type": "metadata", "data": mock_usage} + + with unittest.mock.patch.object( + strands.models.litellm.litellm, "cost_per_token", side_effect=Exception("model not mapped") + ): + result = model.format_chunk(event) + + assert "cost" not in result["metadata"] + assert result["metadata"]["usage"]["inputTokens"] == 100 + + +def test_format_chunk_metadata_cost_with_cache_tokens(): + """Test that cache tokens are passed to cost_per_token.""" + model = LiteLLMModel(model_id="anthropic/claude-3-sonnet") + + mock_usage = unittest.mock.Mock() + mock_usage.prompt_tokens = 100 + mock_usage.completion_tokens = 50 + mock_usage.total_tokens = 150 + mock_tokens_details = unittest.mock.Mock() + mock_tokens_details.cached_tokens = 25 + mock_usage.prompt_tokens_details = mock_tokens_details + mock_usage.cache_creation_input_tokens = 10 + + event = {"chunk_type": "metadata", "data": mock_usage} + + with unittest.mock.patch.object( + strands.models.litellm.litellm, "cost_per_token", return_value=(0.001, 0.002) + ) as mock_cost: + result = model.format_chunk(event) + + mock_cost.assert_called_once_with( + model="anthropic/claude-3-sonnet", + prompt_tokens=100, + completion_tokens=50, + cache_read_input_tokens=25, + cache_creation_input_tokens=10, + ) + assert result["metadata"]["cost"] == 0.003 + + +def test_calculate_cost(): + """Test _calculate_cost returns correct total cost.""" + model = LiteLLMModel(model_id="openai/gpt-4o") + + with unittest.mock.patch.object(strands.models.litellm.litellm, "cost_per_token", return_value=(0.01, 0.02)): + cost = model._calculate_cost(prompt_tokens=1000, completion_tokens=500) + + assert cost == 0.03 + + +def test_calculate_cost_returns_none_on_failure(): + """Test _calculate_cost returns None when cost_per_token raises.""" + model = LiteLLMModel(model_id="unknown/model") + + with unittest.mock.patch.object( + strands.models.litellm.litellm, "cost_per_token", side_effect=Exception("not mapped") + ): + cost = model._calculate_cost(prompt_tokens=100, completion_tokens=50) + + assert cost is None