From 46d4c9c0cc77038c08c4a6292313c3f237a636c8 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Mon, 4 May 2026 17:31:08 +0200 Subject: [PATCH 1/2] feat(hosting): add agent-framework-hosting core package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New ``agent-framework-hosting`` package implementing ADR 0026 / SPEC-002: the channel-neutral host that lets a single ``Agent`` (or ``Workflow``) fan out across multiple wire protocols ("channels") behind one Starlette ASGI app. Surface (re-exported from ``agent_framework_hosting``): - ``AgentFrameworkHost`` — wraps a hostable target, mounts channels onto an ASGI app, owns per-isolation-key ``AgentSession`` reuse, threads request context (``response_id`` / ``previous_response_id``) into context providers via an ``ExitStack`` of ``bind_request_context`` calls, and exposes an opt-in Hypercorn ``serve()`` helper (extra ``[serve]``). - ``Channel`` protocol + ``ChannelContribution`` — the surface a channel package implements (routes, lifespans, identity hooks, …). - ``ChannelRequest`` / ``ChannelSession`` / ``ChannelIdentity`` / ``ChannelPush`` / ``ChannelCommand[Context]`` / ``ChannelRunHook`` / ``ChannelStreamTransformHook`` / ``DeliveryReport`` / ``HostedRunResult`` / ``ResponseTarget`` / ``ResponseTargetKind`` / ``apply_run_hook`` — channel-side dataclasses + helpers. - ``IsolationKeys`` + ``ISOLATION_HEADER_USER`` / ``..._CHAT`` + ``get/set/reset_current_isolation_keys`` — the host's ASGI middleware reads the ``x-agent-{user,chat}-isolation-key`` headers off each inbound request and exposes them to the agent stack via a ``ContextVar`` so storage-side providers (e.g. ``FoundryHostedAgentHistoryProvider``) can apply per-tenant partitioning without channels having to forward anything. Includes 45 unit tests covering the host, channel contributions, isolation contextvar, and shared types. Registers the package in ``python/pyproject.toml`` ``[tool.uv.sources]`` and adds the matching pyright ``executionEnvironments`` entry for tests. Hypercorn is an optional dependency (``[serve]`` extra); the soft import in ``serve()`` is annotated for pyright since it isn't on the default install. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/hosting/LICENSE | 21 + python/packages/hosting/README.md | 59 ++ .../agent_framework_hosting/__init__.py | 74 ++ .../hosting/agent_framework_hosting/_host.py | 946 ++++++++++++++++++ .../agent_framework_hosting/_isolation.py | 76 ++ .../hosting/agent_framework_hosting/_types.py | 376 +++++++ python/packages/hosting/pyproject.toml | 107 ++ python/packages/hosting/tests/__init__.py | 0 .../hosting/tests/_workflow_fixtures.py | 43 + python/packages/hosting/tests/test_host.py | 714 +++++++++++++ python/packages/hosting/tests/test_types.py | 105 ++ python/pyproject.toml | 2 + python/uv.lock | 39 +- 13 files changed, 2558 insertions(+), 4 deletions(-) create mode 100644 python/packages/hosting/LICENSE create mode 100644 python/packages/hosting/README.md create mode 100644 python/packages/hosting/agent_framework_hosting/__init__.py create mode 100644 python/packages/hosting/agent_framework_hosting/_host.py create mode 100644 python/packages/hosting/agent_framework_hosting/_isolation.py create mode 100644 python/packages/hosting/agent_framework_hosting/_types.py create mode 100644 python/packages/hosting/pyproject.toml create mode 100644 python/packages/hosting/tests/__init__.py create mode 100644 python/packages/hosting/tests/_workflow_fixtures.py create mode 100644 python/packages/hosting/tests/test_host.py create mode 100644 python/packages/hosting/tests/test_types.py diff --git a/python/packages/hosting/LICENSE b/python/packages/hosting/LICENSE new file mode 100644 index 0000000000..9e841e7a26 --- /dev/null +++ b/python/packages/hosting/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/python/packages/hosting/README.md b/python/packages/hosting/README.md new file mode 100644 index 0000000000..a2690b5705 --- /dev/null +++ b/python/packages/hosting/README.md @@ -0,0 +1,59 @@ +# agent-framework-hosting + +Multi-channel hosting for Microsoft Agent Framework agents. + +`agent-framework-hosting` lets you serve a single agent (or workflow) +target through one or more **channels** — pluggable adapters that +expose the target over different transports. The result is a single +Starlette ASGI application you can host anywhere (local Hypercorn, +Azure Container Apps, Foundry Hosted Agents, …). + +The base package contains only the channel-neutral plumbing: + +- `AgentFrameworkHost` — the Starlette host +- `Channel` / `ChannelPush` — the channel protocols +- `ChannelRequest` / `ChannelSession` / `ChannelIdentity` / `ResponseTarget` + — the request envelope and routing primitives +- `ChannelContext` / `ChannelContribution` / `ChannelCommand` — the + channel-side hooks for invoking the target and contributing routes, + commands, and lifecycle callbacks +- `ChannelRunHook` / `ChannelStreamTransformHook` — the per-request + customization seams + +Concrete channels live in their own packages so you only install what +you use: + +| Package | Transport | +|---|---| +| `agent-framework-hosting-responses` | OpenAI Responses API | +| `agent-framework-hosting-invocations` | Foundry-native invocation envelope | +| `agent-framework-hosting-telegram` | Telegram Bot API | +| `agent-framework-hosting-activity-protocol` | Bot Framework Activity Protocol (Teams, Direct Line, Web Chat, …) | +| `agent-framework-hosting-teams` | Microsoft Teams (Teams SDK) | +| `agent-framework-hosting-entra` | Entra (OAuth) identity-link sidecar | + +## Install + +```bash +pip install agent-framework-hosting agent-framework-hosting-responses +# or with uvicorn pre-installed for the demo `host.serve(...)` helper +pip install "agent-framework-hosting[serve]" agent-framework-hosting-responses +``` + +## Quickstart + +```python +from agent_framework import ChatAgent +from agent_framework.openai import OpenAIChatClient +from agent_framework_hosting import AgentFrameworkHost +from agent_framework_hosting_responses import ResponsesChannel + +agent = ChatAgent(name="Assistant", chat_client=OpenAIChatClient()) + +host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel()]) +host.serve(port=8000) +``` + +See the [hosting samples](https://github.com/microsoft/agent-framework/tree/main/python/samples/04-hosting/af-hosting) +for richer multi-channel apps (Telegram + Teams + Responses fan-out, +identity linking, `ResponseTarget` routing, etc.). diff --git a/python/packages/hosting/agent_framework_hosting/__init__.py b/python/packages/hosting/agent_framework_hosting/__init__.py new file mode 100644 index 0000000000..9a7cbcadad --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/__init__.py @@ -0,0 +1,74 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Multi-channel hosting for Microsoft Agent Framework agents. + +Serve a single agent target through one or more **channels** — pluggable +adapters that expose the target over different transports such as the +OpenAI Responses API, Microsoft Teams, Telegram, and others. The base +package contains only the channel-neutral plumbing; concrete channels +ship in their own packages (``agent-framework-hosting-responses``, +``agent-framework-hosting-telegram``, …) so users install only what +they need. +""" + +import importlib.metadata + +from ._host import AgentFrameworkHost, ChannelContext, logger +from ._isolation import ( + ISOLATION_HEADER_CHAT, + ISOLATION_HEADER_USER, + IsolationKeys, + get_current_isolation_keys, + reset_current_isolation_keys, + set_current_isolation_keys, +) +from ._types import ( + Channel, + ChannelCommand, + ChannelCommandContext, + ChannelContribution, + ChannelIdentity, + ChannelPush, + ChannelRequest, + ChannelRunHook, + ChannelSession, + ChannelStreamTransformHook, + DeliveryReport, + HostedRunResult, + ResponseTarget, + ResponseTargetKind, + apply_run_hook, +) + +try: + __version__ = importlib.metadata.version(__name__) +except importlib.metadata.PackageNotFoundError: + __version__ = "0.0.0" + +__all__ = [ + "ISOLATION_HEADER_CHAT", + "ISOLATION_HEADER_USER", + "AgentFrameworkHost", + "Channel", + "ChannelCommand", + "ChannelCommandContext", + "ChannelContext", + "ChannelContribution", + "ChannelIdentity", + "ChannelPush", + "ChannelRequest", + "ChannelRunHook", + "ChannelSession", + "ChannelStreamTransformHook", + "DeliveryReport", + "HostedRunResult", + "IsolationKeys", + "ResponseTarget", + "ResponseTargetKind", + "__version__", + "apply_run_hook", + "get_current_isolation_keys", + "logger", + "reset_current_isolation_keys", + "set_current_isolation_keys", +] diff --git a/python/packages/hosting/agent_framework_hosting/_host.py b/python/packages/hosting/agent_framework_hosting/_host.py new file mode 100644 index 0000000000..92a0b1c45c --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_host.py @@ -0,0 +1,946 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""The :class:`AgentFrameworkHost` and its :class:`ChannelContext` bridge. + +The host is a tiny Starlette wrapper: + +- ``__init__`` accepts a hostable target (``SupportsAgentRun`` agent or + ``Workflow``) and a sequence of channels. +- :meth:`AgentFrameworkHost.app` lazily builds a Starlette app by calling + every channel's ``contribute`` and mounting the returned routes under + the channel's ``path`` (empty path → mount at the app root). +- :class:`ChannelContext` exposes ``run`` / ``run_stream`` / + ``deliver_response`` for channels to invoke; the host handles + per-``isolation_key`` session caching, identity tracking, and + :class:`ResponseTarget` fan-out. + +Per SPEC-002 (and ADR-0026), the host is intentionally thin so the bulk +of channel-specific behaviour stays in the channel package. Identity +linking, link policies, response targets, background runs, and the like +are pluggable extensions that the future identity/foundry packages will +contribute on top of this surface. +""" + +from __future__ import annotations + +import logging +import os +import uuid +from collections.abc import Awaitable, Callable, Sequence +from contextlib import AbstractContextManager, ExitStack, asynccontextmanager +from pathlib import Path +from typing import TYPE_CHECKING, Any, AsyncIterator, cast + +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + CheckpointStorage, + Content, + FileCheckpointStorage, + Message, + ResponseStream, + SupportsAgentRun, + Workflow, + WorkflowEvent, +) +from starlette.applications import Starlette +from starlette.middleware import Middleware +from starlette.requests import Request +from starlette.responses import PlainTextResponse +from starlette.routing import BaseRoute, Mount, Route +from starlette.types import ASGIApp, Receive, Scope, Send + +from ._isolation import ( + ISOLATION_HEADER_CHAT, + ISOLATION_HEADER_USER, + IsolationKeys, + reset_current_isolation_keys, + set_current_isolation_keys, +) +from ._types import ( + Channel, + ChannelIdentity, + ChannelPush, + ChannelRequest, + DeliveryReport, + HostedRunResult, + ResponseTargetKind, +) + +if TYPE_CHECKING: + pass + +logger = logging.getLogger("agent_framework.hosting") + + +def _workflow_output_to_text(value: Any) -> str: + """Render a single workflow ``output`` payload as plain text. + + ``AgentResponse`` and ``AgentResponseUpdate`` carry text natively; + everything else is best-effort ``str()``. + """ + text = getattr(value, "text", None) + if isinstance(text, str): + return text + return str(value) + + +def _workflow_event_to_update(event: WorkflowEvent[Any]) -> AgentResponseUpdate | None: + """Map a :class:`WorkflowEvent` to a channel-friendly :class:`AgentResponseUpdate`. + + Returns ``None`` for events the host should drop (anything that is not + user-visible output). The original event is preserved on the update's + ``raw_representation`` so consumers can recover full workflow context. + """ + if event.type != "output": + return None + payload: Any = event.data + if isinstance(payload, AgentResponseUpdate): + # Already a streaming update — pass through but tag the source so + # downstream hooks can tell it came from a workflow executor. + if payload.raw_representation is None: + payload.raw_representation = event + return payload + text = _workflow_output_to_text(payload) + return AgentResponseUpdate( + contents=[Content.from_text(text=text)], + role="assistant", + author_name=event.executor_id, + raw_representation=event, + ) + + +@asynccontextmanager +async def _suppress_already_consumed() -> AsyncIterator[None]: # noqa: RUF029 + """Yield, swallowing the ``RuntimeError`` ``ResponseStream`` raises on double-consume. + + The bridge stream calls ``get_final_response()`` after iterating the + workflow stream so the workflow's cleanup hooks run; on some paths the + stream considers itself already finalized and raises, which we treat + as benign — we're only after the side effect. + """ + try: + yield + except RuntimeError as exc: + logger.debug("workflow stream finalize skipped: %s", exc) + except Exception: # pragma: no cover - defensive: never let cleanup hide the real result + logger.exception("workflow stream finalize failed") + + +class _BoundResponseStream: + """Adapter that keeps an :class:`ExitStack` open across stream iteration. + + Streaming runs return a :class:`ResponseStream` synchronously, but + consumption happens later (the channel iterates). For host-bound + request context (e.g. Foundry response-id binding) to survive that + gap, we hold the stack open until the underlying stream is exhausted + or :meth:`close` is called. We forward awaitable + async-iterator + + ``get_final_response`` semantics so the channel sees a normal + ``ResponseStream``-shaped object. + """ + + def __init__(self, inner: Any, stack: ExitStack) -> None: + self._inner = inner + self._stack = stack + self._closed = False + + def _close(self) -> None: + if self._closed: + return + self._closed = True + self._stack.close() + + def __await__(self) -> Any: + # ``__await__`` returns a generator; closing here would be too + # eager — we close in ``__aiter__`` finally instead. Awaitable + # consumers (rare for streams) call ``aclose()`` separately. + return self._inner.__await__() + + def __aiter__(self) -> AsyncIterator[Any]: + return self._wrap() + + async def _wrap(self) -> AsyncIterator[Any]: + try: + async for item in self._inner: + yield item + finally: + self._close() + + async def get_final_response(self) -> Any: + try: + return await self._inner.get_final_response() + finally: + self._close() + + def __getattr__(self, name: str) -> Any: + return getattr(self._inner, name) + + +class ChannelContext: + """Host-owned bridge that channels call to invoke the target.""" + + def __init__(self, host: "AgentFrameworkHost") -> None: + """Bind the context to its owning :class:`AgentFrameworkHost`. + + The host instance is the source of truth for the target, registered + channels, identity stores, sessions, and lifecycle state. Channels + only ever receive a context; they never see the host directly. + """ + self._host = host + + @property + def target(self) -> SupportsAgentRun | Workflow: + """The hostable target the channel should invoke.""" + return self._host.target + + async def run(self, request: ChannelRequest) -> HostedRunResult: + """Invoke the target for ``request`` and return a channel-neutral result.""" + return await self._host._invoke(request) # pyright: ignore[reportPrivateUsage] + + def run_stream(self, request: ChannelRequest) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + """Invoke the target with ``stream=True`` and return the agent's ResponseStream. + + Channels iterate the stream directly (it acts like an AsyncGenerator) + and are responsible for delivering updates to their wire protocol. + Apply per-channel ``transform_hook`` callables during iteration to + rewrite or drop individual updates before they hit the wire. + """ + return self._host._invoke_stream(request) # pyright: ignore[reportPrivateUsage] + + async def deliver_response(self, request: ChannelRequest, payload: HostedRunResult) -> DeliveryReport: + """Resolve ``request.response_target`` and push ``payload`` to each destination. + + Returns a :class:`DeliveryReport` so the originating channel knows + whether to render the agent reply on its own wire (``originating`` + included in or implied by the target) or just acknowledge dispatch. + """ + return await self._host._deliver_response(request, payload) # pyright: ignore[reportPrivateUsage] + + +class _FoundryIsolationASGIMiddleware: + """Lift the two well-known Foundry isolation headers into a contextvar. + + The Foundry Hosted Agents runtime injects + ``x-agent-{user,chat}-isolation-key`` on every inbound HTTP request. + Storage providers that need partition-aware writes (notably + :class:`FoundryHostedAgentHistoryProvider`) read those keys via + :func:`get_current_isolation_keys` to avoid every channel having to + parse Foundry-specific headers itself. We intentionally inspect + only HTTP scopes; lifespan/websocket scopes are forwarded + untouched. When neither header is present the contextvar stays at + its default ``None``, so local-dev requests behave as before. + """ + + def __init__(self, app: ASGIApp) -> None: + self.app = app + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] != "http": + await self.app(scope, receive, send) + return + user_key: str | None = None + chat_key: str | None = None + for raw_name, raw_value in scope.get("headers") or (): + name = raw_name.decode("latin-1").lower() + if name == ISOLATION_HEADER_USER: + user_key = raw_value.decode("latin-1") or None + elif name == ISOLATION_HEADER_CHAT: + chat_key = raw_value.decode("latin-1") or None + if user_key is None and chat_key is None: + await self.app(scope, receive, send) + return + token = set_current_isolation_keys(IsolationKeys(user_key=user_key, chat_key=chat_key)) + try: + await self.app(scope, receive, send) + finally: + reset_current_isolation_keys(token) + + +class AgentFrameworkHost: + """Owns one Starlette app, one hostable target, and a sequence of channels.""" + + def __init__( + self, + target: SupportsAgentRun | Workflow, + *, + channels: Sequence[Channel], + debug: bool = False, + checkpoint_location: str | os.PathLike[str] | CheckpointStorage | None = None, + ) -> None: + """Create a host for ``target`` and its channels. + + Args: + target: The hostable target to invoke from channels — either a + ``SupportsAgentRun``-compatible agent or a ``Workflow``. The + host detects the kind and dispatches to the appropriate + execution seam (``agent.run(...)`` vs ``workflow.run(message=...)``). + For workflow targets, channels (or their ``run_hook``) are + responsible for shaping ``ChannelRequest.input`` into the + workflow start executor's typed input. + channels: The channels to expose. Each channel contributes routes + and commands that are mounted under ``channel.path`` (defaulting + to the channel name). + debug: Whether to enable Starlette's debug mode (stack traces in + responses, etc.) and per-channel debug logging. + checkpoint_location: When ``target`` is a :class:`Workflow`, the + location used to persist workflow checkpoints across requests. + Either a filesystem path (``str`` / ``PathLike``) — the host + creates a per-conversation + :class:`~agent_framework.FileCheckpointStorage` rooted at + ``checkpoint_location / `` — or a + :class:`~agent_framework.CheckpointStorage` instance the host + uses as-is (caller owns scoping). Per-request behaviour: + requests without ``ChannelRequest.session.isolation_key`` + are run without checkpointing. When set on a workflow that + already has its own checkpoint storage configured + (``WorkflowBuilder(checkpoint_storage=...)``), the host + refuses to start so ownership of checkpointing is + unambiguous. Ignored for ``SupportsAgentRun`` targets (a + warning is emitted). + """ + self.target: SupportsAgentRun | Workflow = target + self._is_workflow = isinstance(target, Workflow) + self.channels = list(channels) + self._debug = debug + self._app: Starlette | None = None + self._checkpoint_location: Path | CheckpointStorage | None = None + if checkpoint_location is not None: + if not self._is_workflow: + logger.warning("checkpoint_location is set but target is not a Workflow; ignoring.") + else: + workflow: Workflow = target # type: ignore[assignment] + if workflow._runner_context.has_checkpointing(): # type: ignore[reportPrivateUsage] + raise RuntimeError( + "Workflow already has checkpoint storage configured " + "(WorkflowBuilder(checkpoint_storage=...)). The host " + "manages checkpoints when checkpoint_location is set; " + "remove one of the two configurations." + ) + if isinstance(checkpoint_location, (str, os.PathLike)): + self._checkpoint_location = Path(os.fspath(checkpoint_location)) + else: + # Anything else is treated as a CheckpointStorage instance. + # ``CheckpointStorage`` is a non-runtime-checkable Protocol, + # so we cannot ``isinstance``-check it directly. + self._checkpoint_location = checkpoint_location + # Per-isolation_key session cache. The real spec backs this with a + # pluggable session store; this base host keeps it in-process. + self._sessions: dict[str, Any] = {} + # ``isolation_key -> active session_id``. Normally identical to the + # isolation_key, but ``reset_session`` rotates this to a fresh id so + # the next turn starts a new ``AgentSession`` while the old history + # remains on disk under its original session_id. + self._session_aliases: dict[str, str] = {} + # Per-isolation_key identity registry: which channels we've seen this + # user on, and which native_id they used on each. Powers + # ResponseTarget.active / .channel(name) / .channels([...]) / + # .all_linked. + # Shape: { isolation_key: { channel_name: ChannelIdentity } }. + self._identities: dict[str, dict[str, ChannelIdentity]] = {} + # (isolation_key -> last-seen channel name) for ResponseTarget.active. + self._active: dict[str, str] = {} + + @property + def app(self) -> Starlette: + """Lazily build (and cache) the Starlette application.""" + if self._app is None: + self._app = self._build_app() + return self._app + + def serve( + self, + *, + host: str = "127.0.0.1", + port: int = 8000, + workers: int = 1, + **config_kwargs: Any, + ) -> None: + """Start the host on ``host:port`` using Hypercorn. + + Hypercorn is the same ASGI server the Foundry Hosted Agents + runtime uses for production deployments, so running locally with + the same server keeps dev/prod parity (Trio fallbacks, lifespan + semantics, HTTP/2 support, …). Install with the ``serve`` extra + (``pip install agent-framework-hosting[serve]``). + + Args: + host: Interface to bind. Defaults to ``127.0.0.1``. + port: TCP port to bind. Defaults to ``8000``. + workers: Number of worker processes. Defaults to ``1``; + Hypercorn's process model only kicks in for ``>1``. + **config_kwargs: Forwarded to :class:`hypercorn.config.Config` + via attribute assignment, so any documented Hypercorn + config field (e.g. ``keep_alive_timeout=...``, + ``access_log_format=...``) can be set directly. + """ + try: + import asyncio + from typing import cast as _cast + + from hypercorn.asyncio import ( # pyright: ignore[reportMissingImports] + serve as _hypercorn_serve, # pyright: ignore[reportUnknownVariableType] + ) + from hypercorn.config import Config # pyright: ignore[reportMissingImports, reportUnknownVariableType] + except ImportError as exc: # pragma: no cover - exercised at runtime + raise RuntimeError( + "AgentFrameworkHost.serve() requires hypercorn. " + "Install with `pip install agent-framework-hosting[serve]` or `pip install hypercorn`." + ) from exc + + config = Config() # pyright: ignore[reportUnknownVariableType] + config.bind = [f"{host}:{port}"] # pyright: ignore[reportUnknownMemberType] + config.workers = workers # pyright: ignore[reportUnknownMemberType] + for key, value in config_kwargs.items(): + setattr(config, key, value) # pyright: ignore[reportUnknownArgumentType] + + # Touch ``self.app`` so the lifespan startup log fires once before + # we hand off to hypercorn — gives a single, readable banner of + # what the host is exposing without requiring channels to log + # individually. + app = self.app + self._log_startup(host=host, port=port, workers=workers) + + # ``hypercorn.asyncio.serve`` has a complex partially-typed signature + # (multiple ASGI/WSGI app overloads) and its ``Scope`` definition + # diverges from Starlette's; cast both sides to ``Any`` to keep the + # call site readable without sprinkling per-error suppressions. + serve_callable = _cast(Any, _hypercorn_serve) + asyncio.run(serve_callable(app, config)) + + def reset_session(self, isolation_key: str) -> None: + """Rotate ``isolation_key`` to a fresh session id without deleting history. + + Old turns are preserved on disk under their original session id and + remain accessible by passing that id explicitly (e.g. as + ``previous_response_id``). Future requests using ``isolation_key`` + get a new, empty ``AgentSession``. + """ + new_id = f"{isolation_key}#{uuid.uuid4().hex[:8]}" + self._session_aliases[isolation_key] = new_id + self._sessions.pop(isolation_key, None) + + # -- internals --------------------------------------------------------- # + + def _log_startup(self, *, host: str, port: int, workers: int) -> None: + """Emit a single human-friendly startup banner. + + Mirrors the ``AgentServerHost`` convention from + ``azure.ai.agentserver.core``: one INFO line that captures the + target type, every channel + its mount path, the bind address, + whether we're running inside a Foundry Hosted Agents container, + and the worker count. Keeps log noise low while still giving an + operator a single grep-able anchor when triaging. + """ + target_kind = "Workflow" if isinstance(self.target, Workflow) else type(self.target).__name__ + target_name = getattr(self.target, "name", None) or target_kind + channels_repr = ", ".join( + f"{ch.name}@{ch.path or '/'}" # blank path means "mounted at root" + for ch in self.channels + ) + is_hosted = bool(os.environ.get("FOUNDRY_HOSTING_ENVIRONMENT")) + logger.info( + "AgentFrameworkHost starting: target=%s (%s) bind=%s:%d workers=%d hosted=%s channels=[%s]", + target_name, + target_kind, + host, + port, + workers, + is_hosted, + channels_repr or "", + ) + + def _build_app(self) -> Starlette: + context = ChannelContext(self) + routes: list[BaseRoute] = [] + on_startup: list[Callable[[], Awaitable[None]]] = [] + on_shutdown: list[Callable[[], Awaitable[None]]] = [] + + # ``/readiness`` is the standard probe path the Foundry Hosted Agents + # runtime hits to gate traffic. We expose it unconditionally — once the + # ASGI app is up the host considers itself ready (channels register + # their own startup hooks and may run before the first request, but + # readiness is intentionally cheap so the platform's probe never times + # out on transient channel work). Mounted first so a channel cannot + # accidentally shadow it. + async def _readiness(_request: Request) -> PlainTextResponse: # noqa: RUF029 + """Liveness/readiness probe handler used by Foundry Hosted Agents.""" + return PlainTextResponse("ok") + + routes.append(Route("/readiness", _readiness, methods=["GET"])) + + for channel in self.channels: + contribution = channel.contribute(context) + # Channels publish routes relative to their root; mount under channel.path. + # An empty path means "mount at the app root" — useful for single-channel hosts + # that don't want a prefix (e.g. ResponsesChannel exposing POST /responses directly). + if contribution.routes: + if channel.path: + routes.append(Mount(channel.path, routes=list(contribution.routes))) + else: + routes.extend(contribution.routes) + on_startup.extend(contribution.on_startup) + on_shutdown.extend(contribution.on_shutdown) + + @asynccontextmanager + async def lifespan(_app: Starlette) -> AsyncIterator[None]: + for cb in on_startup: + await cb() + try: + yield + finally: + for cb in on_shutdown: + await cb() + + return Starlette( + debug=self._debug, + routes=routes, + lifespan=lifespan, + middleware=[Middleware(_FoundryIsolationASGIMiddleware)], + ) + + def _build_run_kwargs(self, request: ChannelRequest) -> dict[str, Any]: + # The full spec resolves a ChannelSession into an AgentSession here, + # honors session_mode, and consults LinkPolicy / ResponseTarget. This + # base host keys a per-isolation_key AgentSession off the channel's + # session hint so context providers (FileHistoryProvider, …) on the + # target see one session per end user. + session = None + if request.session_mode != "disabled" and request.session is not None: + isolation_key = request.session.isolation_key + if isolation_key is not None and hasattr(self.target, "create_session"): + session_id = self._session_aliases.get(isolation_key, isolation_key) + session = self._sessions.get(isolation_key) + if session is None: + # ``create_session`` lives on agent-typed targets but not on + # ``Workflow``; the ``hasattr`` above guards the call site. + session = self.target.create_session( # pyright: ignore[reportAttributeAccessIssue, reportUnknownVariableType, reportUnknownMemberType] + session_id=session_id + ) + self._sessions[isolation_key] = session # pyright: ignore[reportUnknownArgumentType] + + run_kwargs: dict[str, Any] = {} + if session is not None: + run_kwargs["session"] = session + if request.options: + run_kwargs["options"] = request.options + return run_kwargs + + def _log_incoming(self, request: ChannelRequest, *, stream: bool) -> None: + """Emit a one-line INFO summary for every incoming target invocation. + + When ``debug=True`` is set on the host, also dump the channel-native + settings the channel attached to the ``ChannelRequest`` — ``options`` + (the ChatOptions-shaped fields the channel parsed from its protocol + payload, e.g. temperature/tools/tool_choice for Responses), plus + ``attributes`` / ``metadata`` (the channel's protocol-specific bag, + e.g. ``chat_id`` / ``callback_query_id`` for Telegram). + """ + isolation_key = request.session.isolation_key if request.session is not None else None + logger.info( + "channel=%s op=%s stream=%s session=%s session_mode=%s", + request.channel, + request.operation, + stream, + isolation_key, + request.session_mode, + ) + logger.debug( + " ↳ options=%s attributes=%s metadata=%s", + dict(request.options) if request.options else {}, + dict(request.attributes) if request.attributes else {}, + dict(request.metadata) if request.metadata else {}, + ) + + def _flat_context_providers(self) -> list[Any]: + """Flatten ``target.context_providers`` one level for duck-typed hooks. + + ``ContextProviderBase`` aggregates child providers under a + ``providers`` attribute when wrapped (e.g. by ``ChatClientAgent``). + We descend one level so the host catches both styles without + forcing a particular wiring on the agent. + """ + providers = getattr(self.target, "context_providers", None) or () + flat: list[Any] = [] + for entry in providers: + children = getattr(entry, "providers", None) + if children: + flat.extend(children) + else: + flat.append(entry) + return flat + + def _bind_request_context(self, request: ChannelRequest) -> ExitStack: + """Bind any per-request anchors a target's context-providers expose. + + Channels announce per-request anchors (currently ``response_id`` + and ``previous_response_id``) via ``ChannelRequest.attributes``. + Some history providers — notably the Foundry hosted-agent history + provider — need to write storage under the same ``response_id`` + the channel surfaces on its envelope so the next turn's + ``previous_response_id`` walks the chain. Rather than the host + knowing about specific provider classes, we duck-type: any + context provider on the target that exposes a + ``bind_request_context(response_id=..., previous_response_id=..., + **_)`` context-manager gets it called with the request's + attribute values. Per-request platform isolation keys are handled + separately by :class:`_FoundryIsolationASGIMiddleware` (lifted + off the inbound headers into a contextvar) so providers don't + depend on channels to forward them. Bindings are scoped to the + returned :class:`ExitStack` which the caller must enter before + invoking the target and leave after the run completes. + """ + stack = ExitStack() + attrs = request.attributes or {} + response_id = attrs.get("response_id") + if not isinstance(response_id, str) or not response_id: + return stack + previous_response_id = attrs.get("previous_response_id") + if previous_response_id is not None and not isinstance(previous_response_id, str): + previous_response_id = None + + flat: list[Any] = self._flat_context_providers() + + for provider in flat: + bind = getattr(provider, "bind_request_context", None) + if not callable(bind): + continue + stack.enter_context( + cast( + "AbstractContextManager[Any]", + bind( + response_id=response_id, + previous_response_id=previous_response_id, + ), + ) + ) + return stack + + async def _invoke(self, request: ChannelRequest) -> HostedRunResult: + self._log_incoming(request, stream=False) + self._record_identity(request) + if self._is_workflow: + return await self._invoke_workflow(request) + run_kwargs = self._build_run_kwargs(request) + with self._bind_request_context(request): + # ``_is_workflow`` is False here so ``self.target`` is an + # ``Agent``-shaped target whose ``.run`` returns + # :class:`AgentResponse`. Narrow back to keep ``result.text`` + # well-typed without conditional imports of ``Agent``. + agent_target = cast("SupportsAgentRun", self.target) + result = await agent_target.run(self._wrap_input(request), **run_kwargs) + return HostedRunResult(text=result.text) + + def _invoke_stream(self, request: ChannelRequest) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + self._log_incoming(request, stream=True) + self._record_identity(request) + if self._is_workflow: + return self._invoke_workflow_stream(request) + run_kwargs = self._build_run_kwargs(request) + # ``run(stream=True)`` returns a ResponseStream synchronously (it is + # itself awaitable / async-iterable). We hand it back to the channel + # so the channel can drive iteration and apply its transform hook. + # Streaming flows iterate after this method returns, which is + # *outside* a sync ``with`` block — so we wrap the underlying + # stream in an adapter that holds the binding open across the + # iteration lifecycle. + binder = self._bind_request_context(request) + return _BoundResponseStream( # type: ignore[return-value] + self.target.run(self._wrap_input(request), stream=True, **run_kwargs), + binder, + ) + + def _resolve_checkpoint_storage(self, request: ChannelRequest) -> CheckpointStorage | None: + """Build (or return) the per-request checkpoint storage, or ``None``. + + Returns ``None`` when no ``checkpoint_location`` is configured or + when the request lacks a stable session key — without a key we + cannot scope checkpoints per conversation, and we'd rather skip + checkpointing than pollute a single shared store. + """ + if self._checkpoint_location is None: + return None + if request.session is None or not request.session.isolation_key: + return None + if isinstance(self._checkpoint_location, Path): + return FileCheckpointStorage(str(self._checkpoint_location / request.session.isolation_key)) + # Caller-supplied storage — used as-is; caller owns scoping. + return self._checkpoint_location + + async def _invoke_workflow(self, request: ChannelRequest) -> HostedRunResult: + """Dispatch to ``Workflow.run`` and collapse outputs into a ``HostedRunResult``. + + The channel's ``run_hook`` is the canonical adapter for shaping + ``request.input`` into the workflow start executor's typed input + (free-form text from a Telegram message, structured ``Responses`` + ``input`` items, …). When no hook is wired, ``request.input`` is + forwarded verbatim — appropriate for workflows whose start executor + accepts the channel's native input type (commonly ``str``). + + When ``checkpoint_location`` is configured on the host, a + per-conversation checkpoint storage is resolved, the workflow is + restored from its latest checkpoint (if any) and then re-run with + the new input — mirroring the resume semantics of the Foundry + Responses host. + """ + # Workflows do not own session state in the agent sense and do not + # accept ``session=`` / ``options=`` kwargs. The channel's run_hook is + # the seam for any per-run customization; nothing flows through here. + workflow: Workflow = self.target # type: ignore[assignment] + storage = self._resolve_checkpoint_storage(request) + if storage is not None: + latest = await storage.get_latest(workflow_name=workflow.name) + if latest is not None: + # Restore in-memory state from the most recent checkpoint + # before applying the new input. + await workflow.run(checkpoint_id=latest.checkpoint_id, checkpoint_storage=storage) + result = await workflow.run(request.input, checkpoint_storage=storage) + else: + result = await workflow.run(request.input) + outputs = result.get_outputs() + text = "\n".join(_workflow_output_to_text(o) for o in outputs) if outputs else "" + return HostedRunResult(text=text) + + def _invoke_workflow_stream(self, request: ChannelRequest) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + """Bridge ``Workflow.run(stream=True)`` to a channel-facing ``ResponseStream``. + + Wraps the workflow's ``ResponseStream[WorkflowEvent, WorkflowRunResult]`` + in a new ``ResponseStream[AgentResponseUpdate, AgentResponse]`` so + channels can iterate it identically to an agent stream and apply + their ``stream_transform_hook`` callables. + + Mapping rules: + + - ``output`` events whose ``data`` is already an + :class:`AgentResponseUpdate` (the common case for workflows + containing :class:`AgentExecutor`) pass through unchanged. + - ``output`` events with any other ``data`` are wrapped into a + single-text-content :class:`AgentResponseUpdate`. + - All other event types (``status``, ``executor_invoked``, + ``superstep_*``, lifecycle, …) are filtered out — channels only + care about user-visible text. Hooks can opt back in by inspecting + ``raw_representation`` on the produced updates. + + The original :class:`WorkflowEvent` is stashed on + ``AgentResponseUpdate.raw_representation`` so advanced consumers + (telemetry, debug UIs) can recover the full workflow timeline. + + Checkpoint restoration (when ``checkpoint_location`` is set) runs + before the input stream is opened so the new turn observes the + restored state. + """ + workflow: Workflow = self.target # type: ignore[assignment] + storage = self._resolve_checkpoint_storage(request) + + async def _maybe_restore() -> None: + if storage is None: + return + latest = await storage.get_latest(workflow_name=workflow.name) + if latest is None: + return + # Drain the restoration stream so the no-op invocation actually + # rehydrates state before the real run starts. + async for _ in workflow.run( + stream=True, + checkpoint_id=latest.checkpoint_id, + checkpoint_storage=storage, + ): + pass + + async def _bridge() -> AsyncIterator[AgentResponseUpdate]: + await _maybe_restore() + workflow_stream = workflow.run(request.input, stream=True, checkpoint_storage=storage) + try: + async for event in workflow_stream: + update = _workflow_event_to_update(event) + if update is not None: + yield update + finally: + async with _suppress_already_consumed(): + await workflow_stream.get_final_response() + + async def _finalize(updates: Sequence[AgentResponseUpdate]) -> AgentResponse: # noqa: RUF029 + return AgentResponse.from_updates(updates) + + return ResponseStream[AgentResponseUpdate, AgentResponse](_bridge(), finalizer=_finalize) + + def _wrap_input(self, request: ChannelRequest) -> Message | list[Message]: + """Promote ``request.input`` to ``Message``(s) carrying channel metadata. + + Channels deliver inputs as plain text, a single ``Message``, or a list + of ``Message`` (e.g. a Responses-API request that includes a ``system`` + instruction plus the user turn). To preserve channel provenance + + identity + ``response_target`` on the persisted history record (and + make it visible to context providers, evals, audits), we attach a + ``hosting`` block under ``additional_properties``. AF's + ``Message.to_dict`` round-trips ``additional_properties`` through any + ``HistoryProvider`` that serializes via ``to_dict`` (e.g. + ``FileHistoryProvider``) and the framework explicitly does *not* + forward these fields to model providers, so they are safe to attach. + + For a list of messages we attach the metadata to the LAST message that + will be persisted (typically the user turn) — this keeps a single, + searchable record of where the inbound message came from. + """ + hosting_meta: dict[str, Any] = {"channel": request.channel} + if request.identity is not None: + hosting_meta["identity"] = { + "channel": request.identity.channel, + "native_id": request.identity.native_id, + "attributes": dict(request.identity.attributes) if request.identity.attributes else {}, + } + target = request.response_target + hosting_meta["response_target"] = { + "kind": target.kind.value, + "targets": list(target.targets), + } + + raw = request.input + if isinstance(raw, Message): + raw.additional_properties = {**(raw.additional_properties or {}), "hosting": hosting_meta} + return raw + if isinstance(raw, list) and raw and all(isinstance(m, Message) for m in raw): + messages: list[Message] = [m for m in raw if isinstance(m, Message)] + last = messages[-1] + last.additional_properties = {**(last.additional_properties or {}), "hosting": hosting_meta} + return messages + # ``raw`` is typed as ``AgentRunInputs`` (str | Content | Message | Sequence[…]). + # The remaining cases are str / Content / Mapping — wrap as a single user message. + return Message( + role="user", + contents=[raw], # type: ignore[list-item] + additional_properties={"hosting": hosting_meta}, + ) + + def _record_identity(self, request: ChannelRequest) -> None: + """Update the per-``isolation_key`` identity registry + active-channel hint. + + Called on every successful resolve. ``ResponseTarget.active`` + consumes ``self._active``; ``ResponseTarget.channel(name)`` / + ``.channels([...])`` / ``.all_linked`` consume ``self._identities``. + """ + if request.identity is None or request.session is None: + return + key = request.session.isolation_key + if not key: + return + self._identities.setdefault(key, {})[request.identity.channel] = request.identity + self._active[key] = request.identity.channel + + async def _deliver_response(self, request: ChannelRequest, payload: HostedRunResult) -> DeliveryReport: + """Resolve ``request.response_target`` and call ``ChannelPush.push`` on each. + + Per SPEC-002 §"ResponseTarget": for any non-``originating`` target, + the originating channel returns an acknowledgment and the actual + agent reply lands on the destination channel(s). When a destination + cannot be resolved (no known native id) or doesn't implement + ``ChannelPush``, it is dropped and surfaced in + :class:`DeliveryReport.skipped`. If every destination drops, we + fall back to delivering on the originating channel (matching the + spec's policy default). + """ + target = request.response_target + kind = target.kind + + # Fast paths for the trivial variants. + if kind == ResponseTargetKind.ORIGINATING: + return DeliveryReport(include_originating=True) + if kind == ResponseTargetKind.NONE: + # Background-only — drop the reply on the floor for now (no + # ContinuationToken in the prototype). + return DeliveryReport(include_originating=False) + + # Build the destination set. + include_originating = False + # Each entry is (channel_name, identity_override_or_None_to_lookup). + destinations: list[tuple[str, ChannelIdentity | None]] = [] + isolation_key = request.session.isolation_key if request.session is not None else None + known = self._identities.get(isolation_key or "", {}) + + if kind == ResponseTargetKind.ACTIVE: + active = self._active.get(isolation_key or "") + if active is None or active == request.channel: + # Fall back to originating when there's no other active + # channel known (matches the "first message" case). + return DeliveryReport(include_originating=True) + destinations.append((active, known.get(active))) + + elif kind == ResponseTargetKind.ALL_LINKED: + for channel_name, identity in known.items(): + if channel_name == request.channel: + include_originating = True + continue + destinations.append((channel_name, identity)) + if not destinations and not include_originating: + # No links recorded yet — fall back. + return DeliveryReport(include_originating=True) + + elif kind == ResponseTargetKind.CHANNELS: + for entry in target.targets: + if entry == "originating": + include_originating = True + continue + if ":" in entry: + channel_name, _, native_id = entry.partition(":") + if channel_name == request.channel: + # Pointing the originating channel at itself with a + # specific native id — treat as "include + # originating" since the channel will reply on its + # own wire to that user anyway. + include_originating = True + continue + destinations.append((channel_name, ChannelIdentity(channel=channel_name, native_id=native_id))) + else: + if entry == request.channel: + include_originating = True + continue + destinations.append((entry, known.get(entry))) + + # Dispatch. + by_name = {ch.name: ch for ch in self.channels} + pushed: list[str] = [] + skipped: list[str] = [] + for channel_name, dest_identity in destinations: + channel = by_name.get(channel_name) + token = f"{channel_name}:{dest_identity.native_id}" if dest_identity is not None else channel_name + if channel is None: + logger.warning("deliver_response: no channel named %r (target=%s)", channel_name, token) + skipped.append(token) + continue + if not isinstance(channel, ChannelPush): + logger.warning( + "deliver_response: channel %r does not implement ChannelPush (target=%s)", + channel_name, + token, + ) + skipped.append(token) + continue + if dest_identity is None: + logger.warning( + "deliver_response: no known identity for isolation_key=%s on channel=%s", + isolation_key, + channel_name, + ) + skipped.append(token) + continue + try: + await channel.push(dest_identity, payload) + except Exception: + logger.exception("deliver_response: push failed for target=%s", token) + skipped.append(token) + continue + pushed.append(token) + logger.info("deliver_response: pushed to %s (%d chars)", token, len(payload.text)) + + if not pushed and not include_originating: + # Spec policy: if every destination drops, deliver to originating. + logger.warning("deliver_response: every destination dropped — falling back to originating") + include_originating = True + + return DeliveryReport( + include_originating=include_originating, + pushed=tuple(pushed), + skipped=tuple(skipped), + ) + + +__all__ = ["AgentFrameworkHost", "ChannelContext", "logger"] diff --git a/python/packages/hosting/agent_framework_hosting/_isolation.py b/python/packages/hosting/agent_framework_hosting/_isolation.py new file mode 100644 index 0000000000..53fb2f1e54 --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_isolation.py @@ -0,0 +1,76 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Per-request isolation keys read from inbound HTTP headers. + +The Foundry Hosted Agents runtime injects two well-known headers on every +request it forwards to the user's container: + +* ``x-agent-user-isolation-key`` — opaque per-user partition key +* ``x-agent-chat-isolation-key`` — opaque per-conversation partition key + +When the headers are present we are running inside (or being driven by) the +Foundry runtime; when they are absent we are running in plain local dev. The +host installs an ASGI middleware in :meth:`AgentFrameworkHost._build_app` +that reads both headers off every inbound HTTP request and pushes them into +the :data:`current_isolation_keys` contextvar for the duration of the +request, then resets it. Providers that need partition-aware storage (most +notably ``FoundryHostedAgentHistoryProvider``) read the contextvar via +:func:`get_current_isolation_keys` and apply the keys to their backend +calls — so app authors don't have to wire any middleware themselves and +channels stay free of Foundry-specific header knowledge. + +The contextvar holds a plain :class:`IsolationKeys` mapping; conversion to +provider-specific types (e.g. Foundry's ``IsolationContext``) happens at +the consuming provider so this module has no provider dependencies. +""" + +from __future__ import annotations + +from contextvars import ContextVar, Token + +__all__ = [ + "ISOLATION_HEADER_CHAT", + "ISOLATION_HEADER_USER", + "IsolationKeys", + "current_isolation_keys", + "get_current_isolation_keys", + "reset_current_isolation_keys", + "set_current_isolation_keys", +] + + +ISOLATION_HEADER_USER = "x-agent-user-isolation-key" +ISOLATION_HEADER_CHAT = "x-agent-chat-isolation-key" + + +class IsolationKeys: + """Per-request Foundry isolation keys lifted off the inbound headers.""" + + def __init__(self, user_key: str | None = None, chat_key: str | None = None) -> None: + self.user_key = user_key + self.chat_key = chat_key + + @property + def is_empty(self) -> bool: + return self.user_key is None and self.chat_key is None + + +current_isolation_keys: ContextVar[IsolationKeys | None] = ContextVar( + "agent_framework_hosting_isolation_keys", + default=None, +) + + +def get_current_isolation_keys() -> IsolationKeys | None: + """Return the isolation keys bound to the current request, if any.""" + return current_isolation_keys.get() + + +def set_current_isolation_keys(keys: IsolationKeys | None) -> Token[IsolationKeys | None]: + """Bind ``keys`` to the current async context and return a reset token.""" + return current_isolation_keys.set(keys) + + +def reset_current_isolation_keys(token: Token[IsolationKeys | None]) -> None: + """Restore the isolation contextvar to its prior value.""" + current_isolation_keys.reset(token) diff --git a/python/packages/hosting/agent_framework_hosting/_types.py b/python/packages/hosting/agent_framework_hosting/_types.py new file mode 100644 index 0000000000..7d200a3266 --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_types.py @@ -0,0 +1,376 @@ +# Copyright (c) Microsoft. All rights reserved. + +# ``ChannelRequest`` is the only intentional dataclass here (callers use +# ``dataclasses.replace`` on it in run hooks). The other types are plain +# Python classes by preference, so the "could be a dataclass" lint is muted +# at the file level. +# ruff: noqa: B903 + +"""Channel-neutral request envelope and channel protocol types. + +These types form the boundary between the host and individual channels. +A channel parses its native payload, builds a :class:`ChannelRequest`, and +hands it to :class:`ChannelContext.run` (or ``run_stream``) on the host. +The host normalizes the request into a single agent invocation and either +returns the result to the originating channel or fans out via +:class:`ResponseTarget` to other channels that implement +:class:`ChannelPush`. + +See ``docs/specs/002-python-hosting-channels.md`` for the full design. +""" + +from __future__ import annotations + +from collections.abc import Awaitable, Callable, Mapping, Sequence +from dataclasses import dataclass, field +from enum import Enum +from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable + +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + AgentRunInputs, + ResponseStream, + SupportsAgentRun, + Workflow, +) +from starlette.routing import BaseRoute + +if TYPE_CHECKING: + from ._host import ChannelContext + + +# --------------------------------------------------------------------------- # +# Channel-neutral request envelope +# --------------------------------------------------------------------------- # + + +_EMPTY_MAPPING: Mapping[str, Any] = {} + + +class ChannelSession: + """Channel-supplied session hint. + + The host turns this into an ``AgentSession`` keyed by ``isolation_key`` so + every distinct end user gets their own context-provider state (e.g. one + ``FileHistoryProvider`` JSONL file per user). + """ + + def __init__(self, isolation_key: str | None = None) -> None: + self.isolation_key = isolation_key + + +class ChannelIdentity: + """Channel-native identity the host sees on each request. + + Consumed by the host's identity registry. The host uses it for two things: + + 1. Recording the active channel for an ``isolation_key`` so + ``ResponseTarget.active`` resolves correctly. + 2. Telling :class:`ChannelPush` ``push`` recipients **where** in their + native namespace to deliver — Telegram uses ``native_id`` as the + chat id, Teams as the conversation/AAD id, etc. + """ + + def __init__( + self, + channel: str, + native_id: str, + attributes: Mapping[str, Any] | None = None, + ) -> None: + self.channel = channel + self.native_id = native_id + self.attributes: Mapping[str, Any] = attributes if attributes is not None else _EMPTY_MAPPING + + +class ResponseTargetKind(str, Enum): + """Discriminator for :class:`ResponseTarget` variants.""" + + ORIGINATING = "originating" + ACTIVE = "active" + CHANNELS = "channels" + ALL_LINKED = "all_linked" + NONE = "none" + + +class ResponseTarget: + """Per-request directive controlling **where** the host delivers the agent reply. + + Independent of ``session_mode``. Construct via the classmethod helpers or + use the module-level singletons rather than touching ``kind`` directly. + Variants: + + - ``ResponseTarget.originating`` (default) — synchronous response on the + originating channel only. + - ``ResponseTarget.active`` — push to the channel most recently observed + for the resolved ``isolation_key``. + - ``ResponseTarget.channel("teams")`` / ``.channels([...])`` — push to + one or more named destinations. Each entry is either a bare channel + name (host resolves the native id from its identity registry) or a + ``"channel:native_id"`` token (used verbatim). The pseudo-name + ``"originating"`` includes the originating channel in the fan-out. + - ``ResponseTarget.all_linked`` — push to every channel where the + resolved ``isolation_key`` has been observed. + - ``ResponseTarget.none`` — background-only; in the prototype this just + suppresses the originating reply (no ``ContinuationToken`` yet). + + Instances are intended to be treated as immutable; the singletons are + shared across the process. + """ + + def __init__( + self, + kind: ResponseTargetKind = ResponseTargetKind.ORIGINATING, + targets: tuple[str, ...] = (), + ) -> None: + self.kind = kind + self.targets = targets + + # -- builders ---------------------------------------------------------- # + + @classmethod + def channel(cls, name: str) -> "ResponseTarget": + """Target a single named destination channel.""" + return cls(kind=ResponseTargetKind.CHANNELS, targets=(name,)) + + @classmethod + def channels(cls, names: Sequence[str]) -> "ResponseTarget": + """Target an explicit list of destination channels.""" + return cls(kind=ResponseTargetKind.CHANNELS, targets=tuple(names)) + + # -- value semantics --------------------------------------------------- # + # ``ResponseTarget`` is treated as immutable, so two instances with the + # same ``kind`` + ``targets`` are interchangeable. Tests and channel + # parsers compare instances with ``==`` and use them as dict keys. + + def __eq__(self, other: object) -> bool: + if not isinstance(other, ResponseTarget): + return NotImplemented + return self.kind is other.kind and self.targets == other.targets + + def __hash__(self) -> int: + return hash((self.kind, self.targets)) + + def __repr__(self) -> str: + if self.kind is ResponseTargetKind.CHANNELS: + return f"ResponseTarget.channels({list(self.targets)!r})" + return f"ResponseTarget.{self.kind.value}" + + +# Module-level singletons so callers can write ``ResponseTarget.originating`` +# (matching the spec's classmethod-style notation) without juggling Python's +# no-zero-arg-classmethod-property limitation. +ResponseTarget.originating = ResponseTarget(kind=ResponseTargetKind.ORIGINATING) # type: ignore[attr-defined] +ResponseTarget.active = ResponseTarget(kind=ResponseTargetKind.ACTIVE) # type: ignore[attr-defined] +ResponseTarget.all_linked = ResponseTarget(kind=ResponseTargetKind.ALL_LINKED) # type: ignore[attr-defined] +ResponseTarget.none = ResponseTarget(kind=ResponseTargetKind.NONE) # type: ignore[attr-defined] + + +@dataclass +class ChannelRequest: + """Uniform invocation envelope every channel produces from its native payload. + + Kept as a dataclass so app authors can use ``dataclasses.replace(...)`` in + run hooks to produce a modified envelope without re-listing every field. + """ + + channel: str + operation: str # e.g. "message.create", "command.invoke" + input: AgentRunInputs + session: ChannelSession | None = None + options: Mapping[str, Any] | None = None + session_mode: str = "auto" # "auto" | "required" | "disabled" + metadata: Mapping[str, Any] = field(default_factory=lambda: {}) + attributes: Mapping[str, Any] = field(default_factory=lambda: {}) + stream: bool = False + identity: ChannelIdentity | None = None + response_target: ResponseTarget = field(default_factory=lambda: ResponseTarget.originating) # type: ignore[attr-defined] + + +class ChannelCommand: + """A discoverable command a channel exposes to its users (e.g. ``/reset``).""" + + def __init__( + self, + name: str, + description: str, + handle: Callable[["ChannelCommandContext"], Awaitable[None]], + ) -> None: + self.name = name + self.description = description + self.handle = handle + + +class ChannelCommandContext: + """Context passed to a :class:`ChannelCommand` handler.""" + + def __init__( + self, + request: ChannelRequest, + reply: Callable[[str], Awaitable[None]], + ) -> None: + self.request = request + self.reply = reply + + +_EMPTY_ROUTES: tuple[BaseRoute, ...] = () +_EMPTY_COMMANDS: tuple[ChannelCommand, ...] = () +_EMPTY_LIFECYCLE: tuple[Callable[[], Awaitable[None]], ...] = () + + +class ChannelContribution: + """Routes, commands, and lifecycle hooks a channel contributes to the host.""" + + def __init__( + self, + routes: Sequence[BaseRoute] = _EMPTY_ROUTES, + commands: Sequence[ChannelCommand] = _EMPTY_COMMANDS, + on_startup: Sequence[Callable[[], Awaitable[None]]] = _EMPTY_LIFECYCLE, + on_shutdown: Sequence[Callable[[], Awaitable[None]]] = _EMPTY_LIFECYCLE, + ) -> None: + self.routes = routes + self.commands = commands + self.on_startup = on_startup + self.on_shutdown = on_shutdown + + +class HostedRunResult: + """Channel-neutral result of an agent invocation routed through the host.""" + + def __init__(self, text: str) -> None: + self.text = text + + +class DeliveryReport: + """What :meth:`ChannelContext.deliver_response` did with a payload. + + The originating channel uses ``include_originating`` to decide whether + to render the agent reply on its own wire (``True`` — default for the + ``originating`` target, or when ``"originating"`` is one of the listed + destinations) or to return only an acknowledgement (``False`` — when + the target lists only out-of-band destinations). + """ + + def __init__( + self, + include_originating: bool, + pushed: tuple[str, ...] = (), + skipped: tuple[str, ...] = (), + ) -> None: + self.include_originating = include_originating + self.pushed = pushed # destination tokens delivered to (e.g. "telegram:123") + self.skipped = skipped # destinations resolved but skipped (no push, failed, …) + + +# A transform hook runs over each AgentResponseUpdate as the channel consumes +# the stream. It can return a replacement update, ``None`` to drop the update, +# or be async. Channels apply it during iteration so that channel-specific +# concerns (e.g. masking, redaction, formatting for the wire) live close to +# the channel rather than on the agent. +ChannelStreamTransformHook = Callable[ + [AgentResponseUpdate], + "AgentResponseUpdate | Awaitable[AgentResponseUpdate | None] | None", +] + + +# --------------------------------------------------------------------------- # +# Channel run hook +# --------------------------------------------------------------------------- # + + +# Run hooks accept the channel-built ``ChannelRequest`` and return a +# (possibly modified) replacement. Channels invoke the hook with both the +# request and the channel-side context as keyword arguments — the call +# convention is ``await hook(request, target=..., protocol_request=...)``. +# +# The ergonomic minimum for a hook implementation is therefore a function +# accepting ``request`` positionally plus ``**kwargs`` and returning a +# (possibly mutated) :class:`ChannelRequest`. Hooks that need the agent +# target or the raw channel-native payload pull them off the keyword +# arguments by name (``target`` / ``protocol_request``). +# +# ``protocol_request`` is the raw, channel-native payload the channel +# parsed (the JSON body for Responses, the Telegram ``Update`` dict, the +# Bot Framework ``Activity`` for Teams). Use it when the hook needs a +# field the channel did not lift onto ``ChannelRequest`` (e.g. OpenAI's +# ``safety_identifier``, Teams' ``from.aadObjectId``, …). +ChannelRunHook = Callable[..., "Awaitable[ChannelRequest] | ChannelRequest"] + + +async def apply_run_hook( + hook: ChannelRunHook, + request: ChannelRequest, + *, + target: SupportsAgentRun | Workflow, + protocol_request: Any | None, +) -> ChannelRequest: + """Channel-side helper to invoke a :data:`ChannelRunHook` with the standard kwargs. + + Channels call this rather than calling the hook directly so the + invocation convention (``request`` positional, ``target`` / + ``protocol_request`` keyword) is enforced in one place. + """ + result = hook(request, target=target, protocol_request=protocol_request) + if isinstance(result, Awaitable): + return await result + return result + + +# --------------------------------------------------------------------------- # +# Channel protocols +# --------------------------------------------------------------------------- # + + +@runtime_checkable +class Channel(Protocol): + """A pluggable adapter that exposes one transport on the host. + + Channels publish their routes, commands, and lifecycle callbacks via + :meth:`contribute`. The host mounts them under the channel's ``path`` + (or at the app root when ``path == ""``) and gives the channel a + :class:`ChannelContext` so it can call back into the host to invoke + the agent target and deliver responses. + """ + + name: str + path: str # default mount path (e.g. "/responses"); use "" to mount routes at the app root + + def contribute(self, context: "ChannelContext") -> ChannelContribution: ... + + +@runtime_checkable +class ChannelPush(Protocol): + """Optional capability: a channel that can deliver outbound messages without a prior request. + + Per SPEC-002 (req #13), channels that can do proactive delivery + (Telegram bot proactive message, Teams proactive bot message, + webhook callbacks, SSE broadcasts) implement ``push`` on top of the + base :class:`Channel` protocol. Channels without push can only be + addressed as the ``originating`` :class:`ResponseTarget`. + """ + + name: str + + async def push(self, identity: ChannelIdentity, payload: HostedRunResult) -> None: ... + + +__all__ = [ + "AgentResponse", + "AgentResponseUpdate", + "Channel", + "ChannelCommand", + "ChannelCommandContext", + "ChannelContribution", + "ChannelIdentity", + "ChannelPush", + "ChannelRequest", + "ChannelRunHook", + "ChannelSession", + "ChannelStreamTransformHook", + "DeliveryReport", + "HostedRunResult", + "ResponseStream", + "ResponseTarget", + "ResponseTargetKind", + "apply_run_hook", +] diff --git a/python/packages/hosting/pyproject.toml b/python/packages/hosting/pyproject.toml new file mode 100644 index 0000000000..57c79854bb --- /dev/null +++ b/python/packages/hosting/pyproject.toml @@ -0,0 +1,107 @@ +[project] +name = "agent-framework-hosting" +description = "Multi-channel hosting for Microsoft Agent Framework agents." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0a260424" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.2.0,<2", + "starlette>=0.37", +] + +[project.optional-dependencies] +serve = [ + "hypercorn>=0.17", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'", + "sys_platform == 'win32'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +filterwarnings = [] +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +include = ["agent_framework_hosting"] +exclude = ['tests'] + +[tool.mypy] +plugins = ['pydantic.mypy'] +strict = true +python_version = "3.10" +ignore_missing_imports = true +disallow_untyped_defs = true +no_implicit_optional = true +check_untyped_defs = true +warn_return_any = true +show_error_codes = true +warn_unused_ignores = false +disallow_incomplete_defs = true +disallow_untyped_decorators = true + +[tool.bandit] +targets = ["agent_framework_hosting"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks.mypy] +help = "Run MyPy for this package." +cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hosting" + +[tool.poe.tasks.test] +help = "Run the default unit test suite for this package." +cmd = 'pytest -m "not integration" --cov=agent_framework_hosting --cov-report=term-missing:skip-covered tests' + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" + +[dependency-groups] +dev = [ + "httpx>=0.28.1", +] diff --git a/python/packages/hosting/tests/__init__.py b/python/packages/hosting/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/packages/hosting/tests/_workflow_fixtures.py b/python/packages/hosting/tests/_workflow_fixtures.py new file mode 100644 index 0000000000..f59bb8cab8 --- /dev/null +++ b/python/packages/hosting/tests/_workflow_fixtures.py @@ -0,0 +1,43 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Workflow fixtures for hosting tests. + +Defined in a module that does not use ``from __future__ import annotations`` +because the workflow handler validation reflects on real annotation objects +rather than stringified forms. +""" + +from agent_framework import Executor, Workflow, WorkflowBuilder, WorkflowContext, handler + + +class _UpperExecutor(Executor): + @handler + async def handle(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.yield_output(text.upper()) + + +class _EchoExecutor(Executor): + @handler + async def handle(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.yield_output(text) + + +def build_upper_workflow() -> Workflow: + return WorkflowBuilder(start_executor=_UpperExecutor(id="upper")).build() + + +def build_echo_workflow() -> Workflow: + return WorkflowBuilder(start_executor=_EchoExecutor(id="echo")).build() + + +class _MultiChunkExecutor(Executor): + """Yields three separate ``output`` events so streaming has something to chew on.""" + + @handler + async def handle(self, text: str, ctx: WorkflowContext[str]) -> None: + for chunk in (f"{text}-1", f"{text}-2", f"{text}-3"): + await ctx.yield_output(chunk) + + +def build_multi_chunk_workflow() -> Workflow: + return WorkflowBuilder(start_executor=_MultiChunkExecutor(id="multi")).build() diff --git a/python/packages/hosting/tests/test_host.py b/python/packages/hosting/tests/test_host.py new file mode 100644 index 0000000000..bb6d7d5963 --- /dev/null +++ b/python/packages/hosting/tests/test_host.py @@ -0,0 +1,714 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for :class:`AgentFrameworkHost` invocation, session, and delivery routing.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator, Sequence +from dataclasses import dataclass, field +from typing import Any + +import pytest +from agent_framework import AgentResponseUpdate +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.routing import BaseRoute, Route +from starlette.testclient import TestClient + +from agent_framework_hosting import ( + AgentFrameworkHost, + Channel, + ChannelContext, + ChannelContribution, + ChannelIdentity, + ChannelPush, + ChannelRequest, + ChannelSession, + HostedRunResult, + ResponseTarget, +) + + +async def _ping(_request: Request) -> JSONResponse: + return JSONResponse({"ok": True}) + + +# --------------------------------------------------------------------------- # +# Fakes # +# --------------------------------------------------------------------------- # + + +@dataclass +class _FakeAgentSession: + session_id: str | None = None + service_session_id: str | None = None + + +@dataclass +class _FakeAgentResponse: + text: str + + +class _FakeAgent: + """Minimal :class:`SupportsAgentRun` implementation that records invocations.""" + + def __init__(self, reply: str = "ok") -> None: + self._reply = reply + self.calls: list[dict[str, Any]] = [] + self.created_sessions: list[_FakeAgentSession] = [] + + def create_session(self, *, session_id: str | None = None) -> _FakeAgentSession: + s = _FakeAgentSession(session_id=session_id) + self.created_sessions.append(s) + return s + + async def run(self, messages: Any = None, *, stream: bool = False, session: Any = None, **kwargs: Any) -> Any: + self.calls.append({"messages": messages, "stream": stream, "session": session, "kwargs": kwargs}) + if stream: # pragma: no cover - not used by these tests + + async def _gen() -> AsyncIterator[Any]: + yield self._reply + + return _gen() + return _FakeAgentResponse(text=self._reply) + + +class _RecordingChannel: + """Minimal :class:`Channel` + :class:`ChannelPush` for routing tests.""" + + def __init__(self, name: str = "fake", path: str = "/fake", supports_push: bool = True) -> None: + self.name = name + self.path = path + self.context: ChannelContext | None = None + self.pushes: list[tuple[ChannelIdentity, HostedRunResult]] = [] + self._push_raises: Exception | None = None + self._supports_push = supports_push + # Provide a single trivial route so contribute() exercises the mount path. + self._routes: Sequence[BaseRoute] = (Route("/ping", _ping),) + + def contribute(self, context: ChannelContext) -> ChannelContribution: + self.context = context + return ChannelContribution(routes=self._routes) + + async def push(self, identity: ChannelIdentity, payload: HostedRunResult) -> None: + if self._push_raises is not None: + raise self._push_raises + self.pushes.append((identity, payload)) + + +class _NoPushChannel: + """A channel that does NOT implement :class:`ChannelPush`.""" + + def __init__(self, name: str = "nopush", path: str = "/nopush") -> None: + self.name = name + self.path = path + + def contribute(self, context: ChannelContext) -> ChannelContribution: + return ChannelContribution() + + +@dataclass +class _LifecycleChannel: + name: str = "lifecycle" + path: str = "" + started: list[str] = field(default_factory=list) + stopped: list[str] = field(default_factory=list) + + def contribute(self, context: ChannelContext) -> ChannelContribution: + async def on_start() -> None: + self.started.append("up") + + async def on_stop() -> None: + self.stopped.append("down") + + return ChannelContribution(on_startup=[on_start], on_shutdown=[on_stop]) + + +# --------------------------------------------------------------------------- # +# Host wiring # +# --------------------------------------------------------------------------- # + + +class TestHostWiring: + def test_channel_is_recognized(self) -> None: + ch = _RecordingChannel() + assert isinstance(ch, Channel) + assert isinstance(ch, ChannelPush) + + def test_app_mounts_channel_routes_under_path(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel(path="/fake") + host = AgentFrameworkHost(target=agent, channels=[ch]) + + with TestClient(host.app) as client: + r = client.get("/fake/ping") + assert r.status_code == 200 + assert r.json() == {"ok": True} + + def test_app_mounts_at_root_when_path_is_empty(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel(path="") + host = AgentFrameworkHost(target=agent, channels=[ch]) + + with TestClient(host.app) as client: + r = client.get("/ping") + assert r.status_code == 200 + + def test_app_is_cached(self) -> None: + host = AgentFrameworkHost(target=_FakeAgent(), channels=[_RecordingChannel()]) + assert host.app is host.app + + def test_lifespan_invokes_startup_and_shutdown(self) -> None: + agent = _FakeAgent() + ch = _LifecycleChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app): + assert ch.started == ["up"] + assert ch.stopped == ["down"] + + def test_app_exposes_readiness_probe(self) -> None: + host = AgentFrameworkHost(target=_FakeAgent(), channels=[_RecordingChannel()]) + with TestClient(host.app) as client: + r = client.get("/readiness") + assert r.status_code == 200 + assert r.text == "ok" + + +# --------------------------------------------------------------------------- # +# Invoke + sessions # +# --------------------------------------------------------------------------- # + + +class TestHostInvoke: + @pytest.mark.asyncio + async def test_invoke_wraps_input_with_hosting_metadata(self) -> None: + agent = _FakeAgent(reply="hello") + ch = _RecordingChannel(name="responses") + host = AgentFrameworkHost(target=agent, channels=[ch]) + # Force ``app`` build to trigger ``contribute``. + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel="responses", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="user:1"), + identity=ChannelIdentity(channel="responses", native_id="user:1"), + ) + result = await ch.context.run(req) + + assert result.text == "hello" + assert len(agent.calls) == 1 + msg = agent.calls[0]["messages"] + assert msg.role == "user" + assert msg.additional_properties["hosting"]["channel"] == "responses" + assert msg.additional_properties["hosting"]["identity"] == { + "channel": "responses", + "native_id": "user:1", + "attributes": {}, + } + assert msg.additional_properties["hosting"]["response_target"] == { + "kind": "originating", + "targets": [], + } + + @pytest.mark.asyncio + async def test_invoke_caches_session_per_isolation_key(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req_a = ChannelRequest( + channel=ch.name, operation="op", input="1", session=ChannelSession(isolation_key="alice") + ) + req_b = ChannelRequest( + channel=ch.name, operation="op", input="2", session=ChannelSession(isolation_key="alice") + ) + req_c = ChannelRequest(channel=ch.name, operation="op", input="3", session=ChannelSession(isolation_key="bob")) + + await ch.context.run(req_a) + await ch.context.run(req_b) + await ch.context.run(req_c) + + # Two distinct sessions created (alice, bob) — never re-created. + assert len(agent.created_sessions) == 2 + assert agent.calls[0]["session"] is agent.calls[1]["session"] + assert agent.calls[0]["session"] is not agent.calls[2]["session"] + + @pytest.mark.asyncio + async def test_session_disabled_does_not_create_session(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel=ch.name, + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + session_mode="disabled", + ) + await ch.context.run(req) + assert agent.created_sessions == [] + assert agent.calls[0]["session"] is None + + @pytest.mark.asyncio + async def test_reset_session_rotates_id_and_drops_cache(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest(channel=ch.name, operation="op", input="x", session=ChannelSession(isolation_key="alice")) + await ch.context.run(req) + first_session = agent.calls[-1]["session"] + assert first_session.session_id == "alice" + + host.reset_session("alice") + await ch.context.run(req) + second_session = agent.calls[-1]["session"] + # New session, new id (alias rotation), distinct object. + assert second_session is not first_session + assert second_session.session_id != "alice" + assert second_session.session_id.startswith("alice#") + + @pytest.mark.asyncio + async def test_options_propagates_to_target_run(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel=ch.name, + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + options={"temperature": 0.4}, + ) + await ch.context.run(req) + assert agent.calls[0]["kwargs"]["options"] == {"temperature": 0.4} + + +# --------------------------------------------------------------------------- # +# Workflow target # +# --------------------------------------------------------------------------- # + + +class TestHostWorkflowTarget: + """The host accepts a ``Workflow`` and dispatches to ``workflow.run(...)``.""" + + @pytest.mark.asyncio + async def test_invoke_workflow_collapses_outputs_to_hosted_run_result(self) -> None: + from tests._workflow_fixtures import build_upper_workflow + + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch]) + _ = host.app + assert ch.context is not None + + # The channel's run_hook is the canonical adapter from a free-form input + # to a workflow's typed input; here the start executor accepts ``str`` + # already so the channel forwards ``input`` verbatim. + req = ChannelRequest(channel="fake", operation="message.create", input="hello") + result = await ch.context.run(req) + + assert result.text == "HELLO" + # No session caching for workflow targets — Workflow has no + # ``create_session`` and the host must not invent one. + assert host._sessions == {} + + @pytest.mark.asyncio + async def test_stream_workflow_yields_updates_and_finalizes(self) -> None: + from tests._workflow_fixtures import build_echo_workflow + + workflow = build_echo_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest(channel="fake", operation="message.create", input="hi") + stream = ch.context.run_stream(req) + + updates: list[AgentResponseUpdate] = [] + async for update in stream: + updates.append(update) + + # The echo workflow yields a single ``output`` event whose payload is + # the original string; the host wraps non-update payloads into a + # one-shot ``AgentResponseUpdate`` carrying the text. + assert [u.text for u in updates] == ["hi"] + # ``raw_representation`` preserves the source ``WorkflowEvent`` so + # advanced consumers (telemetry, debug UIs) can recover the full + # workflow timeline. + assert all(u.raw_representation is not None for u in updates) + + final = await stream.get_final_response() + assert final.text == "hi" + + @pytest.mark.asyncio + async def test_stream_workflow_yields_one_update_per_output_event(self) -> None: + from tests._workflow_fixtures import build_multi_chunk_workflow + + workflow = build_multi_chunk_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest(channel="fake", operation="message.create", input="x") + stream = ch.context.run_stream(req) + + chunks: list[str] = [] + async for update in stream: + chunks.append(update.text) + # The originating ``executor_id`` is propagated via author_name so + # multi-agent workflows can route per-author rendering downstream. + assert update.author_name == "multi" + + assert chunks == ["x-1", "x-2", "x-3"] + final = await stream.get_final_response() + assert final.text == "x-1x-2x-3" + + +class TestHostWorkflowCheckpointing: + """The host scopes per-conversation checkpoints when ``checkpoint_location`` is set.""" + + def test_rejects_workflow_with_existing_checkpoint_storage(self, tmp_path: Any) -> None: + from agent_framework import InMemoryCheckpointStorage, WorkflowBuilder + + from tests._workflow_fixtures import _UpperExecutor + + workflow = WorkflowBuilder( + start_executor=_UpperExecutor(id="upper"), + checkpoint_storage=InMemoryCheckpointStorage(), + ).build() + with pytest.raises(RuntimeError, match="already has checkpoint storage"): + AgentFrameworkHost( + target=workflow, + channels=[_RecordingChannel()], + checkpoint_location=tmp_path, + ) + + def test_warns_when_target_is_agent(self, tmp_path: Any, caplog: Any) -> None: + import logging as _logging + + agent = _FakeAgent() + with caplog.at_level(_logging.WARNING, logger="agent_framework.hosting"): + host = AgentFrameworkHost(target=agent, channels=[_RecordingChannel()], checkpoint_location=tmp_path) + assert host._checkpoint_location is None + assert any("checkpoint_location" in rec.message for rec in caplog.records) + + @pytest.mark.asyncio + async def test_invoke_skips_checkpointing_when_no_isolation_key(self, tmp_path: Any) -> None: + from tests._workflow_fixtures import build_upper_workflow + + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=tmp_path) + _ = host.app + assert ch.context is not None + + # No session -> no scoping key -> no checkpoint storage written. + req = ChannelRequest(channel="fake", operation="message.create", input="hi") + result = await ch.context.run(req) + + assert result.text == "HI" + assert list(tmp_path.iterdir()) == [] + + @pytest.mark.asyncio + async def test_invoke_writes_checkpoint_under_isolation_key(self, tmp_path: Any) -> None: + from tests._workflow_fixtures import build_upper_workflow + + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=tmp_path) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel="fake", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="alice"), + ) + result = await ch.context.run(req) + assert result.text == "HI" + + # FileCheckpointStorage rooted at / should + # have produced at least one checkpoint file scoped to that user. + scoped = tmp_path / "alice" + assert scoped.exists() + assert any(scoped.iterdir()), "expected at least one checkpoint to be written under the per-user dir" + + @pytest.mark.asyncio + async def test_stream_writes_checkpoint_under_isolation_key(self, tmp_path: Any) -> None: + from tests._workflow_fixtures import build_echo_workflow + + workflow = build_echo_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=tmp_path) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel="fake", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="bob"), + ) + stream = ch.context.run_stream(req) + async for _ in stream: + pass + await stream.get_final_response() + + scoped = tmp_path / "bob" + assert scoped.exists() + assert any(scoped.iterdir()) + + @pytest.mark.asyncio + async def test_caller_supplied_checkpoint_storage_used_as_is(self, tmp_path: Any) -> None: + from agent_framework import InMemoryCheckpointStorage + + from tests._workflow_fixtures import build_upper_workflow + + storage = InMemoryCheckpointStorage() + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=storage) + _ = host.app + assert ch.context is not None + assert host._checkpoint_location is storage + + req = ChannelRequest( + channel="fake", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="carol"), + ) + await ch.context.run(req) + + # The caller-owned storage is used directly (no per-user scoping + # applied by the host); a checkpoint should appear in it. + checkpoints = await storage.list_checkpoints(workflow_name=workflow.name) + assert checkpoints, "expected the caller-supplied storage to receive a checkpoint" + # And nothing should have been written into the tmp_path tree. + assert list(tmp_path.iterdir()) == [] + + +# --------------------------------------------------------------------------- # +# Delivery routing # +# --------------------------------------------------------------------------- # + + +def _make_host_with_two_channels() -> tuple[AgentFrameworkHost, _RecordingChannel, _RecordingChannel, ChannelContext]: + agent = _FakeAgent() + a = _RecordingChannel(name="responses", path="/r") + b = _RecordingChannel(name="telegram", path="/t") + host = AgentFrameworkHost(target=agent, channels=[a, b]) + _ = host.app + assert a.context is not None + return host, a, b, a.context + + +def _record_identity_on(host: AgentFrameworkHost, isolation_key: str, channel: str, native_id: str) -> None: + """Pre-seed the host's identity registry by running a request.""" + host._identities.setdefault(isolation_key, {})[channel] = ChannelIdentity(channel=channel, native_id=native_id) + host._active[isolation_key] = channel + + +class TestDeliverResponse: + @pytest.mark.asyncio + async def test_originating_returns_include_originating(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + req = ChannelRequest(channel="responses", operation="op", input="x") + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == () + assert report.skipped == () + + @pytest.mark.asyncio + async def test_none_suppresses_everything(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + response_target=ResponseTarget.none, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is False + assert report.pushed == () + assert report.skipped == () + + @pytest.mark.asyncio + async def test_active_pushes_to_other_channel(self) -> None: + host, a, b, ctx = _make_host_with_two_channels() + # Alice was last seen on telegram. + _record_identity_on(host, "alice", "telegram", "42") + # Now she sends a message via responses; ResponseTarget.active should + # push to telegram, not back to responses. + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.active, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is False + assert report.pushed == ("telegram:42",) + assert b.pushes and b.pushes[0][0].native_id == "42" + + @pytest.mark.asyncio + async def test_active_falls_back_to_originating_when_self(self) -> None: + host, _a, _b, ctx = _make_host_with_two_channels() + _record_identity_on(host, "alice", "responses", "user:1") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.active, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + + @pytest.mark.asyncio + async def test_channels_with_unknown_identity_skipped(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + # No prior identity seeded for telegram on alice. + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channel("telegram"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + # Skipped → fallback to originating. + assert report.include_originating is True + assert report.skipped == ("telegram",) + assert report.pushed == () + + @pytest.mark.asyncio + async def test_channels_with_explicit_native_id_token(self) -> None: + _, _, b, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + response_target=ResponseTarget.channel("telegram:99"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.pushed == ("telegram:99",) + assert report.include_originating is False + assert b.pushes[0][0].native_id == "99" + + @pytest.mark.asyncio + async def test_channels_originating_pseudo_includes_origin(self) -> None: + host, _a, _b, ctx = _make_host_with_two_channels() + _record_identity_on(host, "alice", "telegram", "42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channels(["originating", "telegram"]), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == ("telegram:42",) + + @pytest.mark.asyncio + async def test_channels_unknown_channel_name_skipped(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + response_target=ResponseTarget.channel("nope"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True # fallback + assert report.skipped == ("nope",) + + @pytest.mark.asyncio + async def test_no_push_capability_skipped(self) -> None: + agent = _FakeAgent() + a = _RecordingChannel(name="responses", path="/r") + b = _NoPushChannel(name="nopush", path="/n") + host = AgentFrameworkHost(target=agent, channels=[a, b]) + _ = host.app + assert a.context is not None + # Pre-seed identity on the no-push channel so we get past the + # identity check and hit the ChannelPush check. + host._identities.setdefault("alice", {})["nopush"] = ChannelIdentity(channel="nopush", native_id="42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channel("nopush"), + ) + report = await a.context.deliver_response(req, HostedRunResult(text="reply")) + assert report.skipped == ("nopush:42",) + assert report.include_originating is True # fallback + + @pytest.mark.asyncio + async def test_all_linked_pushes_to_every_other_channel(self) -> None: + host, _a, b, ctx = _make_host_with_two_channels() + # Alice on responses (originating) and telegram. + host._identities.setdefault("alice", {}) + host._identities["alice"]["responses"] = ChannelIdentity(channel="responses", native_id="user:1") + host._identities["alice"]["telegram"] = ChannelIdentity(channel="telegram", native_id="42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.all_linked, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == ("telegram:42",) + assert b.pushes and b.pushes[0][1].text == "reply" + + @pytest.mark.asyncio + async def test_all_linked_no_other_channels_falls_back(self) -> None: + host, _a, _b, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.all_linked, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == () + + @pytest.mark.asyncio + async def test_push_exception_marks_skipped(self) -> None: + host, _a, b, ctx = _make_host_with_two_channels() + b._push_raises = RuntimeError("boom") # type: ignore[attr-defined] + host._identities.setdefault("alice", {})["telegram"] = ChannelIdentity(channel="telegram", native_id="42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channel("telegram"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.skipped == ("telegram:42",) + assert report.include_originating is True # fallback diff --git a/python/packages/hosting/tests/test_types.py b/python/packages/hosting/tests/test_types.py new file mode 100644 index 0000000000..76531dfe49 --- /dev/null +++ b/python/packages/hosting/tests/test_types.py @@ -0,0 +1,105 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for the channel-neutral envelope types in :mod:`agent_framework_hosting._types`.""" + +from __future__ import annotations + +from agent_framework_hosting import ( + ChannelIdentity, + ChannelRequest, + ChannelSession, + ResponseTarget, + ResponseTargetKind, +) + + +class TestResponseTarget: + def test_originating_default_singleton(self) -> None: + target = ResponseTarget.originating # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.ORIGINATING + assert target.targets == () + + def test_active_singleton(self) -> None: + target = ResponseTarget.active # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.ACTIVE + assert target.targets == () + + def test_all_linked_singleton(self) -> None: + target = ResponseTarget.all_linked # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.ALL_LINKED + + def test_none_singleton(self) -> None: + target = ResponseTarget.none # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.NONE + + def test_channel_builder_single(self) -> None: + target = ResponseTarget.channel("teams") + assert target.kind is ResponseTargetKind.CHANNELS + assert target.targets == ("teams",) + + def test_channels_builder_list(self) -> None: + target = ResponseTarget.channels(["teams", "telegram", "originating"]) + assert target.kind is ResponseTargetKind.CHANNELS + assert target.targets == ("teams", "telegram", "originating") + + def test_channels_builder_accepts_tuple(self) -> None: + target = ResponseTarget.channels(("a", "b")) + assert target.targets == ("a", "b") + + def test_target_is_hashable(self) -> None: + # Plain class — hashing falls back to identity, which is fine here: + # the two keys below are different instances (singleton vs builder). + d = {ResponseTarget.originating: 1, ResponseTarget.channel("t"): 2} # type: ignore[attr-defined] + assert len(d) == 2 + + +class TestChannelRequest: + def test_required_fields_only(self) -> None: + req = ChannelRequest(channel="responses", operation="message.create", input="hi") + assert req.channel == "responses" + assert req.operation == "message.create" + assert req.input == "hi" + assert req.session is None + assert req.options is None + assert req.session_mode == "auto" + assert req.metadata == {} + assert req.attributes == {} + assert req.stream is False + assert req.identity is None + # Default response target is the originating singleton. + assert req.response_target.kind is ResponseTargetKind.ORIGINATING + + def test_default_response_target_is_originating_singleton(self) -> None: + # Every new request shares the module-level ``originating`` singleton + # by default — instances are intended to be treated as immutable, so + # sharing is safe and avoids per-request allocation. + a = ChannelRequest(channel="a", operation="op", input="x") + b = ChannelRequest(channel="b", operation="op", input="y") + assert a.response_target is ResponseTarget.originating # type: ignore[attr-defined] + assert a.response_target is b.response_target + + def test_with_session_and_identity(self) -> None: + req = ChannelRequest( + channel="telegram", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="user:42"), + identity=ChannelIdentity(channel="telegram", native_id="42"), + response_target=ResponseTarget.active, # type: ignore[attr-defined] + ) + assert req.session is not None + assert req.session.isolation_key == "user:42" + assert req.identity is not None + assert req.identity.channel == "telegram" + assert req.identity.native_id == "42" + assert req.response_target.kind is ResponseTargetKind.ACTIVE + + +class TestChannelIdentity: + def test_attributes_default_empty_mapping(self) -> None: + ident = ChannelIdentity(channel="teams", native_id="abc") + assert dict(ident.attributes) == {} + + def test_attributes_passthrough(self) -> None: + ident = ChannelIdentity(channel="teams", native_id="abc", attributes={"role": "user"}) + assert dict(ident.attributes) == {"role": "user"} diff --git a/python/pyproject.toml b/python/pyproject.toml index b788f48e71..406aaf6efc 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -85,6 +85,7 @@ agent-framework-foundry-hosting = { workspace = true } agent-framework-foundry-local = { workspace = true } agent-framework-gemini = { workspace = true } agent-framework-github-copilot = { workspace = true } +agent-framework-hosting = { workspace = true } agent-framework-hyperlight = { workspace = true } agent-framework-lab = { workspace = true } agent-framework-mem0 = { workspace = true } @@ -205,6 +206,7 @@ executionEnvironments = [ { root = "packages/foundry/tests", reportPrivateUsage = "none" }, { root = "packages/foundry_local/tests", reportPrivateUsage = "none" }, { root = "packages/github_copilot/tests", reportPrivateUsage = "none" }, + { root = "packages/hosting/tests", reportPrivateUsage = "none" }, { root = "packages/lab/gaia/tests", reportPrivateUsage = "none" }, { root = "packages/lab/lightning/tests", reportPrivateUsage = "none" }, { root = "packages/lab/tau2/tests", reportPrivateUsage = "none" }, diff --git a/python/uv.lock b/python/uv.lock index 85a968174a..a4b58d2270 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -47,6 +47,7 @@ members = [ "agent-framework-foundry-local", "agent-framework-gemini", "agent-framework-github-copilot", + "agent-framework-hosting", "agent-framework-hyperlight", "agent-framework-lab", "agent-framework-mem0", @@ -599,6 +600,36 @@ requires-dist = [ { name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = ">=0.2.1,<=0.2.1" }, ] +[[package]] +name = "agent-framework-hosting" +version = "1.0.0a260424" +source = { editable = "packages/hosting" } +dependencies = [ + { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "starlette", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.optional-dependencies] +serve = [ + { name = "hypercorn", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.dev-dependencies] +dev = [ + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.metadata] +requires-dist = [ + { name = "agent-framework-core", editable = "packages/core" }, + { name = "hypercorn", marker = "extra == 'serve'", specifier = ">=0.17" }, + { name = "starlette", specifier = ">=0.37" }, +] +provides-extras = ["serve"] + +[package.metadata.requires-dev] +dev = [{ name = "httpx", specifier = ">=0.28.1" }] + [[package]] name = "agent-framework-hyperlight" version = "1.0.0a260429" @@ -1636,7 +1667,7 @@ name = "clr-loader" version = "0.2.10" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cffi", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "cffi", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/18/24/c12faf3f61614b3131b5c98d3bf0d376b49c7feaa73edca559aeb2aee080/clr_loader-0.2.10.tar.gz", hash = "sha256:81f114afbc5005bafc5efe5af1341d400e22137e275b042a8979f3feb9fc9446", size = 83605, upload-time = "2026-01-03T23:13:06.984Z" } wheels = [ @@ -5132,8 +5163,8 @@ name = "powerfx" version = "0.0.34" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cffi", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, - { name = "pythonnet", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "cffi", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, + { name = "pythonnet", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/9f/fb/6c4bf87e0c74ca1c563921ce89ca1c5785b7576bca932f7255cdf81082a7/powerfx-0.0.34.tar.gz", hash = "sha256:956992e7afd272657ed16d80f4cad24ec95d9e4a79fb9dfa4a068a09e136af32", size = 3237555, upload-time = "2025-12-22T15:50:59.682Z" } wheels = [ @@ -5806,7 +5837,7 @@ name = "pythonnet" version = "3.0.5" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "clr-loader", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "clr-loader", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/9a/d6/1afd75edd932306ae9bd2c2d961d603dc2b52fcec51b04afea464f1f6646/pythonnet-3.0.5.tar.gz", hash = "sha256:48e43ca463941b3608b32b4e236db92d8d40db4c58a75ace902985f76dac21cf", size = 239212, upload-time = "2024-12-13T08:30:44.393Z" } wheels = [ From ee183ae507701665915988bc249053ed839b742e Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Mon, 4 May 2026 17:39:03 +0200 Subject: [PATCH 2/2] feat(hosting-responses): add OpenAI Responses-shaped channel package New ``agent-framework-hosting-responses`` package implementing the OpenAI Responses-shaped HTTP channel for the Hosting framework. Mounts ``POST /responses`` (and a ``/responses/{response_id}`` GET) onto an ``AgentFrameworkHost`` and translates the OpenAI Responses wire shape to/from the channel-neutral ``ChannelRequest`` / ``HostedRunResult`` plumbing. Surface (re-exported from ``agent_framework_hosting_responses``): - ``ResponsesChannel`` -- concrete ``Channel`` implementation. Owns the Starlette route(s), parses inbound JSON into ``ChannelRequest``, runs the optional ``ChannelRunHook``, calls back into the ``ChannelContext`` to invoke the agent target, builds Responses envelopes (sync JSON or SSE), and respects ``DeliveryReport.include_originating`` so cross-channel push routes only ack to the originating Responses caller. - The minted ``response_id`` is propagated via the host's ContextVar machinery so storage-side history providers (e.g. ``FoundryHostedAgentHistoryProvider``) persist envelopes against the same id the channel returns. - 48 unit tests covering route wiring, parsing of each Responses input shape, hook composition, sync vs streaming paths, and originating vs non-originating delivery branches. Registers the package in ``python/pyproject.toml`` ``[tool.uv.sources]`` and adds the matching pyright ``executionEnvironments`` entry. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/hosting-responses/LICENSE | 21 ++ python/packages/hosting-responses/README.md | 22 ++ .../__init__.py | 27 ++ .../_channel.py | 339 ++++++++++++++++++ .../_parsing.py | 238 ++++++++++++ .../packages/hosting-responses/pyproject.toml | 98 +++++ .../hosting-responses/tests/__init__.py | 0 .../hosting-responses/tests/test_channel.py | 252 +++++++++++++ .../hosting-responses/tests/test_parsing.py | 205 +++++++++++ 9 files changed, 1202 insertions(+) create mode 100644 python/packages/hosting-responses/LICENSE create mode 100644 python/packages/hosting-responses/README.md create mode 100644 python/packages/hosting-responses/agent_framework_hosting_responses/__init__.py create mode 100644 python/packages/hosting-responses/agent_framework_hosting_responses/_channel.py create mode 100644 python/packages/hosting-responses/agent_framework_hosting_responses/_parsing.py create mode 100644 python/packages/hosting-responses/pyproject.toml create mode 100644 python/packages/hosting-responses/tests/__init__.py create mode 100644 python/packages/hosting-responses/tests/test_channel.py create mode 100644 python/packages/hosting-responses/tests/test_parsing.py diff --git a/python/packages/hosting-responses/LICENSE b/python/packages/hosting-responses/LICENSE new file mode 100644 index 0000000000..9e841e7a26 --- /dev/null +++ b/python/packages/hosting-responses/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/python/packages/hosting-responses/README.md b/python/packages/hosting-responses/README.md new file mode 100644 index 0000000000..b7e82ed508 --- /dev/null +++ b/python/packages/hosting-responses/README.md @@ -0,0 +1,22 @@ +# agent-framework-hosting-responses + +OpenAI Responses-shaped channel for `agent-framework-hosting`. + +Exposes a single `POST /responses` endpoint that accepts the OpenAI +Responses API request body and returns either a Responses-shaped JSON +body or a Server-Sent-Events stream when `stream=True`. + +```python +from agent_framework import ChatAgent +from agent_framework.openai import OpenAIChatClient +from agent_framework_hosting import AgentFrameworkHost +from agent_framework_hosting_responses import ResponsesChannel + +agent = ChatAgent(name="Assistant", chat_client=OpenAIChatClient()) + +host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel()]) +host.serve(port=8000) +``` + +The base host plumbing lives in +[`agent-framework-hosting`](https://pypi.org/project/agent-framework-hosting/). diff --git a/python/packages/hosting-responses/agent_framework_hosting_responses/__init__.py b/python/packages/hosting-responses/agent_framework_hosting_responses/__init__.py new file mode 100644 index 0000000000..72b2272aec --- /dev/null +++ b/python/packages/hosting-responses/agent_framework_hosting_responses/__init__.py @@ -0,0 +1,27 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""OpenAI Responses-shaped channel for ``agent-framework-hosting``.""" + +import importlib.metadata + +from ._channel import ResponsesChannel +from ._parsing import ( + messages_from_responses_input, + parse_response_target, + parse_responses_identity, + parse_responses_request, +) + +try: + __version__ = importlib.metadata.version(__name__) +except importlib.metadata.PackageNotFoundError: + __version__ = "0.0.0" + +__all__ = [ + "ResponsesChannel", + "__version__", + "messages_from_responses_input", + "parse_response_target", + "parse_responses_identity", + "parse_responses_request", +] diff --git a/python/packages/hosting-responses/agent_framework_hosting_responses/_channel.py b/python/packages/hosting-responses/agent_framework_hosting_responses/_channel.py new file mode 100644 index 0000000000..d7cc3290a7 --- /dev/null +++ b/python/packages/hosting-responses/agent_framework_hosting_responses/_channel.py @@ -0,0 +1,339 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""``ResponsesChannel`` — OpenAI Responses-shaped HTTP surface. + +Exposes a single ``POST /responses`` endpoint that accepts +``{"input": "...", "stream": false}`` (and the rest of the Responses API +request body) and returns either a Responses-shaped JSON body +(``stream=False``, default) or a Server-Sent-Events stream +(``stream=True``). + +Payload construction reuses the ``openai.types.responses`` Pydantic +models so the OpenAI Python SDK ``stream=True`` consumer parses every +required field without surprises. +""" + +from __future__ import annotations + +import time +import uuid +from collections.abc import AsyncIterator, Callable, Mapping +from typing import Any + +from agent_framework_hosting import ( + ChannelContext, + ChannelContribution, + ChannelRequest, + ChannelRunHook, + ChannelSession, + DeliveryReport, + HostedRunResult, + apply_run_hook, + logger, +) +from openai.types.responses import ( + Response as OpenAIResponse, +) +from openai.types.responses import ( + ResponseCompletedEvent, + ResponseCreatedEvent, + ResponseError, + ResponseFailedEvent, + ResponseOutputMessage, + ResponseOutputText, + ResponseTextDeltaEvent, +) +from starlette.requests import Request +from starlette.responses import JSONResponse, Response, StreamingResponse +from starlette.routing import Route + +from ._parsing import ( + parse_response_target, + parse_responses_identity, + parse_responses_request, +) + + +def _ack_text(report: DeliveryReport) -> str: + """Tiny acknowledgement string for the originating wire. + + Used when the agent reply is delivered out-of-band via :class:`ChannelPush`. + """ + pushed = ", ".join(report.pushed) if report.pushed else "(none)" + return f"[delivered out-of-band → {pushed}]" + + +class ResponsesChannel: + """Minimal OpenAI-Responses-shaped surface. + + Mounts ``POST /responses`` (default path ``/responses`` so the + full route is ``/responses/responses`` when the channel is prefixed, + or just ``/responses`` when ``path=""``). + """ + + name = "responses" + + def __init__( + self, + *, + path: str = "", + run_hook: ChannelRunHook | None = None, + response_id_factory: Callable[..., str] | None = None, + ) -> None: + """Create a Responses channel. + + Args: + path: Mount prefix on the host. Default ``""`` mounts the + ``POST /responses`` route at the app root, matching the + upstream OpenAI surface. + run_hook: Optional :data:`ChannelRunHook` invoked with the + parsed :class:`ChannelRequest` before the agent target + runs. May return a replacement request. + response_id_factory: Optional callable that mints the + per-request response id. Default produces + ``resp_`` which matches the OpenAI Responses + wire shape. Override when the host backing storage + requires a different id format (e.g. Foundry storage, + whose partition keys are encoded in the id and which + rejects free-form ``resp_*`` ids with a server error). + The same id is used for the channel envelope and for + the host-side anchoring (``ChannelRequest.attributes``) + so storage and replay agree. + """ + self.path = path + self._hook = run_hook + self._ctx: ChannelContext | None = None + self._response_id_factory: Callable[..., str] = ( + response_id_factory if response_id_factory is not None else (lambda *_a, **_kw: f"resp_{uuid.uuid4().hex}") + ) + + def contribute(self, context: ChannelContext) -> ChannelContribution: + """Capture the host-supplied context and register ``POST /responses``.""" + self._ctx = context + return ChannelContribution(routes=[Route("/responses", self._handle, methods=["POST"])]) + + async def _handle(self, request: Request) -> Response: + """Handle a single ``POST /responses`` call. + + Parses the OpenAI Responses-shaped body into ``ChatMessage`` / + ``options`` / ``ChannelSession`` triples via :mod:`._parsing`, + applies the optional ``run_hook``, and either streams an SSE + response stream or returns a one-shot OpenAI ``Response`` envelope. + Non-originating ``response_target`` values resolve to a delivery + acknowledgement instead of echoing the agent text on this wire. + """ + if self._ctx is None: # pragma: no cover - guarded by Channel lifecycle + return JSONResponse({"error": "channel not initialized"}, status_code=500) + try: + body = await request.json() + except Exception: + return JSONResponse({"error": "invalid json"}, status_code=400) + + try: + messages, options, session = parse_responses_request(body) + except ValueError as exc: + return JSONResponse({"error": str(exc)}, status_code=422) + + # When no ``previous_response_id`` chain anchor is on the body, + # surface the platform-injected ``x-agent-chat-isolation-key`` as + # the channel session so callers without an explicit anchor still + # get a stable per-conversation session id (used by non-Foundry + # history providers, routing/idempotency, etc.). The chat-iso + # value is *not* a valid storage anchor; the Foundry history + # provider deliberately ignores it — multi-turn storage chaining + # goes through the ``previous_response_id`` / bound + # ``response_id`` pair on ``ChannelRequest.attributes``. The + # user-iso companion header is consumed at the host level by + # ``_FoundryIsolationASGIMiddleware`` so the channel never has + # to import Foundry-specific types. + chat_iso = request.headers.get("x-agent-chat-isolation-key") + if session is None and chat_iso: + session = ChannelSession(isolation_key=chat_iso) + + # Mint the response id once per request so the channel envelope + # (one-shot or streamed) and any host-side anchoring (e.g. the + # Foundry history provider's ``bind_request_context``) agree on + # the same handle. The next turn arrives with this value as + # ``previous_response_id`` and the storage chain walks. We pass + # both anchors via ``ChannelRequest.attributes`` so the host + # can pick them up without a channel-specific contract. + previous_response_id: str | None = None + prev_raw = body.get("previous_response_id") + if isinstance(prev_raw, str) and prev_raw: + previous_response_id = prev_raw + # Pass the previous id (if any) as a hint to the factory so id + # backends that embed partition keys (e.g. Foundry storage) can + # co-locate the new record with the chain's existing partition. + # No-arg factories continue to work via ``Callable[..., str]``. + response_id = self._response_id_factory(previous_response_id) + + attributes: dict[str, Any] = {"response_id": response_id} + if previous_response_id is not None: + attributes["previous_response_id"] = previous_response_id + + # Honor the OpenAI-Responses ``stream`` flag — non-streaming by + # default, SSE when the caller opts in. Run hooks may still flip + # this per-request (e.g. force non-streaming for a particular user). + channel_request = ChannelRequest( + channel=self.name, + operation="message.create", + input=messages, + session=session, + options=options or None, + stream=bool(body.get("stream", False)), + identity=parse_responses_identity(body, self.name), + response_target=parse_response_target(body), + attributes=attributes, + ) + + if self._hook is not None: + channel_request = await apply_run_hook( + self._hook, + channel_request, + target=self._ctx.target, + protocol_request=body, + ) + + if channel_request.stream: + return StreamingResponse( + self._stream_events(channel_request, body, response_id=response_id), + media_type="text/event-stream", + headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, + ) + + result = await self._ctx.run(channel_request) + report = await self._ctx.deliver_response(channel_request, result) + text = result.text if report.include_originating else _ack_text(report) + envelope = self._build_response(body, text, status="completed", response_id=response_id) + return JSONResponse(envelope.model_dump(mode="json", exclude_none=True)) + + def _build_response( + self, + body: Mapping[str, Any], + text: str, + *, + status: str, + response_id: str | None = None, + ) -> OpenAIResponse: + """Construct an OpenAI ``Response`` for a finished (non-streaming) run. + + ``status`` mirrors the top-level Response status set values + (``in_progress`` / ``completed`` / ``failed`` / ``incomplete`` / + ``cancelled``). The nested ``ResponseOutputMessage.status`` field + only accepts ``in_progress`` / ``completed`` / ``incomplete``, so + terminal-but-non-success states collapse to ``incomplete`` there + — the failure detail still travels via the top-level ``status`` + and (for streamed errors) the ``error`` field. + + ``response_id``: the per-request id minted in :meth:`_handle`. + Passed in so envelope and storage agree on a single handle per + turn (see :meth:`_handle` notes). Falls back to a fresh uuid + when callers (e.g. :meth:`_stream_events`'s skeleton path + before this argument was introduced) don't supply one. + """ + message_status = status if status in ("in_progress", "completed", "incomplete") else "incomplete" + return OpenAIResponse( + id=response_id or self._response_id_factory(None), + object="response", + created_at=time.time(), + status=status, # type: ignore[arg-type] + model=body.get("model", "agent"), + output=[ + ResponseOutputMessage( + id=f"msg_{uuid.uuid4().hex}", + type="message", + role="assistant", + status=message_status, # type: ignore[arg-type] + content=[ResponseOutputText(type="output_text", text=text, annotations=[])], + ) + ], + parallel_tool_calls=False, + tool_choice="auto", + tools=[], + metadata={}, + ) + + async def _stream_events( + self, + request: ChannelRequest, + body: Mapping[str, Any], + *, + response_id: str, + ) -> AsyncIterator[str]: + """Yield SSE events shaped like the OpenAI Responses streaming protocol. + + Emits ``response.created`` → many ``response.output_text.delta`` + → ``response.completed`` (or ``response.failed`` on error). + """ + if self._ctx is None: # pragma: no cover - guarded by Channel lifecycle + return + + msg_id = f"msg_{uuid.uuid4().hex}" + seq = 0 + + def next_seq() -> int: + nonlocal seq + seq += 1 + return seq + + def sse(event: Any) -> str: + return f"event: {event.type}\ndata: {event.model_dump_json(exclude_none=True)}\n\n" + + skeleton = self._build_response(body, "", status="in_progress", response_id=response_id) + yield sse(ResponseCreatedEvent(type="response.created", response=skeleton, sequence_number=next_seq())) + + accumulated = "" + try: + stream = self._ctx.run_stream(request) + async for update in stream: + chunk = getattr(update, "text", None) + if chunk: + accumulated += chunk + yield sse( + ResponseTextDeltaEvent( + type="response.output_text.delta", + item_id=msg_id, + output_index=0, + content_index=0, + delta=chunk, + logprobs=[], + sequence_number=next_seq(), + ) + ) + try: + # Finalize so context-provider / history hooks on the agent + # still run even though we are emitting our own SSE. + await stream.get_final_response() + except Exception: # pragma: no cover - finalize is best-effort + logger.exception("Responses stream finalize failed") + except Exception as exc: + logger.exception("Responses stream consumption failed") + failed = self._build_response(body, accumulated, status="failed", response_id=response_id) + failed.error = ResponseError(code="server_error", message=str(exc)) + yield sse( + ResponseFailedEvent( + type="response.failed", + response=failed, + sequence_number=next_seq(), + ) + ) + return + + completed_text = accumulated + report = await self._ctx.deliver_response(request, HostedRunResult(text=accumulated)) + if not report.include_originating: + completed_text = _ack_text(report) + completed = self._build_response(body, completed_text, status="completed", response_id=response_id) + # Reuse the same message id we emitted deltas under. + if completed.output and isinstance(completed.output[0], ResponseOutputMessage): + completed.output[0].id = msg_id + yield sse( + ResponseCompletedEvent( + type="response.completed", + response=completed, + sequence_number=next_seq(), + ) + ) + + +__all__ = ["ResponsesChannel"] diff --git a/python/packages/hosting-responses/agent_framework_hosting_responses/_parsing.py b/python/packages/hosting-responses/agent_framework_hosting_responses/_parsing.py new file mode 100644 index 0000000000..914da0cb6d --- /dev/null +++ b/python/packages/hosting-responses/agent_framework_hosting_responses/_parsing.py @@ -0,0 +1,238 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Parsing helpers for the OpenAI Responses-API request body. + +The Responses API accepts ``input`` as either a string or a list of "input +items". An item is either a content part (``input_text`` / ``input_image`` +/ ``input_file``) or a message envelope ``{type: "message", role, +content: [...]}``. We translate that into an Agent Framework ``Message`` +list and split out the ChatOptions-shaped fields the API also carries. +""" + +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any, cast + +from agent_framework import Content, Message +from agent_framework_hosting import ChannelIdentity, ChannelSession, ResponseTarget, logger + +# OpenAI Responses field name → Agent Framework ChatOptions field name. +_RESPONSES_OPTION_REMAP = { + "max_output_tokens": "max_tokens", + "parallel_tool_calls": "allow_multiple_tool_calls", +} +# Fields we forward to ChatOptions verbatim. +_RESPONSES_OPTION_PASSTHROUGH = { + "temperature", + "top_p", + "metadata", + "user", + "safety_identifier", + "tool_choice", + "tools", + "store", + "response_format", + "stop", + "seed", + "frequency_penalty", + "presence_penalty", + "logit_bias", + "instructions", +} +# Fields the Responses transport owns; they must not be forwarded as options. +_RESPONSES_TRANSPORT_KEYS = {"input", "model", "stream", "previous_response_id", "response_target"} + + +def parse_response_target(body: Mapping[str, Any]) -> ResponseTarget: + """Translate the OpenAI Responses ``response_target`` field into a :class:`ResponseTarget`. + + Accepted shapes: + + - ``"originating"`` / ``"active"`` / ``"all_linked"`` / ``"none"`` — bare strings. + - ``"telegram"`` / ``"telegram:"`` — single channel destination. + - ``["telegram:", "originating"]`` — list of destinations; the + pseudo-name ``"originating"`` includes the originating channel. + - ``{"channels": [...]}`` — same list semantics with the explicit key. + - ``{"kind": "active"}`` / ``{"kind": "all_linked"}`` — explicit kind. + + Anything malformed is logged at WARNING and falls back to ``originating``. + """ + raw = body.get("response_target") + if raw is None: + return ResponseTarget.originating # type: ignore[attr-defined,no-any-return] + if isinstance(raw, str): + keyword = raw.strip() + if keyword == "originating": + return ResponseTarget.originating # type: ignore[attr-defined,no-any-return] + if keyword == "active": + return ResponseTarget.active # type: ignore[attr-defined,no-any-return] + if keyword == "all_linked": + return ResponseTarget.all_linked # type: ignore[attr-defined,no-any-return] + if keyword == "none": + return ResponseTarget.none # type: ignore[attr-defined,no-any-return] + # Treat any other bare string as a single channel destination. + return ResponseTarget.channel(keyword) + if isinstance(raw, list): + return _parse_channels_list(cast("list[Any]", raw)) # type: ignore[redundant-cast] + if isinstance(raw, Mapping): + raw_map = cast("Mapping[str, Any]", raw) + channels = raw_map.get("channels") + if isinstance(channels, list): + return _parse_channels_list(cast("list[Any]", channels)) # type: ignore[redundant-cast] + kind = raw_map.get("kind") + if kind == "active": + return ResponseTarget.active # type: ignore[attr-defined,no-any-return] + if kind == "all_linked": + return ResponseTarget.all_linked # type: ignore[attr-defined,no-any-return] + if kind == "none": + return ResponseTarget.none # type: ignore[attr-defined,no-any-return] + if kind == "originating": + return ResponseTarget.originating # type: ignore[attr-defined,no-any-return] + logger.warning("responses: ignoring malformed response_target=%r", cast("Any", raw)) + return ResponseTarget.originating # type: ignore[attr-defined,no-any-return] + + +def _parse_channels_list(raw: list[Any]) -> ResponseTarget: + """Build a ``ResponseTarget.channels`` from a raw list, dropping non-string entries. + + An empty list (or one with no usable strings) collapses back to + ``originating`` so we never silently produce a target that nobody + will deliver to. + """ + tokens = [t for t in raw if isinstance(t, str) and t] + if len(tokens) != len(raw): + logger.warning("responses: dropping non-string entries from response_target=%r", raw) + if not tokens: + return ResponseTarget.originating # type: ignore[attr-defined,no-any-return] + return ResponseTarget.channels(tokens) + + +def parse_responses_identity(body: Mapping[str, Any], channel_name: str) -> ChannelIdentity | None: + """Surface the caller as a :class:`ChannelIdentity` so the host can record it. + + OpenAI Responses replaced ``user`` with ``safety_identifier`` — we use + that as the native id, falling back to the legacy ``user`` field. + """ + native = body.get("safety_identifier") or body.get("user") + if not isinstance(native, str) or not native: + return None + return ChannelIdentity(channel=channel_name, native_id=native) + + +def _content_from_input_item(item: Mapping[str, Any]) -> Content: + """Convert a single OpenAI Responses ``input`` item into a :class:`Content` part. + + Handles the ``input_text``/``output_text``/``text`` text variants, + ``input_image`` URL references, and ``input_file`` references via either + a public URL or a hosted ``file_id``. Raises ``ValueError`` for any + unsupported item type so the surrounding parser can return a 422. + """ + item_type = item.get("type") + if item_type in ("input_text", "output_text", "text"): + return Content.from_text(text=str(item.get("text", ""))) + if item_type == "input_image": + image_url: Any = item.get("image_url") + if isinstance(image_url, Mapping): + image_url = cast("Mapping[str, Any]", image_url).get("url") + if not isinstance(image_url, str): + raise ValueError("input_image requires `image_url`") + return Content.from_uri(uri=image_url, media_type="image/*") + if item_type == "input_file": + if (uri := item.get("file_url")) and isinstance(uri, str): + return Content.from_uri(uri=uri, media_type=item.get("mime_type")) + if file_id := item.get("file_id"): + return Content(type="hosted_file", file_id=str(file_id)) + raise ValueError("input_file requires `file_url` or `file_id`") + raise ValueError(f"Unsupported Responses input content type: {item_type!r}") + + +def messages_from_responses_input(value: Any) -> list[Message]: + """Translate ``input`` (string or list of items) into :class:`Message` objects.""" + if isinstance(value, str): + return [Message("user", [Content.from_text(text=value)])] + if not isinstance(value, list) or not value: + raise ValueError("`input` must be a non-empty string or list") + + messages: list[Message] = [] + pending_user_parts: list[Content] = [] + + def flush() -> None: + """Emit any buffered loose user content as a single user message.""" + if pending_user_parts: + messages.append(Message("user", list(pending_user_parts))) + pending_user_parts.clear() + + for item in cast("list[Any]", value): # type: ignore[redundant-cast] + if not isinstance(item, Mapping): + raise ValueError("each `input` item must be an object") + item_map = cast("Mapping[str, Any]", item) + if item_map.get("type") == "message": + flush() + role = str(item_map.get("role") or "user") + content: Any = item_map.get("content") or [] + parts: list[Content] + if isinstance(content, str): + parts = [Content.from_text(text=content)] + elif isinstance(content, list): + parts = [ + _content_from_input_item(cast("Mapping[str, Any]", c)) + for c in cast("list[Any]", content) # type: ignore[redundant-cast] + if isinstance(c, Mapping) + ] + else: + parts = [] + messages.append(Message(role, parts)) + else: + pending_user_parts.append(_content_from_input_item(item_map)) + + flush() + if not messages: + raise ValueError("`input` produced no messages") + return messages + + +def parse_responses_request( + body: Mapping[str, Any], +) -> tuple[list[Message], dict[str, Any], ChannelSession | None]: + """Translate a Responses-API request body into Agent Framework constructs. + + Returns a triple ``(messages, options, session)`` where: + + - ``messages`` is the parsed conversation (``instructions`` is prepended + as a system message when present). + - ``options`` is a ``ChatOptions``-shaped dict with the model-tunable + fields the channel lifted off the body. + - ``session`` is a :class:`ChannelSession` keyed by + ``previous_response_id`` when one was supplied, else ``None``. + """ + messages = messages_from_responses_input(body.get("input")) + + if (instructions := body.get("instructions")) and isinstance(instructions, str): + messages.insert(0, Message("system", [Content.from_text(text=instructions)])) + + options: dict[str, Any] = {} + for key, value in body.items(): + if key in _RESPONSES_TRANSPORT_KEYS or value is None: + continue + if key == "instructions": + continue + if (mapped := _RESPONSES_OPTION_REMAP.get(key)) is not None: + options[mapped] = value + elif key in _RESPONSES_OPTION_PASSTHROUGH: + options[key] = value + # silently drop everything else (truncation, reasoning, include, ...) + + session: ChannelSession | None = None + if (prev := body.get("previous_response_id")) and isinstance(prev, str): + session = ChannelSession(isolation_key=prev) + + return messages, options, session + + +__all__ = [ + "messages_from_responses_input", + "parse_response_target", + "parse_responses_identity", + "parse_responses_request", +] diff --git a/python/packages/hosting-responses/pyproject.toml b/python/packages/hosting-responses/pyproject.toml new file mode 100644 index 0000000000..6606c94455 --- /dev/null +++ b/python/packages/hosting-responses/pyproject.toml @@ -0,0 +1,98 @@ +[project] +name = "agent-framework-hosting-responses" +description = "OpenAI Responses-shaped channel for agent-framework-hosting." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0a260424" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.2.0,<2", + "agent-framework-hosting==1.0.0a260424", + "openai>=1.99.0,<3", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'", + "sys_platform == 'win32'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +filterwarnings = [] +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +include = ["agent_framework_hosting_responses"] +exclude = ['tests'] + +[tool.mypy] +plugins = ['pydantic.mypy'] +strict = true +python_version = "3.10" +ignore_missing_imports = true +disallow_untyped_defs = true +no_implicit_optional = true +check_untyped_defs = true +warn_return_any = true +show_error_codes = true +warn_unused_ignores = false +disallow_incomplete_defs = true +disallow_untyped_decorators = true + +[tool.bandit] +targets = ["agent_framework_hosting_responses"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks.mypy] +help = "Run MyPy for this package." +cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hosting_responses" + +[tool.poe.tasks.test] +help = "Run the default unit test suite for this package." +cmd = 'pytest -m "not integration" --cov=agent_framework_hosting_responses --cov-report=term-missing:skip-covered tests' + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" diff --git a/python/packages/hosting-responses/tests/__init__.py b/python/packages/hosting-responses/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/packages/hosting-responses/tests/test_channel.py b/python/packages/hosting-responses/tests/test_channel.py new file mode 100644 index 0000000000..c5db78b1b8 --- /dev/null +++ b/python/packages/hosting-responses/tests/test_channel.py @@ -0,0 +1,252 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""End-to-end tests for :class:`ResponsesChannel` via Starlette's ``TestClient``.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from dataclasses import dataclass +from typing import Any + +from agent_framework_hosting import ( + AgentFrameworkHost, + ChannelIdentity, + HostedRunResult, +) +from starlette.testclient import TestClient + +from agent_framework_hosting_responses import ResponsesChannel + +# --------------------------------------------------------------------------- # +# Fakes # +# --------------------------------------------------------------------------- # + + +@dataclass +class _FakeAgentResponse: + text: str + + +@dataclass +class _FakeUpdate: + text: str + + +class _FakeStream: + """Minimal stand-in for AF's ``ResponseStream`` returned by ``run(stream=True)``.""" + + def __init__(self, chunks: list[str]) -> None: + self._chunks = chunks + self._final = _FakeAgentResponse(text="".join(chunks)) + + def __aiter__(self) -> AsyncIterator[_FakeUpdate]: + async def _gen() -> AsyncIterator[_FakeUpdate]: + for c in self._chunks: + yield _FakeUpdate(c) + + return _gen() + + async def get_final_response(self) -> _FakeAgentResponse: + return self._final + + +class _FakeAgent: + def __init__(self, reply: str = "hello", chunks: list[str] | None = None) -> None: + self._reply = reply + self._chunks = chunks or [reply] + self.calls: list[dict[str, Any]] = [] + + def create_session(self, *, session_id: str | None = None) -> Any: + return {"session_id": session_id} + + def run(self, messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any: + self.calls.append({"messages": messages, "stream": stream, "kwargs": kwargs}) + if stream: + return _FakeStream(self._chunks) + + async def _coro() -> _FakeAgentResponse: + return _FakeAgentResponse(text=self._reply) + + return _coro() + + +class _RecordingPushChannel: + name = "telegram" + path = "/telegram" + + def __init__(self) -> None: + self.pushes: list[tuple[ChannelIdentity, HostedRunResult]] = [] + + def contribute(self, _ctx: Any) -> Any: + from agent_framework_hosting import ChannelContribution + + return ChannelContribution() + + async def push(self, identity: ChannelIdentity, payload: HostedRunResult) -> None: + self.pushes.append((identity, payload)) + + +# --------------------------------------------------------------------------- # +# Tests # +# --------------------------------------------------------------------------- # + + +def _make_client(agent: _FakeAgent | None = None) -> tuple[TestClient, AgentFrameworkHost, _FakeAgent]: + agent = agent or _FakeAgent() + host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel()]) + return TestClient(host.app), host, agent + + +class TestResponsesChannelNonStreaming: + def test_post_responses_returns_completed_envelope(self) -> None: + client, _host, agent = _make_client(_FakeAgent(reply="hi back")) + with client: + r = client.post("/responses", json={"input": "hi"}) + assert r.status_code == 200 + body = r.json() + assert body["status"] == "completed" + assert body["object"] == "response" + assert body["id"].startswith("resp_") + assert body["output"][0]["content"][0]["text"] == "hi back" + assert len(agent.calls) == 1 + + def test_invalid_json_returns_400(self) -> None: + client, *_ = _make_client() + with client: + r = client.post("/responses", content=b"{not json", headers={"content-type": "application/json"}) + assert r.status_code == 400 + + def test_invalid_input_returns_422(self) -> None: + client, *_ = _make_client() + with client: + r = client.post("/responses", json={"input": 42}) + assert r.status_code == 422 + + def test_options_propagate_to_target_run(self) -> None: + client, _host, agent = _make_client() + with client: + r = client.post("/responses", json={"input": "x", "temperature": 0.5, "max_output_tokens": 64}) + assert r.status_code == 200 + opts = agent.calls[0]["kwargs"]["options"] + assert opts == {"temperature": 0.5, "max_tokens": 64} + + def test_previous_response_id_creates_session(self) -> None: + client, _host, agent = _make_client() + with client: + client.post("/responses", json={"input": "x", "previous_response_id": "resp_42"}) + # AgentFrameworkHost converts the channel session into an AgentSession. + sess = agent.calls[0]["kwargs"].get("session") + assert sess is not None + # _FakeAgent.create_session stashes the session_id on the dict it returns. + assert sess["session_id"] == "resp_42" + + def test_chat_isolation_header_creates_session_when_no_prev_id(self) -> None: + """Foundry-style ``x-agent-chat-isolation-key`` falls back to a session anchor. + + First-turn requests have no ``previous_response_id`` (the client + doesn't have one yet), but Foundry Hosted Agents always inject + the isolation headers. The channel must derive a session from the + chat key so the host can build a stable per-conversation session + that history providers persist under. + """ + client, _host, agent = _make_client() + with client: + client.post( + "/responses", + json={"input": "x"}, + headers={"x-agent-chat-isolation-key": "chat-abc"}, + ) + sess = agent.calls[0]["kwargs"].get("session") + assert sess is not None + assert sess["session_id"] == "chat-abc" + + def test_prev_response_id_wins_over_chat_isolation_header(self) -> None: + """When both anchors are present, ``previous_response_id`` wins. + + ``previous_response_id`` is the protocol-native chain anchor; the + header fallback is only meant to bootstrap when no protocol + anchor exists. + """ + client, _host, agent = _make_client() + with client: + client.post( + "/responses", + json={"input": "x", "previous_response_id": "resp_99"}, + headers={"x-agent-chat-isolation-key": "chat-abc"}, + ) + sess = agent.calls[0]["kwargs"].get("session") + assert sess is not None + assert sess["session_id"] == "resp_99" + + def test_response_target_channel_returns_ack_text_when_pushed(self) -> None: + agent = _FakeAgent(reply="real reply") + push_ch = _RecordingPushChannel() + host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel(), push_ch]) + + with TestClient(host.app) as client: + r = client.post( + "/responses", + json={ + "input": "hi", + "response_target": "telegram:42", + }, + ) + assert r.status_code == 200 + body = r.json() + text = body["output"][0]["content"][0]["text"] + assert "delivered out-of-band" in text + assert "telegram:42" in text + assert push_ch.pushes and push_ch.pushes[0][1].text == "real reply" + assert push_ch.pushes[0][0].native_id == "42" + + +class TestResponsesChannelStreaming: + def test_sse_emits_created_delta_completed(self) -> None: + agent = _FakeAgent(reply="hello world", chunks=["hello", " ", "world"]) + host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel()]) + with TestClient(host.app) as client: + r = client.post("/responses", json={"input": "hi", "stream": True}) + assert r.status_code == 200 + body = r.text + + # SSE event lines look like "event: \ndata: \n\n". + events = [line[len("event: ") :] for line in body.splitlines() if line.startswith("event: ")] + assert events[0] == "response.created" + assert events[-1] == "response.completed" + assert events.count("response.output_text.delta") == 3 + + def test_sse_emits_failed_when_stream_raises(self) -> None: + # Regression: ResponseOutputMessage.status only accepts in_progress/ + # completed/incomplete, so building an OpenAIResponse with status="failed" + # used to crash with a pydantic ValidationError. The channel must map the + # nested message status to "incomplete" while keeping the top-level + # Response.status="failed". + class _BoomStream: + def __aiter__(self) -> AsyncIterator[_FakeUpdate]: + async def _gen() -> AsyncIterator[_FakeUpdate]: + yield _FakeUpdate("partial") + raise RuntimeError("upstream blew up") + + return _gen() + + async def get_final_response(self) -> _FakeAgentResponse: # pragma: no cover + return _FakeAgentResponse(text="") + + class _BoomAgent(_FakeAgent): + def run(self, messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any: + self.calls.append({"messages": messages, "stream": stream, "kwargs": kwargs}) + if stream: + return _BoomStream() + raise AssertionError("non-streaming path not exercised here") + + host = AgentFrameworkHost(target=_BoomAgent(), channels=[ResponsesChannel()]) + with TestClient(host.app) as client: + r = client.post("/responses", json={"input": "hi", "stream": True}) + assert r.status_code == 200 + body = r.text + + events = [line[len("event: ") :] for line in body.splitlines() if line.startswith("event: ")] + assert events[0] == "response.created" + assert events[-1] == "response.failed" + # The failed envelope must serialize cleanly — i.e. no ValidationError raised. + assert "upstream blew up" in body diff --git a/python/packages/hosting-responses/tests/test_parsing.py b/python/packages/hosting-responses/tests/test_parsing.py new file mode 100644 index 0000000000..566c321d11 --- /dev/null +++ b/python/packages/hosting-responses/tests/test_parsing.py @@ -0,0 +1,205 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for the OpenAI Responses request-body parser.""" + +from __future__ import annotations + +import pytest +from agent_framework_hosting import ResponseTarget, ResponseTargetKind + +from agent_framework_hosting_responses import ( + messages_from_responses_input, + parse_response_target, + parse_responses_identity, + parse_responses_request, +) + + +class TestMessagesFromResponsesInput: + def test_string_input_becomes_single_user_message(self) -> None: + msgs = messages_from_responses_input("hello") + assert len(msgs) == 1 + assert msgs[0].role == "user" + assert msgs[0].text == "hello" + + def test_input_text_items_collapse_into_one_user_message(self) -> None: + msgs = messages_from_responses_input([{"type": "input_text", "text": "a"}, {"type": "input_text", "text": "b"}]) + assert len(msgs) == 1 + assert msgs[0].role == "user" + assert msgs[0].text == "a b" + + def test_message_envelope_with_string_content(self) -> None: + msgs = messages_from_responses_input([ + {"type": "message", "role": "system", "content": "be brief"}, + {"type": "message", "role": "user", "content": "hi"}, + ]) + assert [m.role for m in msgs] == ["system", "user"] + assert msgs[0].text == "be brief" + + def test_message_envelope_with_content_parts(self) -> None: + msgs = messages_from_responses_input([ + { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": "describe this"}], + } + ]) + assert msgs[0].text == "describe this" + + def test_pending_text_flushes_before_message_envelope(self) -> None: + msgs = messages_from_responses_input([ + {"type": "input_text", "text": "first"}, + {"type": "message", "role": "user", "content": "second"}, + ]) + assert len(msgs) == 2 + assert msgs[0].text == "first" + assert msgs[1].text == "second" + + def test_image_url_via_string(self) -> None: + msgs = messages_from_responses_input([{"type": "input_image", "image_url": "https://example.com/cat.png"}]) + assert len(msgs) == 1 + # Image content present. + assert any(getattr(c, "uri", None) == "https://example.com/cat.png" for c in msgs[0].contents) + + def test_image_url_via_object(self) -> None: + msgs = messages_from_responses_input([ + {"type": "input_image", "image_url": {"url": "https://example.com/cat.png"}} + ]) + assert any(getattr(c, "uri", None) == "https://example.com/cat.png" for c in msgs[0].contents) + + def test_unknown_input_type_raises(self) -> None: + with pytest.raises(ValueError, match="Unsupported"): + messages_from_responses_input([{"type": "weird"}]) + + def test_empty_list_raises(self) -> None: + with pytest.raises(ValueError, match="non-empty"): + messages_from_responses_input([]) + + def test_non_string_non_list_raises(self) -> None: + with pytest.raises(ValueError): + messages_from_responses_input(42) # type: ignore[arg-type] + + def test_image_url_missing_raises(self) -> None: + with pytest.raises(ValueError, match="image_url"): + messages_from_responses_input([{"type": "input_image"}]) + + +class TestParseResponsesRequest: + def test_instructions_become_system_message_prefix(self) -> None: + msgs, opts, sess = parse_responses_request({"input": "hi", "instructions": "be brief"}) + assert msgs[0].role == "system" + assert msgs[0].text == "be brief" + assert msgs[1].role == "user" + # 'instructions' is consumed, not forwarded as an option. + assert "instructions" not in opts + assert sess is None + + def test_options_passthrough(self) -> None: + _, opts, _ = parse_responses_request({"input": "x", "temperature": 0.4, "top_p": 0.9, "tool_choice": "auto"}) + assert opts["temperature"] == 0.4 + assert opts["top_p"] == 0.9 + assert opts["tool_choice"] == "auto" + + def test_options_remap(self) -> None: + _, opts, _ = parse_responses_request({"input": "x", "max_output_tokens": 256, "parallel_tool_calls": False}) + assert opts == {"max_tokens": 256, "allow_multiple_tool_calls": False} + + def test_transport_keys_not_forwarded(self) -> None: + _, opts, _ = parse_responses_request({ + "input": "x", + "model": "gpt-x", + "stream": True, + "previous_response_id": "r", + }) + for key in ("input", "model", "stream", "previous_response_id"): + assert key not in opts + + def test_unknown_keys_silently_dropped(self) -> None: + _, opts, _ = parse_responses_request({"input": "x", "truncation": "auto", "reasoning": {"effort": "low"}}) + assert opts == {} + + def test_none_values_dropped(self) -> None: + _, opts, _ = parse_responses_request({"input": "x", "temperature": None}) + assert "temperature" not in opts + + def test_previous_response_id_becomes_session(self) -> None: + _, _, sess = parse_responses_request({"input": "x", "previous_response_id": "resp_42"}) + assert sess is not None + assert sess.isolation_key == "resp_42" + + +class TestParseResponseTarget: + def test_default_originating_when_missing(self) -> None: + assert parse_response_target({}).kind is ResponseTargetKind.ORIGINATING + + @pytest.mark.parametrize( + "value,expected_kind", + [ + ("originating", ResponseTargetKind.ORIGINATING), + ("active", ResponseTargetKind.ACTIVE), + ("all_linked", ResponseTargetKind.ALL_LINKED), + ("none", ResponseTargetKind.NONE), + ], + ) + def test_bare_string_kinds(self, value: str, expected_kind: ResponseTargetKind) -> None: + assert parse_response_target({"response_target": value}).kind is expected_kind + + def test_bare_string_other_becomes_channel(self) -> None: + target = parse_response_target({"response_target": "telegram"}) + assert target == ResponseTarget.channel("telegram") + + def test_bare_string_with_native_id_becomes_channel(self) -> None: + target = parse_response_target({"response_target": "telegram:42"}) + assert target.kind is ResponseTargetKind.CHANNELS + assert target.targets == ("telegram:42",) + + def test_list_form(self) -> None: + target = parse_response_target({"response_target": ["telegram:42", "originating"]}) + assert target == ResponseTarget.channels(["telegram:42", "originating"]) + + def test_list_drops_non_strings(self) -> None: + target = parse_response_target({"response_target": ["telegram", 42, ""]}) + assert target.targets == ("telegram",) + + def test_empty_list_falls_back_to_originating(self) -> None: + target = parse_response_target({"response_target": []}) + assert target.kind is ResponseTargetKind.ORIGINATING + + def test_dict_with_channels(self) -> None: + target = parse_response_target({"response_target": {"channels": ["a", "b"]}}) + assert target == ResponseTarget.channels(["a", "b"]) + + @pytest.mark.parametrize( + "kind,expected", + [ + ("active", ResponseTargetKind.ACTIVE), + ("all_linked", ResponseTargetKind.ALL_LINKED), + ("none", ResponseTargetKind.NONE), + ("originating", ResponseTargetKind.ORIGINATING), + ], + ) + def test_dict_kind(self, kind: str, expected: ResponseTargetKind) -> None: + assert parse_response_target({"response_target": {"kind": kind}}).kind is expected + + def test_malformed_falls_back_to_originating(self) -> None: + target = parse_response_target({"response_target": 42}) + assert target.kind is ResponseTargetKind.ORIGINATING + + +class TestParseResponsesIdentity: + def test_safety_identifier_preferred(self) -> None: + ident = parse_responses_identity({"safety_identifier": "abc", "user": "legacy"}, "responses") + assert ident is not None + assert ident.native_id == "abc" + assert ident.channel == "responses" + + def test_fallback_to_user(self) -> None: + ident = parse_responses_identity({"user": "legacy"}, "responses") + assert ident is not None + assert ident.native_id == "legacy" + + def test_returns_none_when_absent(self) -> None: + assert parse_responses_identity({}, "responses") is None + + def test_returns_none_for_non_string(self) -> None: + assert parse_responses_identity({"safety_identifier": 42}, "responses") is None