diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index b7d85ca30..f56f0e455 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -425,7 +425,10 @@ async def process_stream( state, typed_event = handle_content_block_delta(chunk["contentBlockDelta"], state) yield typed_event elif "contentBlockStop" in chunk: + had_text = bool(state.get("text")) state = handle_content_block_stop(state) + if had_text: + yield ModelStreamEvent({"complete": True}) elif "messageStop" in chunk: stop_reason = handle_message_stop(chunk["messageStop"]) elif "metadata" in chunk: diff --git a/tests/strands/agent/hooks/test_agent_events.py b/tests/strands/agent/hooks/test_agent_events.py index 02c367ccc..483109612 100644 --- a/tests/strands/agent/hooks/test_agent_events.py +++ b/tests/strands/agent/hooks/test_agent_events.py @@ -129,6 +129,7 @@ async def test_stream_e2e_success(alist): "delta": {"text": "Okay invoking normal tool"}, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"contentBlockStart": {"start": {"toolUse": {"name": "normal_tool", "toolUseId": "123"}}}}}, {"event": {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}}, { @@ -182,6 +183,7 @@ async def test_stream_e2e_success(alist): "tool_config": tool_config, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"contentBlockStart": {"start": {"toolUse": {"name": "async_tool", "toolUseId": "1234"}}}}}, {"event": {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}}, { @@ -240,6 +242,7 @@ async def test_stream_e2e_success(alist): "tool_config": tool_config, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"contentBlockStart": {"start": {"toolUse": {"name": "streaming_tool", "toolUseId": "12345"}}}}}, {"event": {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}}, { @@ -306,6 +309,7 @@ async def test_stream_e2e_success(alist): "tool_config": tool_config, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "end_turn"}}}, {"message": {"content": [{"text": "I invoked the tools!"}], "role": "assistant"}}, { @@ -370,6 +374,7 @@ async def test_stream_e2e_throttle_and_redact(alist, mock_sleep): "delta": {"text": "INPUT BLOCKED!"}, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "guardrail_intervened"}}}, {"message": {"content": [{"text": "INPUT BLOCKED!"}], "role": "assistant"}}, { @@ -434,6 +439,7 @@ async def test_stream_e2e_reasoning_redacted_content(alist): "delta": {"text": "Response with redacted reasoning"}, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "end_turn"}}}, { "message": { diff --git a/tests/strands/agent/test_agent.py b/tests/strands/agent/test_agent.py index 967a0dafb..93982e2fe 100644 --- a/tests/strands/agent/test_agent.py +++ b/tests/strands/agent/test_agent.py @@ -767,6 +767,7 @@ def test_agent__call__callback(mock_model, agent, callback_handler, agenerator): request_state={}, ), unittest.mock.call(event={"contentBlockStop": {}}), + unittest.mock.call(complete=True), unittest.mock.call( message={ "role": "assistant", diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index 6d376450a..53b2f4274 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -795,6 +795,7 @@ def test_extract_usage_metrics_empty_metadata(): }, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "end_turn"}}}, { "event": { @@ -911,6 +912,7 @@ async def test_process_stream(response, exp_events, agenerator, alist): {"event": {"contentBlockDelta": {"delta": {"text": "Hello!"}}}}, {"data": "Hello!", "delta": {"text": "Hello!"}}, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "guardrail_intervened"}}}, { "event": { @@ -1138,6 +1140,9 @@ async def test_stream_messages(agenerator, alist): "contentBlockStop": {}, }, }, + { + "complete": True, + }, { "stop": ( "end_turn", @@ -1334,3 +1339,122 @@ async def test_stream_messages_normalizes_messages(agenerator, alist): {"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"}, {"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"}, ] + + +@pytest.mark.asyncio +async def test_complete_event_emitted_for_text_block(agenerator, alist): + """Test that a complete=True event is emitted when a text content block finishes. + + This ensures callback handlers like PrintingCallbackHandler can detect the end + of text streaming and format output accordingly (e.g., printing a trailing newline). + See: https://github.com/strands-agents/sdk-python/issues/826 + """ + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"text": "Hello"}}}, + {"contentBlockDelta": {"delta": {"text": " world"}}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "end_turn"}}, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + events = await alist(stream) + + # Find the complete event + complete_events = [e for e in events if e.get("complete") is True] + assert len(complete_events) == 1, f"Expected exactly 1 complete event, got {len(complete_events)}" + + # Verify it appears after contentBlockStop + complete_idx = next(i for i, e in enumerate(events) if e.get("complete") is True) + stop_idx = next(i for i, e in enumerate(events) if e.get("event", {}).get("contentBlockStop") is not None) + assert complete_idx == stop_idx + 1, "complete event must immediately follow contentBlockStop" + + +@pytest.mark.asyncio +async def test_no_complete_event_for_tool_use_block(agenerator, alist): + """Test that no complete event is emitted for toolUse content blocks.""" + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {"toolUse": {"toolUseId": "t1", "name": "my_tool"}}}}, + {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "tool_use"}}, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + events = await alist(stream) + + complete_events = [e for e in events if e.get("complete") is True] + assert len(complete_events) == 0, "No complete event should be emitted for tool_use blocks" + + +@pytest.mark.asyncio +async def test_no_complete_event_for_reasoning_block(agenerator, alist): + """Test that no complete event is emitted for reasoning content blocks.""" + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"reasoningContent": {"text": "thinking..."}}}}, + {"contentBlockDelta": {"delta": {"reasoningContent": {"signature": "sig123"}}}}, + {"contentBlockStop": {}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"text": "answer"}}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "end_turn"}}, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + events = await alist(stream) + + complete_events = [e for e in events if e.get("complete") is True] + # Only 1 complete event for the text block, none for the reasoning block + assert len(complete_events) == 1, f"Expected 1 complete event (text only), got {len(complete_events)}" + + +@pytest.mark.asyncio +async def test_complete_event_with_multiple_text_blocks(agenerator, alist): + """Test that complete events are emitted for each text content block.""" + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"text": "first"}}}, + {"contentBlockStop": {}}, + {"contentBlockStart": {"start": {"toolUse": {"toolUseId": "t1", "name": "tool"}}}}, + {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}, + {"contentBlockStop": {}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"text": "second"}}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "end_turn"}}, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + events = await alist(stream) + + complete_events = [e for e in events if e.get("complete") is True] + assert len(complete_events) == 2, f"Expected 2 complete events (one per text block), got {len(complete_events)}"