Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/strands/event_loop/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,10 @@ async def process_stream(
state, typed_event = handle_content_block_delta(chunk["contentBlockDelta"], state)
yield typed_event
elif "contentBlockStop" in chunk:
had_text = bool(state.get("text"))
state = handle_content_block_stop(state)
if had_text:
yield ModelStreamEvent({"complete": True})
elif "messageStop" in chunk:
stop_reason = handle_message_stop(chunk["messageStop"])
elif "metadata" in chunk:
Expand Down
6 changes: 6 additions & 0 deletions tests/strands/agent/hooks/test_agent_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ async def test_stream_e2e_success(alist):
"delta": {"text": "Okay invoking normal tool"},
},
{"event": {"contentBlockStop": {}}},
{"complete": True},
{"event": {"contentBlockStart": {"start": {"toolUse": {"name": "normal_tool", "toolUseId": "123"}}}}},
{"event": {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}},
{
Expand Down Expand Up @@ -182,6 +183,7 @@ async def test_stream_e2e_success(alist):
"tool_config": tool_config,
},
{"event": {"contentBlockStop": {}}},
{"complete": True},
{"event": {"contentBlockStart": {"start": {"toolUse": {"name": "async_tool", "toolUseId": "1234"}}}}},
{"event": {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}},
{
Expand Down Expand Up @@ -240,6 +242,7 @@ async def test_stream_e2e_success(alist):
"tool_config": tool_config,
},
{"event": {"contentBlockStop": {}}},
{"complete": True},
{"event": {"contentBlockStart": {"start": {"toolUse": {"name": "streaming_tool", "toolUseId": "12345"}}}}},
{"event": {"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}}},
{
Expand Down Expand Up @@ -306,6 +309,7 @@ async def test_stream_e2e_success(alist):
"tool_config": tool_config,
},
{"event": {"contentBlockStop": {}}},
{"complete": True},
{"event": {"messageStop": {"stopReason": "end_turn"}}},
{"message": {"content": [{"text": "I invoked the tools!"}], "role": "assistant"}},
{
Expand Down Expand Up @@ -370,6 +374,7 @@ async def test_stream_e2e_throttle_and_redact(alist, mock_sleep):
"delta": {"text": "INPUT BLOCKED!"},
},
{"event": {"contentBlockStop": {}}},
{"complete": True},
{"event": {"messageStop": {"stopReason": "guardrail_intervened"}}},
{"message": {"content": [{"text": "INPUT BLOCKED!"}], "role": "assistant"}},
{
Expand Down Expand Up @@ -434,6 +439,7 @@ async def test_stream_e2e_reasoning_redacted_content(alist):
"delta": {"text": "Response with redacted reasoning"},
},
{"event": {"contentBlockStop": {}}},
{"complete": True},
{"event": {"messageStop": {"stopReason": "end_turn"}}},
{
"message": {
Expand Down
1 change: 1 addition & 0 deletions tests/strands/agent/test_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ def test_agent__call__callback(mock_model, agent, callback_handler, agenerator):
request_state={},
),
unittest.mock.call(event={"contentBlockStop": {}}),
unittest.mock.call(complete=True),
unittest.mock.call(
message={
"role": "assistant",
Expand Down
124 changes: 124 additions & 0 deletions tests/strands/event_loop/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,7 @@ def test_extract_usage_metrics_empty_metadata():
},
},
{"event": {"contentBlockStop": {}}},
{"complete": True},
{"event": {"messageStop": {"stopReason": "end_turn"}}},
{
"event": {
Expand Down Expand Up @@ -911,6 +912,7 @@ async def test_process_stream(response, exp_events, agenerator, alist):
{"event": {"contentBlockDelta": {"delta": {"text": "Hello!"}}}},
{"data": "Hello!", "delta": {"text": "Hello!"}},
{"event": {"contentBlockStop": {}}},
{"complete": True},
{"event": {"messageStop": {"stopReason": "guardrail_intervened"}}},
{
"event": {
Expand Down Expand Up @@ -1138,6 +1140,9 @@ async def test_stream_messages(agenerator, alist):
"contentBlockStop": {},
},
},
{
"complete": True,
},
{
"stop": (
"end_turn",
Expand Down Expand Up @@ -1334,3 +1339,122 @@ async def test_stream_messages_normalizes_messages(agenerator, alist):
{"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"},
{"content": [{"toolUse": {"name": "INVALID_TOOL_NAME"}}], "role": "assistant"},
]


@pytest.mark.asyncio
async def test_complete_event_emitted_for_text_block(agenerator, alist):
"""Test that a complete=True event is emitted when a text content block finishes.

This ensures callback handlers like PrintingCallbackHandler can detect the end
of text streaming and format output accordingly (e.g., printing a trailing newline).
See: https://github.com/strands-agents/sdk-python/issues/826
"""
response = [
{"messageStart": {"role": "assistant"}},
{"contentBlockStart": {"start": {}}},
{"contentBlockDelta": {"delta": {"text": "Hello"}}},
{"contentBlockDelta": {"delta": {"text": " world"}}},
{"contentBlockStop": {}},
{"messageStop": {"stopReason": "end_turn"}},
{
"metadata": {
"usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1},
"metrics": {"latencyMs": 1},
}
},
]

stream = strands.event_loop.streaming.process_stream(agenerator(response))
events = await alist(stream)

# Find the complete event
complete_events = [e for e in events if e.get("complete") is True]
assert len(complete_events) == 1, f"Expected exactly 1 complete event, got {len(complete_events)}"

# Verify it appears after contentBlockStop
complete_idx = next(i for i, e in enumerate(events) if e.get("complete") is True)
stop_idx = next(i for i, e in enumerate(events) if e.get("event", {}).get("contentBlockStop") is not None)
assert complete_idx == stop_idx + 1, "complete event must immediately follow contentBlockStop"


@pytest.mark.asyncio
async def test_no_complete_event_for_tool_use_block(agenerator, alist):
"""Test that no complete event is emitted for toolUse content blocks."""
response = [
{"messageStart": {"role": "assistant"}},
{"contentBlockStart": {"start": {"toolUse": {"toolUseId": "t1", "name": "my_tool"}}}},
{"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}},
{"contentBlockStop": {}},
{"messageStop": {"stopReason": "tool_use"}},
{
"metadata": {
"usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1},
"metrics": {"latencyMs": 1},
}
},
]

stream = strands.event_loop.streaming.process_stream(agenerator(response))
events = await alist(stream)

complete_events = [e for e in events if e.get("complete") is True]
assert len(complete_events) == 0, "No complete event should be emitted for tool_use blocks"


@pytest.mark.asyncio
async def test_no_complete_event_for_reasoning_block(agenerator, alist):
"""Test that no complete event is emitted for reasoning content blocks."""
response = [
{"messageStart": {"role": "assistant"}},
{"contentBlockStart": {"start": {}}},
{"contentBlockDelta": {"delta": {"reasoningContent": {"text": "thinking..."}}}},
{"contentBlockDelta": {"delta": {"reasoningContent": {"signature": "sig123"}}}},
{"contentBlockStop": {}},
{"contentBlockStart": {"start": {}}},
{"contentBlockDelta": {"delta": {"text": "answer"}}},
{"contentBlockStop": {}},
{"messageStop": {"stopReason": "end_turn"}},
{
"metadata": {
"usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1},
"metrics": {"latencyMs": 1},
}
},
]

stream = strands.event_loop.streaming.process_stream(agenerator(response))
events = await alist(stream)

complete_events = [e for e in events if e.get("complete") is True]
# Only 1 complete event for the text block, none for the reasoning block
assert len(complete_events) == 1, f"Expected 1 complete event (text only), got {len(complete_events)}"


@pytest.mark.asyncio
async def test_complete_event_with_multiple_text_blocks(agenerator, alist):
"""Test that complete events are emitted for each text content block."""
response = [
{"messageStart": {"role": "assistant"}},
{"contentBlockStart": {"start": {}}},
{"contentBlockDelta": {"delta": {"text": "first"}}},
{"contentBlockStop": {}},
{"contentBlockStart": {"start": {"toolUse": {"toolUseId": "t1", "name": "tool"}}}},
{"contentBlockDelta": {"delta": {"toolUse": {"input": "{}"}}}},
{"contentBlockStop": {}},
{"contentBlockStart": {"start": {}}},
{"contentBlockDelta": {"delta": {"text": "second"}}},
{"contentBlockStop": {}},
{"messageStop": {"stopReason": "end_turn"}},
{
"metadata": {
"usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 1},
"metrics": {"latencyMs": 1},
}
},
]

stream = strands.event_loop.streaming.process_stream(agenerator(response))
events = await alist(stream)

complete_events = [e for e in events if e.get("complete") is True]
assert len(complete_events) == 2, f"Expected 2 complete events (one per text block), got {len(complete_events)}"