From 46d4c9c0cc77038c08c4a6292313c3f237a636c8 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Mon, 4 May 2026 17:31:08 +0200 Subject: [PATCH 1/3] feat(hosting): add agent-framework-hosting core package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New ``agent-framework-hosting`` package implementing ADR 0026 / SPEC-002: the channel-neutral host that lets a single ``Agent`` (or ``Workflow``) fan out across multiple wire protocols ("channels") behind one Starlette ASGI app. Surface (re-exported from ``agent_framework_hosting``): - ``AgentFrameworkHost`` — wraps a hostable target, mounts channels onto an ASGI app, owns per-isolation-key ``AgentSession`` reuse, threads request context (``response_id`` / ``previous_response_id``) into context providers via an ``ExitStack`` of ``bind_request_context`` calls, and exposes an opt-in Hypercorn ``serve()`` helper (extra ``[serve]``). - ``Channel`` protocol + ``ChannelContribution`` — the surface a channel package implements (routes, lifespans, identity hooks, …). - ``ChannelRequest`` / ``ChannelSession`` / ``ChannelIdentity`` / ``ChannelPush`` / ``ChannelCommand[Context]`` / ``ChannelRunHook`` / ``ChannelStreamTransformHook`` / ``DeliveryReport`` / ``HostedRunResult`` / ``ResponseTarget`` / ``ResponseTargetKind`` / ``apply_run_hook`` — channel-side dataclasses + helpers. - ``IsolationKeys`` + ``ISOLATION_HEADER_USER`` / ``..._CHAT`` + ``get/set/reset_current_isolation_keys`` — the host's ASGI middleware reads the ``x-agent-{user,chat}-isolation-key`` headers off each inbound request and exposes them to the agent stack via a ``ContextVar`` so storage-side providers (e.g. ``FoundryHostedAgentHistoryProvider``) can apply per-tenant partitioning without channels having to forward anything. Includes 45 unit tests covering the host, channel contributions, isolation contextvar, and shared types. Registers the package in ``python/pyproject.toml`` ``[tool.uv.sources]`` and adds the matching pyright ``executionEnvironments`` entry for tests. Hypercorn is an optional dependency (``[serve]`` extra); the soft import in ``serve()`` is annotated for pyright since it isn't on the default install. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/hosting/LICENSE | 21 + python/packages/hosting/README.md | 59 ++ .../agent_framework_hosting/__init__.py | 74 ++ .../hosting/agent_framework_hosting/_host.py | 946 ++++++++++++++++++ .../agent_framework_hosting/_isolation.py | 76 ++ .../hosting/agent_framework_hosting/_types.py | 376 +++++++ python/packages/hosting/pyproject.toml | 107 ++ python/packages/hosting/tests/__init__.py | 0 .../hosting/tests/_workflow_fixtures.py | 43 + python/packages/hosting/tests/test_host.py | 714 +++++++++++++ python/packages/hosting/tests/test_types.py | 105 ++ python/pyproject.toml | 2 + python/uv.lock | 39 +- 13 files changed, 2558 insertions(+), 4 deletions(-) create mode 100644 python/packages/hosting/LICENSE create mode 100644 python/packages/hosting/README.md create mode 100644 python/packages/hosting/agent_framework_hosting/__init__.py create mode 100644 python/packages/hosting/agent_framework_hosting/_host.py create mode 100644 python/packages/hosting/agent_framework_hosting/_isolation.py create mode 100644 python/packages/hosting/agent_framework_hosting/_types.py create mode 100644 python/packages/hosting/pyproject.toml create mode 100644 python/packages/hosting/tests/__init__.py create mode 100644 python/packages/hosting/tests/_workflow_fixtures.py create mode 100644 python/packages/hosting/tests/test_host.py create mode 100644 python/packages/hosting/tests/test_types.py diff --git a/python/packages/hosting/LICENSE b/python/packages/hosting/LICENSE new file mode 100644 index 0000000000..9e841e7a26 --- /dev/null +++ b/python/packages/hosting/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/python/packages/hosting/README.md b/python/packages/hosting/README.md new file mode 100644 index 0000000000..a2690b5705 --- /dev/null +++ b/python/packages/hosting/README.md @@ -0,0 +1,59 @@ +# agent-framework-hosting + +Multi-channel hosting for Microsoft Agent Framework agents. + +`agent-framework-hosting` lets you serve a single agent (or workflow) +target through one or more **channels** — pluggable adapters that +expose the target over different transports. The result is a single +Starlette ASGI application you can host anywhere (local Hypercorn, +Azure Container Apps, Foundry Hosted Agents, …). + +The base package contains only the channel-neutral plumbing: + +- `AgentFrameworkHost` — the Starlette host +- `Channel` / `ChannelPush` — the channel protocols +- `ChannelRequest` / `ChannelSession` / `ChannelIdentity` / `ResponseTarget` + — the request envelope and routing primitives +- `ChannelContext` / `ChannelContribution` / `ChannelCommand` — the + channel-side hooks for invoking the target and contributing routes, + commands, and lifecycle callbacks +- `ChannelRunHook` / `ChannelStreamTransformHook` — the per-request + customization seams + +Concrete channels live in their own packages so you only install what +you use: + +| Package | Transport | +|---|---| +| `agent-framework-hosting-responses` | OpenAI Responses API | +| `agent-framework-hosting-invocations` | Foundry-native invocation envelope | +| `agent-framework-hosting-telegram` | Telegram Bot API | +| `agent-framework-hosting-activity-protocol` | Bot Framework Activity Protocol (Teams, Direct Line, Web Chat, …) | +| `agent-framework-hosting-teams` | Microsoft Teams (Teams SDK) | +| `agent-framework-hosting-entra` | Entra (OAuth) identity-link sidecar | + +## Install + +```bash +pip install agent-framework-hosting agent-framework-hosting-responses +# or with uvicorn pre-installed for the demo `host.serve(...)` helper +pip install "agent-framework-hosting[serve]" agent-framework-hosting-responses +``` + +## Quickstart + +```python +from agent_framework import ChatAgent +from agent_framework.openai import OpenAIChatClient +from agent_framework_hosting import AgentFrameworkHost +from agent_framework_hosting_responses import ResponsesChannel + +agent = ChatAgent(name="Assistant", chat_client=OpenAIChatClient()) + +host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel()]) +host.serve(port=8000) +``` + +See the [hosting samples](https://github.com/microsoft/agent-framework/tree/main/python/samples/04-hosting/af-hosting) +for richer multi-channel apps (Telegram + Teams + Responses fan-out, +identity linking, `ResponseTarget` routing, etc.). diff --git a/python/packages/hosting/agent_framework_hosting/__init__.py b/python/packages/hosting/agent_framework_hosting/__init__.py new file mode 100644 index 0000000000..9a7cbcadad --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/__init__.py @@ -0,0 +1,74 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Multi-channel hosting for Microsoft Agent Framework agents. + +Serve a single agent target through one or more **channels** — pluggable +adapters that expose the target over different transports such as the +OpenAI Responses API, Microsoft Teams, Telegram, and others. The base +package contains only the channel-neutral plumbing; concrete channels +ship in their own packages (``agent-framework-hosting-responses``, +``agent-framework-hosting-telegram``, …) so users install only what +they need. +""" + +import importlib.metadata + +from ._host import AgentFrameworkHost, ChannelContext, logger +from ._isolation import ( + ISOLATION_HEADER_CHAT, + ISOLATION_HEADER_USER, + IsolationKeys, + get_current_isolation_keys, + reset_current_isolation_keys, + set_current_isolation_keys, +) +from ._types import ( + Channel, + ChannelCommand, + ChannelCommandContext, + ChannelContribution, + ChannelIdentity, + ChannelPush, + ChannelRequest, + ChannelRunHook, + ChannelSession, + ChannelStreamTransformHook, + DeliveryReport, + HostedRunResult, + ResponseTarget, + ResponseTargetKind, + apply_run_hook, +) + +try: + __version__ = importlib.metadata.version(__name__) +except importlib.metadata.PackageNotFoundError: + __version__ = "0.0.0" + +__all__ = [ + "ISOLATION_HEADER_CHAT", + "ISOLATION_HEADER_USER", + "AgentFrameworkHost", + "Channel", + "ChannelCommand", + "ChannelCommandContext", + "ChannelContext", + "ChannelContribution", + "ChannelIdentity", + "ChannelPush", + "ChannelRequest", + "ChannelRunHook", + "ChannelSession", + "ChannelStreamTransformHook", + "DeliveryReport", + "HostedRunResult", + "IsolationKeys", + "ResponseTarget", + "ResponseTargetKind", + "__version__", + "apply_run_hook", + "get_current_isolation_keys", + "logger", + "reset_current_isolation_keys", + "set_current_isolation_keys", +] diff --git a/python/packages/hosting/agent_framework_hosting/_host.py b/python/packages/hosting/agent_framework_hosting/_host.py new file mode 100644 index 0000000000..92a0b1c45c --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_host.py @@ -0,0 +1,946 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""The :class:`AgentFrameworkHost` and its :class:`ChannelContext` bridge. + +The host is a tiny Starlette wrapper: + +- ``__init__`` accepts a hostable target (``SupportsAgentRun`` agent or + ``Workflow``) and a sequence of channels. +- :meth:`AgentFrameworkHost.app` lazily builds a Starlette app by calling + every channel's ``contribute`` and mounting the returned routes under + the channel's ``path`` (empty path → mount at the app root). +- :class:`ChannelContext` exposes ``run`` / ``run_stream`` / + ``deliver_response`` for channels to invoke; the host handles + per-``isolation_key`` session caching, identity tracking, and + :class:`ResponseTarget` fan-out. + +Per SPEC-002 (and ADR-0026), the host is intentionally thin so the bulk +of channel-specific behaviour stays in the channel package. Identity +linking, link policies, response targets, background runs, and the like +are pluggable extensions that the future identity/foundry packages will +contribute on top of this surface. +""" + +from __future__ import annotations + +import logging +import os +import uuid +from collections.abc import Awaitable, Callable, Sequence +from contextlib import AbstractContextManager, ExitStack, asynccontextmanager +from pathlib import Path +from typing import TYPE_CHECKING, Any, AsyncIterator, cast + +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + CheckpointStorage, + Content, + FileCheckpointStorage, + Message, + ResponseStream, + SupportsAgentRun, + Workflow, + WorkflowEvent, +) +from starlette.applications import Starlette +from starlette.middleware import Middleware +from starlette.requests import Request +from starlette.responses import PlainTextResponse +from starlette.routing import BaseRoute, Mount, Route +from starlette.types import ASGIApp, Receive, Scope, Send + +from ._isolation import ( + ISOLATION_HEADER_CHAT, + ISOLATION_HEADER_USER, + IsolationKeys, + reset_current_isolation_keys, + set_current_isolation_keys, +) +from ._types import ( + Channel, + ChannelIdentity, + ChannelPush, + ChannelRequest, + DeliveryReport, + HostedRunResult, + ResponseTargetKind, +) + +if TYPE_CHECKING: + pass + +logger = logging.getLogger("agent_framework.hosting") + + +def _workflow_output_to_text(value: Any) -> str: + """Render a single workflow ``output`` payload as plain text. + + ``AgentResponse`` and ``AgentResponseUpdate`` carry text natively; + everything else is best-effort ``str()``. + """ + text = getattr(value, "text", None) + if isinstance(text, str): + return text + return str(value) + + +def _workflow_event_to_update(event: WorkflowEvent[Any]) -> AgentResponseUpdate | None: + """Map a :class:`WorkflowEvent` to a channel-friendly :class:`AgentResponseUpdate`. + + Returns ``None`` for events the host should drop (anything that is not + user-visible output). The original event is preserved on the update's + ``raw_representation`` so consumers can recover full workflow context. + """ + if event.type != "output": + return None + payload: Any = event.data + if isinstance(payload, AgentResponseUpdate): + # Already a streaming update — pass through but tag the source so + # downstream hooks can tell it came from a workflow executor. + if payload.raw_representation is None: + payload.raw_representation = event + return payload + text = _workflow_output_to_text(payload) + return AgentResponseUpdate( + contents=[Content.from_text(text=text)], + role="assistant", + author_name=event.executor_id, + raw_representation=event, + ) + + +@asynccontextmanager +async def _suppress_already_consumed() -> AsyncIterator[None]: # noqa: RUF029 + """Yield, swallowing the ``RuntimeError`` ``ResponseStream`` raises on double-consume. + + The bridge stream calls ``get_final_response()`` after iterating the + workflow stream so the workflow's cleanup hooks run; on some paths the + stream considers itself already finalized and raises, which we treat + as benign — we're only after the side effect. + """ + try: + yield + except RuntimeError as exc: + logger.debug("workflow stream finalize skipped: %s", exc) + except Exception: # pragma: no cover - defensive: never let cleanup hide the real result + logger.exception("workflow stream finalize failed") + + +class _BoundResponseStream: + """Adapter that keeps an :class:`ExitStack` open across stream iteration. + + Streaming runs return a :class:`ResponseStream` synchronously, but + consumption happens later (the channel iterates). For host-bound + request context (e.g. Foundry response-id binding) to survive that + gap, we hold the stack open until the underlying stream is exhausted + or :meth:`close` is called. We forward awaitable + async-iterator + + ``get_final_response`` semantics so the channel sees a normal + ``ResponseStream``-shaped object. + """ + + def __init__(self, inner: Any, stack: ExitStack) -> None: + self._inner = inner + self._stack = stack + self._closed = False + + def _close(self) -> None: + if self._closed: + return + self._closed = True + self._stack.close() + + def __await__(self) -> Any: + # ``__await__`` returns a generator; closing here would be too + # eager — we close in ``__aiter__`` finally instead. Awaitable + # consumers (rare for streams) call ``aclose()`` separately. + return self._inner.__await__() + + def __aiter__(self) -> AsyncIterator[Any]: + return self._wrap() + + async def _wrap(self) -> AsyncIterator[Any]: + try: + async for item in self._inner: + yield item + finally: + self._close() + + async def get_final_response(self) -> Any: + try: + return await self._inner.get_final_response() + finally: + self._close() + + def __getattr__(self, name: str) -> Any: + return getattr(self._inner, name) + + +class ChannelContext: + """Host-owned bridge that channels call to invoke the target.""" + + def __init__(self, host: "AgentFrameworkHost") -> None: + """Bind the context to its owning :class:`AgentFrameworkHost`. + + The host instance is the source of truth for the target, registered + channels, identity stores, sessions, and lifecycle state. Channels + only ever receive a context; they never see the host directly. + """ + self._host = host + + @property + def target(self) -> SupportsAgentRun | Workflow: + """The hostable target the channel should invoke.""" + return self._host.target + + async def run(self, request: ChannelRequest) -> HostedRunResult: + """Invoke the target for ``request`` and return a channel-neutral result.""" + return await self._host._invoke(request) # pyright: ignore[reportPrivateUsage] + + def run_stream(self, request: ChannelRequest) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + """Invoke the target with ``stream=True`` and return the agent's ResponseStream. + + Channels iterate the stream directly (it acts like an AsyncGenerator) + and are responsible for delivering updates to their wire protocol. + Apply per-channel ``transform_hook`` callables during iteration to + rewrite or drop individual updates before they hit the wire. + """ + return self._host._invoke_stream(request) # pyright: ignore[reportPrivateUsage] + + async def deliver_response(self, request: ChannelRequest, payload: HostedRunResult) -> DeliveryReport: + """Resolve ``request.response_target`` and push ``payload`` to each destination. + + Returns a :class:`DeliveryReport` so the originating channel knows + whether to render the agent reply on its own wire (``originating`` + included in or implied by the target) or just acknowledge dispatch. + """ + return await self._host._deliver_response(request, payload) # pyright: ignore[reportPrivateUsage] + + +class _FoundryIsolationASGIMiddleware: + """Lift the two well-known Foundry isolation headers into a contextvar. + + The Foundry Hosted Agents runtime injects + ``x-agent-{user,chat}-isolation-key`` on every inbound HTTP request. + Storage providers that need partition-aware writes (notably + :class:`FoundryHostedAgentHistoryProvider`) read those keys via + :func:`get_current_isolation_keys` to avoid every channel having to + parse Foundry-specific headers itself. We intentionally inspect + only HTTP scopes; lifespan/websocket scopes are forwarded + untouched. When neither header is present the contextvar stays at + its default ``None``, so local-dev requests behave as before. + """ + + def __init__(self, app: ASGIApp) -> None: + self.app = app + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] != "http": + await self.app(scope, receive, send) + return + user_key: str | None = None + chat_key: str | None = None + for raw_name, raw_value in scope.get("headers") or (): + name = raw_name.decode("latin-1").lower() + if name == ISOLATION_HEADER_USER: + user_key = raw_value.decode("latin-1") or None + elif name == ISOLATION_HEADER_CHAT: + chat_key = raw_value.decode("latin-1") or None + if user_key is None and chat_key is None: + await self.app(scope, receive, send) + return + token = set_current_isolation_keys(IsolationKeys(user_key=user_key, chat_key=chat_key)) + try: + await self.app(scope, receive, send) + finally: + reset_current_isolation_keys(token) + + +class AgentFrameworkHost: + """Owns one Starlette app, one hostable target, and a sequence of channels.""" + + def __init__( + self, + target: SupportsAgentRun | Workflow, + *, + channels: Sequence[Channel], + debug: bool = False, + checkpoint_location: str | os.PathLike[str] | CheckpointStorage | None = None, + ) -> None: + """Create a host for ``target`` and its channels. + + Args: + target: The hostable target to invoke from channels — either a + ``SupportsAgentRun``-compatible agent or a ``Workflow``. The + host detects the kind and dispatches to the appropriate + execution seam (``agent.run(...)`` vs ``workflow.run(message=...)``). + For workflow targets, channels (or their ``run_hook``) are + responsible for shaping ``ChannelRequest.input`` into the + workflow start executor's typed input. + channels: The channels to expose. Each channel contributes routes + and commands that are mounted under ``channel.path`` (defaulting + to the channel name). + debug: Whether to enable Starlette's debug mode (stack traces in + responses, etc.) and per-channel debug logging. + checkpoint_location: When ``target`` is a :class:`Workflow`, the + location used to persist workflow checkpoints across requests. + Either a filesystem path (``str`` / ``PathLike``) — the host + creates a per-conversation + :class:`~agent_framework.FileCheckpointStorage` rooted at + ``checkpoint_location / `` — or a + :class:`~agent_framework.CheckpointStorage` instance the host + uses as-is (caller owns scoping). Per-request behaviour: + requests without ``ChannelRequest.session.isolation_key`` + are run without checkpointing. When set on a workflow that + already has its own checkpoint storage configured + (``WorkflowBuilder(checkpoint_storage=...)``), the host + refuses to start so ownership of checkpointing is + unambiguous. Ignored for ``SupportsAgentRun`` targets (a + warning is emitted). + """ + self.target: SupportsAgentRun | Workflow = target + self._is_workflow = isinstance(target, Workflow) + self.channels = list(channels) + self._debug = debug + self._app: Starlette | None = None + self._checkpoint_location: Path | CheckpointStorage | None = None + if checkpoint_location is not None: + if not self._is_workflow: + logger.warning("checkpoint_location is set but target is not a Workflow; ignoring.") + else: + workflow: Workflow = target # type: ignore[assignment] + if workflow._runner_context.has_checkpointing(): # type: ignore[reportPrivateUsage] + raise RuntimeError( + "Workflow already has checkpoint storage configured " + "(WorkflowBuilder(checkpoint_storage=...)). The host " + "manages checkpoints when checkpoint_location is set; " + "remove one of the two configurations." + ) + if isinstance(checkpoint_location, (str, os.PathLike)): + self._checkpoint_location = Path(os.fspath(checkpoint_location)) + else: + # Anything else is treated as a CheckpointStorage instance. + # ``CheckpointStorage`` is a non-runtime-checkable Protocol, + # so we cannot ``isinstance``-check it directly. + self._checkpoint_location = checkpoint_location + # Per-isolation_key session cache. The real spec backs this with a + # pluggable session store; this base host keeps it in-process. + self._sessions: dict[str, Any] = {} + # ``isolation_key -> active session_id``. Normally identical to the + # isolation_key, but ``reset_session`` rotates this to a fresh id so + # the next turn starts a new ``AgentSession`` while the old history + # remains on disk under its original session_id. + self._session_aliases: dict[str, str] = {} + # Per-isolation_key identity registry: which channels we've seen this + # user on, and which native_id they used on each. Powers + # ResponseTarget.active / .channel(name) / .channels([...]) / + # .all_linked. + # Shape: { isolation_key: { channel_name: ChannelIdentity } }. + self._identities: dict[str, dict[str, ChannelIdentity]] = {} + # (isolation_key -> last-seen channel name) for ResponseTarget.active. + self._active: dict[str, str] = {} + + @property + def app(self) -> Starlette: + """Lazily build (and cache) the Starlette application.""" + if self._app is None: + self._app = self._build_app() + return self._app + + def serve( + self, + *, + host: str = "127.0.0.1", + port: int = 8000, + workers: int = 1, + **config_kwargs: Any, + ) -> None: + """Start the host on ``host:port`` using Hypercorn. + + Hypercorn is the same ASGI server the Foundry Hosted Agents + runtime uses for production deployments, so running locally with + the same server keeps dev/prod parity (Trio fallbacks, lifespan + semantics, HTTP/2 support, …). Install with the ``serve`` extra + (``pip install agent-framework-hosting[serve]``). + + Args: + host: Interface to bind. Defaults to ``127.0.0.1``. + port: TCP port to bind. Defaults to ``8000``. + workers: Number of worker processes. Defaults to ``1``; + Hypercorn's process model only kicks in for ``>1``. + **config_kwargs: Forwarded to :class:`hypercorn.config.Config` + via attribute assignment, so any documented Hypercorn + config field (e.g. ``keep_alive_timeout=...``, + ``access_log_format=...``) can be set directly. + """ + try: + import asyncio + from typing import cast as _cast + + from hypercorn.asyncio import ( # pyright: ignore[reportMissingImports] + serve as _hypercorn_serve, # pyright: ignore[reportUnknownVariableType] + ) + from hypercorn.config import Config # pyright: ignore[reportMissingImports, reportUnknownVariableType] + except ImportError as exc: # pragma: no cover - exercised at runtime + raise RuntimeError( + "AgentFrameworkHost.serve() requires hypercorn. " + "Install with `pip install agent-framework-hosting[serve]` or `pip install hypercorn`." + ) from exc + + config = Config() # pyright: ignore[reportUnknownVariableType] + config.bind = [f"{host}:{port}"] # pyright: ignore[reportUnknownMemberType] + config.workers = workers # pyright: ignore[reportUnknownMemberType] + for key, value in config_kwargs.items(): + setattr(config, key, value) # pyright: ignore[reportUnknownArgumentType] + + # Touch ``self.app`` so the lifespan startup log fires once before + # we hand off to hypercorn — gives a single, readable banner of + # what the host is exposing without requiring channels to log + # individually. + app = self.app + self._log_startup(host=host, port=port, workers=workers) + + # ``hypercorn.asyncio.serve`` has a complex partially-typed signature + # (multiple ASGI/WSGI app overloads) and its ``Scope`` definition + # diverges from Starlette's; cast both sides to ``Any`` to keep the + # call site readable without sprinkling per-error suppressions. + serve_callable = _cast(Any, _hypercorn_serve) + asyncio.run(serve_callable(app, config)) + + def reset_session(self, isolation_key: str) -> None: + """Rotate ``isolation_key`` to a fresh session id without deleting history. + + Old turns are preserved on disk under their original session id and + remain accessible by passing that id explicitly (e.g. as + ``previous_response_id``). Future requests using ``isolation_key`` + get a new, empty ``AgentSession``. + """ + new_id = f"{isolation_key}#{uuid.uuid4().hex[:8]}" + self._session_aliases[isolation_key] = new_id + self._sessions.pop(isolation_key, None) + + # -- internals --------------------------------------------------------- # + + def _log_startup(self, *, host: str, port: int, workers: int) -> None: + """Emit a single human-friendly startup banner. + + Mirrors the ``AgentServerHost`` convention from + ``azure.ai.agentserver.core``: one INFO line that captures the + target type, every channel + its mount path, the bind address, + whether we're running inside a Foundry Hosted Agents container, + and the worker count. Keeps log noise low while still giving an + operator a single grep-able anchor when triaging. + """ + target_kind = "Workflow" if isinstance(self.target, Workflow) else type(self.target).__name__ + target_name = getattr(self.target, "name", None) or target_kind + channels_repr = ", ".join( + f"{ch.name}@{ch.path or '/'}" # blank path means "mounted at root" + for ch in self.channels + ) + is_hosted = bool(os.environ.get("FOUNDRY_HOSTING_ENVIRONMENT")) + logger.info( + "AgentFrameworkHost starting: target=%s (%s) bind=%s:%d workers=%d hosted=%s channels=[%s]", + target_name, + target_kind, + host, + port, + workers, + is_hosted, + channels_repr or "", + ) + + def _build_app(self) -> Starlette: + context = ChannelContext(self) + routes: list[BaseRoute] = [] + on_startup: list[Callable[[], Awaitable[None]]] = [] + on_shutdown: list[Callable[[], Awaitable[None]]] = [] + + # ``/readiness`` is the standard probe path the Foundry Hosted Agents + # runtime hits to gate traffic. We expose it unconditionally — once the + # ASGI app is up the host considers itself ready (channels register + # their own startup hooks and may run before the first request, but + # readiness is intentionally cheap so the platform's probe never times + # out on transient channel work). Mounted first so a channel cannot + # accidentally shadow it. + async def _readiness(_request: Request) -> PlainTextResponse: # noqa: RUF029 + """Liveness/readiness probe handler used by Foundry Hosted Agents.""" + return PlainTextResponse("ok") + + routes.append(Route("/readiness", _readiness, methods=["GET"])) + + for channel in self.channels: + contribution = channel.contribute(context) + # Channels publish routes relative to their root; mount under channel.path. + # An empty path means "mount at the app root" — useful for single-channel hosts + # that don't want a prefix (e.g. ResponsesChannel exposing POST /responses directly). + if contribution.routes: + if channel.path: + routes.append(Mount(channel.path, routes=list(contribution.routes))) + else: + routes.extend(contribution.routes) + on_startup.extend(contribution.on_startup) + on_shutdown.extend(contribution.on_shutdown) + + @asynccontextmanager + async def lifespan(_app: Starlette) -> AsyncIterator[None]: + for cb in on_startup: + await cb() + try: + yield + finally: + for cb in on_shutdown: + await cb() + + return Starlette( + debug=self._debug, + routes=routes, + lifespan=lifespan, + middleware=[Middleware(_FoundryIsolationASGIMiddleware)], + ) + + def _build_run_kwargs(self, request: ChannelRequest) -> dict[str, Any]: + # The full spec resolves a ChannelSession into an AgentSession here, + # honors session_mode, and consults LinkPolicy / ResponseTarget. This + # base host keys a per-isolation_key AgentSession off the channel's + # session hint so context providers (FileHistoryProvider, …) on the + # target see one session per end user. + session = None + if request.session_mode != "disabled" and request.session is not None: + isolation_key = request.session.isolation_key + if isolation_key is not None and hasattr(self.target, "create_session"): + session_id = self._session_aliases.get(isolation_key, isolation_key) + session = self._sessions.get(isolation_key) + if session is None: + # ``create_session`` lives on agent-typed targets but not on + # ``Workflow``; the ``hasattr`` above guards the call site. + session = self.target.create_session( # pyright: ignore[reportAttributeAccessIssue, reportUnknownVariableType, reportUnknownMemberType] + session_id=session_id + ) + self._sessions[isolation_key] = session # pyright: ignore[reportUnknownArgumentType] + + run_kwargs: dict[str, Any] = {} + if session is not None: + run_kwargs["session"] = session + if request.options: + run_kwargs["options"] = request.options + return run_kwargs + + def _log_incoming(self, request: ChannelRequest, *, stream: bool) -> None: + """Emit a one-line INFO summary for every incoming target invocation. + + When ``debug=True`` is set on the host, also dump the channel-native + settings the channel attached to the ``ChannelRequest`` — ``options`` + (the ChatOptions-shaped fields the channel parsed from its protocol + payload, e.g. temperature/tools/tool_choice for Responses), plus + ``attributes`` / ``metadata`` (the channel's protocol-specific bag, + e.g. ``chat_id`` / ``callback_query_id`` for Telegram). + """ + isolation_key = request.session.isolation_key if request.session is not None else None + logger.info( + "channel=%s op=%s stream=%s session=%s session_mode=%s", + request.channel, + request.operation, + stream, + isolation_key, + request.session_mode, + ) + logger.debug( + " ↳ options=%s attributes=%s metadata=%s", + dict(request.options) if request.options else {}, + dict(request.attributes) if request.attributes else {}, + dict(request.metadata) if request.metadata else {}, + ) + + def _flat_context_providers(self) -> list[Any]: + """Flatten ``target.context_providers`` one level for duck-typed hooks. + + ``ContextProviderBase`` aggregates child providers under a + ``providers`` attribute when wrapped (e.g. by ``ChatClientAgent``). + We descend one level so the host catches both styles without + forcing a particular wiring on the agent. + """ + providers = getattr(self.target, "context_providers", None) or () + flat: list[Any] = [] + for entry in providers: + children = getattr(entry, "providers", None) + if children: + flat.extend(children) + else: + flat.append(entry) + return flat + + def _bind_request_context(self, request: ChannelRequest) -> ExitStack: + """Bind any per-request anchors a target's context-providers expose. + + Channels announce per-request anchors (currently ``response_id`` + and ``previous_response_id``) via ``ChannelRequest.attributes``. + Some history providers — notably the Foundry hosted-agent history + provider — need to write storage under the same ``response_id`` + the channel surfaces on its envelope so the next turn's + ``previous_response_id`` walks the chain. Rather than the host + knowing about specific provider classes, we duck-type: any + context provider on the target that exposes a + ``bind_request_context(response_id=..., previous_response_id=..., + **_)`` context-manager gets it called with the request's + attribute values. Per-request platform isolation keys are handled + separately by :class:`_FoundryIsolationASGIMiddleware` (lifted + off the inbound headers into a contextvar) so providers don't + depend on channels to forward them. Bindings are scoped to the + returned :class:`ExitStack` which the caller must enter before + invoking the target and leave after the run completes. + """ + stack = ExitStack() + attrs = request.attributes or {} + response_id = attrs.get("response_id") + if not isinstance(response_id, str) or not response_id: + return stack + previous_response_id = attrs.get("previous_response_id") + if previous_response_id is not None and not isinstance(previous_response_id, str): + previous_response_id = None + + flat: list[Any] = self._flat_context_providers() + + for provider in flat: + bind = getattr(provider, "bind_request_context", None) + if not callable(bind): + continue + stack.enter_context( + cast( + "AbstractContextManager[Any]", + bind( + response_id=response_id, + previous_response_id=previous_response_id, + ), + ) + ) + return stack + + async def _invoke(self, request: ChannelRequest) -> HostedRunResult: + self._log_incoming(request, stream=False) + self._record_identity(request) + if self._is_workflow: + return await self._invoke_workflow(request) + run_kwargs = self._build_run_kwargs(request) + with self._bind_request_context(request): + # ``_is_workflow`` is False here so ``self.target`` is an + # ``Agent``-shaped target whose ``.run`` returns + # :class:`AgentResponse`. Narrow back to keep ``result.text`` + # well-typed without conditional imports of ``Agent``. + agent_target = cast("SupportsAgentRun", self.target) + result = await agent_target.run(self._wrap_input(request), **run_kwargs) + return HostedRunResult(text=result.text) + + def _invoke_stream(self, request: ChannelRequest) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + self._log_incoming(request, stream=True) + self._record_identity(request) + if self._is_workflow: + return self._invoke_workflow_stream(request) + run_kwargs = self._build_run_kwargs(request) + # ``run(stream=True)`` returns a ResponseStream synchronously (it is + # itself awaitable / async-iterable). We hand it back to the channel + # so the channel can drive iteration and apply its transform hook. + # Streaming flows iterate after this method returns, which is + # *outside* a sync ``with`` block — so we wrap the underlying + # stream in an adapter that holds the binding open across the + # iteration lifecycle. + binder = self._bind_request_context(request) + return _BoundResponseStream( # type: ignore[return-value] + self.target.run(self._wrap_input(request), stream=True, **run_kwargs), + binder, + ) + + def _resolve_checkpoint_storage(self, request: ChannelRequest) -> CheckpointStorage | None: + """Build (or return) the per-request checkpoint storage, or ``None``. + + Returns ``None`` when no ``checkpoint_location`` is configured or + when the request lacks a stable session key — without a key we + cannot scope checkpoints per conversation, and we'd rather skip + checkpointing than pollute a single shared store. + """ + if self._checkpoint_location is None: + return None + if request.session is None or not request.session.isolation_key: + return None + if isinstance(self._checkpoint_location, Path): + return FileCheckpointStorage(str(self._checkpoint_location / request.session.isolation_key)) + # Caller-supplied storage — used as-is; caller owns scoping. + return self._checkpoint_location + + async def _invoke_workflow(self, request: ChannelRequest) -> HostedRunResult: + """Dispatch to ``Workflow.run`` and collapse outputs into a ``HostedRunResult``. + + The channel's ``run_hook`` is the canonical adapter for shaping + ``request.input`` into the workflow start executor's typed input + (free-form text from a Telegram message, structured ``Responses`` + ``input`` items, …). When no hook is wired, ``request.input`` is + forwarded verbatim — appropriate for workflows whose start executor + accepts the channel's native input type (commonly ``str``). + + When ``checkpoint_location`` is configured on the host, a + per-conversation checkpoint storage is resolved, the workflow is + restored from its latest checkpoint (if any) and then re-run with + the new input — mirroring the resume semantics of the Foundry + Responses host. + """ + # Workflows do not own session state in the agent sense and do not + # accept ``session=`` / ``options=`` kwargs. The channel's run_hook is + # the seam for any per-run customization; nothing flows through here. + workflow: Workflow = self.target # type: ignore[assignment] + storage = self._resolve_checkpoint_storage(request) + if storage is not None: + latest = await storage.get_latest(workflow_name=workflow.name) + if latest is not None: + # Restore in-memory state from the most recent checkpoint + # before applying the new input. + await workflow.run(checkpoint_id=latest.checkpoint_id, checkpoint_storage=storage) + result = await workflow.run(request.input, checkpoint_storage=storage) + else: + result = await workflow.run(request.input) + outputs = result.get_outputs() + text = "\n".join(_workflow_output_to_text(o) for o in outputs) if outputs else "" + return HostedRunResult(text=text) + + def _invoke_workflow_stream(self, request: ChannelRequest) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + """Bridge ``Workflow.run(stream=True)`` to a channel-facing ``ResponseStream``. + + Wraps the workflow's ``ResponseStream[WorkflowEvent, WorkflowRunResult]`` + in a new ``ResponseStream[AgentResponseUpdate, AgentResponse]`` so + channels can iterate it identically to an agent stream and apply + their ``stream_transform_hook`` callables. + + Mapping rules: + + - ``output`` events whose ``data`` is already an + :class:`AgentResponseUpdate` (the common case for workflows + containing :class:`AgentExecutor`) pass through unchanged. + - ``output`` events with any other ``data`` are wrapped into a + single-text-content :class:`AgentResponseUpdate`. + - All other event types (``status``, ``executor_invoked``, + ``superstep_*``, lifecycle, …) are filtered out — channels only + care about user-visible text. Hooks can opt back in by inspecting + ``raw_representation`` on the produced updates. + + The original :class:`WorkflowEvent` is stashed on + ``AgentResponseUpdate.raw_representation`` so advanced consumers + (telemetry, debug UIs) can recover the full workflow timeline. + + Checkpoint restoration (when ``checkpoint_location`` is set) runs + before the input stream is opened so the new turn observes the + restored state. + """ + workflow: Workflow = self.target # type: ignore[assignment] + storage = self._resolve_checkpoint_storage(request) + + async def _maybe_restore() -> None: + if storage is None: + return + latest = await storage.get_latest(workflow_name=workflow.name) + if latest is None: + return + # Drain the restoration stream so the no-op invocation actually + # rehydrates state before the real run starts. + async for _ in workflow.run( + stream=True, + checkpoint_id=latest.checkpoint_id, + checkpoint_storage=storage, + ): + pass + + async def _bridge() -> AsyncIterator[AgentResponseUpdate]: + await _maybe_restore() + workflow_stream = workflow.run(request.input, stream=True, checkpoint_storage=storage) + try: + async for event in workflow_stream: + update = _workflow_event_to_update(event) + if update is not None: + yield update + finally: + async with _suppress_already_consumed(): + await workflow_stream.get_final_response() + + async def _finalize(updates: Sequence[AgentResponseUpdate]) -> AgentResponse: # noqa: RUF029 + return AgentResponse.from_updates(updates) + + return ResponseStream[AgentResponseUpdate, AgentResponse](_bridge(), finalizer=_finalize) + + def _wrap_input(self, request: ChannelRequest) -> Message | list[Message]: + """Promote ``request.input`` to ``Message``(s) carrying channel metadata. + + Channels deliver inputs as plain text, a single ``Message``, or a list + of ``Message`` (e.g. a Responses-API request that includes a ``system`` + instruction plus the user turn). To preserve channel provenance + + identity + ``response_target`` on the persisted history record (and + make it visible to context providers, evals, audits), we attach a + ``hosting`` block under ``additional_properties``. AF's + ``Message.to_dict`` round-trips ``additional_properties`` through any + ``HistoryProvider`` that serializes via ``to_dict`` (e.g. + ``FileHistoryProvider``) and the framework explicitly does *not* + forward these fields to model providers, so they are safe to attach. + + For a list of messages we attach the metadata to the LAST message that + will be persisted (typically the user turn) — this keeps a single, + searchable record of where the inbound message came from. + """ + hosting_meta: dict[str, Any] = {"channel": request.channel} + if request.identity is not None: + hosting_meta["identity"] = { + "channel": request.identity.channel, + "native_id": request.identity.native_id, + "attributes": dict(request.identity.attributes) if request.identity.attributes else {}, + } + target = request.response_target + hosting_meta["response_target"] = { + "kind": target.kind.value, + "targets": list(target.targets), + } + + raw = request.input + if isinstance(raw, Message): + raw.additional_properties = {**(raw.additional_properties or {}), "hosting": hosting_meta} + return raw + if isinstance(raw, list) and raw and all(isinstance(m, Message) for m in raw): + messages: list[Message] = [m for m in raw if isinstance(m, Message)] + last = messages[-1] + last.additional_properties = {**(last.additional_properties or {}), "hosting": hosting_meta} + return messages + # ``raw`` is typed as ``AgentRunInputs`` (str | Content | Message | Sequence[…]). + # The remaining cases are str / Content / Mapping — wrap as a single user message. + return Message( + role="user", + contents=[raw], # type: ignore[list-item] + additional_properties={"hosting": hosting_meta}, + ) + + def _record_identity(self, request: ChannelRequest) -> None: + """Update the per-``isolation_key`` identity registry + active-channel hint. + + Called on every successful resolve. ``ResponseTarget.active`` + consumes ``self._active``; ``ResponseTarget.channel(name)`` / + ``.channels([...])`` / ``.all_linked`` consume ``self._identities``. + """ + if request.identity is None or request.session is None: + return + key = request.session.isolation_key + if not key: + return + self._identities.setdefault(key, {})[request.identity.channel] = request.identity + self._active[key] = request.identity.channel + + async def _deliver_response(self, request: ChannelRequest, payload: HostedRunResult) -> DeliveryReport: + """Resolve ``request.response_target`` and call ``ChannelPush.push`` on each. + + Per SPEC-002 §"ResponseTarget": for any non-``originating`` target, + the originating channel returns an acknowledgment and the actual + agent reply lands on the destination channel(s). When a destination + cannot be resolved (no known native id) or doesn't implement + ``ChannelPush``, it is dropped and surfaced in + :class:`DeliveryReport.skipped`. If every destination drops, we + fall back to delivering on the originating channel (matching the + spec's policy default). + """ + target = request.response_target + kind = target.kind + + # Fast paths for the trivial variants. + if kind == ResponseTargetKind.ORIGINATING: + return DeliveryReport(include_originating=True) + if kind == ResponseTargetKind.NONE: + # Background-only — drop the reply on the floor for now (no + # ContinuationToken in the prototype). + return DeliveryReport(include_originating=False) + + # Build the destination set. + include_originating = False + # Each entry is (channel_name, identity_override_or_None_to_lookup). + destinations: list[tuple[str, ChannelIdentity | None]] = [] + isolation_key = request.session.isolation_key if request.session is not None else None + known = self._identities.get(isolation_key or "", {}) + + if kind == ResponseTargetKind.ACTIVE: + active = self._active.get(isolation_key or "") + if active is None or active == request.channel: + # Fall back to originating when there's no other active + # channel known (matches the "first message" case). + return DeliveryReport(include_originating=True) + destinations.append((active, known.get(active))) + + elif kind == ResponseTargetKind.ALL_LINKED: + for channel_name, identity in known.items(): + if channel_name == request.channel: + include_originating = True + continue + destinations.append((channel_name, identity)) + if not destinations and not include_originating: + # No links recorded yet — fall back. + return DeliveryReport(include_originating=True) + + elif kind == ResponseTargetKind.CHANNELS: + for entry in target.targets: + if entry == "originating": + include_originating = True + continue + if ":" in entry: + channel_name, _, native_id = entry.partition(":") + if channel_name == request.channel: + # Pointing the originating channel at itself with a + # specific native id — treat as "include + # originating" since the channel will reply on its + # own wire to that user anyway. + include_originating = True + continue + destinations.append((channel_name, ChannelIdentity(channel=channel_name, native_id=native_id))) + else: + if entry == request.channel: + include_originating = True + continue + destinations.append((entry, known.get(entry))) + + # Dispatch. + by_name = {ch.name: ch for ch in self.channels} + pushed: list[str] = [] + skipped: list[str] = [] + for channel_name, dest_identity in destinations: + channel = by_name.get(channel_name) + token = f"{channel_name}:{dest_identity.native_id}" if dest_identity is not None else channel_name + if channel is None: + logger.warning("deliver_response: no channel named %r (target=%s)", channel_name, token) + skipped.append(token) + continue + if not isinstance(channel, ChannelPush): + logger.warning( + "deliver_response: channel %r does not implement ChannelPush (target=%s)", + channel_name, + token, + ) + skipped.append(token) + continue + if dest_identity is None: + logger.warning( + "deliver_response: no known identity for isolation_key=%s on channel=%s", + isolation_key, + channel_name, + ) + skipped.append(token) + continue + try: + await channel.push(dest_identity, payload) + except Exception: + logger.exception("deliver_response: push failed for target=%s", token) + skipped.append(token) + continue + pushed.append(token) + logger.info("deliver_response: pushed to %s (%d chars)", token, len(payload.text)) + + if not pushed and not include_originating: + # Spec policy: if every destination drops, deliver to originating. + logger.warning("deliver_response: every destination dropped — falling back to originating") + include_originating = True + + return DeliveryReport( + include_originating=include_originating, + pushed=tuple(pushed), + skipped=tuple(skipped), + ) + + +__all__ = ["AgentFrameworkHost", "ChannelContext", "logger"] diff --git a/python/packages/hosting/agent_framework_hosting/_isolation.py b/python/packages/hosting/agent_framework_hosting/_isolation.py new file mode 100644 index 0000000000..53fb2f1e54 --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_isolation.py @@ -0,0 +1,76 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Per-request isolation keys read from inbound HTTP headers. + +The Foundry Hosted Agents runtime injects two well-known headers on every +request it forwards to the user's container: + +* ``x-agent-user-isolation-key`` — opaque per-user partition key +* ``x-agent-chat-isolation-key`` — opaque per-conversation partition key + +When the headers are present we are running inside (or being driven by) the +Foundry runtime; when they are absent we are running in plain local dev. The +host installs an ASGI middleware in :meth:`AgentFrameworkHost._build_app` +that reads both headers off every inbound HTTP request and pushes them into +the :data:`current_isolation_keys` contextvar for the duration of the +request, then resets it. Providers that need partition-aware storage (most +notably ``FoundryHostedAgentHistoryProvider``) read the contextvar via +:func:`get_current_isolation_keys` and apply the keys to their backend +calls — so app authors don't have to wire any middleware themselves and +channels stay free of Foundry-specific header knowledge. + +The contextvar holds a plain :class:`IsolationKeys` mapping; conversion to +provider-specific types (e.g. Foundry's ``IsolationContext``) happens at +the consuming provider so this module has no provider dependencies. +""" + +from __future__ import annotations + +from contextvars import ContextVar, Token + +__all__ = [ + "ISOLATION_HEADER_CHAT", + "ISOLATION_HEADER_USER", + "IsolationKeys", + "current_isolation_keys", + "get_current_isolation_keys", + "reset_current_isolation_keys", + "set_current_isolation_keys", +] + + +ISOLATION_HEADER_USER = "x-agent-user-isolation-key" +ISOLATION_HEADER_CHAT = "x-agent-chat-isolation-key" + + +class IsolationKeys: + """Per-request Foundry isolation keys lifted off the inbound headers.""" + + def __init__(self, user_key: str | None = None, chat_key: str | None = None) -> None: + self.user_key = user_key + self.chat_key = chat_key + + @property + def is_empty(self) -> bool: + return self.user_key is None and self.chat_key is None + + +current_isolation_keys: ContextVar[IsolationKeys | None] = ContextVar( + "agent_framework_hosting_isolation_keys", + default=None, +) + + +def get_current_isolation_keys() -> IsolationKeys | None: + """Return the isolation keys bound to the current request, if any.""" + return current_isolation_keys.get() + + +def set_current_isolation_keys(keys: IsolationKeys | None) -> Token[IsolationKeys | None]: + """Bind ``keys`` to the current async context and return a reset token.""" + return current_isolation_keys.set(keys) + + +def reset_current_isolation_keys(token: Token[IsolationKeys | None]) -> None: + """Restore the isolation contextvar to its prior value.""" + current_isolation_keys.reset(token) diff --git a/python/packages/hosting/agent_framework_hosting/_types.py b/python/packages/hosting/agent_framework_hosting/_types.py new file mode 100644 index 0000000000..7d200a3266 --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_types.py @@ -0,0 +1,376 @@ +# Copyright (c) Microsoft. All rights reserved. + +# ``ChannelRequest`` is the only intentional dataclass here (callers use +# ``dataclasses.replace`` on it in run hooks). The other types are plain +# Python classes by preference, so the "could be a dataclass" lint is muted +# at the file level. +# ruff: noqa: B903 + +"""Channel-neutral request envelope and channel protocol types. + +These types form the boundary between the host and individual channels. +A channel parses its native payload, builds a :class:`ChannelRequest`, and +hands it to :class:`ChannelContext.run` (or ``run_stream``) on the host. +The host normalizes the request into a single agent invocation and either +returns the result to the originating channel or fans out via +:class:`ResponseTarget` to other channels that implement +:class:`ChannelPush`. + +See ``docs/specs/002-python-hosting-channels.md`` for the full design. +""" + +from __future__ import annotations + +from collections.abc import Awaitable, Callable, Mapping, Sequence +from dataclasses import dataclass, field +from enum import Enum +from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable + +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + AgentRunInputs, + ResponseStream, + SupportsAgentRun, + Workflow, +) +from starlette.routing import BaseRoute + +if TYPE_CHECKING: + from ._host import ChannelContext + + +# --------------------------------------------------------------------------- # +# Channel-neutral request envelope +# --------------------------------------------------------------------------- # + + +_EMPTY_MAPPING: Mapping[str, Any] = {} + + +class ChannelSession: + """Channel-supplied session hint. + + The host turns this into an ``AgentSession`` keyed by ``isolation_key`` so + every distinct end user gets their own context-provider state (e.g. one + ``FileHistoryProvider`` JSONL file per user). + """ + + def __init__(self, isolation_key: str | None = None) -> None: + self.isolation_key = isolation_key + + +class ChannelIdentity: + """Channel-native identity the host sees on each request. + + Consumed by the host's identity registry. The host uses it for two things: + + 1. Recording the active channel for an ``isolation_key`` so + ``ResponseTarget.active`` resolves correctly. + 2. Telling :class:`ChannelPush` ``push`` recipients **where** in their + native namespace to deliver — Telegram uses ``native_id`` as the + chat id, Teams as the conversation/AAD id, etc. + """ + + def __init__( + self, + channel: str, + native_id: str, + attributes: Mapping[str, Any] | None = None, + ) -> None: + self.channel = channel + self.native_id = native_id + self.attributes: Mapping[str, Any] = attributes if attributes is not None else _EMPTY_MAPPING + + +class ResponseTargetKind(str, Enum): + """Discriminator for :class:`ResponseTarget` variants.""" + + ORIGINATING = "originating" + ACTIVE = "active" + CHANNELS = "channels" + ALL_LINKED = "all_linked" + NONE = "none" + + +class ResponseTarget: + """Per-request directive controlling **where** the host delivers the agent reply. + + Independent of ``session_mode``. Construct via the classmethod helpers or + use the module-level singletons rather than touching ``kind`` directly. + Variants: + + - ``ResponseTarget.originating`` (default) — synchronous response on the + originating channel only. + - ``ResponseTarget.active`` — push to the channel most recently observed + for the resolved ``isolation_key``. + - ``ResponseTarget.channel("teams")`` / ``.channels([...])`` — push to + one or more named destinations. Each entry is either a bare channel + name (host resolves the native id from its identity registry) or a + ``"channel:native_id"`` token (used verbatim). The pseudo-name + ``"originating"`` includes the originating channel in the fan-out. + - ``ResponseTarget.all_linked`` — push to every channel where the + resolved ``isolation_key`` has been observed. + - ``ResponseTarget.none`` — background-only; in the prototype this just + suppresses the originating reply (no ``ContinuationToken`` yet). + + Instances are intended to be treated as immutable; the singletons are + shared across the process. + """ + + def __init__( + self, + kind: ResponseTargetKind = ResponseTargetKind.ORIGINATING, + targets: tuple[str, ...] = (), + ) -> None: + self.kind = kind + self.targets = targets + + # -- builders ---------------------------------------------------------- # + + @classmethod + def channel(cls, name: str) -> "ResponseTarget": + """Target a single named destination channel.""" + return cls(kind=ResponseTargetKind.CHANNELS, targets=(name,)) + + @classmethod + def channels(cls, names: Sequence[str]) -> "ResponseTarget": + """Target an explicit list of destination channels.""" + return cls(kind=ResponseTargetKind.CHANNELS, targets=tuple(names)) + + # -- value semantics --------------------------------------------------- # + # ``ResponseTarget`` is treated as immutable, so two instances with the + # same ``kind`` + ``targets`` are interchangeable. Tests and channel + # parsers compare instances with ``==`` and use them as dict keys. + + def __eq__(self, other: object) -> bool: + if not isinstance(other, ResponseTarget): + return NotImplemented + return self.kind is other.kind and self.targets == other.targets + + def __hash__(self) -> int: + return hash((self.kind, self.targets)) + + def __repr__(self) -> str: + if self.kind is ResponseTargetKind.CHANNELS: + return f"ResponseTarget.channels({list(self.targets)!r})" + return f"ResponseTarget.{self.kind.value}" + + +# Module-level singletons so callers can write ``ResponseTarget.originating`` +# (matching the spec's classmethod-style notation) without juggling Python's +# no-zero-arg-classmethod-property limitation. +ResponseTarget.originating = ResponseTarget(kind=ResponseTargetKind.ORIGINATING) # type: ignore[attr-defined] +ResponseTarget.active = ResponseTarget(kind=ResponseTargetKind.ACTIVE) # type: ignore[attr-defined] +ResponseTarget.all_linked = ResponseTarget(kind=ResponseTargetKind.ALL_LINKED) # type: ignore[attr-defined] +ResponseTarget.none = ResponseTarget(kind=ResponseTargetKind.NONE) # type: ignore[attr-defined] + + +@dataclass +class ChannelRequest: + """Uniform invocation envelope every channel produces from its native payload. + + Kept as a dataclass so app authors can use ``dataclasses.replace(...)`` in + run hooks to produce a modified envelope without re-listing every field. + """ + + channel: str + operation: str # e.g. "message.create", "command.invoke" + input: AgentRunInputs + session: ChannelSession | None = None + options: Mapping[str, Any] | None = None + session_mode: str = "auto" # "auto" | "required" | "disabled" + metadata: Mapping[str, Any] = field(default_factory=lambda: {}) + attributes: Mapping[str, Any] = field(default_factory=lambda: {}) + stream: bool = False + identity: ChannelIdentity | None = None + response_target: ResponseTarget = field(default_factory=lambda: ResponseTarget.originating) # type: ignore[attr-defined] + + +class ChannelCommand: + """A discoverable command a channel exposes to its users (e.g. ``/reset``).""" + + def __init__( + self, + name: str, + description: str, + handle: Callable[["ChannelCommandContext"], Awaitable[None]], + ) -> None: + self.name = name + self.description = description + self.handle = handle + + +class ChannelCommandContext: + """Context passed to a :class:`ChannelCommand` handler.""" + + def __init__( + self, + request: ChannelRequest, + reply: Callable[[str], Awaitable[None]], + ) -> None: + self.request = request + self.reply = reply + + +_EMPTY_ROUTES: tuple[BaseRoute, ...] = () +_EMPTY_COMMANDS: tuple[ChannelCommand, ...] = () +_EMPTY_LIFECYCLE: tuple[Callable[[], Awaitable[None]], ...] = () + + +class ChannelContribution: + """Routes, commands, and lifecycle hooks a channel contributes to the host.""" + + def __init__( + self, + routes: Sequence[BaseRoute] = _EMPTY_ROUTES, + commands: Sequence[ChannelCommand] = _EMPTY_COMMANDS, + on_startup: Sequence[Callable[[], Awaitable[None]]] = _EMPTY_LIFECYCLE, + on_shutdown: Sequence[Callable[[], Awaitable[None]]] = _EMPTY_LIFECYCLE, + ) -> None: + self.routes = routes + self.commands = commands + self.on_startup = on_startup + self.on_shutdown = on_shutdown + + +class HostedRunResult: + """Channel-neutral result of an agent invocation routed through the host.""" + + def __init__(self, text: str) -> None: + self.text = text + + +class DeliveryReport: + """What :meth:`ChannelContext.deliver_response` did with a payload. + + The originating channel uses ``include_originating`` to decide whether + to render the agent reply on its own wire (``True`` — default for the + ``originating`` target, or when ``"originating"`` is one of the listed + destinations) or to return only an acknowledgement (``False`` — when + the target lists only out-of-band destinations). + """ + + def __init__( + self, + include_originating: bool, + pushed: tuple[str, ...] = (), + skipped: tuple[str, ...] = (), + ) -> None: + self.include_originating = include_originating + self.pushed = pushed # destination tokens delivered to (e.g. "telegram:123") + self.skipped = skipped # destinations resolved but skipped (no push, failed, …) + + +# A transform hook runs over each AgentResponseUpdate as the channel consumes +# the stream. It can return a replacement update, ``None`` to drop the update, +# or be async. Channels apply it during iteration so that channel-specific +# concerns (e.g. masking, redaction, formatting for the wire) live close to +# the channel rather than on the agent. +ChannelStreamTransformHook = Callable[ + [AgentResponseUpdate], + "AgentResponseUpdate | Awaitable[AgentResponseUpdate | None] | None", +] + + +# --------------------------------------------------------------------------- # +# Channel run hook +# --------------------------------------------------------------------------- # + + +# Run hooks accept the channel-built ``ChannelRequest`` and return a +# (possibly modified) replacement. Channels invoke the hook with both the +# request and the channel-side context as keyword arguments — the call +# convention is ``await hook(request, target=..., protocol_request=...)``. +# +# The ergonomic minimum for a hook implementation is therefore a function +# accepting ``request`` positionally plus ``**kwargs`` and returning a +# (possibly mutated) :class:`ChannelRequest`. Hooks that need the agent +# target or the raw channel-native payload pull them off the keyword +# arguments by name (``target`` / ``protocol_request``). +# +# ``protocol_request`` is the raw, channel-native payload the channel +# parsed (the JSON body for Responses, the Telegram ``Update`` dict, the +# Bot Framework ``Activity`` for Teams). Use it when the hook needs a +# field the channel did not lift onto ``ChannelRequest`` (e.g. OpenAI's +# ``safety_identifier``, Teams' ``from.aadObjectId``, …). +ChannelRunHook = Callable[..., "Awaitable[ChannelRequest] | ChannelRequest"] + + +async def apply_run_hook( + hook: ChannelRunHook, + request: ChannelRequest, + *, + target: SupportsAgentRun | Workflow, + protocol_request: Any | None, +) -> ChannelRequest: + """Channel-side helper to invoke a :data:`ChannelRunHook` with the standard kwargs. + + Channels call this rather than calling the hook directly so the + invocation convention (``request`` positional, ``target`` / + ``protocol_request`` keyword) is enforced in one place. + """ + result = hook(request, target=target, protocol_request=protocol_request) + if isinstance(result, Awaitable): + return await result + return result + + +# --------------------------------------------------------------------------- # +# Channel protocols +# --------------------------------------------------------------------------- # + + +@runtime_checkable +class Channel(Protocol): + """A pluggable adapter that exposes one transport on the host. + + Channels publish their routes, commands, and lifecycle callbacks via + :meth:`contribute`. The host mounts them under the channel's ``path`` + (or at the app root when ``path == ""``) and gives the channel a + :class:`ChannelContext` so it can call back into the host to invoke + the agent target and deliver responses. + """ + + name: str + path: str # default mount path (e.g. "/responses"); use "" to mount routes at the app root + + def contribute(self, context: "ChannelContext") -> ChannelContribution: ... + + +@runtime_checkable +class ChannelPush(Protocol): + """Optional capability: a channel that can deliver outbound messages without a prior request. + + Per SPEC-002 (req #13), channels that can do proactive delivery + (Telegram bot proactive message, Teams proactive bot message, + webhook callbacks, SSE broadcasts) implement ``push`` on top of the + base :class:`Channel` protocol. Channels without push can only be + addressed as the ``originating`` :class:`ResponseTarget`. + """ + + name: str + + async def push(self, identity: ChannelIdentity, payload: HostedRunResult) -> None: ... + + +__all__ = [ + "AgentResponse", + "AgentResponseUpdate", + "Channel", + "ChannelCommand", + "ChannelCommandContext", + "ChannelContribution", + "ChannelIdentity", + "ChannelPush", + "ChannelRequest", + "ChannelRunHook", + "ChannelSession", + "ChannelStreamTransformHook", + "DeliveryReport", + "HostedRunResult", + "ResponseStream", + "ResponseTarget", + "ResponseTargetKind", + "apply_run_hook", +] diff --git a/python/packages/hosting/pyproject.toml b/python/packages/hosting/pyproject.toml new file mode 100644 index 0000000000..57c79854bb --- /dev/null +++ b/python/packages/hosting/pyproject.toml @@ -0,0 +1,107 @@ +[project] +name = "agent-framework-hosting" +description = "Multi-channel hosting for Microsoft Agent Framework agents." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0a260424" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.2.0,<2", + "starlette>=0.37", +] + +[project.optional-dependencies] +serve = [ + "hypercorn>=0.17", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'", + "sys_platform == 'win32'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +filterwarnings = [] +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +include = ["agent_framework_hosting"] +exclude = ['tests'] + +[tool.mypy] +plugins = ['pydantic.mypy'] +strict = true +python_version = "3.10" +ignore_missing_imports = true +disallow_untyped_defs = true +no_implicit_optional = true +check_untyped_defs = true +warn_return_any = true +show_error_codes = true +warn_unused_ignores = false +disallow_incomplete_defs = true +disallow_untyped_decorators = true + +[tool.bandit] +targets = ["agent_framework_hosting"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks.mypy] +help = "Run MyPy for this package." +cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hosting" + +[tool.poe.tasks.test] +help = "Run the default unit test suite for this package." +cmd = 'pytest -m "not integration" --cov=agent_framework_hosting --cov-report=term-missing:skip-covered tests' + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" + +[dependency-groups] +dev = [ + "httpx>=0.28.1", +] diff --git a/python/packages/hosting/tests/__init__.py b/python/packages/hosting/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/packages/hosting/tests/_workflow_fixtures.py b/python/packages/hosting/tests/_workflow_fixtures.py new file mode 100644 index 0000000000..f59bb8cab8 --- /dev/null +++ b/python/packages/hosting/tests/_workflow_fixtures.py @@ -0,0 +1,43 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Workflow fixtures for hosting tests. + +Defined in a module that does not use ``from __future__ import annotations`` +because the workflow handler validation reflects on real annotation objects +rather than stringified forms. +""" + +from agent_framework import Executor, Workflow, WorkflowBuilder, WorkflowContext, handler + + +class _UpperExecutor(Executor): + @handler + async def handle(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.yield_output(text.upper()) + + +class _EchoExecutor(Executor): + @handler + async def handle(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.yield_output(text) + + +def build_upper_workflow() -> Workflow: + return WorkflowBuilder(start_executor=_UpperExecutor(id="upper")).build() + + +def build_echo_workflow() -> Workflow: + return WorkflowBuilder(start_executor=_EchoExecutor(id="echo")).build() + + +class _MultiChunkExecutor(Executor): + """Yields three separate ``output`` events so streaming has something to chew on.""" + + @handler + async def handle(self, text: str, ctx: WorkflowContext[str]) -> None: + for chunk in (f"{text}-1", f"{text}-2", f"{text}-3"): + await ctx.yield_output(chunk) + + +def build_multi_chunk_workflow() -> Workflow: + return WorkflowBuilder(start_executor=_MultiChunkExecutor(id="multi")).build() diff --git a/python/packages/hosting/tests/test_host.py b/python/packages/hosting/tests/test_host.py new file mode 100644 index 0000000000..bb6d7d5963 --- /dev/null +++ b/python/packages/hosting/tests/test_host.py @@ -0,0 +1,714 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for :class:`AgentFrameworkHost` invocation, session, and delivery routing.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator, Sequence +from dataclasses import dataclass, field +from typing import Any + +import pytest +from agent_framework import AgentResponseUpdate +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.routing import BaseRoute, Route +from starlette.testclient import TestClient + +from agent_framework_hosting import ( + AgentFrameworkHost, + Channel, + ChannelContext, + ChannelContribution, + ChannelIdentity, + ChannelPush, + ChannelRequest, + ChannelSession, + HostedRunResult, + ResponseTarget, +) + + +async def _ping(_request: Request) -> JSONResponse: + return JSONResponse({"ok": True}) + + +# --------------------------------------------------------------------------- # +# Fakes # +# --------------------------------------------------------------------------- # + + +@dataclass +class _FakeAgentSession: + session_id: str | None = None + service_session_id: str | None = None + + +@dataclass +class _FakeAgentResponse: + text: str + + +class _FakeAgent: + """Minimal :class:`SupportsAgentRun` implementation that records invocations.""" + + def __init__(self, reply: str = "ok") -> None: + self._reply = reply + self.calls: list[dict[str, Any]] = [] + self.created_sessions: list[_FakeAgentSession] = [] + + def create_session(self, *, session_id: str | None = None) -> _FakeAgentSession: + s = _FakeAgentSession(session_id=session_id) + self.created_sessions.append(s) + return s + + async def run(self, messages: Any = None, *, stream: bool = False, session: Any = None, **kwargs: Any) -> Any: + self.calls.append({"messages": messages, "stream": stream, "session": session, "kwargs": kwargs}) + if stream: # pragma: no cover - not used by these tests + + async def _gen() -> AsyncIterator[Any]: + yield self._reply + + return _gen() + return _FakeAgentResponse(text=self._reply) + + +class _RecordingChannel: + """Minimal :class:`Channel` + :class:`ChannelPush` for routing tests.""" + + def __init__(self, name: str = "fake", path: str = "/fake", supports_push: bool = True) -> None: + self.name = name + self.path = path + self.context: ChannelContext | None = None + self.pushes: list[tuple[ChannelIdentity, HostedRunResult]] = [] + self._push_raises: Exception | None = None + self._supports_push = supports_push + # Provide a single trivial route so contribute() exercises the mount path. + self._routes: Sequence[BaseRoute] = (Route("/ping", _ping),) + + def contribute(self, context: ChannelContext) -> ChannelContribution: + self.context = context + return ChannelContribution(routes=self._routes) + + async def push(self, identity: ChannelIdentity, payload: HostedRunResult) -> None: + if self._push_raises is not None: + raise self._push_raises + self.pushes.append((identity, payload)) + + +class _NoPushChannel: + """A channel that does NOT implement :class:`ChannelPush`.""" + + def __init__(self, name: str = "nopush", path: str = "/nopush") -> None: + self.name = name + self.path = path + + def contribute(self, context: ChannelContext) -> ChannelContribution: + return ChannelContribution() + + +@dataclass +class _LifecycleChannel: + name: str = "lifecycle" + path: str = "" + started: list[str] = field(default_factory=list) + stopped: list[str] = field(default_factory=list) + + def contribute(self, context: ChannelContext) -> ChannelContribution: + async def on_start() -> None: + self.started.append("up") + + async def on_stop() -> None: + self.stopped.append("down") + + return ChannelContribution(on_startup=[on_start], on_shutdown=[on_stop]) + + +# --------------------------------------------------------------------------- # +# Host wiring # +# --------------------------------------------------------------------------- # + + +class TestHostWiring: + def test_channel_is_recognized(self) -> None: + ch = _RecordingChannel() + assert isinstance(ch, Channel) + assert isinstance(ch, ChannelPush) + + def test_app_mounts_channel_routes_under_path(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel(path="/fake") + host = AgentFrameworkHost(target=agent, channels=[ch]) + + with TestClient(host.app) as client: + r = client.get("/fake/ping") + assert r.status_code == 200 + assert r.json() == {"ok": True} + + def test_app_mounts_at_root_when_path_is_empty(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel(path="") + host = AgentFrameworkHost(target=agent, channels=[ch]) + + with TestClient(host.app) as client: + r = client.get("/ping") + assert r.status_code == 200 + + def test_app_is_cached(self) -> None: + host = AgentFrameworkHost(target=_FakeAgent(), channels=[_RecordingChannel()]) + assert host.app is host.app + + def test_lifespan_invokes_startup_and_shutdown(self) -> None: + agent = _FakeAgent() + ch = _LifecycleChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app): + assert ch.started == ["up"] + assert ch.stopped == ["down"] + + def test_app_exposes_readiness_probe(self) -> None: + host = AgentFrameworkHost(target=_FakeAgent(), channels=[_RecordingChannel()]) + with TestClient(host.app) as client: + r = client.get("/readiness") + assert r.status_code == 200 + assert r.text == "ok" + + +# --------------------------------------------------------------------------- # +# Invoke + sessions # +# --------------------------------------------------------------------------- # + + +class TestHostInvoke: + @pytest.mark.asyncio + async def test_invoke_wraps_input_with_hosting_metadata(self) -> None: + agent = _FakeAgent(reply="hello") + ch = _RecordingChannel(name="responses") + host = AgentFrameworkHost(target=agent, channels=[ch]) + # Force ``app`` build to trigger ``contribute``. + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel="responses", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="user:1"), + identity=ChannelIdentity(channel="responses", native_id="user:1"), + ) + result = await ch.context.run(req) + + assert result.text == "hello" + assert len(agent.calls) == 1 + msg = agent.calls[0]["messages"] + assert msg.role == "user" + assert msg.additional_properties["hosting"]["channel"] == "responses" + assert msg.additional_properties["hosting"]["identity"] == { + "channel": "responses", + "native_id": "user:1", + "attributes": {}, + } + assert msg.additional_properties["hosting"]["response_target"] == { + "kind": "originating", + "targets": [], + } + + @pytest.mark.asyncio + async def test_invoke_caches_session_per_isolation_key(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req_a = ChannelRequest( + channel=ch.name, operation="op", input="1", session=ChannelSession(isolation_key="alice") + ) + req_b = ChannelRequest( + channel=ch.name, operation="op", input="2", session=ChannelSession(isolation_key="alice") + ) + req_c = ChannelRequest(channel=ch.name, operation="op", input="3", session=ChannelSession(isolation_key="bob")) + + await ch.context.run(req_a) + await ch.context.run(req_b) + await ch.context.run(req_c) + + # Two distinct sessions created (alice, bob) — never re-created. + assert len(agent.created_sessions) == 2 + assert agent.calls[0]["session"] is agent.calls[1]["session"] + assert agent.calls[0]["session"] is not agent.calls[2]["session"] + + @pytest.mark.asyncio + async def test_session_disabled_does_not_create_session(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel=ch.name, + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + session_mode="disabled", + ) + await ch.context.run(req) + assert agent.created_sessions == [] + assert agent.calls[0]["session"] is None + + @pytest.mark.asyncio + async def test_reset_session_rotates_id_and_drops_cache(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest(channel=ch.name, operation="op", input="x", session=ChannelSession(isolation_key="alice")) + await ch.context.run(req) + first_session = agent.calls[-1]["session"] + assert first_session.session_id == "alice" + + host.reset_session("alice") + await ch.context.run(req) + second_session = agent.calls[-1]["session"] + # New session, new id (alias rotation), distinct object. + assert second_session is not first_session + assert second_session.session_id != "alice" + assert second_session.session_id.startswith("alice#") + + @pytest.mark.asyncio + async def test_options_propagates_to_target_run(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel=ch.name, + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + options={"temperature": 0.4}, + ) + await ch.context.run(req) + assert agent.calls[0]["kwargs"]["options"] == {"temperature": 0.4} + + +# --------------------------------------------------------------------------- # +# Workflow target # +# --------------------------------------------------------------------------- # + + +class TestHostWorkflowTarget: + """The host accepts a ``Workflow`` and dispatches to ``workflow.run(...)``.""" + + @pytest.mark.asyncio + async def test_invoke_workflow_collapses_outputs_to_hosted_run_result(self) -> None: + from tests._workflow_fixtures import build_upper_workflow + + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch]) + _ = host.app + assert ch.context is not None + + # The channel's run_hook is the canonical adapter from a free-form input + # to a workflow's typed input; here the start executor accepts ``str`` + # already so the channel forwards ``input`` verbatim. + req = ChannelRequest(channel="fake", operation="message.create", input="hello") + result = await ch.context.run(req) + + assert result.text == "HELLO" + # No session caching for workflow targets — Workflow has no + # ``create_session`` and the host must not invent one. + assert host._sessions == {} + + @pytest.mark.asyncio + async def test_stream_workflow_yields_updates_and_finalizes(self) -> None: + from tests._workflow_fixtures import build_echo_workflow + + workflow = build_echo_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest(channel="fake", operation="message.create", input="hi") + stream = ch.context.run_stream(req) + + updates: list[AgentResponseUpdate] = [] + async for update in stream: + updates.append(update) + + # The echo workflow yields a single ``output`` event whose payload is + # the original string; the host wraps non-update payloads into a + # one-shot ``AgentResponseUpdate`` carrying the text. + assert [u.text for u in updates] == ["hi"] + # ``raw_representation`` preserves the source ``WorkflowEvent`` so + # advanced consumers (telemetry, debug UIs) can recover the full + # workflow timeline. + assert all(u.raw_representation is not None for u in updates) + + final = await stream.get_final_response() + assert final.text == "hi" + + @pytest.mark.asyncio + async def test_stream_workflow_yields_one_update_per_output_event(self) -> None: + from tests._workflow_fixtures import build_multi_chunk_workflow + + workflow = build_multi_chunk_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest(channel="fake", operation="message.create", input="x") + stream = ch.context.run_stream(req) + + chunks: list[str] = [] + async for update in stream: + chunks.append(update.text) + # The originating ``executor_id`` is propagated via author_name so + # multi-agent workflows can route per-author rendering downstream. + assert update.author_name == "multi" + + assert chunks == ["x-1", "x-2", "x-3"] + final = await stream.get_final_response() + assert final.text == "x-1x-2x-3" + + +class TestHostWorkflowCheckpointing: + """The host scopes per-conversation checkpoints when ``checkpoint_location`` is set.""" + + def test_rejects_workflow_with_existing_checkpoint_storage(self, tmp_path: Any) -> None: + from agent_framework import InMemoryCheckpointStorage, WorkflowBuilder + + from tests._workflow_fixtures import _UpperExecutor + + workflow = WorkflowBuilder( + start_executor=_UpperExecutor(id="upper"), + checkpoint_storage=InMemoryCheckpointStorage(), + ).build() + with pytest.raises(RuntimeError, match="already has checkpoint storage"): + AgentFrameworkHost( + target=workflow, + channels=[_RecordingChannel()], + checkpoint_location=tmp_path, + ) + + def test_warns_when_target_is_agent(self, tmp_path: Any, caplog: Any) -> None: + import logging as _logging + + agent = _FakeAgent() + with caplog.at_level(_logging.WARNING, logger="agent_framework.hosting"): + host = AgentFrameworkHost(target=agent, channels=[_RecordingChannel()], checkpoint_location=tmp_path) + assert host._checkpoint_location is None + assert any("checkpoint_location" in rec.message for rec in caplog.records) + + @pytest.mark.asyncio + async def test_invoke_skips_checkpointing_when_no_isolation_key(self, tmp_path: Any) -> None: + from tests._workflow_fixtures import build_upper_workflow + + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=tmp_path) + _ = host.app + assert ch.context is not None + + # No session -> no scoping key -> no checkpoint storage written. + req = ChannelRequest(channel="fake", operation="message.create", input="hi") + result = await ch.context.run(req) + + assert result.text == "HI" + assert list(tmp_path.iterdir()) == [] + + @pytest.mark.asyncio + async def test_invoke_writes_checkpoint_under_isolation_key(self, tmp_path: Any) -> None: + from tests._workflow_fixtures import build_upper_workflow + + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=tmp_path) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel="fake", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="alice"), + ) + result = await ch.context.run(req) + assert result.text == "HI" + + # FileCheckpointStorage rooted at / should + # have produced at least one checkpoint file scoped to that user. + scoped = tmp_path / "alice" + assert scoped.exists() + assert any(scoped.iterdir()), "expected at least one checkpoint to be written under the per-user dir" + + @pytest.mark.asyncio + async def test_stream_writes_checkpoint_under_isolation_key(self, tmp_path: Any) -> None: + from tests._workflow_fixtures import build_echo_workflow + + workflow = build_echo_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=tmp_path) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel="fake", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="bob"), + ) + stream = ch.context.run_stream(req) + async for _ in stream: + pass + await stream.get_final_response() + + scoped = tmp_path / "bob" + assert scoped.exists() + assert any(scoped.iterdir()) + + @pytest.mark.asyncio + async def test_caller_supplied_checkpoint_storage_used_as_is(self, tmp_path: Any) -> None: + from agent_framework import InMemoryCheckpointStorage + + from tests._workflow_fixtures import build_upper_workflow + + storage = InMemoryCheckpointStorage() + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=storage) + _ = host.app + assert ch.context is not None + assert host._checkpoint_location is storage + + req = ChannelRequest( + channel="fake", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="carol"), + ) + await ch.context.run(req) + + # The caller-owned storage is used directly (no per-user scoping + # applied by the host); a checkpoint should appear in it. + checkpoints = await storage.list_checkpoints(workflow_name=workflow.name) + assert checkpoints, "expected the caller-supplied storage to receive a checkpoint" + # And nothing should have been written into the tmp_path tree. + assert list(tmp_path.iterdir()) == [] + + +# --------------------------------------------------------------------------- # +# Delivery routing # +# --------------------------------------------------------------------------- # + + +def _make_host_with_two_channels() -> tuple[AgentFrameworkHost, _RecordingChannel, _RecordingChannel, ChannelContext]: + agent = _FakeAgent() + a = _RecordingChannel(name="responses", path="/r") + b = _RecordingChannel(name="telegram", path="/t") + host = AgentFrameworkHost(target=agent, channels=[a, b]) + _ = host.app + assert a.context is not None + return host, a, b, a.context + + +def _record_identity_on(host: AgentFrameworkHost, isolation_key: str, channel: str, native_id: str) -> None: + """Pre-seed the host's identity registry by running a request.""" + host._identities.setdefault(isolation_key, {})[channel] = ChannelIdentity(channel=channel, native_id=native_id) + host._active[isolation_key] = channel + + +class TestDeliverResponse: + @pytest.mark.asyncio + async def test_originating_returns_include_originating(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + req = ChannelRequest(channel="responses", operation="op", input="x") + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == () + assert report.skipped == () + + @pytest.mark.asyncio + async def test_none_suppresses_everything(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + response_target=ResponseTarget.none, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is False + assert report.pushed == () + assert report.skipped == () + + @pytest.mark.asyncio + async def test_active_pushes_to_other_channel(self) -> None: + host, a, b, ctx = _make_host_with_two_channels() + # Alice was last seen on telegram. + _record_identity_on(host, "alice", "telegram", "42") + # Now she sends a message via responses; ResponseTarget.active should + # push to telegram, not back to responses. + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.active, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is False + assert report.pushed == ("telegram:42",) + assert b.pushes and b.pushes[0][0].native_id == "42" + + @pytest.mark.asyncio + async def test_active_falls_back_to_originating_when_self(self) -> None: + host, _a, _b, ctx = _make_host_with_two_channels() + _record_identity_on(host, "alice", "responses", "user:1") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.active, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + + @pytest.mark.asyncio + async def test_channels_with_unknown_identity_skipped(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + # No prior identity seeded for telegram on alice. + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channel("telegram"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + # Skipped → fallback to originating. + assert report.include_originating is True + assert report.skipped == ("telegram",) + assert report.pushed == () + + @pytest.mark.asyncio + async def test_channels_with_explicit_native_id_token(self) -> None: + _, _, b, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + response_target=ResponseTarget.channel("telegram:99"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.pushed == ("telegram:99",) + assert report.include_originating is False + assert b.pushes[0][0].native_id == "99" + + @pytest.mark.asyncio + async def test_channels_originating_pseudo_includes_origin(self) -> None: + host, _a, _b, ctx = _make_host_with_two_channels() + _record_identity_on(host, "alice", "telegram", "42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channels(["originating", "telegram"]), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == ("telegram:42",) + + @pytest.mark.asyncio + async def test_channels_unknown_channel_name_skipped(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + response_target=ResponseTarget.channel("nope"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True # fallback + assert report.skipped == ("nope",) + + @pytest.mark.asyncio + async def test_no_push_capability_skipped(self) -> None: + agent = _FakeAgent() + a = _RecordingChannel(name="responses", path="/r") + b = _NoPushChannel(name="nopush", path="/n") + host = AgentFrameworkHost(target=agent, channels=[a, b]) + _ = host.app + assert a.context is not None + # Pre-seed identity on the no-push channel so we get past the + # identity check and hit the ChannelPush check. + host._identities.setdefault("alice", {})["nopush"] = ChannelIdentity(channel="nopush", native_id="42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channel("nopush"), + ) + report = await a.context.deliver_response(req, HostedRunResult(text="reply")) + assert report.skipped == ("nopush:42",) + assert report.include_originating is True # fallback + + @pytest.mark.asyncio + async def test_all_linked_pushes_to_every_other_channel(self) -> None: + host, _a, b, ctx = _make_host_with_two_channels() + # Alice on responses (originating) and telegram. + host._identities.setdefault("alice", {}) + host._identities["alice"]["responses"] = ChannelIdentity(channel="responses", native_id="user:1") + host._identities["alice"]["telegram"] = ChannelIdentity(channel="telegram", native_id="42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.all_linked, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == ("telegram:42",) + assert b.pushes and b.pushes[0][1].text == "reply" + + @pytest.mark.asyncio + async def test_all_linked_no_other_channels_falls_back(self) -> None: + host, _a, _b, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.all_linked, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == () + + @pytest.mark.asyncio + async def test_push_exception_marks_skipped(self) -> None: + host, _a, b, ctx = _make_host_with_two_channels() + b._push_raises = RuntimeError("boom") # type: ignore[attr-defined] + host._identities.setdefault("alice", {})["telegram"] = ChannelIdentity(channel="telegram", native_id="42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channel("telegram"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.skipped == ("telegram:42",) + assert report.include_originating is True # fallback diff --git a/python/packages/hosting/tests/test_types.py b/python/packages/hosting/tests/test_types.py new file mode 100644 index 0000000000..76531dfe49 --- /dev/null +++ b/python/packages/hosting/tests/test_types.py @@ -0,0 +1,105 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for the channel-neutral envelope types in :mod:`agent_framework_hosting._types`.""" + +from __future__ import annotations + +from agent_framework_hosting import ( + ChannelIdentity, + ChannelRequest, + ChannelSession, + ResponseTarget, + ResponseTargetKind, +) + + +class TestResponseTarget: + def test_originating_default_singleton(self) -> None: + target = ResponseTarget.originating # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.ORIGINATING + assert target.targets == () + + def test_active_singleton(self) -> None: + target = ResponseTarget.active # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.ACTIVE + assert target.targets == () + + def test_all_linked_singleton(self) -> None: + target = ResponseTarget.all_linked # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.ALL_LINKED + + def test_none_singleton(self) -> None: + target = ResponseTarget.none # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.NONE + + def test_channel_builder_single(self) -> None: + target = ResponseTarget.channel("teams") + assert target.kind is ResponseTargetKind.CHANNELS + assert target.targets == ("teams",) + + def test_channels_builder_list(self) -> None: + target = ResponseTarget.channels(["teams", "telegram", "originating"]) + assert target.kind is ResponseTargetKind.CHANNELS + assert target.targets == ("teams", "telegram", "originating") + + def test_channels_builder_accepts_tuple(self) -> None: + target = ResponseTarget.channels(("a", "b")) + assert target.targets == ("a", "b") + + def test_target_is_hashable(self) -> None: + # Plain class — hashing falls back to identity, which is fine here: + # the two keys below are different instances (singleton vs builder). + d = {ResponseTarget.originating: 1, ResponseTarget.channel("t"): 2} # type: ignore[attr-defined] + assert len(d) == 2 + + +class TestChannelRequest: + def test_required_fields_only(self) -> None: + req = ChannelRequest(channel="responses", operation="message.create", input="hi") + assert req.channel == "responses" + assert req.operation == "message.create" + assert req.input == "hi" + assert req.session is None + assert req.options is None + assert req.session_mode == "auto" + assert req.metadata == {} + assert req.attributes == {} + assert req.stream is False + assert req.identity is None + # Default response target is the originating singleton. + assert req.response_target.kind is ResponseTargetKind.ORIGINATING + + def test_default_response_target_is_originating_singleton(self) -> None: + # Every new request shares the module-level ``originating`` singleton + # by default — instances are intended to be treated as immutable, so + # sharing is safe and avoids per-request allocation. + a = ChannelRequest(channel="a", operation="op", input="x") + b = ChannelRequest(channel="b", operation="op", input="y") + assert a.response_target is ResponseTarget.originating # type: ignore[attr-defined] + assert a.response_target is b.response_target + + def test_with_session_and_identity(self) -> None: + req = ChannelRequest( + channel="telegram", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="user:42"), + identity=ChannelIdentity(channel="telegram", native_id="42"), + response_target=ResponseTarget.active, # type: ignore[attr-defined] + ) + assert req.session is not None + assert req.session.isolation_key == "user:42" + assert req.identity is not None + assert req.identity.channel == "telegram" + assert req.identity.native_id == "42" + assert req.response_target.kind is ResponseTargetKind.ACTIVE + + +class TestChannelIdentity: + def test_attributes_default_empty_mapping(self) -> None: + ident = ChannelIdentity(channel="teams", native_id="abc") + assert dict(ident.attributes) == {} + + def test_attributes_passthrough(self) -> None: + ident = ChannelIdentity(channel="teams", native_id="abc", attributes={"role": "user"}) + assert dict(ident.attributes) == {"role": "user"} diff --git a/python/pyproject.toml b/python/pyproject.toml index b788f48e71..406aaf6efc 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -85,6 +85,7 @@ agent-framework-foundry-hosting = { workspace = true } agent-framework-foundry-local = { workspace = true } agent-framework-gemini = { workspace = true } agent-framework-github-copilot = { workspace = true } +agent-framework-hosting = { workspace = true } agent-framework-hyperlight = { workspace = true } agent-framework-lab = { workspace = true } agent-framework-mem0 = { workspace = true } @@ -205,6 +206,7 @@ executionEnvironments = [ { root = "packages/foundry/tests", reportPrivateUsage = "none" }, { root = "packages/foundry_local/tests", reportPrivateUsage = "none" }, { root = "packages/github_copilot/tests", reportPrivateUsage = "none" }, + { root = "packages/hosting/tests", reportPrivateUsage = "none" }, { root = "packages/lab/gaia/tests", reportPrivateUsage = "none" }, { root = "packages/lab/lightning/tests", reportPrivateUsage = "none" }, { root = "packages/lab/tau2/tests", reportPrivateUsage = "none" }, diff --git a/python/uv.lock b/python/uv.lock index 85a968174a..a4b58d2270 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -47,6 +47,7 @@ members = [ "agent-framework-foundry-local", "agent-framework-gemini", "agent-framework-github-copilot", + "agent-framework-hosting", "agent-framework-hyperlight", "agent-framework-lab", "agent-framework-mem0", @@ -599,6 +600,36 @@ requires-dist = [ { name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = ">=0.2.1,<=0.2.1" }, ] +[[package]] +name = "agent-framework-hosting" +version = "1.0.0a260424" +source = { editable = "packages/hosting" } +dependencies = [ + { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "starlette", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.optional-dependencies] +serve = [ + { name = "hypercorn", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.dev-dependencies] +dev = [ + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.metadata] +requires-dist = [ + { name = "agent-framework-core", editable = "packages/core" }, + { name = "hypercorn", marker = "extra == 'serve'", specifier = ">=0.17" }, + { name = "starlette", specifier = ">=0.37" }, +] +provides-extras = ["serve"] + +[package.metadata.requires-dev] +dev = [{ name = "httpx", specifier = ">=0.28.1" }] + [[package]] name = "agent-framework-hyperlight" version = "1.0.0a260429" @@ -1636,7 +1667,7 @@ name = "clr-loader" version = "0.2.10" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cffi", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "cffi", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/18/24/c12faf3f61614b3131b5c98d3bf0d376b49c7feaa73edca559aeb2aee080/clr_loader-0.2.10.tar.gz", hash = "sha256:81f114afbc5005bafc5efe5af1341d400e22137e275b042a8979f3feb9fc9446", size = 83605, upload-time = "2026-01-03T23:13:06.984Z" } wheels = [ @@ -5132,8 +5163,8 @@ name = "powerfx" version = "0.0.34" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cffi", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, - { name = "pythonnet", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "cffi", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, + { name = "pythonnet", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/9f/fb/6c4bf87e0c74ca1c563921ce89ca1c5785b7576bca932f7255cdf81082a7/powerfx-0.0.34.tar.gz", hash = "sha256:956992e7afd272657ed16d80f4cad24ec95d9e4a79fb9dfa4a068a09e136af32", size = 3237555, upload-time = "2025-12-22T15:50:59.682Z" } wheels = [ @@ -5806,7 +5837,7 @@ name = "pythonnet" version = "3.0.5" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "clr-loader", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "clr-loader", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/9a/d6/1afd75edd932306ae9bd2c2d961d603dc2b52fcec51b04afea464f1f6646/pythonnet-3.0.5.tar.gz", hash = "sha256:48e43ca463941b3608b32b4e236db92d8d40db4c58a75ace902985f76dac21cf", size = 239212, upload-time = "2024-12-13T08:30:44.393Z" } wheels = [ From c3ee720b40074914ad34bc124a36e7cbcad1f3d2 Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Mon, 4 May 2026 21:23:29 +0200 Subject: [PATCH 2/3] feat(hosting-entra): add Entra (Azure AD) identity-linking channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New ``agent-framework-hosting-entra`` package implementing a Microsoft Entra OAuth-based identity-linking channel for the Hosting framework. Mounts a small set of routes (``/entra/login``, ``/entra/callback``, ``/entra/whoami``) that walk a user through an Entra/Azure AD authorization-code flow and stick the resulting verified identity (``oid`` / ``email`` / ``tid``) onto the host's identity table so later requests on any other channel (Responses, Telegram, …) can be linked to the same user. Surface (re-exported from ``agent_framework_hosting_entra``): - ``EntraChannel`` -- concrete ``Channel`` implementation. Owns the three Starlette routes, signs/verifies short-lived ``state`` tokens to bind the round-trip to the originating channel, exchanges the authorization code for an ID token via MSAL, and writes the verified identity into the host's identity store via the standard ``ChannelIdentity`` plumbing so cross-channel push (e.g. send a Telegram message to the user who completed the link from Responses) works without the channels having to coordinate directly. - 14 unit tests covering route wiring, ``state`` issue / verify, callback exchange happy + failure paths, and identity-store write. Registers the package in ``python/pyproject.toml`` ``[tool.uv.sources]`` and adds the matching pyright ``executionEnvironments`` entry. Stacks on PR-2 (Hosting core); independent of PR-3 / PR-4 / PR-6. The cross-channel sample (``local_identity_link/``) that demonstrates this end-to-end alongside Responses + Telegram lands in PR-8 (samples). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- python/packages/hosting-entra/LICENSE | 21 ++ python/packages/hosting-entra/README.md | 39 ++ .../agent_framework_hosting_entra/__init__.py | 15 + .../agent_framework_hosting_entra/_channel.py | 342 ++++++++++++++++++ python/packages/hosting-entra/pyproject.toml | 108 ++++++ .../packages/hosting-entra/tests/__init__.py | 0 .../hosting-entra/tests/test_channel.py | 197 ++++++++++ python/pyproject.toml | 2 + python/uv.lock | 22 ++ 9 files changed, 746 insertions(+) create mode 100644 python/packages/hosting-entra/LICENSE create mode 100644 python/packages/hosting-entra/README.md create mode 100644 python/packages/hosting-entra/agent_framework_hosting_entra/__init__.py create mode 100644 python/packages/hosting-entra/agent_framework_hosting_entra/_channel.py create mode 100644 python/packages/hosting-entra/pyproject.toml create mode 100644 python/packages/hosting-entra/tests/__init__.py create mode 100644 python/packages/hosting-entra/tests/test_channel.py diff --git a/python/packages/hosting-entra/LICENSE b/python/packages/hosting-entra/LICENSE new file mode 100644 index 0000000000..9e841e7a26 --- /dev/null +++ b/python/packages/hosting-entra/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/python/packages/hosting-entra/README.md b/python/packages/hosting-entra/README.md new file mode 100644 index 0000000000..6e0073812b --- /dev/null +++ b/python/packages/hosting-entra/README.md @@ -0,0 +1,39 @@ +# agent-framework-hosting-entra + +Microsoft Entra (Azure AD) identity-linking sidecar channel for +[agent-framework-hosting](../hosting). Owns the OAuth 2.0 Authorization Code +flow that binds a per-channel id (e.g. a Telegram chat id) to the user's +Entra object id, so multiple non-Entra channels can share a single +`entra:` isolation key. + +## Usage + +```python +from pathlib import Path +from agent_framework_hosting import AgentFrameworkHost +from agent_framework_hosting_entra import ( + EntraIdentityLinkChannel, + EntraIdentityStore, +) + +store = EntraIdentityStore(Path("./identity_links.json")) + +host = AgentFrameworkHost( + target=my_agent, + channels=[ + EntraIdentityLinkChannel( + store=store, + tenant_id="", + client_id="", + client_secret="", + public_base_url="https://your.host", + ), + # ... other channels whose run hooks call store.lookup(...) + ], +) +host.serve() +``` + +For tenants that disallow client secrets, pass `certificate_path=` (and +optionally `certificate_password=`) instead of `client_secret`. The PEM +layout matches the one used by `agent-framework-hosting-teams`. diff --git a/python/packages/hosting-entra/agent_framework_hosting_entra/__init__.py b/python/packages/hosting-entra/agent_framework_hosting_entra/__init__.py new file mode 100644 index 0000000000..6e1bba53b8 --- /dev/null +++ b/python/packages/hosting-entra/agent_framework_hosting_entra/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Microsoft Entra (Azure AD) identity channel for :mod:`agent_framework_hosting`.""" + +from ._channel import ( + EntraIdentityLinkChannel, + EntraIdentityStore, + entra_isolation_key, +) + +__all__ = [ + "EntraIdentityLinkChannel", + "EntraIdentityStore", + "entra_isolation_key", +] diff --git a/python/packages/hosting-entra/agent_framework_hosting_entra/_channel.py b/python/packages/hosting-entra/agent_framework_hosting_entra/_channel.py new file mode 100644 index 0000000000..8b592b35ee --- /dev/null +++ b/python/packages/hosting-entra/agent_framework_hosting_entra/_channel.py @@ -0,0 +1,342 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Microsoft Entra (Azure AD) identity-linking sidecar channel. + +Implements the OAuth 2.0 Authorization Code flow against Entra so users on +non-Entra channels (Telegram, Responses callers without a verified token, +etc.) can bind their per-channel id to a stable ``entra:`` isolation +key. Once the link is established, channel run-hooks can call +:meth:`EntraIdentityStore.lookup` and rewrite the request to use the Entra +key instead of the channel-native id. + +Two credential modes are supported: + +* ``client_secret`` — confidential-client secret. +* ``certificate_path`` — PEM bundle (private key + cert) for tenants that + disallow secrets. The Teams channel uses the same PEM layout; see + :mod:`agent_framework_hosting_teams` for the openssl recipe. +""" + +from __future__ import annotations + +import asyncio +import json +import secrets +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import httpx +import msal +from agent_framework_hosting import ( + ChannelContext, + ChannelContribution, + logger, +) +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from starlette.requests import Request +from starlette.responses import HTMLResponse, RedirectResponse, Response +from starlette.routing import Route + + +def entra_isolation_key(oid: str) -> str: + """Canonical isolation key for a user identified by Entra object id.""" + return f"entra:{oid}" + + +class EntraIdentityStore: + """Tiny JSON-backed mapping ``: → entra:``. + + Production deployments should swap this for a real KV store. Single-file + JSON is fine for samples because writes are infrequent (only during the + OAuth callback) and we serialize them under an asyncio lock. + """ + + def __init__(self, path: Path) -> None: + """Open an identity store backed by ``path``. + + Loads any existing JSON document; an unreadable or corrupt file is + logged and replaced with an empty in-memory map so callers always + get a usable store. + """ + self._path = path + self._lock = asyncio.Lock() + self._data: dict[str, str] = {} + if path.exists(): + try: + self._data = json.loads(path.read_text()) + except Exception: + logger.exception("identity store load failed; starting empty") + + def lookup(self, channel_key: str) -> str | None: + """Return the linked ``entra:`` key for a per-channel id, or ``None``.""" + return self._data.get(channel_key) + + async def link(self, channel_key: str, oid: str) -> None: + """Bind ``channel_key`` (e.g. ``telegram:123``) to the Entra ``oid`` and persist. + + Overwrites any existing mapping for ``channel_key`` and rewrites the + backing JSON file under the lock so concurrent callers cannot race. + """ + async with self._lock: + self._data[channel_key] = entra_isolation_key(oid) + self._path.write_text(json.dumps(self._data, indent=2, sort_keys=True)) + + async def unlink(self, channel_key: str) -> None: + """Remove the mapping for ``channel_key``; no-op if absent. + + The file is only rewritten when an entry actually existed so we + don't churn disk on idempotent unlink calls. + """ + async with self._lock: + if self._data.pop(channel_key, None) is not None: + self._path.write_text(json.dumps(self._data, indent=2, sort_keys=True)) + + +@dataclass +class _PendingAuth: + """In-memory record of an authorize redirect waiting for its OAuth callback.""" + + channel: str + channel_id: str + expires_at: float + return_to: str | None = None + + +def _link_html(body: str, *, status: int = 200) -> HTMLResponse: + """Wrap ``body`` in a minimal HTML shell suitable for browser link UIs.""" + return HTMLResponse( + f"{body}", + status_code=status, + ) + + +def _load_certificate_credential(certificate_path: str | Path, certificate_password: bytes | None) -> dict[str, str]: + """Build the ``msal`` certificate credential dict from a PEM bundle. + + Expects ``certificate_path`` to point at a single PEM containing the + private key followed by the X.509 certificate (the layout produced by + ``cat key.pem cert.pem > combined.pem``). + """ + pem_bytes = Path(certificate_path).read_bytes() + private_key = serialization.load_pem_private_key(pem_bytes, password=certificate_password) + cert = x509.load_pem_x509_certificate(pem_bytes) + + private_key_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode() + public_cert_pem = cert.public_bytes(serialization.Encoding.PEM).decode() + # SHA-1 thumbprint is required by the Entra ``client_assertion`` spec for cert auth — not a security choice. + thumbprint = cert.fingerprint(hashes.SHA1()).hex() # noqa: S303 + return { + "private_key": private_key_pem, + "thumbprint": thumbprint, + "public_certificate": public_cert_pem, + } + + +class EntraIdentityLinkChannel: + """Sidecar Channel exposing ``GET /auth/start`` and ``GET /auth/callback``. + + Demonstrates that ``Channel`` is a general extensibility point — not just + for chat surfaces. Owns the Entra OAuth Authorization Code flow used to + bind a per-channel id (e.g. Telegram chat id) to the user's Entra object + id. + + Two credential modes are supported (mutually exclusive): + + * ``client_secret`` — classic confidential-client secret. + * ``certificate_path`` — PEM bundle (private key + certificate) for + tenants that disallow secrets. See ``teams.py`` module docstring for + an ``openssl`` recipe; the same PEM works here. + + Flow (OAuth 2.0 Authorization Code, confidential client): + + 1. ``GET /auth/start?channel=&id=`` mints a one-shot + ``state`` token and 302s to the Entra ``authorize`` endpoint. + 2. User signs in; Entra calls ``GET /auth/callback?code=...&state=...``. + 3. We exchange the code for a token (via ``msal`` so secret + cert auth + look identical at the call site), call Microsoft Graph ``/me`` to + read ``id`` (oid), persist ``: → entra:``, and + respond with a friendly HTML page (or 302 to ``return_to``). + + Tokens never leave the host process; only the ``oid`` claim is stored. + """ + + name = "identity" + path = "/auth" + + _AUTHORITY_TEMPLATE = "https://login.microsoftonline.com/{tenant}" + _GRAPH_ME = "https://graph.microsoft.com/v1.0/me" + _PENDING_TTL_SECONDS = 600 # 10 minutes + + def __init__( + self, + *, + store: EntraIdentityStore, + tenant_id: str, + client_id: str, + public_base_url: str, + client_secret: str | None = None, + certificate_path: str | Path | None = None, + certificate_password: bytes | None = None, + scope: str = "openid profile User.Read", + ) -> None: + if bool(client_secret) == bool(certificate_path): + raise ValueError("IdentityLinkChannel: pass exactly one of client_secret or certificate_path.") + if certificate_path is not None: + credential: str | dict[str, str] = _load_certificate_credential(certificate_path, certificate_password) + self._auth_kind = "certificate" + else: + credential = client_secret # type: ignore[assignment] + self._auth_kind = "client_secret" + + self._store = store + self._tenant_id = tenant_id + self._client_id = client_id + self._public_base_url = public_base_url.rstrip("/") + self._scopes = [s for s in scope.split() if s and s.lower() not in {"openid", "profile", "offline_access"}] + # MSAL ConfidentialClientApplication is sync; we wrap blocking calls + # in ``asyncio.to_thread`` because token endpoint calls do real I/O. + self._msal_app = msal.ConfidentialClientApplication( + client_id=client_id, + authority=self._AUTHORITY_TEMPLATE.format(tenant=tenant_id), + client_credential=credential, + ) + self._pending: dict[str, _PendingAuth] = {} + self._http: httpx.AsyncClient | None = None + + @property + def redirect_uri(self) -> str: + """The fully-qualified OAuth redirect URI registered with Entra ID. + + Computed from ``public_base_url`` plus the channel's mount path so + operators can copy it straight into the app registration's reply URLs. + """ + return f"{self._public_base_url}{self.path}/callback" + + def contribute(self, context: "ChannelContext") -> "ChannelContribution": + """Mount the ``/start`` and ``/callback`` routes plus lifecycle hooks.""" + return ChannelContribution( + routes=[ + Route("/start", self._handle_start, methods=["GET"]), + Route("/callback", self._handle_callback, methods=["GET"]), + ], + on_startup=[self._on_startup], + on_shutdown=[self._on_shutdown], + ) + + async def _on_startup(self) -> None: + """Open the shared HTTP client used for Microsoft Graph calls.""" + self._http = httpx.AsyncClient(timeout=15.0) + logger.info( + "IdentityLinkChannel ready (auth=%s); redirect_uri=%s", + self._auth_kind, + self.redirect_uri, + ) + + async def _on_shutdown(self) -> None: + """Close the Graph HTTP client; safe to call when never started.""" + if self._http is not None: + await self._http.aclose() + + def authorize_url_for(self, channel: str, channel_id: str, return_to: str | None = None) -> str: + """Mint a one-shot authorize URL the user can visit to bind their account.""" + state = secrets.token_urlsafe(24) + self._gc_pending() + self._pending[state] = _PendingAuth( + channel=channel, + channel_id=str(channel_id), + expires_at=time.monotonic() + self._PENDING_TTL_SECONDS, + return_to=return_to, + ) + return str( + self._msal_app.get_authorization_request_url( + scopes=self._scopes, + redirect_uri=self.redirect_uri, + state=state, + prompt="select_account", + ) + ) + + def _gc_pending(self) -> None: + """Drop expired pending-auth entries so the in-memory map cannot grow unbounded.""" + now = time.monotonic() + for key, entry in list(self._pending.items()): + if entry.expires_at < now: + self._pending.pop(key, None) + + async def _handle_start(self, request: Request) -> Response: + """``GET /start?channel=&id=&return_to=`` — redirect the user to Entra to sign in. + + The caller (typically a channel command like ``/link`` in Telegram or + a Teams adaptive-card button) hands the user this URL; we mint the + authorize URL and 302 to it. + """ + channel = request.query_params.get("channel") + channel_id = request.query_params.get("id") + return_to = request.query_params.get("return_to") + if not channel or not channel_id: + return _link_html("Missing 'channel' or 'id' query parameter.", status=400) + url = self.authorize_url_for(channel, channel_id, return_to=return_to) + return RedirectResponse(url, status_code=302) + + async def _handle_callback(self, request: Request) -> Response: + """``GET /callback`` — finish the OAuth flow and persist the link. + + Exchanges the authorization code for a token, reads the user's + ``id``/``userPrincipalName`` from Microsoft Graph, then stores the + ``channel:channel_id -> entra:`` mapping in the identity store. + Renders a small HTML page so a browser-based flow has something to + show; if ``return_to`` was supplied it appears as a deep link. + """ + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("entra identity channel not started") + if error := request.query_params.get("error"): + description = request.query_params.get("error_description", "") + return _link_html(f"Sign-in failed: {error}
{description}", status=400) + + code = request.query_params.get("code") + state = request.query_params.get("state") + pending = self._pending.pop(state or "", None) + if not code or pending is None or pending.expires_at < time.monotonic(): + return _link_html("Invalid or expired sign-in state. Please retry.", status=400) + + # MSAL handles client_secret vs client_assertion (cert) under the hood. + result: dict[str, Any] = await asyncio.to_thread( + self._msal_app.acquire_token_by_authorization_code, + code, + scopes=self._scopes, + redirect_uri=self.redirect_uri, + ) + if "access_token" not in result: + logger.warning("Entra token exchange failed: %s", result) + return _link_html( + f"Token exchange failed: {result.get('error_description', result.get('error'))}", + status=502, + ) + access_token = result["access_token"] + + me = await self._http.get(self._GRAPH_ME, headers={"Authorization": f"Bearer {access_token}"}) + if me.status_code != 200: + return _link_html("Could not read user profile from Microsoft Graph.", status=502) + profile = me.json() + oid = profile.get("id") + upn = profile.get("userPrincipalName") or profile.get("displayName") or oid + if not oid: + return _link_html("Profile response missing 'id'.", status=502) + + channel_key = f"{pending.channel}:{pending.channel_id}" + await self._store.link(channel_key, oid) + logger.info("Linked %s → entra:%s (%s)", channel_key, oid, upn) + + if pending.return_to: + return RedirectResponse(pending.return_to, status_code=302) + return _link_html( + f"

