From 57bf2b1402b65b7be7b48a98b8b7daeeb7fce131 Mon Sep 17 00:00:00 2001 From: giulio-leone Date: Fri, 13 Mar 2026 03:38:54 +0100 Subject: [PATCH] fix: emit complete signal to callback handler when text content block stops When PrintingCallbackHandler checks kwargs.get('complete', False) to determine whether to print a trailing newline, no event in the streaming pipeline ever sets complete=True. This causes text output to never have proper line termination. Root cause: TextStreamEvent emits {data, delta} but never signals completion. contentBlockStop triggers handle_content_block_stop() which clears the text state, but no event is yielded to notify the callback. Fix: Before calling handle_content_block_stop(), check if text was being accumulated (state['text'] is truthy). If so, yield a ModelStreamEvent({'complete': True}) after the stop handler runs. This event reaches the callback handler via is_callback_event (non-empty dict) and enables PrintingCallbackHandler to print the trailing newline. The complete signal is only emitted for text content blocks, not for toolUse or reasoningContent blocks, matching the callback handler's expected behavior. Closes #826 --- src/strands/event_loop/streaming.py | 3 + .../strands/agent/hooks/test_agent_events.py | 6 + tests/strands/agent/test_agent.py | 1 + tests/strands/event_loop/test_streaming.py | 124 ++++++++++++++++++ 4 files changed, 134 insertions(+) diff --git a/src/strands/event_loop/streaming.py b/src/strands/event_loop/streaming.py index b7d85ca30..f56f0e455 100644 --- a/src/strands/event_loop/streaming.py +++ b/src/strands/event_loop/streaming.py @@ -425,7 +425,10 @@ async def process_stream( state, typed_event = handle_content_block_delta(chunk["contentBlockDelta"], state) yield typed_event elif "contentBlockStop" in chunk: + had_text = bool(state.get("text")) state = handle_content_block_stop(state) + if had_text: + yield ModelStreamEvent({"complete": True}) elif "messageStop" in chunk: stop_reason = handle_message_stop(chunk["messageStop"]) elif "metadata" in chunk: diff --git a/tests/strands/agent/hooks/test_agent_events.py b/tests/strands/agent/hooks/test_agent_events.py index 02c367ccc..483109612 100644 --- a/tests/strands/agent/hooks/test_agent_events.py +++ b/tests/strands/agent/hooks/test_agent_events.py @@ -129,6 +129,7 @@ async def test_stream_e2e_success(alist): "delta": {"text": "Okay invoking normal tool"}, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"contentBlockStart": {"start": {"toolUse": {"name": "normal_tool", "toolUseId": "123"}}}}}, {"event": {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}}, { @@ -182,6 +183,7 @@ async def test_stream_e2e_success(alist): "tool_config": tool_config, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"contentBlockStart": {"start": {"toolUse": {"name": "async_tool", "toolUseId": "1234"}}}}}, {"event": {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}}, { @@ -240,6 +242,7 @@ async def test_stream_e2e_success(alist): "tool_config": tool_config, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"contentBlockStart": {"start": {"toolUse": {"name": "streaming_tool", "toolUseId": "12345"}}}}}, {"event": {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}}, { @@ -306,6 +309,7 @@ async def test_stream_e2e_success(alist): "tool_config": tool_config, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "end_turn"}}}, {"message": {"content": [{"text": "I invoked the tools!"}], "role": "assistant"}}, { @@ -370,6 +374,7 @@ async def test_stream_e2e_throttle_and_redact(alist, mock_sleep): "delta": {"text": "INPUT BLOCKED!"}, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "guardrail_intervened"}}}, {"message": {"content": [{"text": "INPUT BLOCKED!"}], "role": "assistant"}}, { @@ -434,6 +439,7 @@ async def test_stream_e2e_reasoning_redacted_content(alist): "delta": {"text": "Response with redacted reasoning"}, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "end_turn"}}}, { "message": { diff --git a/tests/strands/agent/test_agent.py b/tests/strands/agent/test_agent.py index 967a0dafb..93982e2fe 100644 --- a/tests/strands/agent/test_agent.py +++ b/tests/strands/agent/test_agent.py @@ -767,6 +767,7 @@ def test_agent__call__callback(mock_model, agent, callback_handler, agenerator): request_state={}, ), unittest.mock.call(event={"contentBlockStop": {}}), + unittest.mock.call(complete=True), unittest.mock.call( message={ "role": "assistant", diff --git a/tests/strands/event_loop/test_streaming.py b/tests/strands/event_loop/test_streaming.py index 6d376450a..53b2f4274 100644 --- a/tests/strands/event_loop/test_streaming.py +++ b/tests/strands/event_loop/test_streaming.py @@ -795,6 +795,7 @@ def test_extract_usage_metrics_empty_metadata(): }, }, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "end_turn"}}}, { "event": { @@ -911,6 +912,7 @@ async def test_process_stream(response, exp_events, agenerator, alist): {"event": {"contentBlockDelta": {"delta": {"text": "Hello!"}}}}, {"data": "Hello!", "delta": {"text": "Hello!"}}, {"event": {"contentBlockStop": {}}}, + {"complete": True}, {"event": {"messageStop": {"stopReason": "guardrail_intervened"}}}, { "event": { @@ -1138,6 +1140,9 @@ async def test_stream_messages(agenerator, alist): "contentBlockStop": {}, }, }, + { + "complete": True, + }, { "stop": ( "end_turn", @@ -1334,3 +1339,122 @@ async def test_stream_messages_normalizes_messages(agenerator, alist): {"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"}, {"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"}, ] + + +@pytest.mark.asyncio +async def test_complete_event_emitted_for_text_block(agenerator, alist): + """Test that a complete=True event is emitted when a text content block finishes. + + This ensures callback handlers like PrintingCallbackHandler can detect the end + of text streaming and format output accordingly (e.g., printing a trailing newline). + See: https://github.com/strands-agents/sdk-python/issues/826 + """ + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"text": "Hello"}}}, + {"contentBlockDelta": {"delta": {"text": " world"}}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "end_turn"}}, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + events = await alist(stream) + + # Find the complete event + complete_events = [e for e in events if e.get("complete") is True] + assert len(complete_events) == 1, f"Expected exactly 1 complete event, got {len(complete_events)}" + + # Verify it appears after contentBlockStop + complete_idx = next(i for i, e in enumerate(events) if e.get("complete") is True) + stop_idx = next(i for i, e in enumerate(events) if e.get("event", {}).get("contentBlockStop") is not None) + assert complete_idx == stop_idx + 1, "complete event must immediately follow contentBlockStop" + + +@pytest.mark.asyncio +async def test_no_complete_event_for_tool_use_block(agenerator, alist): + """Test that no complete event is emitted for toolUse content blocks.""" + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {"toolUse": {"toolUseId": "t1", "name": "my_tool"}}}}, + {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "tool_use"}}, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + events = await alist(stream) + + complete_events = [e for e in events if e.get("complete") is True] + assert len(complete_events) == 0, "No complete event should be emitted for tool_use blocks" + + +@pytest.mark.asyncio +async def test_no_complete_event_for_reasoning_block(agenerator, alist): + """Test that no complete event is emitted for reasoning content blocks.""" + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"reasoningContent": {"text": "thinking..."}}}}, + {"contentBlockDelta": {"delta": {"reasoningContent": {"signature": "sig123"}}}}, + {"contentBlockStop": {}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"text": "answer"}}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "end_turn"}}, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + events = await alist(stream) + + complete_events = [e for e in events if e.get("complete") is True] + # Only 1 complete event for the text block, none for the reasoning block + assert len(complete_events) == 1, f"Expected 1 complete event (text only), got {len(complete_events)}" + + +@pytest.mark.asyncio +async def test_complete_event_with_multiple_text_blocks(agenerator, alist): + """Test that complete events are emitted for each text content block.""" + response = [ + {"messageStart": {"role": "assistant"}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"text": "first"}}}, + {"contentBlockStop": {}}, + {"contentBlockStart": {"start": {"toolUse": {"toolUseId": "t1", "name": "tool"}}}}, + {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}, + {"contentBlockStop": {}}, + {"contentBlockStart": {"start": {}}}, + {"contentBlockDelta": {"delta": {"text": "second"}}}, + {"contentBlockStop": {}}, + {"messageStop": {"stopReason": "end_turn"}}, + { + "metadata": { + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1}, + "metrics": {"latencyMs": 1}, + } + }, + ] + + stream = strands.event_loop.streaming.process_stream(agenerator(response)) + events = await alist(stream) + + complete_events = [e for e in events if e.get("complete") is True] + assert len(complete_events) == 2, f"Expected 2 complete events (one per text block), got {len(complete_events)}"