From dd6881a5b0a7ac447bb0a3492dfd66d638c66f62 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Mon, 9 Mar 2026 12:31:36 +0900 Subject: [PATCH 1/4] Extract function_approval_response from workflow messages (#4546) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _extract_responses_from_messages now handles function_approval_response content in addition to function_result content. Previously, approval responses sent via the messages field were silently dropped because the function only checked for content.type == "function_result". The approval response is keyed by content.id and includes the approved status, id, and serialized function_call — consistent with how _coerce_content identifies approval response payloads. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../agent_framework_ag_ui/_workflow_run.py | 16 +++- .../ag-ui/tests/ag_ui/test_workflow_run.py | 77 +++++++++++++++++++ 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py b/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py index 81e4a27302..596686e2fb 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py @@ -128,10 +128,18 @@ def _extract_responses_from_messages(messages: list[Message]) -> dict[str, Any]: responses: dict[str, Any] = {} for message in messages: for content in message.contents: - if content.type != "function_result" or not content.call_id: - continue - value = _coerce_json_value(content.result) - responses[str(content.call_id)] = value + if content.type == "function_result" and content.call_id: + value = _coerce_json_value(content.result) + responses[str(content.call_id)] = value + elif content.type == "function_approval_response" and getattr(content, "id", None): + approval_value: dict[str, Any] = { + "approved": getattr(content, "approved", False), + "id": str(content.id), # type: ignore[union-attr] + } + func_call = getattr(content, "function_call", None) + if func_call is not None: + approval_value["function_call"] = make_json_safe(func_call.to_dict()) + responses[str(content.id)] = approval_value # type: ignore[union-attr] return responses diff --git a/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py b/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py index 8ebd8fcaaa..bdcbc8fffa 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py +++ b/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py @@ -33,6 +33,7 @@ _custom_event_value, _details_code, _details_message, + _extract_responses_from_messages, _interrupt_entry_for_request_event, _latest_assistant_contents, _latest_user_text, @@ -1172,6 +1173,82 @@ def test_details_without_error_type(self): assert _details_code(details) is None +class TestExtractResponsesFromMessages: + """Tests for _extract_responses_from_messages helper.""" + + def test_function_result_extracted(self): + """function_result content is extracted keyed by call_id.""" + result = Content.from_function_result(call_id="call-1", result="ok") + messages = [Message(role="tool", contents=[result])] + responses = _extract_responses_from_messages(messages) + assert responses == {"call-1": "ok"} + + def test_function_result_without_call_id_skipped(self): + """function_result with no call_id is ignored.""" + result = Content.from_function_result(call_id="", result="ok") + messages = [Message(role="tool", contents=[result])] + responses = _extract_responses_from_messages(messages) + assert responses == {} + + def test_function_approval_response_extracted(self): + """function_approval_response content is extracted keyed by id.""" + func_call = Content.from_function_call( + call_id="call-1", name="do_action", arguments={"x": 1}, + ) + approval = Content.from_function_approval_response( + approved=True, id="approval-1", function_call=func_call, + ) + messages = [Message(role="user", contents=[approval])] + responses = _extract_responses_from_messages(messages) + assert "approval-1" in responses + assert responses["approval-1"]["approved"] is True + assert responses["approval-1"]["id"] == "approval-1" + assert "function_call" in responses["approval-1"] + + def test_denied_approval_response_extracted(self): + """Denied function_approval_response is extracted with approved=False.""" + func_call = Content.from_function_call( + call_id="call-2", name="delete_item", arguments={}, + ) + approval = Content.from_function_approval_response( + approved=False, id="approval-2", function_call=func_call, + ) + messages = [Message(role="user", contents=[approval])] + responses = _extract_responses_from_messages(messages) + assert "approval-2" in responses + assert responses["approval-2"]["approved"] is False + + def test_mixed_result_and_approval(self): + """Both function_result and function_approval_response are extracted.""" + result = Content.from_function_result(call_id="call-1", result="done") + func_call = Content.from_function_call( + call_id="call-2", name="submit", arguments={}, + ) + approval = Content.from_function_approval_response( + approved=True, id="approval-1", function_call=func_call, + ) + messages = [ + Message(role="tool", contents=[result]), + Message(role="user", contents=[approval]), + ] + responses = _extract_responses_from_messages(messages) + assert "call-1" in responses + assert responses["call-1"] == "done" + assert "approval-1" in responses + assert responses["approval-1"]["approved"] is True + + def test_text_content_skipped(self): + """Non-result, non-approval content is ignored.""" + text = Content.from_text(text="hello") + messages = [Message(role="user", contents=[text])] + responses = _extract_responses_from_messages(messages) + assert responses == {} + + def test_empty_messages(self): + """Empty message list returns empty responses.""" + assert _extract_responses_from_messages([]) == {} + + # ── Stream integration tests ── From 2095d79fa82e464709755cea5550df96e33092a8 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Mon, 9 Mar 2026 12:31:57 +0900 Subject: [PATCH 2/4] Apply pre-commit auto-fixes --- .../ag-ui/tests/ag_ui/test_workflow_run.py | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py b/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py index bdcbc8fffa..b19ca76cb7 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py +++ b/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py @@ -1193,10 +1193,14 @@ def test_function_result_without_call_id_skipped(self): def test_function_approval_response_extracted(self): """function_approval_response content is extracted keyed by id.""" func_call = Content.from_function_call( - call_id="call-1", name="do_action", arguments={"x": 1}, + call_id="call-1", + name="do_action", + arguments={"x": 1}, ) approval = Content.from_function_approval_response( - approved=True, id="approval-1", function_call=func_call, + approved=True, + id="approval-1", + function_call=func_call, ) messages = [Message(role="user", contents=[approval])] responses = _extract_responses_from_messages(messages) @@ -1208,10 +1212,14 @@ def test_function_approval_response_extracted(self): def test_denied_approval_response_extracted(self): """Denied function_approval_response is extracted with approved=False.""" func_call = Content.from_function_call( - call_id="call-2", name="delete_item", arguments={}, + call_id="call-2", + name="delete_item", + arguments={}, ) approval = Content.from_function_approval_response( - approved=False, id="approval-2", function_call=func_call, + approved=False, + id="approval-2", + function_call=func_call, ) messages = [Message(role="user", contents=[approval])] responses = _extract_responses_from_messages(messages) @@ -1222,10 +1230,14 @@ def test_mixed_result_and_approval(self): """Both function_result and function_approval_response are extracted.""" result = Content.from_function_result(call_id="call-1", result="done") func_call = Content.from_function_call( - call_id="call-2", name="submit", arguments={}, + call_id="call-2", + name="submit", + arguments={}, ) approval = Content.from_function_approval_response( - approved=True, id="approval-1", function_call=func_call, + approved=True, + id="approval-1", + function_call=func_call, ) messages = [ Message(role="tool", contents=[result]), From 193429d137c52e8189e82869db0458bc739680da Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Mon, 9 Mar 2026 12:43:41 +0900 Subject: [PATCH 3/4] Fix #4546: Update docstring and add integration tests for message-based approvals - Update _extract_responses_from_messages docstring to reflect that it now handles function_approval_response content in addition to function_result content. - Add integration tests for run_workflow_stream across two turns with approval responses provided via messages (function_approvals) rather than resume.interrupts, covering both approved and denied scenarios. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../agent_framework_ag_ui/_workflow_run.py | 8 +- .../ag-ui/tests/ag_ui/test_workflow_run.py | 136 ++++++++++++++++++ 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py b/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py index 596686e2fb..a75d29abc4 100644 --- a/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py +++ b/python/packages/ag-ui/agent_framework_ag_ui/_workflow_run.py @@ -124,7 +124,13 @@ def _request_payload_from_request_event(request_event: Any) -> dict[str, Any] | def _extract_responses_from_messages(messages: list[Message]) -> dict[str, Any]: - """Extract request-info responses from incoming tool/function-result messages.""" + """Extract request-info responses from incoming messages. + + Handles both ``function_result`` content (keyed by ``call_id``) and + ``function_approval_response`` content (keyed by ``id``), so that + approval decisions sent via messages are forwarded into the workflow + responses map. + """ responses: dict[str, Any] = {} for message in messages: for content in message.contents: diff --git a/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py b/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py index b19ca76cb7..3d154680e0 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py +++ b/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py @@ -1264,6 +1264,142 @@ def test_empty_messages(self): # ── Stream integration tests ── +async def test_workflow_run_approval_via_messages_approved() -> None: + """Approval response sent via messages (function_approvals) should satisfy the pending request.""" + + class ApprovalExecutor(Executor): + def __init__(self) -> None: + super().__init__(id="approval_executor") + + @handler + async def start(self, message: Any, ctx: WorkflowContext) -> None: + del message + function_call = Content.from_function_call( + call_id="refund-call", + name="submit_refund", + arguments={"order_id": "12345", "amount": "$89.99"}, + ) + approval_request = Content.from_function_approval_request(id="approval-1", function_call=function_call) + await ctx.request_info(approval_request, Content, request_id="approval-1") + + @response_handler + async def handle_approval(self, original_request: Content, response: Content, ctx: WorkflowContext) -> None: + del original_request + status = "approved" if bool(response.approved) else "rejected" + await ctx.yield_output(f"Refund {status}.") + + workflow = WorkflowBuilder(start_executor=ApprovalExecutor()).build() + first_events = [ + event async for event in run_workflow_stream({"messages": [{"role": "user", "content": "go"}]}, workflow) + ] + first_finished = [event for event in first_events if event.type == "RUN_FINISHED"][0].model_dump() + interrupt_payload = cast(list[dict[str, Any]], first_finished.get("interrupt")) + assert isinstance(interrupt_payload, list) and len(interrupt_payload) == 1 + + # Second turn: send approval via function_approvals on a message (not resume.interrupts) + resumed_events = [ + event + async for event in run_workflow_stream( + { + "messages": [ + { + "role": "user", + "content": "", + "function_approvals": [ + { + "approved": True, + "id": "approval-1", + "call_id": "refund-call", + "name": "submit_refund", + "arguments": {"order_id": "12345", "amount": "$89.99"}, + } + ], + } + ], + }, + workflow, + ) + ] + + resumed_types = [event.type for event in resumed_events] + assert "RUN_STARTED" in resumed_types + assert "RUN_FINISHED" in resumed_types + assert "RUN_ERROR" not in resumed_types + assert "TEXT_MESSAGE_CONTENT" in resumed_types + text_deltas = [event.delta for event in resumed_events if event.type == "TEXT_MESSAGE_CONTENT"] + assert any("approved" in delta for delta in text_deltas) + resumed_finished = [event for event in resumed_events if event.type == "RUN_FINISHED"][0].model_dump() + assert "interrupt" not in resumed_finished + + +async def test_workflow_run_approval_via_messages_denied() -> None: + """Denied approval response sent via messages (function_approvals) should satisfy the pending request.""" + + class ApprovalExecutor(Executor): + def __init__(self) -> None: + super().__init__(id="approval_executor") + + @handler + async def start(self, message: Any, ctx: WorkflowContext) -> None: + del message + function_call = Content.from_function_call( + call_id="delete-call", + name="delete_record", + arguments={"record_id": "abc"}, + ) + approval_request = Content.from_function_approval_request(id="deny-1", function_call=function_call) + await ctx.request_info(approval_request, Content, request_id="deny-1") + + @response_handler + async def handle_approval(self, original_request: Content, response: Content, ctx: WorkflowContext) -> None: + del original_request + status = "approved" if bool(response.approved) else "rejected" + await ctx.yield_output(f"Delete {status}.") + + workflow = WorkflowBuilder(start_executor=ApprovalExecutor()).build() + first_events = [ + event async for event in run_workflow_stream({"messages": [{"role": "user", "content": "go"}]}, workflow) + ] + first_finished = [event for event in first_events if event.type == "RUN_FINISHED"][0].model_dump() + interrupt_payload = cast(list[dict[str, Any]], first_finished.get("interrupt")) + assert isinstance(interrupt_payload, list) and len(interrupt_payload) == 1 + + # Second turn: send denial via function_approvals on a message (not resume.interrupts) + resumed_events = [ + event + async for event in run_workflow_stream( + { + "messages": [ + { + "role": "user", + "content": "", + "function_approvals": [ + { + "approved": False, + "id": "deny-1", + "call_id": "delete-call", + "name": "delete_record", + "arguments": {"record_id": "abc"}, + } + ], + } + ], + }, + workflow, + ) + ] + + resumed_types = [event.type for event in resumed_events] + assert "RUN_STARTED" in resumed_types + assert "RUN_FINISHED" in resumed_types + assert "RUN_ERROR" not in resumed_types + assert "TEXT_MESSAGE_CONTENT" in resumed_types + text_deltas = [event.delta for event in resumed_events if event.type == "TEXT_MESSAGE_CONTENT"] + assert any("rejected" in delta for delta in text_deltas) + resumed_finished = [event for event in resumed_events if event.type == "RUN_FINISHED"][0].model_dump() + assert "interrupt" not in resumed_finished + + async def test_workflow_run_available_interrupts_logged(): """available_interrupts in input data should be logged without errors.""" From 879caa97f4b5a578d82bf96308d8a885f798f028 Mon Sep 17 00:00:00 2001 From: Evan Mattson Date: Mon, 9 Mar 2026 12:49:26 +0900 Subject: [PATCH 4/4] Address PR review feedback for #4546 - Use safer 'not .get("interrupt")' assertion instead of 'not in' to handle Pydantic v2 model_dump() including keys with None values - Add unit test for mixed function_result and function_approval_response in the same message to TestExtractResponsesFromMessages Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../ag-ui/tests/ag_ui/test_workflow_run.py | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py b/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py index 3d154680e0..26b44b03ba 100644 --- a/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py +++ b/python/packages/ag-ui/tests/ag_ui/test_workflow_run.py @@ -1249,6 +1249,26 @@ def test_mixed_result_and_approval(self): assert "approval-1" in responses assert responses["approval-1"]["approved"] is True + def test_mixed_result_and_approval_same_message(self): + """Both function_result and function_approval_response in the same message are extracted.""" + result = Content.from_function_result(call_id="call-1", result="done") + func_call = Content.from_function_call( + call_id="call-2", + name="submit", + arguments={}, + ) + approval = Content.from_function_approval_response( + approved=True, + id="approval-1", + function_call=func_call, + ) + messages = [Message(role="tool", contents=[result, approval])] + responses = _extract_responses_from_messages(messages) + assert "call-1" in responses + assert responses["call-1"] == "done" + assert "approval-1" in responses + assert responses["approval-1"]["approved"] is True + def test_text_content_skipped(self): """Non-result, non-approval content is ignored.""" text = Content.from_text(text="hello") @@ -1329,7 +1349,7 @@ async def handle_approval(self, original_request: Content, response: Content, ct text_deltas = [event.delta for event in resumed_events if event.type == "TEXT_MESSAGE_CONTENT"] assert any("approved" in delta for delta in text_deltas) resumed_finished = [event for event in resumed_events if event.type == "RUN_FINISHED"][0].model_dump() - assert "interrupt" not in resumed_finished + assert not resumed_finished.get("interrupt") async def test_workflow_run_approval_via_messages_denied() -> None: @@ -1397,7 +1417,7 @@ async def handle_approval(self, original_request: Content, response: Content, ct text_deltas = [event.delta for event in resumed_events if event.type == "TEXT_MESSAGE_CONTENT"] assert any("rejected" in delta for delta in text_deltas) resumed_finished = [event for event in resumed_events if event.type == "RUN_FINISHED"][0].model_dump() - assert "interrupt" not in resumed_finished + assert not resumed_finished.get("interrupt") async def test_workflow_run_available_interrupts_logged():