Linked

{channel_key} is now bound to {upn}.

" + "

You can close this window and return to your chat.

" + ) diff --git a/python/packages/hosting-entra/pyproject.toml b/python/packages/hosting-entra/pyproject.toml new file mode 100644 index 0000000000..45264741d5 --- /dev/null +++ b/python/packages/hosting-entra/pyproject.toml @@ -0,0 +1,108 @@ +[project] +name = "agent-framework-hosting-entra" +description = "Microsoft Entra (Azure AD) OAuth-based identity-linking channel for agent-framework-hosting." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0a260424" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.2.0,<2", + "agent-framework-hosting==1.0.0a260424", + "httpx>=0.27,<1", + "msal>=1.28,<2", + "cryptography>=42", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'", + "sys_platform == 'win32'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +filterwarnings = [] +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +include = ["agent_framework_hosting_entra"] +exclude = ['tests'] +# Bot Framework activities arrive as loosely-typed JSON-ish maps. Strict +# ``Unknown`` reporting on every ``.get(...)`` adds noise without catching +# real bugs — narrowing happens via runtime isinstance checks instead. +reportUnknownArgumentType = "none" +reportUnknownMemberType = "none" +reportUnknownVariableType = "none" +reportUnknownLambdaType = "none" +reportOptionalMemberAccess = "none" + +[tool.mypy] +plugins = ['pydantic.mypy'] +strict = true +python_version = "3.10" +ignore_missing_imports = true +disallow_untyped_defs = true +no_implicit_optional = true +check_untyped_defs = true +warn_return_any = true +show_error_codes = true +warn_unused_ignores = false +disallow_incomplete_defs = true +disallow_untyped_decorators = true + +[tool.bandit] +targets = ["agent_framework_hosting_entra"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks.mypy] +help = "Run MyPy for this package." +cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hosting_entra" + +[tool.poe.tasks.test] +help = "Run the default unit test suite for this package." +cmd = 'pytest -m "not integration" --cov=agent_framework_hosting_entra --cov-report=term-missing:skip-covered tests' + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" diff --git a/python/packages/hosting-entra/tests/__init__.py b/python/packages/hosting-entra/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/packages/hosting-entra/tests/test_channel.py b/python/packages/hosting-entra/tests/test_channel.py new file mode 100644 index 0000000000..d37df98676 --- /dev/null +++ b/python/packages/hosting-entra/tests/test_channel.py @@ -0,0 +1,197 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Unit tests for :mod:`agent_framework_hosting_entra`. + +The MSAL ``ConfidentialClientApplication`` and Microsoft Graph calls are +mocked out so no network access is required. Live OAuth, certificate auth, +and full webhook flow are out of scope here. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from starlette.applications import Starlette +from starlette.testclient import TestClient + +from agent_framework_hosting_entra import ( + EntraIdentityLinkChannel, + EntraIdentityStore, + entra_isolation_key, +) + + +def test_entra_isolation_key_format() -> None: + assert entra_isolation_key("abc123") == "entra:abc123" + + +class TestEntraIdentityStore: + @pytest.mark.asyncio + async def test_link_writes_entra_namespaced_value(self, tmp_path: Path) -> None: + store = EntraIdentityStore(tmp_path / "links.json") + await store.link("telegram:42", "oid-xyz") + assert store.lookup("telegram:42") == "entra:oid-xyz" + # Persisted to disk. + saved = json.loads((tmp_path / "links.json").read_text()) + assert saved == {"telegram:42": "entra:oid-xyz"} + + @pytest.mark.asyncio + async def test_unlink_removes_entry(self, tmp_path: Path) -> None: + store = EntraIdentityStore(tmp_path / "links.json") + await store.link("telegram:42", "oid") + await store.unlink("telegram:42") + assert store.lookup("telegram:42") is None + assert json.loads((tmp_path / "links.json").read_text()) == {} + + @pytest.mark.asyncio + async def test_unlink_unknown_is_noop(self, tmp_path: Path) -> None: + store = EntraIdentityStore(tmp_path / "links.json") + await store.unlink("telegram:never") # must not raise + assert not (tmp_path / "links.json").exists() + + def test_loads_existing_file(self, tmp_path: Path) -> None: + path = tmp_path / "links.json" + path.write_text(json.dumps({"telegram:1": "entra:abc"})) + store = EntraIdentityStore(path) + assert store.lookup("telegram:1") == "entra:abc" + + def test_corrupt_file_starts_empty(self, tmp_path: Path) -> None: + path = tmp_path / "links.json" + path.write_text("not-json") + store = EntraIdentityStore(path) + assert store.lookup("anything") is None + + +class TestEntraIdentityLinkChannelConfig: + def test_rejects_neither_credential(self, tmp_path: Path) -> None: + with pytest.raises(ValueError, match="exactly one"): + EntraIdentityLinkChannel( + store=EntraIdentityStore(tmp_path / "x.json"), + tenant_id="t", + client_id="c", + public_base_url="https://example.com", + ) + + def test_rejects_both_credentials(self, tmp_path: Path) -> None: + with pytest.raises(ValueError, match="exactly one"): + EntraIdentityLinkChannel( + store=EntraIdentityStore(tmp_path / "x.json"), + tenant_id="t", + client_id="c", + public_base_url="https://example.com", + client_secret="s", + certificate_path="/tmp/does-not-exist.pem", + ) + + def test_redirect_uri_strips_trailing_slash(self, tmp_path: Path) -> None: + with patch( + "agent_framework_hosting_entra._channel.msal.ConfidentialClientApplication", + MagicMock(), + ): + ch = EntraIdentityLinkChannel( + store=EntraIdentityStore(tmp_path / "x.json"), + tenant_id="t", + client_id="c", + public_base_url="https://example.com/", + client_secret="s", + ) + assert ch.redirect_uri == "https://example.com/auth/callback" + + +class TestEntraIdentityLinkChannelRoutes: + def _make_channel(self, tmp_path: Path, msal_app: MagicMock) -> tuple[EntraIdentityLinkChannel, EntraIdentityStore]: + store = EntraIdentityStore(tmp_path / "links.json") + with patch( + "agent_framework_hosting_entra._channel.msal.ConfidentialClientApplication", + return_value=msal_app, + ): + ch = EntraIdentityLinkChannel( + store=store, + tenant_id="tenant-1", + client_id="client-1", + public_base_url="https://example.com", + client_secret="s", + ) + return ch, store + + def _mount_app(self, ch: EntraIdentityLinkChannel) -> Starlette: + # We don't depend on AgentFrameworkHost here — wire the routes + # directly so we can exercise the channel in isolation. + from starlette.routing import Mount + + contribution = ch.contribute(MagicMock()) + return Starlette(routes=[Mount(ch.path, routes=contribution.routes)]) + + def test_start_missing_params_returns_400(self, tmp_path: Path) -> None: + msal_app = MagicMock() + ch, _ = self._make_channel(tmp_path, msal_app) + with TestClient(self._mount_app(ch)) as client: + r = client.get("/auth/start", follow_redirects=False) + assert r.status_code == 400 + + def test_start_redirects_to_authorize_url(self, tmp_path: Path) -> None: + msal_app = MagicMock() + msal_app.get_authorization_request_url.return_value = ( + "https://login.microsoftonline.com/tenant-1/oauth2/v2.0/authorize?state=X" + ) + ch, _ = self._make_channel(tmp_path, msal_app) + with TestClient(self._mount_app(ch)) as client: + r = client.get( + "/auth/start", + params={"channel": "telegram", "id": "42"}, + follow_redirects=False, + ) + assert r.status_code == 302 + assert "login.microsoftonline.com" in r.headers["location"] + + def test_callback_invalid_state_returns_400(self, tmp_path: Path) -> None: + msal_app = MagicMock() + ch, _ = self._make_channel(tmp_path, msal_app) + ch._http = MagicMock(aclose=AsyncMock()) + with TestClient(self._mount_app(ch)) as client: + r = client.get("/auth/callback", params={"code": "c", "state": "unknown"}) + assert r.status_code == 400 + + def test_callback_links_oid_on_success(self, tmp_path: Path) -> None: + msal_app = MagicMock() + msal_app.get_authorization_request_url.return_value = ( + "https://login.microsoftonline.com/tenant-1/authorize?state=X" + ) + msal_app.acquire_token_by_authorization_code.return_value = {"access_token": "t"} + ch, store = self._make_channel(tmp_path, msal_app) + + # Fake the Graph /me call. + graph_response = MagicMock() + graph_response.status_code = 200 + graph_response.json = MagicMock(return_value={"id": "oid-xyz", "userPrincipalName": "user@x"}) + ch._http = MagicMock() + ch._http.get = AsyncMock(return_value=graph_response) + ch._http.aclose = AsyncMock() + + # Mint a real state via the public API so the pending dict is populated. + ch.authorize_url_for("telegram", "42") + state = next(iter(ch._pending.keys())) + + with TestClient(self._mount_app(ch)) as client: + r = client.get("/auth/callback", params={"code": "abc", "state": state}) + assert r.status_code == 200 + assert store.lookup("telegram:42") == "entra:oid-xyz" + + def test_callback_token_failure_returns_502(self, tmp_path: Path) -> None: + msal_app = MagicMock() + msal_app.get_authorization_request_url.return_value = "https://x" + msal_app.acquire_token_by_authorization_code.return_value = { + "error": "invalid_grant", + "error_description": "expired", + } + ch, store = self._make_channel(tmp_path, msal_app) + ch._http = MagicMock(aclose=AsyncMock()) + ch.authorize_url_for("telegram", "42") + state = next(iter(ch._pending.keys())) + with TestClient(self._mount_app(ch)) as client: + r = client.get("/auth/callback", params={"code": "c", "state": state}) + assert r.status_code == 502 + assert store.lookup("telegram:42") is None diff --git a/python/pyproject.toml b/python/pyproject.toml index 406aaf6efc..65cba40963 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -86,6 +86,7 @@ agent-framework-foundry-local = { workspace = true } agent-framework-gemini = { workspace = true } agent-framework-github-copilot = { workspace = true } agent-framework-hosting = { workspace = true } +agent-framework-hosting-entra = { workspace = true } agent-framework-hyperlight = { workspace = true } agent-framework-lab = { workspace = true } agent-framework-mem0 = { workspace = true } @@ -207,6 +208,7 @@ executionEnvironments = [ { root = "packages/foundry_local/tests", reportPrivateUsage = "none" }, { root = "packages/github_copilot/tests", reportPrivateUsage = "none" }, { root = "packages/hosting/tests", reportPrivateUsage = "none" }, + { root = "packages/hosting-entra/tests", reportPrivateUsage = "none" }, { root = "packages/lab/gaia/tests", reportPrivateUsage = "none" }, { root = "packages/lab/lightning/tests", reportPrivateUsage = "none" }, { root = "packages/lab/tau2/tests", reportPrivateUsage = "none" }, diff --git a/python/uv.lock b/python/uv.lock index a4b58d2270..528fa8aa5d 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -48,6 +48,7 @@ members = [ "agent-framework-gemini", "agent-framework-github-copilot", "agent-framework-hosting", + "agent-framework-hosting-entra", "agent-framework-hyperlight", "agent-framework-lab", "agent-framework-mem0", @@ -630,6 +631,27 @@ provides-extras = ["serve"] [package.metadata.requires-dev] dev = [{ name = "httpx", specifier = ">=0.28.1" }] +[[package]] +name = "agent-framework-hosting-entra" +version = "1.0.0a260424" +source = { editable = "packages/hosting-entra" } +dependencies = [ + { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "agent-framework-hosting", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "cryptography", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "msal", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.metadata] +requires-dist = [ + { name = "agent-framework-core", editable = "packages/core" }, + { name = "agent-framework-hosting", editable = "packages/hosting" }, + { name = "cryptography", specifier = ">=42" }, + { name = "httpx", specifier = ">=0.27,<1" }, + { name = "msal", specifier = ">=1.28,<2" }, +] + [[package]] name = "agent-framework-hyperlight" version = "1.0.0a260429" From 487a7451ad1d786bb393bf809084e41b80a1233a Mon Sep 17 00:00:00 2001 From: eavanvalkenburg Date: Thu, 7 May 2026 16:48:10 +0200 Subject: [PATCH 3/3] fix(hosting-entra): close IDOR + reflected-XSS + open-redirect on the OAuth flow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three SECURITY-CRITICAL fixes flagged in round-2 review. 1. IDOR on /auth/start (3198518308). Without authentication the endpoint accepted (channel, channel_id) from the query string and bound *whoever signed in* to that pair. An attacker could bind their own Entra oid to a victim's per-channel id (e.g. `telegram:`), redirecting all of the victim's future inbound traffic to the attacker's isolation key. Fix: introduce link_token_secret + mint_start_url(channel, id, ...). When set, /auth/start requires `exp` + `sig` (HMAC-SHA256 over `channel|channel_id|expires_at`) before issuing the redirect. Channels that hand out start URLs (a Telegram /link command after verifying the inbound webhook signature) call mint_start_url so the token proves the (channel, id) pair was authorised by the channel that owns the surface. Unsigned mode is opt-in and logs a loud WARNING at startup *and* on every accepted request. 2. Reflected XSS on /auth/callback (3198520256, 3198527896). `error`, `error_description`, channel_key (from the unauthenticated /start query), and `upn` (from a Graph response) flowed straight into the text/html response body unescaped. With the IDOR above, an attacker could stash `", + "error_description": "", + }, + ) + assert r.status_code == 400 + assert "@x"} + ) + ch._http = MagicMock(aclose=AsyncMock()) + ch._http.get = AsyncMock(return_value=graph_response) + # Mint a binding via authorize_url_for (channel-side trusted call). + ch.authorize_url_for("", "42") + state = next(iter(ch._pending.keys())) + with TestClient(self._mount(ch)) as client: + r = client.get("/auth/callback", params={"code": "abc", "state": state}) + assert r.status_code == 200 + assert "