diff --git a/python/packages/hosting-responses/README.md b/python/packages/hosting-responses/README.md index ae03d364af3..47246f02252 100644 --- a/python/packages/hosting-responses/README.md +++ b/python/packages/hosting-responses/README.md @@ -1,21 +1,51 @@ # agent-framework-hosting-responses -OpenAI Responses-shaped channel for `agent-framework-hosting`. +OpenAI Responses-shaped helpers for app-owned Agent Framework hosting. -Exposes a single `POST /responses` endpoint that accepts the OpenAI -Responses API request body and returns either a Responses-shaped JSON -body or a Server-Sent-Events stream when `stream=True`. +This package provides the Responses-specific conversion layer: + +- `responses_to_run(...)` — convert a Responses request body into Agent + Framework run values. +- `responses_session_id(...)` — extract a prior `resp_*` response id or + `conv_*` conversation id from the request body when present. +- `create_response_id(...)` — mint a Responses-shaped response id. +- `responses_from_run(...)` — convert an `AgentResponse` into a + Responses-compatible JSON payload. +- `responses_stream_from_run(...)` — convert an Agent Framework + `ResponseStream` into Responses-compatible SSE events. + +FastAPI/Starlette/Django/Azure Functions code owns route registration, +authentication, status codes, response construction, and background work. ```python -from agent_framework.openai import OpenAIChatClient -from agent_framework_hosting import AgentFrameworkHost -from agent_framework_hosting_responses import ResponsesChannel +from agent_framework_hosting import AgentState +from agent_framework_hosting_responses import ( + create_response_id, + responses_from_run, + responses_session_id, + responses_to_run, +) +from fastapi import Body, FastAPI +from fastapi.responses import JSONResponse + +app = FastAPI() +state = AgentState(agent) -agent = OpenAIChatClient().as_agent(name="Assistant") -host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel()]) -host.serve(port=8000) +@app.post("/responses") +async def responses(body: dict = Body(...)) -> JSONResponse: + run = responses_to_run(body) + session_id = responses_session_id(body) + response_id = create_response_id() + session = await state.get_or_create_session(session_id or response_id) + result = await (await state.get_target()).run( + run["messages"], + session=session, + options=run["options"], + ) + await state.set_session(response_id, session) + return JSONResponse(responses_from_run(result, response_id=response_id, session_id=session_id)) ``` -The base host plumbing lives in +The base execution-state helpers live in [`agent-framework-hosting`](https://pypi.org/project/agent-framework-hosting/). diff --git a/python/packages/hosting-responses/agent_framework_hosting_responses/__init__.py b/python/packages/hosting-responses/agent_framework_hosting_responses/__init__.py index cd221b56824..26ee8da69e7 100644 --- a/python/packages/hosting-responses/agent_framework_hosting_responses/__init__.py +++ b/python/packages/hosting-responses/agent_framework_hosting_responses/__init__.py @@ -6,9 +6,14 @@ from ._channel import ResponsesChannel from ._parsing import ( + create_response_id, messages_from_responses_input, parse_responses_identity, parse_responses_request, + responses_from_run, + responses_session_id, + responses_stream_from_run, + responses_to_run, ) try: @@ -19,7 +24,12 @@ __all__ = [ "ResponsesChannel", "__version__", + "create_response_id", "messages_from_responses_input", "parse_responses_identity", "parse_responses_request", + "responses_from_run", + "responses_session_id", + "responses_stream_from_run", + "responses_to_run", ] diff --git a/python/packages/hosting-responses/agent_framework_hosting_responses/_parsing.py b/python/packages/hosting-responses/agent_framework_hosting_responses/_parsing.py index fa742c55563..9af4a1c9fb4 100644 --- a/python/packages/hosting-responses/agent_framework_hosting_responses/_parsing.py +++ b/python/packages/hosting-responses/agent_framework_hosting_responses/_parsing.py @@ -15,11 +15,30 @@ from __future__ import annotations -from collections.abc import Mapping +import json +import time +import uuid +from collections.abc import AsyncIterator, Mapping, Sequence from typing import Any, cast -from agent_framework import Content, Message -from agent_framework_hosting import ChannelIdentity, ChannelSession +from agent_framework import AgentResponse, AgentResponseUpdate, ChatOptions, Content, Message, ResponseStream +from agent_framework_hosting import AgentRunArgs, ChannelIdentity, ChannelSession +from openai.types.responses import ( + Response as OpenAIResponse, +) +from openai.types.responses import ( + ResponseFunctionToolCall, + ResponseFunctionToolCallOutputItem, + ResponseInputFile, + ResponseInputImage, + ResponseInputText, + ResponseOutputItem, + ResponseOutputMessage, + ResponseOutputText, +) +from pydantic import TypeAdapter, ValidationError + +_RESPONSE_OUTPUT_ITEM_ADAPTER: TypeAdapter[Any] = TypeAdapter(ResponseOutputItem) # OpenAI Responses field name → Agent Framework ChatOptions field name. _RESPONSES_OPTION_REMAP = { @@ -29,6 +48,7 @@ # Fields the Responses transport owns; they are consumed separately and must # not also appear in options. _RESPONSES_TRANSPORT_KEYS = frozenset({"input", "stream", "previous_response_id"}) +_RESPONSES_RUN_TRANSPORT_KEYS = frozenset({"input", "stream", "previous_response_id", "conversation_id"}) def parse_responses_identity(body: Mapping[str, Any], channel_name: str) -> ChannelIdentity | None: @@ -149,8 +169,791 @@ def parse_responses_request( return messages, options, session +def create_response_id() -> str: + """Create a Responses-shaped response id.""" + return f"resp_{uuid.uuid4().hex}" + + +def responses_session_id(body: Mapping[str, Any]) -> str | None: + """Return the Responses session id from request body, if present. + + The returned value can be a ``resp_*`` previous response id or a ``conv_*`` + conversation id. Callers choose whether this request-derived value is + trusted for their route and deployment. + + Args: + body: OpenAI Responses-shaped request body. + + Returns: + Previous response id, conversation id, or ``None``. + """ + previous_response_id = body.get("previous_response_id") + if isinstance(previous_response_id, str) and previous_response_id: + return previous_response_id + conversation_id = body.get("conversation_id") + if isinstance(conversation_id, str) and conversation_id: + return conversation_id + return None + + +def responses_to_run(body: Mapping[str, Any]) -> AgentRunArgs: + """Convert a Responses request body into Agent Framework run values. + + Args: + body: OpenAI Responses-shaped request body. + + Returns: + Arguments corresponding to ``Agent.run``. + + Raises: + ValueError: If the request body has invalid ``input``. + """ + messages = messages_from_responses_input(body.get("input")) + options: dict[str, Any] = {} + for key, value in body.items(): + if key in _RESPONSES_RUN_TRANSPORT_KEYS or value is None: + continue + options[_RESPONSES_OPTION_REMAP.get(key, key)] = value + return AgentRunArgs( + messages=messages, + options=cast("ChatOptions[Any]", options), + stream=bool(body.get("stream", False)), + ) + + +def responses_from_run( + result: AgentResponse[Any], + *, + response_id: str, + session_id: str | None = None, +) -> dict[str, Any]: + """Convert an Agent Framework response into a Responses payload. + + Args: + result: Agent response returned by a run. + + Keyword Args: + response_id: Id for the response being created. + session_id: Optional prior ``resp_*`` or ``conv_*`` session id. When it + is a conversation id, the helper renders it in the Responses + conversation field. + + Returns: + Responses-compatible JSON payload. + """ + output_items = _result_to_output_items(result, status="completed") + response_kwargs: dict[str, Any] = { + "id": response_id, + "object": "response", + "created_at": int(time.time()), + "status": "completed", + "model": _model_from_result(result), + "output": output_items, + "parallel_tool_calls": False, + "tool_choice": "auto", + "tools": [], + "metadata": {}, + } + if session_id is not None and session_id.startswith("conv_"): + response_kwargs["conversation"] = {"id": session_id} + return _response_payload(OpenAIResponse(**response_kwargs)) + + +def _model_from_update(update: AgentResponseUpdate) -> str | None: + """Best-effort model id from one streamed update's raw representation. + + ``AgentResponse.from_updates`` does not carry a chunk's raw representation + forward onto the finalized response (see ``_finalize_response`` in core), + so ``_model_from_result`` can never find a model for a streamed result. + Each ``AgentResponseUpdate`` still has its own raw chat chunk, which + usually reports the model, so the streaming SSE helper captures it here + instead. + """ + raw = update.raw_representation + model = getattr(raw, "model", None) + return model if isinstance(model, str) and model else None + + +def _model_from_result(result: Any) -> str: + model = getattr(result, "model", None) + if isinstance(model, str) and model: + return model + raw = getattr(result, "raw_representation", None) + raw_model = getattr(raw, "model", None) + if isinstance(raw_model, str) and raw_model: + return raw_model + additional_properties = getattr(result, "additional_properties", None) + if isinstance(additional_properties, Mapping): + additional_model = cast(Mapping[str, Any], additional_properties).get("model") + if isinstance(additional_model, str) and additional_model: + return additional_model + return "agent" + + +def _result_to_output_items(result: Any, *, status: str) -> list[ResponseOutputItem]: + """Render an agent or workflow result as Responses output items.""" + messages = getattr(result, "messages", None) + if isinstance(messages, Sequence) and not isinstance(messages, (str, bytes, bytearray)): + return _messages_to_output_items(cast("Sequence[Any]", messages), status=status) + + if isinstance(result, Message): + return _messages_to_output_items([result], status=status) + if isinstance(result, Content): + return _contents_to_output_items([result], status=status) + + get_outputs = getattr(result, "get_outputs", None) + if callable(get_outputs): + output_items: list[ResponseOutputItem] = [] + for output in cast("Sequence[Any]", get_outputs()): + output_items.extend(_output_to_output_items(output, status=status)) + return output_items + + text = getattr(result, "text", None) + if isinstance(text, str): + return _text_output_items(text, status=status) + return _text_output_items(_result_to_text(result), status=status) + + +def _output_to_output_items(output: Any, *, status: str) -> list[ResponseOutputItem]: + if isinstance(output, Message): + return _messages_to_output_items([output], status=status) + if isinstance(output, Content): + return _contents_to_output_items([output], status=status) + messages = getattr(output, "messages", None) + if isinstance(messages, Sequence) and not isinstance(messages, (str, bytes, bytearray)): + return _messages_to_output_items(cast("Sequence[Any]", messages), status=status) + text = getattr(output, "text", None) + if isinstance(text, str): + return _text_output_items(text, status=status) + return _text_output_items(str(output), status=status) + + +def _messages_to_output_items(messages: Sequence[Any], *, status: str) -> list[ResponseOutputItem]: + output_items: list[ResponseOutputItem] = [] + message_contents: list[Content] = [] + + for message in messages: + if not isinstance(message, Message): + if message_contents: + output_items.extend(_contents_to_output_items(message_contents, status=status)) + message_contents.clear() + output_items.extend(_output_to_output_items(message, status=status)) + continue + message_contents.extend(message.contents) + + if message_contents: + output_items.extend(_contents_to_output_items(message_contents, status=status)) + + return output_items + + +def _contents_to_output_items( + contents: Sequence[Content], + *, + status: str, + seen_raw_items: dict[tuple[str, str], int] | None = None, +) -> list[ResponseOutputItem]: + output_items: list[ResponseOutputItem] = [] + message_content: list[Any] = [] + seen: dict[tuple[str, str], int] = seen_raw_items if seen_raw_items is not None else {} + + def flush_message() -> None: + if not message_content: + return + output_items.append(_message_output_item(message_content, status=status)) + message_content.clear() + + content_list = list(contents) + index = 0 + while index < len(content_list): + content = content_list[index] + raw_item = _raw_response_output_item(content.raw_representation) + if raw_item is not None: + raw_key = _response_output_item_key(raw_item) + if raw_key in seen: + output_items[seen[raw_key]] = raw_item + else: + flush_message() + seen[raw_key] = len(output_items) + output_items.append(raw_item) + index += 1 + continue + + next_content = content_list[index + 1] if index + 1 < len(content_list) else None + if _is_matching_code_interpreter_result(content, next_content): + flush_message() + output_items.append(_code_interpreter_output_item(content, status=status, result_content=next_content)) + index += 2 + continue + if _is_matching_image_generation_result(content, next_content): + flush_message() + output_items.append(_image_generation_output_item(content, status=status, result_content=next_content)) + index += 2 + continue + if _is_matching_mcp_result(content, next_content): + flush_message() + output_items.append(_mcp_call_output_item(content, status=status, result_content=next_content)) + index += 2 + continue + + match content.type: + case "text": + message_content.append(_message_text_content(content)) + case "text_reasoning": + flush_message() + output_items.append(_reasoning_output_item(content, status=status)) + case "function_call": + flush_message() + output_items.append(_function_call_output_item(content, status=status)) + case "function_result": + flush_message() + output_items.append(_function_result_output_item(content, status=status)) + case "code_interpreter_tool_call" | "code_interpreter_tool_result": + flush_message() + output_items.append(_code_interpreter_output_item(content, status=status)) + case "image_generation_tool_call" | "image_generation_tool_result": + flush_message() + output_items.append(_image_generation_output_item(content, status=status)) + case "mcp_server_tool_call": + flush_message() + output_items.append(_mcp_call_output_item(content, status=status)) + case "mcp_server_tool_result": + flush_message() + output_items.append(_mcp_result_output_item(content, status=status)) + case "shell_tool_call": + flush_message() + output_items.append(_shell_call_output_item(content, status=status)) + case "shell_tool_result": + flush_message() + output_items.append(_shell_result_output_item(content, status=status)) + case "function_approval_request": + flush_message() + output_items.append(_function_approval_request_output_item(content)) + case "function_approval_response": + flush_message() + output_items.append(_function_approval_response_output_item(content)) + case "data" | "uri" | "hosted_file": + flush_message() + output_items.append(_media_content_output_item(content, status=status)) + case "error": + message_content.append(ResponseOutputText(type="output_text", text=str(content), annotations=[])) + case _: + flush_message() + output_items.extend(_text_output_items(json.dumps(content.to_dict(), default=str), status=status)) + index += 1 + + flush_message() + return output_items + + +def _is_matching_code_interpreter_result(content: Content, next_content: Content | None) -> bool: + return ( + content.type == "code_interpreter_tool_call" + and next_content is not None + and next_content.type == "code_interpreter_tool_result" + and content.call_id == next_content.call_id + ) + + +def _is_matching_image_generation_result(content: Content, next_content: Content | None) -> bool: + return ( + content.type == "image_generation_tool_call" + and next_content is not None + and next_content.type == "image_generation_tool_result" + and content.image_id == next_content.image_id + ) + + +def _is_matching_mcp_result(content: Content, next_content: Content | None) -> bool: + return ( + content.type == "mcp_server_tool_call" + and next_content is not None + and next_content.type == "mcp_server_tool_result" + and content.call_id == next_content.call_id + ) + + +def _message_status(status: str) -> str: + return status if status in ("in_progress", "completed", "incomplete") else "incomplete" + + +def _text_output_items(text: str, *, status: str, message_id: str | None = None) -> list[ResponseOutputItem]: + return [ + _message_output_item( + [ResponseOutputText(type="output_text", text=text, annotations=[])], + status=status, + message_id=message_id, + ) + ] + + +def _message_output_item(content: Sequence[Any], *, status: str, message_id: str | None = None) -> ResponseOutputItem: + return cast( + ResponseOutputItem, + ResponseOutputMessage( + id=message_id or f"msg_{uuid.uuid4().hex}", + type="message", + role="assistant", + status=_message_status(status), # type: ignore[arg-type] + content=list(content), + ), + ) + + +def _message_text_content(content: Content) -> Any: + raw_type = _raw_type(content.raw_representation) + if raw_type in ("output_text", "refusal"): + return content.raw_representation + return ResponseOutputText(type="output_text", text=content.text or "", annotations=[]) + + +def _reasoning_output_item(content: Content, *, status: str) -> ResponseOutputItem: + item_data: dict[str, Any] = { + "id": content.id or f"rs_{uuid.uuid4().hex}", + "type": "reasoning", + "summary": [], + "status": _message_status(status), + } + if content.text: + item_data["content"] = [{"type": "reasoning_text", "text": content.text}] + if content.protected_data: + item_data["encrypted_content"] = content.protected_data + return _response_output_item(item_data) + + +def _function_call_output_item(content: Content, *, status: str) -> ResponseOutputItem: + return cast( + ResponseOutputItem, + ResponseFunctionToolCall( + id=content.additional_properties.get("fc_id") if content.additional_properties else None, + type="function_call", + call_id=content.call_id or f"call_{uuid.uuid4().hex}", + name=content.name or "tool", + arguments=_arguments_to_str(content.arguments), + status=_message_status(status), # type: ignore[arg-type] + ), + ) + + +def _function_result_output_item(content: Content, *, status: str) -> ResponseOutputItem: + if content.exception: + output: str | list[Any] = content.exception + elif output_parts := _content_parts_to_input_items(content.items): + output = output_parts + elif isinstance(content.result, str): + output = content.result + elif content.result is None: + output = "" + else: + output = json.dumps(content.result, default=str) + return cast( + ResponseOutputItem, + ResponseFunctionToolCallOutputItem( + id=f"fcout_{uuid.uuid4().hex}", + type="function_call_output", + call_id=content.call_id or f"call_{uuid.uuid4().hex}", + output=output, + status=_message_status(status), # type: ignore[arg-type] + ), + ) + + +def _code_interpreter_output_item( + content: Content, + *, + status: str, + result_content: Content | None = None, +) -> ResponseOutputItem: + output_parts: list[dict[str, Any]] = [] + outputs_value: Any = result_content.outputs if result_content is not None else content.outputs + if isinstance(outputs_value, Sequence) and not isinstance(outputs_value, (str, bytes, bytearray)): + for item in cast(Sequence[Any], outputs_value): + if isinstance(item, Content) and item.type == "text": + output_parts.append({"type": "logs", "logs": item.text or ""}) + elif isinstance(item, Content) and item.type in ("data", "uri") and item.uri: + output_parts.append({"type": "image", "url": item.uri}) + + return _response_output_item({ + "id": _content_item_id(content, result_content) or f"ci_{uuid.uuid4().hex}", + "type": "code_interpreter_call", + "code": _content_sequence_text(content.inputs), + "container_id": str(_content_property(content, result_content, "container_id") or "agent_framework"), + "outputs": output_parts or None, + "status": _code_interpreter_status(status), + }) + + +def _image_generation_output_item( + content: Content, + *, + status: str, + result_content: Content | None = None, +) -> ResponseOutputItem: + result_source = result_content.outputs if result_content is not None else content.outputs + image_id = content.image_id or (result_content.image_id if result_content is not None else None) + return _response_output_item({ + "id": image_id or f"ig_{uuid.uuid4().hex}", + "type": "image_generation_call", + "result": _image_generation_result(result_source), + "status": _image_generation_status(status), + }) + + +def _mcp_call_output_item( + content: Content, + *, + status: str, + result_content: Content | None = None, +) -> ResponseOutputItem: + return _response_output_item({ + "id": content.call_id or f"mcp_{uuid.uuid4().hex}", + "type": "mcp_call", + "server_label": content.server_name or "default", + "name": content.tool_name or "tool", + "arguments": _arguments_to_str(content.arguments), + "output": _stringify_output(result_content.output) if result_content is not None else None, + "status": _mcp_status(status), + }) + + +def _mcp_result_output_item(content: Content, *, status: str) -> ResponseOutputItem: + return _response_output_item({ + "id": content.call_id or f"mcp_{uuid.uuid4().hex}", + "type": "mcp_call", + "server_label": content.server_name or "default", + "name": content.tool_name or "tool", + "arguments": "", + "output": _stringify_output(content.output), + "status": _mcp_status(status), + }) + + +def _shell_call_output_item(content: Content, *, status: str) -> ResponseOutputItem: + return _response_output_item({ + "id": content.additional_properties.get("item_id") or f"shell_{uuid.uuid4().hex}", + "type": "shell_call", + "call_id": content.call_id or f"call_{uuid.uuid4().hex}", + "action": { + "commands": content.commands or [], + "timeout_ms": content.timeout_ms, + "max_output_length": content.max_output_length, + }, + "environment": {"type": "local"}, + "status": _message_status(status), + }) + + +def _shell_result_output_item(content: Content, *, status: str) -> ResponseOutputItem: + outputs: list[dict[str, Any]] = [] + outputs_value: Any = content.outputs + if isinstance(outputs_value, Sequence) and not isinstance(outputs_value, (str, bytes, bytearray)): + for item in cast(Sequence[Any], outputs_value): + if not isinstance(item, Content): + continue + outcome = {"type": "timeout"} if item.timed_out else {"type": "exit", "exit_code": item.exit_code or 0} + outputs.append({"stdout": item.stdout or "", "stderr": item.stderr or "", "outcome": outcome}) + + return _response_output_item({ + "id": content.additional_properties.get("item_id") or f"shellout_{uuid.uuid4().hex}", + "type": "shell_call_output", + "call_id": content.call_id or f"call_{uuid.uuid4().hex}", + "output": outputs, + "max_output_length": content.max_output_length, + "status": _message_status(status), + }) + + +def _function_approval_request_output_item(content: Content) -> ResponseOutputItem: + function_call = content.function_call + return _response_output_item({ + "id": content.id or f"approval_{uuid.uuid4().hex}", + "type": "mcp_approval_request", + "server_label": ( + function_call.additional_properties.get("server_label", "agent_framework") + if function_call is not None + else "agent_framework" + ), + "name": function_call.name if function_call is not None and function_call.name else "tool", + "arguments": _arguments_to_str(function_call.arguments if function_call is not None else None), + }) + + +def _function_approval_response_output_item(content: Content) -> ResponseOutputItem: + return _response_output_item({ + "id": content.id or f"approval_{uuid.uuid4().hex}", + "type": "mcp_approval_response", + "approval_request_id": content.id or "", + "approve": bool(content.approved), + }) + + +def _media_content_output_item(content: Content, *, status: str) -> ResponseOutputItem: + parts = _content_parts_to_input_items([content]) + if parts: + return cast( + ResponseOutputItem, + ResponseFunctionToolCallOutputItem( + id=f"content_{uuid.uuid4().hex}", + type="function_call_output", + call_id=f"content_{uuid.uuid4().hex}", + output=parts, + status=_message_status(status), # type: ignore[arg-type] + ), + ) + return _text_output_items(json.dumps(content.to_dict(), default=str), status=status)[0] + + +def _content_parts_to_input_items(contents: Sequence[Content] | None) -> list[Any]: + if not contents: + return [] + + parts: list[Any] = [] + for content in contents: + match content.type: + case "text": + parts.append(ResponseInputText(type="input_text", text=content.text or "")) + case "data" | "uri": + if not content.uri: + continue + if _is_image_content(content): + parts.append(ResponseInputImage(type="input_image", image_url=content.uri, detail="auto")) + else: + parts.append(ResponseInputFile(type="input_file", file_url=content.uri)) + case "hosted_file": + if content.file_id: + parts.append(ResponseInputFile(type="input_file", file_id=content.file_id)) + case _: + parts.append(ResponseInputText(type="input_text", text=json.dumps(content.to_dict(), default=str))) + return parts + + +def _content_sequence_text(contents: Sequence[Content] | None) -> str | None: + if not contents: + return None + text = "".join(content.text or "" for content in contents if content.type == "text") + return text or None + + +def _is_image_content(content: Content) -> bool: + media_type = content.media_type or "" + if media_type.startswith("image/"): + return True + return (content.uri or "").startswith("data:image/") + + +def _image_generation_result(outputs: Any) -> str | None: + if isinstance(outputs, Content): + return _image_generation_content_result(outputs) + if isinstance(outputs, Sequence) and not isinstance(outputs, (str, bytes, bytearray)): + for output in cast(Sequence[Any], outputs): + if isinstance(output, Content) and (result := _image_generation_content_result(output)): + return result + if isinstance(outputs, str): + return outputs + return None + + +def _image_generation_content_result(content: Content) -> str | None: + uri = content.uri + if not uri: + return None + if ";base64," in uri: + return uri.split(";base64,", 1)[1] + return uri + + +def _content_item_id(content: Content, result_content: Content | None = None) -> str | None: + item_id = content.additional_properties.get("item_id") + if isinstance(item_id, str) and item_id: + return item_id + if result_content is not None: + result_item_id = result_content.additional_properties.get("item_id") + if isinstance(result_item_id, str) and result_item_id: + return result_item_id + return content.call_id or (result_content.call_id if result_content is not None else None) + + +def _content_property(content: Content, result_content: Content | None, key: str) -> Any: + if key in content.additional_properties: + return content.additional_properties[key] + if result_content is not None and key in result_content.additional_properties: + return result_content.additional_properties[key] + return None + + +def _code_interpreter_status(status: str) -> str: + if status in ("in_progress", "completed", "incomplete", "failed"): + return status + return "incomplete" + + +def _image_generation_status(status: str) -> str: + if status in ("in_progress", "completed", "failed"): + return status + return "failed" + + +def _mcp_status(status: str) -> str: + if status in ("in_progress", "completed", "incomplete", "failed"): + return status + return "incomplete" + + +def _arguments_to_str(arguments: Any | None) -> str: + if arguments is None: + return "" + if isinstance(arguments, str): + return arguments + return json.dumps(arguments, default=str) + + +def _stringify_output(output: Any) -> str: + if output is None: + return "" + if isinstance(output, str): + return output + if isinstance(output, Sequence) and not isinstance(output, (str, bytes, bytearray)): + return "".join(_stringify_output(item) for item in cast(Sequence[Any], output)) + return json.dumps(output, default=str) + + +def _raw_response_output_item(raw: Any) -> ResponseOutputItem | None: + if _raw_type(raw) is None: + return None + try: + return cast(ResponseOutputItem, _RESPONSE_OUTPUT_ITEM_ADAPTER.validate_python(raw)) + except ValidationError: + return None + + +def _response_output_item(value: Mapping[str, Any]) -> ResponseOutputItem: + return cast(ResponseOutputItem, _RESPONSE_OUTPUT_ITEM_ADAPTER.validate_python(value)) + + +def _response_output_item_key(item: ResponseOutputItem) -> tuple[str, str]: + item_type = _raw_type(item) or "unknown" + item_id = getattr(item, "id", None) or getattr(item, "call_id", None) + if isinstance(item_id, str) and item_id: + return item_type, item_id + return item_type, str(id(item)) + + +def _raw_type(raw: Any) -> str | None: + raw_type = getattr(raw, "type", None) + if isinstance(raw_type, str): + return raw_type + if isinstance(raw, Mapping): + mapping_type = cast(Mapping[str, Any], raw).get("type") + if isinstance(mapping_type, str): + return mapping_type + return None + + +def _result_to_text(result: Any) -> str: + text = getattr(result, "text", None) + if isinstance(text, str): + return text + get_outputs = getattr(result, "get_outputs", None) + if callable(get_outputs): + return "".join(_output_to_text(output) for output in cast(Sequence[Any], get_outputs())) + return str(result) + + +def _output_to_text(output: Any) -> str: + text = getattr(output, "text", None) + if isinstance(text, str): + return text + return str(output) + + +def _response_payload(response: OpenAIResponse) -> dict[str, Any]: + payload = response.model_dump(mode="json", exclude_none=True) + created_at = payload.get("created_at") + if isinstance(created_at, float): + payload["created_at"] = int(created_at) + return payload + + +def _sse_event(event_type: str, payload: Mapping[str, Any]) -> str: + """Format one Server-Sent Event.""" + return f"event: {event_type}\ndata: {_json_dumps(payload)}\n\n" + + +def _json_dumps(payload: Mapping[str, Any]) -> str: + """Serialize a Responses SSE payload.""" + return json.dumps(payload, separators=(",", ":")) + + +async def responses_stream_from_run( + stream: ResponseStream[AgentResponseUpdate, AgentResponse[Any]], + *, + response_id: str, + session_id: str | None = None, +) -> AsyncIterator[str]: + """Convert an Agent Framework response stream into Responses SSE events. + + Args: + stream: Agent Framework response stream returned by ``agent.run(..., + stream=True)``. + + Keyword Args: + response_id: Id for the response being created. + session_id: Optional prior ``resp_*`` or ``conv_*`` session id. + + Yields: + Server-Sent Event strings. + """ + yield _sse_event( + "response.created", + { + "type": "response.created", + "response": { + "id": response_id, + "object": "response", + "created_at": int(time.time()), + "status": "in_progress", + "model": "agent", + "output": [], + }, + }, + ) + + model: str | None = None + async for update in stream: + if model is None: + model = _model_from_update(update) + if update.text: + yield _sse_event( + "response.output_text.delta", + { + "type": "response.output_text.delta", + "delta": update.text, + }, + ) + + final = await stream.get_final_response() + payload = responses_from_run(final, response_id=response_id, session_id=session_id) + if model is not None: + # The finalized `AgentResponse` never carries a raw representation + # (see `_model_from_update`), so prefer the model observed on the + # stream's own chunks over `responses_from_run`'s "agent" fallback. + payload["model"] = model + yield _sse_event( + "response.completed", + { + "type": "response.completed", + "response": payload, + }, + ) + + __all__ = [ + "create_response_id", "messages_from_responses_input", "parse_responses_identity", "parse_responses_request", + "responses_from_run", + "responses_session_id", + "responses_stream_from_run", + "responses_to_run", ] diff --git a/python/packages/hosting-responses/pyproject.toml b/python/packages/hosting-responses/pyproject.toml index a8b96654d21..82806d9ebaa 100644 --- a/python/packages/hosting-responses/pyproject.toml +++ b/python/packages/hosting-responses/pyproject.toml @@ -28,6 +28,12 @@ dependencies = [ "openai>=1.99.0,<3", ] +[dependency-groups] +dev = [ + "fastapi>=0.115.0,<0.138.1", + "httpx>=0.28.1", +] + [tool.uv] prerelease = "if-necessary-or-explicit" environments = [ diff --git a/python/packages/hosting-responses/tests/hosting_responses/test_http_round_trip.py b/python/packages/hosting-responses/tests/hosting_responses/test_http_round_trip.py new file mode 100644 index 00000000000..8c0039228cb --- /dev/null +++ b/python/packages/hosting-responses/tests/hosting_responses/test_http_round_trip.py @@ -0,0 +1,271 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""HTTP round-trip tests: POST -> FastAPI route -> JSON/SSE response. + +These exercise the same wiring as the `local_responses` sample: helpers from +`agent_framework_hosting_responses` convert between the Responses protocol and +Agent Framework run values, `agent_framework_hosting`'s `AgentState` / +`SessionStore` hold shared execution state, and a small FastAPI route owns +everything else (parsing, policy, response construction). Requests go through +`httpx.AsyncClient` with `ASGITransport` -- no real server process or live +model is involved. +""" + +from __future__ import annotations + +import json +from collections.abc import AsyncIterator, Awaitable, Mapping +from typing import Any, Literal, overload + +import httpx +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + AgentRunInputs, + AgentSession, + Content, + Message, + ResponseStream, +) +from agent_framework_hosting import AgentState +from fastapi import Body, FastAPI, HTTPException +from fastapi.responses import JSONResponse, StreamingResponse + +from agent_framework_hosting_responses import ( + create_response_id, + responses_from_run, + responses_session_id, + responses_stream_from_run, + responses_to_run, +) + + +class _StubAgent: + """Deterministic ``SupportsAgentRun`` stub that tracks session continuity. + + Each call records the ``session_id`` of the ``AgentSession`` it was + invoked with and a per-session turn counter, so tests can assert that a + chain of requests reused one session instead of silently starting fresh + ones. + """ + + id = "stub-agent" + name: str | None = "stub-agent" + description: str | None = "stub agent for HTTP round-trip tests" + + def __init__(self) -> None: + self.session_ids_seen: list[str | None] = [] + self.turn_counts: dict[str | None, int] = {} + + def create_session(self, *, session_id: str | None = None) -> AgentSession: + return AgentSession(session_id=session_id) + + def get_session(self, service_session_id: Any, *, session_id: str | None = None) -> AgentSession: + return AgentSession(session_id=session_id, service_session_id=service_session_id) + + @overload + def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: Literal[False] = ..., + session: AgentSession | None = None, + function_invocation_kwargs: Mapping[str, Any] | None = None, + client_kwargs: Mapping[str, Any] | None = None, + ) -> Awaitable[AgentResponse[Any]]: ... + + @overload + def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: Literal[True], + session: AgentSession | None = None, + function_invocation_kwargs: Mapping[str, Any] | None = None, + client_kwargs: Mapping[str, Any] | None = None, + ) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ... + + def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: bool = False, + session: AgentSession | None = None, + function_invocation_kwargs: Mapping[str, Any] | None = None, + client_kwargs: Mapping[str, Any] | None = None, + ) -> Awaitable[AgentResponse[Any]] | ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: + session_id = session.session_id if session is not None else None + self.session_ids_seen.append(session_id) + self.turn_counts[session_id] = self.turn_counts.get(session_id, 0) + 1 + text = f"turn {self.turn_counts[session_id]} for session {session_id}" + + if stream: + + async def _stream() -> AsyncIterator[AgentResponseUpdate]: + yield AgentResponseUpdate(contents=[Content.from_text(text=text)], role="assistant") + + return ResponseStream(_stream(), finalizer=lambda updates: AgentResponse.from_updates(updates)) + + async def _get_response() -> AgentResponse[Any]: + return AgentResponse(messages=Message(role="assistant", contents=[Content.from_text(text=text)])) + + return _get_response() + + +def _build_app(agent: _StubAgent) -> FastAPI: + """Build a minimal FastAPI app mirroring the `local_responses` sample's route.""" + app = FastAPI() + state = AgentState(agent) + + @app.post("/responses", response_model=None) + async def responses(body: dict[str, Any] = Body(...)) -> JSONResponse | StreamingResponse: # noqa: B008 + try: + run = responses_to_run(body) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + session_id = responses_session_id(body) + response_id = create_response_id() + + target = await state.get_target() + lookup_id = session_id or response_id + session = await state.get_or_create_session(lookup_id) + + if run["stream"]: + stream = target.run(run["messages"], stream=True, session=session) + if not isinstance(stream, ResponseStream): + raise HTTPException(status_code=500, detail="agent did not return a response stream") + + async def stream_events() -> AsyncIterator[str]: + async for event in responses_stream_from_run( + stream, + response_id=response_id, + session_id=session_id, + ): + yield event + await state.set_session(response_id, session) + + return StreamingResponse( + stream_events(), + media_type="text/event-stream", + ) + + result = await target.run(run["messages"], session=session) + await state.set_session(response_id, session) + return JSONResponse(responses_from_run(result, response_id=response_id, session_id=session_id)) + + return app + + +async def _post(app: FastAPI, payload: dict[str, Any]) -> httpx.Response: + """Send a POST /responses request through the ASGI app, no real socket involved.""" + transport = httpx.ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + return await client.post("/responses", json=payload, timeout=30) + + +def _parse_sse_events(body: str) -> list[dict[str, Any]]: + """Parse SSE text into a list of `{"event": ..., "data": ...}` dicts.""" + events: list[dict[str, Any]] = [] + for block in body.split("\n\n"): + if not block.strip(): + continue + event_type: str | None = None + data: str | None = None + for line in block.split("\n"): + if line.startswith("event: "): + event_type = line[len("event: ") :] + elif line.startswith("data: "): + data = line[len("data: ") :] + if event_type is not None and data is not None: + events.append({"event": event_type, "data": json.loads(data)}) + return events + + +class TestNonStreamingRoundTrip: + async def test_returns_responses_shaped_payload(self) -> None: + app = _build_app(_StubAgent()) + response = await _post(app, {"input": "hello"}) + + assert response.status_code == 200 + payload = response.json() + assert payload["object"] == "response" + assert payload["status"] == "completed" + assert payload["id"].startswith("resp_") + assert any(item["type"] == "message" for item in payload["output"]) + + async def test_invalid_input_returns_400_not_500(self) -> None: + app = _build_app(_StubAgent()) + response = await _post(app, {}) + + assert response.status_code == 400 + assert "input" in response.json()["detail"] + + +class TestStreamingRoundTrip: + async def test_stream_emits_created_delta_and_completed_events(self) -> None: + app = _build_app(_StubAgent()) + response = await _post(app, {"input": "hello", "stream": True}) + + assert response.status_code == 200 + assert "text/event-stream" in response.headers["content-type"] + + events = _parse_sse_events(response.text) + event_types = [e["event"] for e in events] + assert event_types[0] == "response.created" + assert event_types[-1] == "response.completed" + assert "response.output_text.delta" in event_types + + completed = events[-1]["data"]["response"] + assert completed["status"] == "completed" + assert completed["id"].startswith("resp_") + + +class TestSessionContinuity: + """Regression coverage for the `previous_response_id` aliasing fix. + + `previous_response_id` rotates every turn. Without aliasing the newly + minted response id to the same session, turn 3 would silently resolve to + a brand-new, empty session instead of the one from turns 1-2. + """ + + async def test_previous_response_id_chain_preserves_session_across_three_turns(self) -> None: + agent = _StubAgent() + app = _build_app(agent) + + turn1 = await _post(app, {"input": "hi"}) + assert turn1.status_code == 200 + turn2 = await _post(app, {"input": "still there?", "previous_response_id": turn1.json()["id"]}) + assert turn2.status_code == 200 + turn3 = await _post(app, {"input": "still there?", "previous_response_id": turn2.json()["id"]}) + assert turn3.status_code == 200 + + assert len(agent.session_ids_seen) == 3 + # All three turns must have run against the same underlying session, + # not three independent ones. + first_session_id = agent.session_ids_seen[0] + assert first_session_id is not None + assert agent.session_ids_seen == [first_session_id] * 3 + assert agent.turn_counts[first_session_id] == 3 + + async def test_conversation_id_preserves_session_across_turns(self) -> None: + agent = _StubAgent() + app = _build_app(agent) + + turn1 = await _post(app, {"input": "hi", "conversation_id": "conv_stable"}) + assert turn1.status_code == 200 + turn2 = await _post(app, {"input": "still there?", "conversation_id": "conv_stable"}) + assert turn2.status_code == 200 + + assert agent.session_ids_seen == ["conv_stable", "conv_stable"] + assert agent.turn_counts["conv_stable"] == 2 + + async def test_unrelated_requests_get_independent_sessions(self) -> None: + agent = _StubAgent() + app = _build_app(agent) + + first = await _post(app, {"input": "hi"}) + second = await _post(app, {"input": "unrelated"}) + + assert first.status_code == 200 + assert second.status_code == 200 + assert agent.session_ids_seen[0] != agent.session_ids_seen[1] diff --git a/python/packages/hosting-responses/tests/hosting_responses/test_parsing.py b/python/packages/hosting-responses/tests/hosting_responses/test_parsing.py index bdc65058066..6fa100415c1 100644 --- a/python/packages/hosting-responses/tests/hosting_responses/test_parsing.py +++ b/python/packages/hosting-responses/tests/hosting_responses/test_parsing.py @@ -4,12 +4,21 @@ from __future__ import annotations +from collections.abc import AsyncIterator, Sequence +from typing import cast + import pytest +from agent_framework import AgentResponse, AgentResponseUpdate, Content, Message, ResponseStream from agent_framework_hosting_responses import ( + create_response_id, messages_from_responses_input, parse_responses_identity, parse_responses_request, + responses_from_run, + responses_session_id, + responses_stream_from_run, + responses_to_run, ) @@ -167,3 +176,127 @@ def test_returns_none_when_absent(self) -> None: def test_returns_none_for_non_string(self) -> None: assert parse_responses_identity({"safety_identifier": 42}, "responses") is None + + +class TestResponsesRunHelpers: + def test_create_response_id_shape(self) -> None: + response_id = create_response_id() + + assert response_id.startswith("resp_") + + def test_responses_session_id_prefers_previous_response(self) -> None: + assert responses_session_id({"previous_response_id": "resp_1", "conversation_id": "conv_1"}) == "resp_1" + + def test_responses_session_id_uses_conversation_id(self) -> None: + assert responses_session_id({"conversation_id": "conv_1"}) == "conv_1" + + def test_responses_session_id_returns_none_when_absent(self) -> None: + assert responses_session_id({"input": "hi"}) is None + + def test_responses_to_run_returns_messages_options_and_stream(self) -> None: + run = responses_to_run({ + "input": "hi", + "stream": True, + "previous_response_id": "resp_1", + "conversation_id": "conv_1", + "max_output_tokens": 32, + "model": "gpt-x", + }) + + # `responses_to_run` always produces a `list[Message]`; the TypedDict + # field is typed as the wider `Agent.run` input shape, so narrow here. + messages = cast("list[Message]", run["messages"]) + assert messages[0].text == "hi" + assert run["stream"] is True + assert run["options"] == {"max_tokens": 32, "model": "gpt-x"} + + def test_responses_from_run_returns_response_payload(self) -> None: + result = AgentResponse( + messages=Message(role="assistant", contents=[Content.from_text("hello")]), + additional_properties={"model": "test-model"}, + ) + + payload = responses_from_run(result, response_id="resp_new") + + assert payload["id"] == "resp_new" + assert payload["model"] == "test-model" + assert payload["output"][0]["content"][0]["text"] == "hello" + + def test_responses_from_run_preserves_multimodal_output_items(self) -> None: + result = AgentResponse( + messages=Message( + role="assistant", + contents=[ + Content.from_text_reasoning(id="rs_1", text="checking"), + Content.from_function_call("call_1", "collect_media", arguments={"city": "Seattle"}), + Content.from_function_result( + "call_1", + result=[ + Content.from_text("caption"), + Content.from_uri("https://example.com/cat.png", media_type="image/png"), + Content.from_hosted_file("file_pdf", media_type="application/pdf"), + ], + ), + Content.from_text("done"), + ], + ) + ) + + payload = responses_from_run(result, response_id="resp_new") + + output = payload["output"] + assert [item["type"] for item in output] == [ + "reasoning", + "function_call", + "function_call_output", + "message", + ] + assert output[0]["content"][0]["text"] == "checking" + assert output[1]["name"] == "collect_media" + assert output[1]["arguments"] == '{"city": "Seattle"}' + assert output[2]["output"] == [ + {"text": "caption", "type": "input_text"}, + {"detail": "auto", "type": "input_image", "image_url": "https://example.com/cat.png"}, + {"type": "input_file", "file_id": "file_pdf"}, + ] + assert output[3]["content"][0]["text"] == "done" + + def test_responses_from_run_maps_conversation_session(self) -> None: + result = AgentResponse(messages=Message(role="assistant", contents=[Content.from_text("hello")])) + + payload = responses_from_run(result, response_id="resp_new", session_id="conv_1") + + assert payload["conversation"] == {"id": "conv_1"} + + def test_responses_from_run_omits_previous_response_session(self) -> None: + result = AgentResponse(messages=Message(role="assistant", contents=[Content.from_text("hello")])) + + payload = responses_from_run(result, response_id="resp_new", session_id="resp_1") + + assert "conversation" not in payload + + async def test_responses_stream_from_run(self) -> None: + async def updates() -> AsyncIterator[AgentResponseUpdate]: + yield AgentResponseUpdate(contents=[Content.from_text("hel")], role="assistant") + yield AgentResponseUpdate(contents=[Content.from_text("lo")], role="assistant") + + def finalizer(items: Sequence[AgentResponseUpdate]) -> AgentResponse: + return AgentResponse.from_updates(items) + + stream = ResponseStream(updates(), finalizer=finalizer) + + events = [ + event + async for event in responses_stream_from_run( + stream, + response_id="resp_new", + session_id="conv_1", + ) + ] + + assert events[0].startswith("event: response.created") + assert "response.output_text.delta" in events[1] + assert "hel" in events[1] + assert "lo" in events[2] + assert events[-1].startswith("event: response.completed") + assert '"conversation":{"id":"conv_1"}' in events[-1] diff --git a/python/packages/hosting/README.md b/python/packages/hosting/README.md index d08be242d1f..096762fa0c4 100644 --- a/python/packages/hosting/README.md +++ b/python/packages/hosting/README.md @@ -1,122 +1,92 @@ # agent-framework-hosting -Multi-channel hosting for Microsoft Agent Framework agents. +Shared execution-state helpers for app-owned Agent Framework hosting. -`agent-framework-hosting` lets you serve a single agent or workflow target -through one or more **channels**. The host owns one Starlette ASGI app, -route/lifecycle composition, and per-`isolation_key` session resolution. -Each channel owns its protocol parsing and response rendering. +This package keeps Agent Framework state separate from web-framework concerns: -The base package contains only channel-neutral plumbing: +- `AgentState` — pairs an agent target with a `SessionStore` + (`session_id -> AgentSession`). +- `WorkflowState` — pairs a workflow target with a `CheckpointStore` + (`session_id -> CheckpointStorage`). -- `AgentFrameworkHost` — the Starlette host. -- `Channel` — the channel protocol. -- `ChannelRequest` / `ChannelSession` / `ChannelIdentity` — the request - envelope and optional channel metadata. -- `ChannelContext` / `ChannelContribution` / `ChannelCommand` — channel-side - hooks for invoking the target and contributing routes, commands, and - lifecycle callbacks. -- `ChannelRunHook` / `ChannelResponseHook` / `ChannelStreamUpdateHook` — - host-invoked customization seams. +Both stores are plain storage: `get`/`set`/`delete` by an app-selected id, +nothing more. Neither one knows how to create a new value for an id it +hasn't seen before — use `AgentState.get_or_create_session(...)` / +`WorkflowState.get_or_create_checkpoint_storage(...)` for that, since only +the state object has both the store and the resolved target. -`ChannelStreamUpdateHook` applies to streamed updates only. It is not a -substitute for final-response redaction. +- Existing experimental channel-hosting types remain available while the package + is unreleased, but the v1 direction is protocol helpers plus app-owned routes. -Concrete channels live in their own packages so you only install what you use: +Use FastAPI, Starlette, Azure Functions, Django, or another framework for route +registration, auth, middleware, response construction, and background work. -| Package | Transport | -|---|---| -| `agent-framework-hosting-responses` | OpenAI Responses API | - -Additional channel packages can build on the same host contract without adding -their protocol dependencies to the base package. - -## Install - -```bash -pip install agent-framework-hosting agent-framework-hosting-responses -# or with Hypercorn pre-installed for the demo `host.serve(...)` helper -pip install "agent-framework-hosting[serve]" agent-framework-hosting-responses -# add the [disk] extra to persist reset-session aliases -pip install "agent-framework-hosting[disk]" -``` +> The built-in `SessionStore` / `CheckpointStore` are in-memory `dict`s with +> no eviction — every id ever stored stays resolvable for the life of the +> process. That is intentional: protocols such as OpenAI Responses' +> `previous_response_id` are designed to let a caller continue from *any* +> earlier point in a conversation, not just the latest turn, so every id +> handed out needs to stay independently resolvable. If you back either +> store with real storage (Redis, a database, ...), you are responsible for +> that store's own TTL/eviction policy; these in-memory reference +> implementations do not model that concern. ## Quickstart ```python from agent_framework.openai import OpenAIChatClient -from agent_framework_hosting import AgentFrameworkHost, Channel +from agent_framework_hosting import AgentState agent = OpenAIChatClient().as_agent(name="Assistant") +state = AgentState(agent) -# Add channels from sibling packages, e.g. `agent-framework-hosting-responses` -# exposes a `ResponsesChannel` that serves the OpenAI Responses API. -channels: list[Channel] = [] - -host = AgentFrameworkHost(target=agent, channels=channels) -host.serve(port=8000) +session = await state.get_or_create_session("conversation-1") +result = await (await state.get_target()).run("Hello", session=session) ``` -## Session state and workflow checkpoints - -By default the host keeps live `AgentSession` objects and reset-session aliases -in memory. Channels opt into continuity by setting -`ChannelRequest.session = ChannelSession(isolation_key=...)`; requests with the -same isolation key reuse the same host-created session. +If a protocol mints a new continuation id on every response, store the session +explicitly after `run(...)` returns. `run(...)` may update the session, so store +the post-run object: -The host treats `isolation_key` as an opaque partition key. Each channel or -hosting environment decides where that key comes from: - -- protocol headers supplied by a trusted platform, -- request body fields such as a previous response or conversation ID, -- route/path parameters, -- channel-native metadata such as chat/user IDs, or -- environment-provided context in an ephemeral host. +```python +session = await state.get_or_create_session(previous_response_id) +result = await (await state.get_target()).run("Hello", session=session) +await state.set_session(response_id, session) +``` -The host should be able to carry any of those sources as long as the channel or -platform has already authenticated and authorized the caller before passing the -key to `ChannelSession`. +Targets can be direct instances, synchronous factories, asynchronous factories, +or awaitables: -The built-in request-context helper recognizes the `x-agent-user-isolation-key` -and `x-agent-chat-isolation-key` header names because some hosting -environments, including Foundry Hosted Agents, already use them. Reusing those -header names does **not** mean `agent-framework-hosting` is the supported way to -run on Foundry Hosted Agents; use `agent-framework-foundry-hosting` for that -hosting surface. +```python +state = AgentState(create_agent) # cached by default +state = AgentState(create_agent, cache_target=False) +``` -For long-running deployments that need `reset_session(...)` aliases to survive -restart, pass `state_dir`: +`WorkflowState` mirrors this shape for workflow targets: ```python -host = AgentFrameworkHost( - target=agent, - channels=channels, - state_dir="./.host-state", -) -``` +from agent_framework_hosting import WorkflowState -This creates `./.host-state/sessions/` and stores only lightweight alias -bookkeeping. Live `AgentSession` objects are still rehydrated lazily by the -configured history provider on the next turn. +state = WorkflowState(create_workflow) +storage = await state.get_or_create_checkpoint_storage("conversation-1") +result = await (await state.get_target()).run("Hello", checkpoint_storage=storage) +``` -For workflow targets, `checkpoint_location=...` is the clearest way to enable -checkpoint persistence. As a convenience, `state_dir="./.host-state"` also -derives `./.host-state/checkpoints/` for workflow targets. Use the mapping form -when you want only one component: +`WorkflowState` also accepts an unbuilt workflow builder directly: ```python -from agent_framework_hosting import HostStatePaths - -host = AgentFrameworkHost( - target=workflow, - channels=channels, - state_dir=HostStatePaths( - sessions="/var/lib/myapp/sessions", - checkpoints="/var/lib/myapp/checkpoints", - ), -) +from agent_framework import WorkflowBuilder +from agent_framework_hosting import WorkflowState + +builder = WorkflowBuilder(start_executor=executor) +state = WorkflowState(builder) # calls builder.build() when the target is resolved ``` +This is structural: orchestration builders from `agent_framework_orchestrations` +(`SequentialBuilder`, `ConcurrentBuilder`, `HandoffBuilder`, `GroupChatBuilder`, +and `MagenticBuilder`) also work because they expose the same zero-argument +`build() -> Workflow` method. + Cross-channel identity linking, multicast delivery, background runs, continuation tokens, and durable delivery runners are follow-up enhancements, -not part of this v1 host contract. +not part of this v1 state surface. diff --git a/python/packages/hosting/agent_framework_hosting/__init__.py b/python/packages/hosting/agent_framework_hosting/__init__.py index ab78ccd4b9e..99d98367573 100644 --- a/python/packages/hosting/agent_framework_hosting/__init__.py +++ b/python/packages/hosting/agent_framework_hosting/__init__.py @@ -20,6 +20,15 @@ reset_current_isolation_keys, set_current_isolation_keys, ) +from ._state import ( + AgentRunArgs, + AgentState, + CheckpointStore, + SessionStore, + SupportsBuild, + WorkflowRunArgs, + WorkflowState, +) from ._types import ( Channel, ChannelCommand, @@ -44,6 +53,8 @@ "ISOLATION_HEADER_CHAT", "ISOLATION_HEADER_USER", "AgentFrameworkHost", + "AgentRunArgs", + "AgentState", "Channel", "ChannelCommand", "ChannelCommandContext", @@ -55,9 +66,14 @@ "ChannelRunHook", "ChannelSession", "ChannelStreamUpdateHook", + "CheckpointStore", "HostStatePaths", "HostedRunResult", "IsolationKeys", + "SessionStore", + "SupportsBuild", + "WorkflowRunArgs", + "WorkflowState", "__version__", "get_current_isolation_keys", "logger", diff --git a/python/packages/hosting/agent_framework_hosting/_state.py b/python/packages/hosting/agent_framework_hosting/_state.py new file mode 100644 index 00000000000..bbe07d965c6 --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_state.py @@ -0,0 +1,422 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Shared execution state for app-owned hosting routes. + +Two independent state holders, one per target kind, since agents and +workflows keep different continuation state: + +- ``AgentState`` pairs an agent target with a ``SessionStore`` + (``session_id -> AgentSession``). +- ``WorkflowState`` pairs a workflow target with a ``CheckpointStore`` + (``session_id -> CheckpointStorage``). + +Both stores are plain storage: they only get/set/delete what they are given. +Neither one knows how to create a new value for a ``session_id`` it hasn't +seen before -- that is the corresponding state object's job (see +``AgentState.get_or_create_session`` / ``WorkflowState.get_or_create_checkpoint_storage``), +since only the state object has both the store and the resolved target. +""" + +from __future__ import annotations + +import inspect +from collections.abc import Awaitable, Callable, Mapping +from typing import Any, Generic, Protocol, TypedDict, TypeVar, cast, runtime_checkable + +from agent_framework import ( + AgentRunInputs, + AgentSession, + ChatOptions, + CheckpointStorage, + InMemoryCheckpointStorage, + SupportsAgentRun, + Workflow, +) + + +class SessionStore: + """Plain in-memory ``session_id -> AgentSession`` lookup. + + This store only stores and retrieves; it does not create sessions. Use + :meth:`AgentState.get_or_create_session` for that -- it resolves the + agent target and calls ``target.create_session(...)`` the first time a + given ``session_id`` is seen, then stores the result here. + + No eviction: every id ever stored stays resolvable for the life of the + process. That is intentional -- protocols such as OpenAI Responses' + ``previous_response_id`` are designed to let a caller continue from *any* + earlier point in a conversation, not just the latest turn, so every id + that has been handed out needs to stay independently resolvable. If you + back a ``SessionStore``-shaped store with real storage (Redis, a + database, ...), you are responsible for that store's own TTL/eviction + policy; this in-memory reference implementation does not model that + concern. + """ + + def __init__(self) -> None: + """Create an empty session store.""" + self._sessions: dict[str, AgentSession] = {} + + async def get(self, session_id: str) -> AgentSession | None: + """Return the stored session for ``session_id``, or ``None`` if absent. + + Args: + session_id: Opaque app-selected session id. + + Raises: + ValueError: If ``session_id`` is empty. + """ + if not session_id: + raise ValueError("session_id must be a non-empty string") + return self._sessions.get(session_id) + + async def set(self, session_id: str, session: AgentSession) -> None: + """Store ``session`` under ``session_id``, replacing any existing entry. + + Args: + session_id: Opaque app-selected session id. + session: The session to store. + + Raises: + ValueError: If ``session_id`` is empty. + """ + if not session_id: + raise ValueError("session_id must be a non-empty string") + self._sessions[session_id] = session + + async def delete(self, session_id: str) -> None: + """Forget the stored session for ``session_id``, if any. + + Args: + session_id: Opaque app-selected session id. + + Raises: + ValueError: If ``session_id`` is empty. + """ + if not session_id: + raise ValueError("session_id must be a non-empty string") + self._sessions.pop(session_id, None) + + +class CheckpointStore: + """Plain in-memory ``session_id -> CheckpointStorage`` lookup. + + Maps an app-selected session id to a :class:`CheckpointStorage` scoped to + that conversation. This store only stores and retrieves; it does not + decide which checkpoint within that storage to resume from. Use + :meth:`WorkflowState.get_or_create_checkpoint_storage` to create a fresh + ``CheckpointStorage`` the first time a given ``session_id`` is seen. + + Resuming a prior run is a separate, run-time decision the route makes: + call ``storage.get_latest(workflow_name=...)`` yourself and pass its + ``checkpoint_id`` into ``workflow.run(checkpoint_id=..., checkpoint_storage=storage)``. + + No eviction, for the same reason as :class:`SessionStore` -- see that + class's docstring. + """ + + def __init__(self) -> None: + """Create an empty checkpoint store.""" + self._storages: dict[str, CheckpointStorage] = {} + + async def get(self, session_id: str) -> CheckpointStorage | None: + """Return the stored checkpoint storage for ``session_id``, or ``None`` if absent. + + Args: + session_id: Opaque app-selected session id. + + Raises: + ValueError: If ``session_id`` is empty. + """ + if not session_id: + raise ValueError("session_id must be a non-empty string") + return self._storages.get(session_id) + + async def set(self, session_id: str, storage: CheckpointStorage) -> None: + """Store ``storage`` under ``session_id``, replacing any existing entry. + + Args: + session_id: Opaque app-selected session id. + storage: The checkpoint storage to store. + + Raises: + ValueError: If ``session_id`` is empty. + """ + if not session_id: + raise ValueError("session_id must be a non-empty string") + self._storages[session_id] = storage + + async def delete(self, session_id: str) -> None: + """Forget the stored checkpoint storage for ``session_id``, if any. + + Args: + session_id: Opaque app-selected session id. + + Raises: + ValueError: If ``session_id`` is empty. + """ + if not session_id: + raise ValueError("session_id must be a non-empty string") + self._storages.pop(session_id, None) + + +AgentT = TypeVar("AgentT", bound=SupportsAgentRun) +WorkflowT = TypeVar("WorkflowT", bound=Workflow) + + +@runtime_checkable +class SupportsBuild(Protocol): + """A builder that produces a ``Workflow`` via a zero-argument ``build()``. + + Matches ``agent_framework.WorkflowBuilder`` and the orchestration + builders in ``agent_framework_orchestrations`` (``ConcurrentBuilder``, + ``GroupChatBuilder``, ``HandoffBuilder``, ``MagenticBuilder``, + ``SequentialBuilder``) structurally, without ``agent-framework-hosting`` + depending on either package. + """ + + def build(self) -> Workflow: ... + + +class AgentRunArgs(TypedDict): + """Arguments prepared for ``Agent.run``.""" + + messages: AgentRunInputs + options: ChatOptions[Any] + stream: bool + + +class WorkflowRunArgs(TypedDict): + """Arguments prepared for ``Workflow.run``.""" + + message: Any | None + responses: Mapping[str, Any] | None + stream: bool + + +class AgentState(Generic[AgentT]): + """Shared execution state for app-owned agent hosting routes. + + Holds the Agent Framework agent target and a :class:`SessionStore` that + route code may share. Does not own routes, middleware, protocol + dispatch, or native SDK calls -- web frameworks keep those concerns. + """ + + def __init__( + self, + target: AgentT | Awaitable[AgentT] | Callable[[], AgentT | Awaitable[AgentT]], + *, + session_store: SessionStore | None = None, + cache_target: bool = True, + ) -> None: + """Create shared state for ``target``. + + Args: + target: Agent target used by route code. May be a target + instance, a synchronous factory, an asynchronous factory, or + an awaitable target. + + Keyword Args: + session_store: Existing store to use. Defaults to a fresh + in-memory :class:`SessionStore`. + cache_target: Whether to cache a resolved callable/awaitable + target. Defaults to ``True`` so expensive target setup + happens once. + + Raises: + ValueError: If ``cache_target=False`` is used with a one-shot + awaitable target. + """ + if not cache_target and inspect.isawaitable(target): + raise ValueError("cache_target=False requires a target instance or callable target factory") + self._target_source = target + self._cache_target = cache_target + self._cached_target: AgentT | None = None + if not callable(target) and not inspect.isawaitable(target): + self._cached_target = target + self._session_store: SessionStore = session_store if session_store is not None else SessionStore() + + async def get_target(self) -> AgentT: + """Return the resolved target. + + Returns: + The target instance. Callable and awaitable targets are resolved + first and cached by default. + """ + if self._cache_target and self._cached_target is not None: + return self._cached_target + + target = self._target_source() if callable(self._target_source) else self._target_source + if inspect.isawaitable(target): + target = await target + if self._cache_target: + self._cached_target = target + return target + + @property + def target(self) -> AgentT: + """Return a synchronously available target. + + Raises: + RuntimeError: If the target is a callable or awaitable that has not + been resolved with :meth:`get_target`. + """ + if self._cached_target is not None: + return self._cached_target + if not callable(self._target_source) and not inspect.isawaitable(self._target_source): + return self._target_source + raise RuntimeError("target is resolved asynchronously; use `await state.get_target()`") + + @property + def session_store(self) -> SessionStore: + """Return the session store for this state.""" + return self._session_store + + async def get_or_create_session(self, session_id: str) -> AgentSession: + """Return the session for ``session_id``, creating and storing one if missing. + + Args: + session_id: Opaque app-selected session id. + + Returns: + The stored or newly created ``AgentSession``. + """ + session = await self._session_store.get(session_id) + if session is None: + target = await self.get_target() + session = target.create_session(session_id=session_id) + await self._session_store.set(session_id, session) + return session + + async def set_session(self, session_id: str, session: AgentSession) -> None: + """Store ``session`` under ``session_id`` in this state's session store. + + Args: + session_id: Opaque app-selected session id. + session: Session to store. + """ + await self._session_store.set(session_id, session) + + +class WorkflowState(Generic[WorkflowT]): + """Shared execution state for app-owned workflow hosting routes. + + Holds the Agent Framework workflow target and a :class:`CheckpointStore` + that route code may share. Does not own routes, middleware, protocol + dispatch, or native SDK calls -- web frameworks keep those concerns. + """ + + def __init__( + self, + target: WorkflowT | SupportsBuild | Awaitable[WorkflowT] | Callable[[], WorkflowT | Awaitable[WorkflowT]], + *, + checkpoint_store: CheckpointStore | None = None, + cache_target: bool = True, + ) -> None: + """Create shared state for ``target``. + + Args: + target: Workflow target used by route code. May be a target + instance, a ``WorkflowBuilder``-shaped builder (see + :class:`SupportsBuild`; the state calls ``build()`` for you), + a synchronous factory, an asynchronous factory, or an + awaitable target. + + Keyword Args: + checkpoint_store: Existing store to use. Defaults to a fresh + in-memory :class:`CheckpointStore`. + cache_target: Whether to cache a resolved callable/awaitable/built + target. Defaults to ``True`` so expensive target setup + happens once. + + Raises: + ValueError: If ``cache_target=False`` is used with a one-shot + awaitable target. + """ + if isinstance(target, SupportsBuild): + # WorkflowBuilder (and the orchestration builders) are not + # themselves callable or awaitable, so normalize to the bound + # `build` method -- the resolution logic below already knows how + # to treat a zero-arg factory. `build()` is typed to return the + # `Workflow` base class rather than this instance's narrower + # `WorkflowT`, but it is the same object the caller asked for. + target = cast("Callable[[], WorkflowT]", target.build) + if not cache_target and inspect.isawaitable(target): + raise ValueError("cache_target=False requires a target instance or callable target factory") + self._target_source = target + self._cache_target = cache_target + self._cached_target: WorkflowT | None = None + if not callable(target) and not inspect.isawaitable(target): + self._cached_target = target + self._checkpoint_store: CheckpointStore = ( + checkpoint_store if checkpoint_store is not None else CheckpointStore() + ) + + async def get_target(self) -> WorkflowT: + """Return the resolved target. + + Returns: + The target instance. Callable and awaitable targets are resolved + first and cached by default. + """ + if self._cache_target and self._cached_target is not None: + return self._cached_target + + target = self._target_source() if callable(self._target_source) else self._target_source + if inspect.isawaitable(target): + target = await target + if self._cache_target: + self._cached_target = target + return target + + @property + def target(self) -> WorkflowT: + """Return a synchronously available target. + + Raises: + RuntimeError: If the target is a callable or awaitable that has not + been resolved with :meth:`get_target`. + """ + if self._cached_target is not None: + return self._cached_target + if not callable(self._target_source) and not inspect.isawaitable(self._target_source): + return self._target_source + raise RuntimeError("target is resolved asynchronously; use `await state.get_target()`") + + @property + def checkpoint_store(self) -> CheckpointStore: + """Return the checkpoint store for this state.""" + return self._checkpoint_store + + async def get_or_create_checkpoint_storage(self, session_id: str) -> CheckpointStorage: + """Return the checkpoint storage for ``session_id``, creating and storing one if missing. + + Unlike an agent, a ``Workflow`` has no ``create_session``-style + factory method, so "creating" one for a new ``session_id`` means + allocating a fresh, empty :class:`InMemoryCheckpointStorage` -- there + is nothing to restore yet. Pass the returned storage into + ``workflow.run(checkpoint_storage=...)``. To resume a prior run for + this ``session_id`` instead of starting fresh, call + ``storage.get_latest(workflow_name=...)`` yourself first and pass its + ``checkpoint_id`` into ``workflow.run(checkpoint_id=..., checkpoint_storage=...)``. + + Args: + session_id: Opaque app-selected session id. + + Returns: + The stored or newly created ``CheckpointStorage``. + """ + storage = await self._checkpoint_store.get(session_id) + if storage is None: + storage = InMemoryCheckpointStorage() + await self._checkpoint_store.set(session_id, storage) + return storage + + async def set_checkpoint_storage(self, session_id: str, storage: CheckpointStorage) -> None: + """Store ``storage`` under ``session_id`` in this state's checkpoint store. + + Args: + session_id: Opaque app-selected session id. + storage: Checkpoint storage to store. + """ + await self._checkpoint_store.set(session_id, storage) diff --git a/python/packages/hosting/tests/hosting/_workflow_fixtures.py b/python/packages/hosting/tests/hosting/_workflow_fixtures.py index d797e743d92..72addb7dff8 100644 --- a/python/packages/hosting/tests/hosting/_workflow_fixtures.py +++ b/python/packages/hosting/tests/hosting/_workflow_fixtures.py @@ -32,6 +32,11 @@ def build_echo_workflow() -> Workflow: return WorkflowBuilder(start_executor=_EchoExecutor(id="echo")).build() +def echo_workflow_builder() -> WorkflowBuilder: + """Return an *unbuilt* echo ``WorkflowBuilder``, for testing builder-shaped targets.""" + return WorkflowBuilder(start_executor=_EchoExecutor(id="echo")) + + class _MultiChunkExecutor(Executor): """Yields three separate ``output`` events so streaming has something to chew on.""" diff --git a/python/packages/hosting/tests/hosting/test_state.py b/python/packages/hosting/tests/hosting/test_state.py new file mode 100644 index 00000000000..6f34d214134 --- /dev/null +++ b/python/packages/hosting/tests/hosting/test_state.py @@ -0,0 +1,359 @@ +# Copyright (c) Microsoft. All rights reserved. + +from __future__ import annotations + +import importlib +from collections.abc import AsyncIterator, Awaitable, Mapping +from typing import Any, Literal, overload + +import pytest +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + AgentRunInputs, + AgentSession, + Content, + InMemoryCheckpointStorage, + Message, + ResponseStream, + Workflow, +) + +from agent_framework_hosting import AgentState, CheckpointStore, SessionStore, WorkflowState + + +def _workflow_fixture(name: str) -> Any: + """Load a fixture from ``_workflow_fixtures.py`` via the ``conftest``-registered alias. + + Mirrors ``test_host.py``'s helper: the local ``conftest.py`` registers + ``_workflow_fixtures.py`` under the collision-proof name + ``hosting_workflow_fixtures`` so it stays importable in both + package-local and aggregate pytest runs. + """ + return getattr(importlib.import_module("hosting_workflow_fixtures"), name) + + +class _FakeAgent: + """Minimal agent target for state tests. + + Declares ``run`` with the same two overloads as ``SupportsAgentRun`` (one + per ``stream`` value) so it satisfies the protocol under static type + checking, not just at runtime. + """ + + id: str = "fake-agent" + name: str | None = "Fake Agent" + description: str | None = "Fake agent for tests" + + def __init__(self) -> None: + self.created_sessions: list[AgentSession] = [] + + def create_session(self, *, session_id: str | None = None) -> AgentSession: + session = AgentSession(session_id=session_id) + self.created_sessions.append(session) + return session + + def get_session(self, service_session_id: Any, *, session_id: str | None = None) -> AgentSession: + return AgentSession(session_id=session_id, service_session_id=service_session_id) + + @overload + def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: Literal[False] = ..., + session: AgentSession | None = None, + function_invocation_kwargs: Mapping[str, Any] | None = None, + client_kwargs: Mapping[str, Any] | None = None, + ) -> Awaitable[AgentResponse[Any]]: ... + + @overload + def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: Literal[True], + session: AgentSession | None = None, + function_invocation_kwargs: Mapping[str, Any] | None = None, + client_kwargs: Mapping[str, Any] | None = None, + ) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ... + + def run( + self, + messages: AgentRunInputs | None = None, + *, + stream: bool = False, + session: AgentSession | None = None, + function_invocation_kwargs: Mapping[str, Any] | None = None, + client_kwargs: Mapping[str, Any] | None = None, + ) -> Awaitable[AgentResponse[Any]] | ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: + if stream: + + async def _stream() -> AsyncIterator[AgentResponseUpdate]: + yield AgentResponseUpdate(contents=[Content.from_text(text="ok")], role="assistant") + + return ResponseStream(_stream(), finalizer=lambda updates: AgentResponse.from_updates(updates)) + + async def _get_response() -> AgentResponse[Any]: + return AgentResponse(messages=Message(role="assistant", contents=[Content.from_text(text="ok")])) + + return _get_response() + + +class TestSessionStore: + async def test_get_returns_none_for_missing_id(self) -> None: + store = SessionStore() + + assert await store.get("session-1") is None + + async def test_set_then_get_returns_stored_session(self) -> None: + store = SessionStore() + session = AgentSession(session_id="session-1") + + await store.set("session-1", session) + + assert await store.get("session-1") is session + + async def test_set_can_store_same_session_under_additional_id(self) -> None: + store = SessionStore() + session = AgentSession(session_id="resp_1") + + await store.set("resp_1", session) + await store.set("resp_2", session) + + assert await store.get("resp_1") is session + assert await store.get("resp_2") is session + + async def test_set_replaces_existing_entry(self) -> None: + store = SessionStore() + first = AgentSession(session_id="session-1") + second = AgentSession(session_id="session-1") + + await store.set("session-1", first) + await store.set("session-1", second) + + assert await store.get("session-1") is second + + async def test_delete_forgets_session(self) -> None: + store = SessionStore() + await store.set("session-1", AgentSession(session_id="session-1")) + + await store.delete("session-1") + + assert await store.get("session-1") is None + + async def test_delete_missing_id_is_a_no_op(self) -> None: + store = SessionStore() + + await store.delete("never-stored") + + async def test_empty_session_id_raises(self) -> None: + store = SessionStore() + session = AgentSession(session_id="session-1") + + with pytest.raises(ValueError, match="session_id"): + await store.get("") + with pytest.raises(ValueError, match="session_id"): + await store.set("", session) + with pytest.raises(ValueError, match="session_id"): + await store.delete("") + + +class TestCheckpointStore: + async def test_get_returns_none_for_missing_id(self) -> None: + store = CheckpointStore() + + assert await store.get("session-1") is None + + async def test_set_then_get_returns_stored_storage(self) -> None: + store = CheckpointStore() + storage = InMemoryCheckpointStorage() + + await store.set("session-1", storage) + + assert await store.get("session-1") is storage + + async def test_delete_forgets_storage(self) -> None: + store = CheckpointStore() + await store.set("session-1", InMemoryCheckpointStorage()) + + await store.delete("session-1") + + assert await store.get("session-1") is None + + async def test_empty_session_id_raises(self) -> None: + store = CheckpointStore() + storage = InMemoryCheckpointStorage() + + with pytest.raises(ValueError, match="session_id"): + await store.get("") + with pytest.raises(ValueError, match="session_id"): + await store.set("", storage) + with pytest.raises(ValueError, match="session_id"): + await store.delete("") + + +class TestAgentState: + def test_default_session_store_is_fresh_in_memory_store(self) -> None: + agent = _FakeAgent() + state = AgentState(agent) + + assert state.target is agent + assert isinstance(state.session_store, SessionStore) + + def test_accepts_session_store_instance(self) -> None: + store = SessionStore() + state = AgentState(_FakeAgent(), session_store=store) + + assert state.session_store is store + + async def test_callable_target_cached_by_default(self) -> None: + calls = 0 + + def create_agent() -> _FakeAgent: + nonlocal calls + calls += 1 + return _FakeAgent() + + state = AgentState(create_agent) + + first = await state.get_target() + second = await state.get_target() + + assert first is second + assert calls == 1 + + async def test_callable_target_cache_can_be_disabled(self) -> None: + calls = 0 + + def create_agent() -> _FakeAgent: + nonlocal calls + calls += 1 + return _FakeAgent() + + state = AgentState(create_agent, cache_target=False) + + first = await state.get_target() + second = await state.get_target() + + assert first is not second + assert calls == 2 + + async def test_async_callable_target(self) -> None: + async def create_agent() -> _FakeAgent: + return _FakeAgent() + + state = AgentState(create_agent) + + assert isinstance(await state.get_target(), _FakeAgent) + + def test_cache_target_false_rejects_bare_awaitable(self) -> None: + async def create_agent() -> _FakeAgent: + return _FakeAgent() + + coro = create_agent() + try: + with pytest.raises(ValueError, match="cache_target=False"): + AgentState(coro, cache_target=False) + finally: + coro.close() + + async def test_get_or_create_session_creates_and_stores_once(self) -> None: + agent = _FakeAgent() + state = AgentState(agent) + + first = await state.get_or_create_session("session-1") + second = await state.get_or_create_session("session-1") + + assert first is second + assert first.session_id == "session-1" + assert len(agent.created_sessions) == 1 + + async def test_get_or_create_session_reuses_a_session_set_on_the_state(self) -> None: + agent = _FakeAgent() + state = AgentState(agent) + pre_existing = AgentSession(session_id="session-1") + await state.set_session("session-1", pre_existing) + + session = await state.get_or_create_session("session-1") + + assert session is pre_existing + assert len(agent.created_sessions) == 0 + + +class TestWorkflowState: + def test_default_checkpoint_store_is_fresh_in_memory_store(self) -> None: + workflow = _workflow_fixture("build_echo_workflow")() + state: WorkflowState[Workflow] = WorkflowState(workflow) + + assert state.target is workflow + assert isinstance(state.checkpoint_store, CheckpointStore) + + def test_accepts_checkpoint_store_instance(self) -> None: + workflow = _workflow_fixture("build_echo_workflow")() + store = CheckpointStore() + state: WorkflowState[Workflow] = WorkflowState(workflow, checkpoint_store=store) + + assert state.checkpoint_store is store + + async def test_workflow_target_resolved_from_factory(self) -> None: + build_echo_workflow = _workflow_fixture("build_echo_workflow") + + state: WorkflowState[Workflow] = WorkflowState(build_echo_workflow) + + target = await state.get_target() + assert isinstance(target, Workflow) + + async def test_accepts_workflow_builder_instance_directly(self) -> None: + """A ``WorkflowBuilder`` is not itself callable or awaitable; the state must + recognize its `build()` method and call it, not cache the raw builder.""" + builder = _workflow_fixture("echo_workflow_builder")() + + state: WorkflowState[Workflow] = WorkflowState(builder) + + target = await state.get_target() + assert isinstance(target, Workflow) + assert state.target is target + + async def test_workflow_builder_is_built_once_and_cached_by_default(self) -> None: + builder = _workflow_fixture("echo_workflow_builder")() + state: WorkflowState[Workflow] = WorkflowState(builder) + + first = await state.get_target() + second = await state.get_target() + + assert first is second + + async def test_accepts_orchestration_style_builder_without_importing_orchestrations(self) -> None: + """``SupportsBuild`` is structural: any object with a zero-arg ``build() -> Workflow`` + is accepted, matching ``agent_framework_orchestrations``' builders without this + package depending on that one.""" + workflow = _workflow_fixture("build_echo_workflow")() + + class _FakeOrchestrationBuilder: + def build(self) -> Workflow: + return workflow + + state: WorkflowState[Workflow] = WorkflowState(_FakeOrchestrationBuilder()) + + assert await state.get_target() is workflow + + async def test_get_or_create_checkpoint_storage_creates_and_stores_once(self) -> None: + workflow = _workflow_fixture("build_echo_workflow")() + state: WorkflowState[Workflow] = WorkflowState(workflow) + + first = await state.get_or_create_checkpoint_storage("session-1") + second = await state.get_or_create_checkpoint_storage("session-1") + + assert first is second + assert isinstance(first, InMemoryCheckpointStorage) + + async def test_get_or_create_checkpoint_storage_reuses_storage_set_on_the_state(self) -> None: + workflow = _workflow_fixture("build_echo_workflow")() + state: WorkflowState[Workflow] = WorkflowState(workflow) + pre_existing = InMemoryCheckpointStorage() + await state.set_checkpoint_storage("session-1", pre_existing) + + storage = await state.get_or_create_checkpoint_storage("session-1") + + assert storage is pre_existing diff --git a/python/samples/04-hosting/af-hosting/README.md b/python/samples/04-hosting/af-hosting/README.md index c21ebbecd5e..367dfcabc71 100644 --- a/python/samples/04-hosting/af-hosting/README.md +++ b/python/samples/04-hosting/af-hosting/README.md @@ -10,7 +10,7 @@ its own package. This first sample set includes | Sample | What it shows | Packaging | |---|---|---| -| [`local_responses/`](./local_responses) | The minimal shape: one agent + one `@tool` + `ResponsesChannel` + a single `run_hook` that strips caller-supplied options and forces a `reasoning` preset. | **Local only.** Start here to learn the run-hook seam. | +| [`local_responses/`](./local_responses) | The minimal shape: one agent + one `@tool` + native FastAPI route + Responses helper functions + `SessionStore`. | **Local only.** Start here to learn the helper seam. | | [`local_responses_workflow/`](./local_responses_workflow) | A 4-step `Workflow` (typed `SloganBrief` intake → writer → legal → formatter) hosted behind the Responses channel via a `run_hook` that parses inbound text/JSON into the workflow's typed input. The host writes per-conversation checkpoints via `checkpoint_location=…`. Demonstrates workflow targets + structured input adaptation + resume-across-turns. Includes a `call_server.rest` file with REST examples. | **Local only.** | | [`local_telegram/`](./local_telegram) | Telegram bot with `@tool`, `FileHistoryProvider`, `run_hook`, and slash commands (`/new`, `/whoami`, `/weather`). Pure Telegram — no HTTP endpoint. | **Local only.** Start here to learn the Telegram channel. | | [`local_multi_channel/`](./local_multi_channel) | Same agent behind two channels at once: `ResponsesChannel` + `TelegramChannel`. Shared `FileHistoryProvider` enables cross-channel session resumption (resume a Telegram chat from the Responses endpoint by passing the Telegram isolation key as `previous_response_id`). | **Local only.** | diff --git a/python/samples/04-hosting/af-hosting/local_responses/README.md b/python/samples/04-hosting/af-hosting/local_responses/README.md index e4b08f408c9..9c1c2e8d163 100644 --- a/python/samples/04-hosting/af-hosting/local_responses/README.md +++ b/python/samples/04-hosting/af-hosting/local_responses/README.md @@ -1,19 +1,36 @@ -# local_responses — Responses-only with a settings-altering hook - -The smallest end-to-end `agent-framework-hosting` shape: one Foundry -agent with a `@tool`, one `ResponsesChannel`, one `run_hook`. Useful as -the entry-point sample for understanding the **channel run-hook** seam -without any multi-channel or identity-link concerns. - -What the run hook demonstrates: - -- **Strips** caller-supplied `model` / `temperature` / `store` so the - host owns the backing deployment and persistence settings. -- **Forces** a `reasoning` preset (`effort=medium`, `summary=auto`) on - every turn — caller-side overrides are ignored. - -`app:app` is a module-level Starlette ASGI app; recommended local launch -is Hypercorn. +# local_responses — Responses helpers with native FastAPI routes + +The smallest end-to-end Responses hosting shape: one Foundry agent with a +`@tool`, one native FastAPI route, a small `SessionStore`, and the Responses +helper functions: + +- `responses_to_run(...)` +- `responses_session_id(...)` +- `create_response_id(...)` +- `responses_from_run(...)` + +The sample demonstrates the lighter hosting direction. Agent Framework provides +the run conversion and session-state pieces; FastAPI owns route registration, +request bodies, response objects, and server startup. + +What the route demonstrates: + +- **Strips** caller-supplied `model` / `temperature` / `store` so the app owns + deployment and persistence settings. +- **Forces** a `reasoning` preset (`effort=medium`, `summary=auto`) on every + turn. +- Produces the AF messages, options, and session id that the route passes to + `agent.run(...)`. +- **Stores** each newly minted response id for the session it was just + resolved from, via `state.set_session(response_id, session)` after + `agent.run(...)` has updated the session. + OpenAI's `previous_response_id` rotates every turn *by design* — it lets a + caller continue from any earlier response, not just the latest one — so + every response id needs to stay independently resolvable, not just the + most recent. + +`app:app` is a module-level FastAPI ASGI app; recommended local launch is +Hypercorn. ## Run @@ -40,14 +57,16 @@ uv sync --group dev # Plain OpenAI SDK call: uv run python call_server.py -# The client intentionally omits `model`; the host chooses the backing -# deployment from FOUNDRY_MODEL. +# The client intentionally omits `model`; the app chooses the backing deployment +# from FOUNDRY_MODEL. -# The script then sends a second turn, "And what about Amsterdam?", -# using the first `response.id` as `previous_response_id`. +# The script then sends two more turns, each continuing from the previous +# turn's `response.id` as `previous_response_id`. The third turn asks about +# the first turn's city, so it only succeeds if the server still remembers +# that far back in the chain. -# Same two-turn interaction through an Agent Framework Agent backed by -# OpenAIChatClient, with streaming enabled: +# Same three-turn interaction through an Agent Framework Agent backed by +# OpenAIChatClient: uv run python call_server_af.py ``` diff --git a/python/samples/04-hosting/af-hosting/local_responses/app.py b/python/samples/04-hosting/af-hosting/local_responses/app.py index 73313614748..c55c8c84079 100644 --- a/python/samples/04-hosting/af-hosting/local_responses/app.py +++ b/python/samples/04-hosting/af-hosting/local_responses/app.py @@ -1,28 +1,19 @@ # Copyright (c) Microsoft. All rights reserved. -"""Minimal Responses-only hosting sample. +"""Minimal Responses-only hosting sample with native FastAPI routes. -Single agent with one ``@tool`` (``lookup_weather``), single channel -(``ResponsesChannel``), one ``run_hook`` that demonstrates the -settings-mutation seam over caller-supplied options. +This sample demonstrates the helper-first hosting shape: -What the hook does ------------------- -On every Responses request the hook receives the ``ChannelRequest`` that -the channel built from the inbound HTTP body. It: - -- strips ``model`` (the host owns the backing deployment), ``store`` - (this agent owns persistence), and ``temperature`` (the configured - model may not honor it), -- forces a ``reasoning`` effort + summary preset so the deployed surface - is consistent regardless of what the caller sent. - -The hook is the documented escape hatch over the uniform -``ChannelRequest`` envelope. +1. ``agent-framework-hosting-responses`` converts Responses request/response + payloads to and from Agent Framework run values. +2. ``agent-framework-hosting`` owns shared execution state via + ``AgentState`` and ``SessionStore``. +3. FastAPI owns the route, request parsing, policy decisions, and response + object. Run --- -``app`` is a module-level Starlette ASGI app. Recommended local launch:: +``app`` is a module-level FastAPI ASGI app. Recommended local launch:: uv sync az login @@ -42,16 +33,27 @@ from __future__ import annotations +import asyncio import os -from dataclasses import replace +from collections.abc import AsyncIterator from pathlib import Path -from typing import Annotated +from typing import Annotated, Any, cast -from agent_framework import Agent, FileHistoryProvider, tool +from agent_framework import Agent, FileHistoryProvider, ResponseStream, tool from agent_framework_foundry import FoundryChatClient -from agent_framework_hosting import AgentFrameworkHost, ChannelRequest -from agent_framework_hosting_responses import ResponsesChannel +from agent_framework_hosting import AgentState +from agent_framework_hosting_responses import ( + create_response_id, + responses_from_run, + responses_session_id, + responses_stream_from_run, + responses_to_run, +) from azure.identity.aio import DefaultAzureCredential +from fastapi import Body, FastAPI, HTTPException +from fastapi.responses import JSONResponse, StreamingResponse +from hypercorn.asyncio import serve +from hypercorn.config import Config SESSIONS_DIR = Path(__file__).resolve().parent / "storage" / "sessions" SESSIONS_DIR.mkdir(parents=True, exist_ok=True) @@ -71,37 +73,9 @@ def lookup_weather( return reports.get(location, f"{location} is sunny with a high of {high_temp}°C.") -# the run hook defines what you want to allow the user to passthrough when they call your host -# since the responses clients can call with all of the responses options, -# you can decide with this run_hook which of those: are rejected -# which are passed through, which are altered, which are added. -# In this sample below, we are removing, model, temperature and store if set -# and we add reasoning, but note that this could also be set on the Agent itself -# the difference is that this option is specific to the Responses channel -# so if you want to differentiate between options over channels -# you would set the option in the run_hook, if it needs to be the same (like store) -# you would set it in the agent. -def run_hook(request: ChannelRequest, **_: object) -> ChannelRequest: - """Strip caller-supplied options the host should own and force a - reasoning preset.""" - options = dict(request.options or {}) - - # The host owns the backing deployment; the agent's default_options - # own ``store``; the model may not honor ``temperature``. Strip them - # so the caller can't override. - options.pop("model", None) - options.pop("temperature", None) - options.pop("store", None) - - # Force a consistent reasoning preset on every turn. - options["reasoning"] = {"effort": "medium", "summary": "auto"} - - return replace(request, options=options or None) - - -def build_host() -> AgentFrameworkHost: - # Here we define how our agent should run, with tools, options, etc: - agent = Agent( +def create_agent() -> Agent: + """Create the sample weather agent.""" + return Agent( client=FoundryChatClient(credential=DefaultAzureCredential()), name="WeatherAgent", instructions=( @@ -112,15 +86,91 @@ def build_host() -> AgentFrameworkHost: context_providers=[FileHistoryProvider(SESSIONS_DIR)], default_options={"store": False}, ) - return AgentFrameworkHost( - target=agent, - channels=[ResponsesChannel(run_hook=run_hook)], - debug=True, + + +app = FastAPI() +state = AgentState(create_agent) + + +@app.post("/responses", response_model=None) +async def responses(body: dict[str, Any] = Body(...)) -> JSONResponse | StreamingResponse: # noqa: B008 + """Handle one OpenAI Responses-shaped request.""" + try: + run = responses_to_run(body) + except ValueError as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + session_id = responses_session_id(body) + response_id = create_response_id() + + options = dict(run["options"]) + # App-specific policy: caller cannot pick deployment/persistence settings, + # and this sample forces a consistent reasoning preset. + options.pop("model", None) + options.pop("temperature", None) + options.pop("store", None) + options["reasoning"] = {"effort": "medium", "summary": "auto"} + options_for_run = cast(Any, options) + + target = await state.get_target() + lookup_id = session_id or response_id + session = await state.get_or_create_session(lookup_id) + if run["stream"]: + stream = target.run( + run["messages"], + stream=True, + session=session, + options=options_for_run, + ) + if not isinstance(stream, ResponseStream): + raise HTTPException(status_code=500, detail="agent did not return a response stream") + + async def stream_events() -> AsyncIterator[str]: + async for event in responses_stream_from_run( + stream, + response_id=response_id, + session_id=session_id, + ): + yield event + # `agent.run(..., stream=True)` updates the session while the stream + # is consumed/finalized. Store it under the newly minted response id + # after finalization so a later `previous_response_id` can restore + # this exact continuation point. + await state.set_session(response_id, session) + + return StreamingResponse( + stream_events(), + media_type="text/event-stream", + ) + + result = await target.run( + run["messages"], + session=session, + options=options_for_run, + ) + # `agent.run(...)` updates the session. Store it under the newly minted + # response id after the run so `previous_response_id=response_id` continues + # from this exact point. + await state.set_session(response_id, session) + return JSONResponse( + responses_from_run( + result, + response_id=response_id, + session_id=session_id, + ) ) -app = build_host().app +async def main() -> None: + """Run the sample with Hypercorn for local development.""" + config = Config() + config.bind = [f"0.0.0.0:{int(os.environ.get('PORT', '8000'))}"] + await serve(cast(Any, app), config) if __name__ == "__main__": - build_host().serve(host="0.0.0.0", port=int(os.environ.get("PORT", "8000"))) + asyncio.run(main()) + +# Sample output: +# User: What is the weather in Tokyo? +# Agent: Tokyo is clear with a high of 18°C. +# Response ID: resp_... diff --git a/python/samples/04-hosting/af-hosting/local_responses/call_server.py b/python/samples/04-hosting/af-hosting/local_responses/call_server.py index 7066e9e7ac5..648e7fc9868 100644 --- a/python/samples/04-hosting/af-hosting/local_responses/call_server.py +++ b/python/samples/04-hosting/af-hosting/local_responses/call_server.py @@ -15,8 +15,10 @@ uv run python call_server.py -The script sends a follow-up turn ("And what about Amsterdam?") using the -first response's ``response.id`` as ``previous_response_id``. +The script sends two follow-up turns, each continuing from the previous +turn's ``response.id`` as ``previous_response_id``. The third turn asks about +information from the *first* turn only, so it also exercises session +continuity across a rotating response id chain, not just a single hop. """ from __future__ import annotations @@ -26,6 +28,7 @@ BASE_URL = "http://127.0.0.1:8000" PROMPT = "What is the weather in Tokyo?" FOLLOW_UP_PROMPT = "And what about Amsterdam?" +THIRD_PROMPT = "Which of the two cities we just discussed is warmer?" def main() -> None: @@ -46,6 +49,15 @@ def main() -> None: print(f"Agent: {follow_up.output_text}") print(f"Response ID: {follow_up.id}") + third = client.responses.create( + input=THIRD_PROMPT, + previous_response_id=follow_up.id, + ) + print() + print(f"User: {THIRD_PROMPT}") + print(f"Agent: {third.output_text}") + print(f"Response ID: {third.id}") + if __name__ == "__main__": main() diff --git a/python/samples/04-hosting/af-hosting/local_responses/call_server_af.py b/python/samples/04-hosting/af-hosting/local_responses/call_server_af.py index 91e8d3aa6fd..078087c0912 100644 --- a/python/samples/04-hosting/af-hosting/local_responses/call_server_af.py +++ b/python/samples/04-hosting/af-hosting/local_responses/call_server_af.py @@ -3,15 +3,19 @@ """Agent Framework agent client for the local_responses sample. Creates a local :class:`agent_framework.Agent` backed by -:class:`agent_framework.openai.OpenAIChatClient`, points that client at the -hosted ``/responses`` endpoint, and streams both turns: +:class:`agent_framework.openai.OpenAIChatClient` and points that client at the +hosted ``/responses`` endpoint for all turns: 1. ``What is the weather in Tokyo?`` 2. ``And what about Amsterdam?`` +3. ``Which of the two cities we just discussed is warmer?`` -Both turns use the same :class:`agent_framework.AgentSession`; the first -turn binds the hosted response id to the session, and the second turn -continues through that session. +All turns use the same :class:`agent_framework.AgentSession`; the first turn +binds the hosted response id to the session, and later turns continue through +that session via a chain of rotating ``previous_response_id`` values. The +third turn only makes sense if the server still remembers the first turn, so +it also exercises session continuity across that whole chain, not just a +single hop. Start the server first (in another shell):: @@ -33,6 +37,7 @@ PROMPTS = [ "What is the weather in Tokyo?", "And what about Amsterdam?", + "Which of the two cities we just discussed is warmer?", ] @@ -45,14 +50,8 @@ async def main() -> None: for prompt in PROMPTS: print(f"User: {prompt}") - stream = agent.run(prompt, stream=True, session=session) - print("Agent: ", end="", flush=True) - async for update in stream: - if update.text: - print(update.text, end="", flush=True) - - response = await stream.get_final_response() - print("\n") + response = await agent.run(prompt, session=session) + print(f"Agent: {response.text}\n") print(f"Response ID: {response.response_id}\n") diff --git a/python/samples/04-hosting/af-hosting/local_responses/pyproject.toml b/python/samples/04-hosting/af-hosting/local_responses/pyproject.toml index f301dd6bfc4..adb6550ac2a 100644 --- a/python/samples/04-hosting/af-hosting/local_responses/pyproject.toml +++ b/python/samples/04-hosting/af-hosting/local_responses/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "agent-framework-hosting-sample-local-responses" version = "0.0.1" -description = "Minimal Responses-only local hosting sample with a settings-altering run hook." +description = "Minimal Responses-only local hosting sample with native FastAPI routes." requires-python = ">=3.10" dependencies = [ "agent-framework-foundry", @@ -9,6 +9,7 @@ dependencies = [ "agent-framework-hosting-responses", "azure-identity", "aiohttp>=3.13.5", + "fastapi>=0.115.0,<0.138.1", "hypercorn>=0.17", ] diff --git a/python/uv.lock b/python/uv.lock index 0fe3298f2a3..af07359e37d 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -677,6 +677,12 @@ dependencies = [ { name = "openai", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] +[package.dev-dependencies] +dev = [ + { name = "fastapi", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + [package.metadata] requires-dist = [ { name = "agent-framework-core", editable = "packages/core" }, @@ -684,6 +690,12 @@ requires-dist = [ { name = "openai", specifier = ">=1.99.0,<3" }, ] +[package.metadata.requires-dev] +dev = [ + { name = "fastapi", specifier = ">=0.115.0,<0.138.1" }, + { name = "httpx", specifier = ">=0.28.1" }, +] + [[package]] name = "agent-framework-hosting-telegram" version = "1.0.0a260625"