diff --git a/python/packages/hosting-telegram/LICENSE b/python/packages/hosting-telegram/LICENSE new file mode 100644 index 0000000000..9e841e7a26 --- /dev/null +++ b/python/packages/hosting-telegram/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/python/packages/hosting-telegram/README.md b/python/packages/hosting-telegram/README.md new file mode 100644 index 0000000000..aa97e07417 --- /dev/null +++ b/python/packages/hosting-telegram/README.md @@ -0,0 +1,29 @@ +# agent-framework-hosting-telegram + +Telegram channel for [agent-framework-hosting](../hosting). Supports both +**polling** (default — no public URL required, perfect for local dev) and +**webhook** transports, multi-content messages (text + media), command +registration, and end-to-end SSE-style streaming via Telegram message edits. + +## Usage + +```python +from agent_framework_hosting import AgentFrameworkHost +from agent_framework_hosting_telegram import TelegramChannel + +host = AgentFrameworkHost( + target=my_agent, + channels=[TelegramChannel(bot_token="...")], +) +host.serve() +``` + +For production, configure `webhook_url="https://…"` and the channel will +register the webhook on startup and receive updates over HTTPS. + +## Identity & sessions + +Each Telegram chat is mapped to an opaque isolation key +(`telegram:`) so other channels can opt into the same per-chat +session by reusing the same key. The helper `telegram_isolation_key(chat_id)` +is exported for that purpose. diff --git a/python/packages/hosting-telegram/agent_framework_hosting_telegram/__init__.py b/python/packages/hosting-telegram/agent_framework_hosting_telegram/__init__.py new file mode 100644 index 0000000000..0be4a26381 --- /dev/null +++ b/python/packages/hosting-telegram/agent_framework_hosting_telegram/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Telegram channel for :mod:`agent_framework_hosting`.""" + +from ._channel import TelegramChannel, telegram_isolation_key + +__all__ = ["TelegramChannel", "telegram_isolation_key"] diff --git a/python/packages/hosting-telegram/agent_framework_hosting_telegram/_channel.py b/python/packages/hosting-telegram/agent_framework_hosting_telegram/_channel.py new file mode 100644 index 0000000000..27d9237a07 --- /dev/null +++ b/python/packages/hosting-telegram/agent_framework_hosting_telegram/_channel.py @@ -0,0 +1,714 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Built-in channel: Telegram (polling + webhook transports). + +Inspired by PR #5393's Telegram sample. Two transports are supported: + +- ``polling`` (default when no ``webhook_url`` is set): the channel runs a + background ``getUpdates`` long-poll loop. No public URL required — + perfect for local development. This is what ``python-telegram-bot`` + uses by default. +- ``webhook``: when ``webhook_url`` is set, the channel registers it via + ``setWebhook`` on startup and receives updates over HTTPS POSTs to the + mounted ``/webhook`` route. This is the production-recommended mode. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import time +from collections.abc import Awaitable, Callable, Mapping, Sequence +from typing import Any, Literal + +import httpx +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + Content, + Message, + ResponseStream, +) +from agent_framework_hosting import ( + ChannelCommand, + ChannelCommandContext, + ChannelContext, + ChannelContribution, + ChannelIdentity, + ChannelRequest, + ChannelRunHook, + ChannelSession, + ChannelStreamTransformHook, + HostedRunResult, + apply_run_hook, + logger, +) +from starlette.requests import Request +from starlette.responses import JSONResponse, Response +from starlette.routing import BaseRoute, Route + +# Telegram update parsing ------------------------------------------------------ +# +# A Telegram message can carry text, a caption, and one of several media kinds +# (photo, document, voice, audio, video). For media we resolve the file_id +# into a public bot-file URL via ``getFile`` and emit a ``Content.from_uri``; +# the agent then receives a multi-content Message with text + media side by +# side, the same as it would over the Responses API. + +_TELEGRAM_MEDIA_DEFAULT_MIMETYPE = { + "photo": "image/jpeg", + "document": "application/octet-stream", + "voice": "audio/ogg", + "audio": "audio/mpeg", + "video": "video/mp4", +} + +# Telegram's hard limit on a single message body. Past this, sendMessage / +# editMessageText return 400. We truncate interim and final edits at this +# boundary; if the agent emits more, callers can split into a follow-up +# sendMessage in their run hook. +_TELEGRAM_MAX_TEXT_LEN = 4096 + + +def telegram_isolation_key(chat_id: Any) -> str: + """Build the namespaced isolation key the Telegram channel writes under. + + Exposed at module scope so other channels' ``run_hook`` callbacks can opt + into the same per-chat session (e.g. a Responses caller resuming a + Telegram conversation by passing the chat id). + """ + return f"telegram:{chat_id}" + + +def _telegram_media_file_id(message: Mapping[str, Any]) -> tuple[str, str] | None: + """Return ``(file_id, fallback_media_type)`` for any media on the message.""" + photo = message.get("photo") + if isinstance(photo, list) and photo: + # Telegram delivers photos as an array of progressively-larger sizes. + largest = photo[-1] + if isinstance(largest, Mapping) and (fid := largest.get("file_id")): + return str(fid), _TELEGRAM_MEDIA_DEFAULT_MIMETYPE["photo"] + for kind in ("document", "voice", "audio", "video"): + media = message.get(kind) + if media and isinstance(media, Mapping) and (fid := media.get("file_id")): + return str(fid), str(media.get("mime_type") or _TELEGRAM_MEDIA_DEFAULT_MIMETYPE[kind]) + return None + + +async def _parse_telegram_message( + message: Mapping[str, Any], + resolve_file_url: Callable[[str], Awaitable[str | None]], +) -> Message: + """Translate one Telegram ``message`` object into an Agent Framework Message.""" + parts: list[Content] = [] + if (text := message.get("text") or message.get("caption")) and isinstance(text, str): + parts.append(Content.from_text(text=text)) + + if (media := _telegram_media_file_id(message)) is not None: + file_id, media_type = media + if (uri := await resolve_file_url(file_id)) is not None: + parts.append(Content.from_uri(uri=uri, media_type=media_type)) + + if not parts: + # Edge case: no recognizable content — emit an empty placeholder so the + # agent contract still receives a Message and can react gracefully. + parts.append(Content.from_text(text="")) + return Message("user", parts) + + +class TelegramChannel: + """Telegram channel with both polling and webhook transports. + + Update kinds handled (both transports): + - ``message`` / ``edited_message`` — text, captions, and media + (photo/document/voice/audio/video). + - ``callback_query`` — inline-button presses; the ``data`` payload is + treated as the user's next utterance and the click is acknowledged. + + Streaming + --------- + The channel defaults to ``stream=True`` on every ``ChannelRequest``: it + drives ``ChannelContext.run_stream`` and progressively edits a single + Telegram message as ``AgentResponseUpdate`` chunks arrive (Telegram has + no native streaming primitive). Pass ``stream=False`` on the constructor + to opt out for all messages, or override per-request inside the + ``run_hook`` (set ``ChannelRequest.stream = False``). A ``stream_transform_hook`` + can rewrite or drop individual updates before they hit the wire — useful + for redaction, formatting, or merging tool-call deltas. + """ + + name = "telegram" + + def __init__( + self, + *, + bot_token: str, + path: str = "/telegram", + commands: Sequence[ChannelCommand] = (), + register_native_commands: bool = True, + run_hook: ChannelRunHook | None = None, + api_base: str = "https://api.telegram.org", + webhook_url: str | None = None, + secret_token: str | None = None, + parse_mode: str | None = None, + send_typing_action: bool = True, + transport: Literal["auto", "polling", "webhook"] = "auto", + polling_timeout: int = 30, + stream: bool = True, + stream_transform_hook: ChannelStreamTransformHook | None = None, + stream_edit_min_interval: float = 0.4, + ) -> None: + self.path = path + self._token = bot_token + self._commands = list(commands) + self._register = register_native_commands + self._hook = run_hook + self._stream_default = stream + self._stream_transform_hook = stream_transform_hook + self._stream_edit_min_interval = stream_edit_min_interval + self._api = f"{api_base}/bot{bot_token}" + self._webhook_url = webhook_url + self._secret_token = secret_token + self._parse_mode = parse_mode + self._send_typing_action = send_typing_action + if transport == "auto": + transport = "webhook" if webhook_url else "polling" + if transport == "webhook" and not webhook_url: + raise ValueError("transport='webhook' requires webhook_url") + self._transport: Literal["polling", "webhook"] = transport + self._polling_timeout = polling_timeout + self._ctx: ChannelContext | None = None + self._http: httpx.AsyncClient | None = None + self._poll_task: asyncio.Task[None] | None = None + self._update_tasks: set[asyncio.Task[None]] = set() + + def contribute(self, context: ChannelContext) -> ChannelContribution: + """Register the webhook route (only in ``webhook`` transport) plus lifecycle hooks. + + Polling-mode hosts intentionally expose no HTTP route — adding one + would just confuse readers who expect inbound HTTP traffic to do + something. + """ + self._ctx = context + routes: list[BaseRoute] = [] + if self._transport == "webhook": + routes.append(Route("/webhook", self._handle, methods=["POST"])) + return ChannelContribution( + routes=routes, + commands=self._commands, + on_startup=[self._on_startup], + on_shutdown=[self._on_shutdown], + ) + + # -- lifecycle --------------------------------------------------------- # + + async def _on_startup(self) -> None: + """Open the HTTP client, optionally register slash commands, and start the transport. + + - Polling: clears any previously-set webhook (Telegram refuses + ``getUpdates`` while one is registered) and launches the + long-poll task. + - Webhook: ``setWebhook`` to the configured URL, including the + optional secret token used to authenticate inbound calls. + """ + # ``getUpdates`` blocks for up to ``polling_timeout`` seconds, so the + # client timeout has to comfortably exceed it. Skip when a client has + # been pre-injected (e.g. by tests). + if self._http is None: + self._http = httpx.AsyncClient(timeout=self._polling_timeout + 15) + if self._register and self._commands: + cmd_payload: dict[str, Any] = { + "commands": [{"command": c.name, "description": c.description} for c in self._commands] + } + await self._http.post(f"{self._api}/setMyCommands", json=cmd_payload) + logger.info("Registered %d Telegram commands", len(self._commands)) + + if self._transport == "webhook": + payload: dict[str, Any] = { + "url": self._webhook_url, + "allowed_updates": ["message", "edited_message", "callback_query"], + } + if self._secret_token: + payload["secret_token"] = self._secret_token + response = await self._http.post(f"{self._api}/setWebhook", json=payload) + response.raise_for_status() + logger.info("Telegram webhook registered: %s", self._webhook_url) + else: + # Telegram refuses getUpdates while a webhook is set, so clear it. + await self._http.post(f"{self._api}/deleteWebhook", json={"drop_pending_updates": False}) + self._poll_task = asyncio.create_task(self._poll_loop(), name="telegram-poll") + logger.info("Telegram polling started (long-poll timeout=%ss)", self._polling_timeout) + + async def _on_shutdown(self) -> None: + """Stop the polling task, drop the webhook registration, close the HTTP client. + + Webhook teardown is best-effort — failures (e.g. revoked token at + shutdown) are logged but never raised so app shutdown can complete. + """ + if self._poll_task is not None: + self._poll_task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await self._poll_task + self._poll_task = None + if self._http is not None: + if self._transport == "webhook": + try: + await self._http.post(f"{self._api}/deleteWebhook") + except Exception: # pragma: no cover - best-effort cleanup + logger.exception("deleteWebhook failed") + await self._http.aclose() + + # -- polling loop ------------------------------------------------------ # + + async def _poll_loop(self) -> None: + """Long-poll ``getUpdates`` until cancelled. + + Each batch advances the ``offset`` by the highest seen + ``update_id`` so processed updates aren't redelivered. Updates are + dispatched to per-update tasks so a slow agent invocation cannot + block the next poll iteration; ordering inside a chat is still + preserved by Telegram-side queueing. Transient errors back off for + 2 seconds before retrying. + """ + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + offset: int | None = None + while True: + try: + params: dict[str, Any] = { + "timeout": self._polling_timeout, + "allowed_updates": '["message","edited_message","callback_query"]', + } + if offset is not None: + params["offset"] = offset + response = await self._http.get(f"{self._api}/getUpdates", params=params) + response.raise_for_status() + payload = response.json() + if not payload.get("ok"): + logger.warning("Telegram getUpdates returned error: %s", payload) + await asyncio.sleep(1.0) + continue + for update in payload.get("result", []) or []: + update_id = update.get("update_id") + if isinstance(update_id, int): + offset = update_id + 1 + # Each update is processed in its own task so a slow agent + # call doesn't stall the next poll. Order within a chat is + # still preserved by Telegram-side queueing. + task = asyncio.create_task(self._safe_process_update(update)) + self._update_tasks.add(task) + task.add_done_callback(self._update_tasks.discard) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("Telegram polling iteration failed; retrying in 2s") + await asyncio.sleep(2.0) + + async def _safe_process_update(self, update: Mapping[str, Any]) -> None: + """Wrap :meth:`_process_update` so a failure on one update never escapes a task.""" + try: + await self._process_update(update) + except Exception: + logger.exception("Telegram update processing failed: %s", update.get("update_id")) + + # -- request handling -------------------------------------------------- # + + async def _handle(self, request: Request) -> Response: + """Webhook endpoint — verifies the secret token then queues the update. + + Telegram includes the configured secret in the + ``X-Telegram-Bot-Api-Secret-Token`` header on every webhook delivery; + we reject mismatches so leaked URLs alone aren't enough to inject + traffic. + """ + if self._secret_token is not None: + received = request.headers.get("x-telegram-bot-api-secret-token") + if received != self._secret_token: + logger.warning("Telegram webhook secret token mismatch — rejecting update") + return JSONResponse({"ok": False, "error": "invalid secret"}, status_code=401) + + update = await request.json() + await self._safe_process_update(update) + return JSONResponse({"ok": True}) + + async def _process_update(self, update: Mapping[str, Any]) -> None: + """Convert one Telegram update into a :class:`ChannelRequest` and dispatch. + + Branches: + - ``callback_query`` — inline-button click; handled separately so we + can ack the click and treat the button payload as the next user + utterance. + - ``message`` / ``edited_message`` — the common text-and-attachment + case; runs slash commands when present, otherwise builds a + message and dispatches to the agent. + """ + if self._ctx is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + + # Inline-button presses: ack the click, treat the payload as input. + if (callback := update.get("callback_query")) is not None: + await self._handle_callback_query(callback) + return + + # message and edited_message share the same shape. + message = update.get("message") or update.get("edited_message") or {} + chat_id = (message.get("chat") or {}).get("id") + text = message.get("text") or message.get("caption") + has_media = any(k in message for k in ("photo", "document", "voice", "audio", "video")) + if chat_id is None or (not isinstance(text, str) and not has_media): + return # Nothing actionable. + + # Native command dispatch — bypasses the agent. + if isinstance(text, str) and text.startswith("/"): + command_name = text[1:].split()[0].split("@", 1)[0] + handler = next((c for c in self._commands if c.name == command_name), None) + if handler is not None: + channel_request = ChannelRequest( + channel=self.name, + operation="command.invoke", + input=text, + session=ChannelSession(isolation_key=telegram_isolation_key(chat_id)), + attributes={"chat_id": chat_id}, + identity=ChannelIdentity(channel=self.name, native_id=str(chat_id)), + ) + ctx = ChannelCommandContext( + request=channel_request, + reply=lambda body, cid=chat_id: self._send(cid, body), + ) + await handler.handle(ctx) + return + + # Plain message → agent run. Build a multi-content Message with the + # text/caption alongside any attached media (photo, document, ...). + parsed = await _parse_telegram_message(message, self._resolve_file_url) + channel_request = ChannelRequest( + channel=self.name, + operation="message.create", + input=[parsed], + session=ChannelSession(isolation_key=telegram_isolation_key(chat_id)), + attributes={"chat_id": chat_id}, + stream=self._stream_default, + identity=ChannelIdentity(channel=self.name, native_id=str(chat_id)), + ) + if self._hook is not None: + channel_request = await apply_run_hook( + self._hook, + channel_request, + target=self._ctx.target, + protocol_request=update, + ) + + await self._dispatch(chat_id, channel_request) + + async def _handle_callback_query(self, callback: Mapping[str, Any]) -> None: + """Handle an inline-button click. + + Always answers the callback query to clear the spinner on the user's + client, then treats the button's ``data`` payload as the user's + next utterance and dispatches it as if they had typed it. + Callbacks without a chat or string ``data`` are silently dropped. + """ + if self._ctx is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + callback_id = callback.get("id") + data = callback.get("data") + message = callback.get("message") or {} + chat_id = (message.get("chat") or {}).get("id") + + if callback_id is not None: + # Always answer to remove the loading spinner on the user's client. + try: + await self._http.post(f"{self._api}/answerCallbackQuery", json={"callback_query_id": callback_id}) + except Exception: # pragma: no cover - defensive + logger.exception("answerCallbackQuery failed") + + if chat_id is None or not isinstance(data, str): + return + + channel_request = ChannelRequest( + channel=self.name, + operation="message.create", + input=data, + session=ChannelSession(isolation_key=telegram_isolation_key(chat_id)), + attributes={"chat_id": chat_id, "callback_query_id": callback_id}, + stream=self._stream_default, + identity=ChannelIdentity(channel=self.name, native_id=str(chat_id)), + ) + if self._hook is not None: + channel_request = await apply_run_hook( + self._hook, + channel_request, + target=self._ctx.target, + protocol_request=callback, + ) + + await self._dispatch(chat_id, channel_request) + + async def _resolve_file_url(self, file_id: str) -> str | None: + """Resolve a Telegram file_id into an HTTPS URL via getFile.""" + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + try: + response = await self._http.get(f"{self._api}/getFile", params={"file_id": file_id}) + response.raise_for_status() + file_path = response.json().get("result", {}).get("file_path") + except Exception: # pragma: no cover - defensive: bad token, network, etc. + logger.exception("getFile failed for %s", file_id) + return None + return f"{self._api.replace('/bot', '/file/bot')}/{file_path}" if file_path else None + + # -- outbound helpers -------------------------------------------------- # + + async def _dispatch(self, chat_id: int, request: ChannelRequest) -> None: + """Run the request and forward results to ``chat_id``.""" + if self._ctx is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + if not request.stream: + if self._send_typing_action: + await self._send_chat_action(chat_id, "typing") + result = await self._ctx.run(request) + await self._reply_with_result(chat_id, result) + return + + stream = self._ctx.run_stream(request) + await self._stream_to_chat(chat_id, stream) + + async def _stream_to_chat( + self, + chat_id: int, + stream: ResponseStream[AgentResponseUpdate, AgentResponse], + ) -> None: + """Iterate the agent's ResponseStream and progressively edit a Telegram message. + + Smoothness recipe: + + 1. Send the placeholder message up front so the user sees instant + activity (a "…" bubble) instead of waiting for the first edit. + 2. Token consumption never awaits the network — a background + ``edit_worker`` watches an asyncio.Event, coalesces accumulated + text, rate-limits itself to ``stream_edit_min_interval`` (default + 0.4s — well under Telegram's per-chat edit limits), and only sends + when the text actually changed. + 3. Interim edits are sent as **plain text** even if a ``parse_mode`` + is configured. Partial Markdown/HTML mid-stream is invalid and + Telegram rejects it with 400 ``can't parse entities``. The final + edit re-applies the configured ``parse_mode`` so the user ends up + with formatted output. + 4. ``sendChatAction("typing")`` is re-issued every 4s while the + stream is live so the typing bubble doesn't disappear on long + responses (Telegram clears it after ~5s). + """ + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + # Pin to a local so mypy narrows inside the nested closures below. + http = self._http + + accumulated = "" + last_sent = "" + last_edit_at = 0.0 + message_id: int | None = None + worker_done = asyncio.Event() + wake = asyncio.Event() + + async def send_initial_placeholder() -> None: + nonlocal message_id, last_edit_at + try: + response = await http.post( + f"{self._api}/sendMessage", + json={"chat_id": chat_id, "text": "…"}, + ) + response.raise_for_status() + message_id = response.json().get("result", {}).get("message_id") + last_edit_at = time.monotonic() + except Exception: # pragma: no cover - placeholder is best-effort + logger.exception("Telegram placeholder send failed") + + async def edit_worker() -> None: + nonlocal last_sent, last_edit_at + while not (worker_done.is_set() and accumulated == last_sent): + await wake.wait() + wake.clear() + if message_id is None or accumulated == last_sent: + continue + elapsed = time.monotonic() - last_edit_at + if elapsed < self._stream_edit_min_interval: + try: + await asyncio.wait_for(wake.wait(), timeout=self._stream_edit_min_interval - elapsed) + wake.clear() + except asyncio.TimeoutError: + pass + snapshot = accumulated[:_TELEGRAM_MAX_TEXT_LEN] + if snapshot == last_sent: + continue + # Interim edits go out as plain text — partial Markdown/HTML + # is invalid mid-stream and Telegram returns 400. + try: + await http.post( + f"{self._api}/editMessageText", + json={"chat_id": chat_id, "message_id": message_id, "text": snapshot}, + ) + except Exception: # pragma: no cover - keep streaming on error + logger.exception("Telegram interim edit failed") + last_sent = snapshot + last_edit_at = time.monotonic() + + async def typing_worker() -> None: + while not worker_done.is_set(): + await self._send_chat_action(chat_id, "typing") + try: + await asyncio.wait_for(worker_done.wait(), timeout=4.0) + except asyncio.TimeoutError: + continue + + await send_initial_placeholder() + edit_task = asyncio.create_task(edit_worker(), name="telegram-edit-worker") + typing_task = asyncio.create_task(typing_worker(), name="telegram-typing-worker") + + try: + async for update in stream: + if self._stream_transform_hook is not None: + transformed = self._stream_transform_hook(update) + if isinstance(transformed, Awaitable): + transformed = await transformed + if transformed is None: + continue + update = transformed + chunk = getattr(update, "text", None) + if chunk: + accumulated += chunk + wake.set() + except Exception: + logger.exception("Telegram streaming consumption failed") + finally: + worker_done.set() + wake.set() + try: + await edit_task + except Exception: # pragma: no cover + logger.exception("Telegram edit worker crashed") + typing_task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await typing_task + + # Always finalize so context providers / history hooks run. + try: + final = await stream.get_final_response() + except Exception: # pragma: no cover - finalize is best-effort + logger.exception("Stream finalize failed") + final = None + + # Final edit applies parse_mode (if configured) to the full text. + final_text = (accumulated or last_sent)[:_TELEGRAM_MAX_TEXT_LEN] + if message_id is not None and final_text and final_text != last_sent: + payload: dict[str, Any] = { + "chat_id": chat_id, + "message_id": message_id, + "text": final_text, + } + if self._parse_mode: + payload["parse_mode"] = self._parse_mode + try: + response = await self._http.post(f"{self._api}/editMessageText", json=payload) + # If parse_mode rejected the final edit, retry as plain text + # so the user still sees the answer. + if response.status_code == 400 and self._parse_mode: + payload.pop("parse_mode", None) + await self._http.post(f"{self._api}/editMessageText", json=payload) + except Exception: # pragma: no cover + logger.exception("Telegram final edit failed") + + # If nothing ever streamed (no text chunks at all), fall back to the + # full result so images / tool outputs still reach the user. + if not accumulated: + await self._reply_with_result(chat_id, final) + + async def _reply_with_result(self, chat_id: int, result: Any) -> None: + """Forward an AgentRunResponse back to Telegram. + + Sends any image attachments on the last assistant message as photos, + then the text body via ``sendMessage``. Falls back to a ``"(no + response)"`` placeholder if neither text nor images are present so + the user is never left hanging. + """ + sent_photo = False + last_message = None + messages = getattr(result, "messages", None) or [] + for msg in reversed(messages): + if getattr(msg, "role", None) == "assistant": + last_message = msg + break + + if last_message is not None: + for content in getattr(last_message, "contents", []) or []: + uri = getattr(content, "uri", None) + media_type = getattr(content, "media_type", "") or "" + if uri and isinstance(media_type, str) and media_type.startswith("image/"): + await self._send_photo(chat_id, uri) + sent_photo = True + + text = getattr(result, "text", None) + if text: + await self._send(chat_id, text) + elif not sent_photo: + await self._send(chat_id, "(no response)") + + async def _send(self, chat_id: int, text: str, **extra: Any) -> None: + """POST a ``sendMessage`` to Telegram, applying the configured ``parse_mode`` by default. + + Extra kwargs are merged into the payload after ``parse_mode`` so + callers can override any field per-call (e.g. drop ``parse_mode`` + for a known-unsafe interim text). + """ + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + payload: dict[str, Any] = {"chat_id": chat_id, "text": text} + if self._parse_mode and "parse_mode" not in extra: + payload["parse_mode"] = self._parse_mode + payload.update(extra) + await self._http.post(f"{self._api}/sendMessage", json=payload) + + # -- ChannelPush -------------------------------------------------------- # + + async def push(self, identity: ChannelIdentity, payload: HostedRunResult) -> None: + """Proactive delivery to a Telegram chat. + + Implements :class:`host.ChannelPush` so other channels' callers can + target Telegram via ``ChannelRequest.response_target`` + (e.g. ``ResponseTarget.channels(["telegram:8741188429"])`` from a + ``/responses`` request). ``identity.native_id`` is the Telegram + chat id. + """ + try: + chat_id = int(identity.native_id) + except ValueError as exc: + raise ValueError(f"Telegram push requires an int chat_id, got {identity.native_id!r}") from exc + if self._http is None: + raise RuntimeError("TelegramChannel.push called before startup") + await self._send(chat_id, payload.text) + + async def _send_photo(self, chat_id: int, photo_url: str, caption: str | None = None) -> None: + """POST a ``sendPhoto`` to Telegram with an optional caption.""" + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + payload: dict[str, Any] = {"chat_id": chat_id, "photo": photo_url} + if caption: + payload["caption"] = caption + await self._http.post(f"{self._api}/sendPhoto", json=payload) + + async def _send_chat_action(self, chat_id: int, action: str) -> None: + """Fire a ``sendChatAction`` (typing, upload_photo, …); errors are logged and swallowed. + + Chat actions are pure UX hints — Telegram clears them after ~5s + — so failures should never propagate to the caller. + """ + if self._http is None: # pragma: no cover - guarded by lifecycle + raise RuntimeError("telegram channel not started") + try: + await self._http.post(f"{self._api}/sendChatAction", json={"chat_id": chat_id, "action": action}) + except Exception: # pragma: no cover - non-critical UX + logger.exception("sendChatAction failed") + + +__all__ = ["TelegramChannel", "telegram_isolation_key"] diff --git a/python/packages/hosting-telegram/pyproject.toml b/python/packages/hosting-telegram/pyproject.toml new file mode 100644 index 0000000000..41f8d7ea0c --- /dev/null +++ b/python/packages/hosting-telegram/pyproject.toml @@ -0,0 +1,107 @@ +[project] +name = "agent-framework-hosting-telegram" +description = "Telegram channel for agent-framework-hosting." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0a260424" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.2.0,<2", + "agent-framework-hosting==1.0.0a260424", + "httpx>=0.27,<1", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'", + "sys_platform == 'win32'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +filterwarnings = [] +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +include = ["agent_framework_hosting_telegram"] +exclude = ['tests'] +# Telegram's API delivers loosely-typed JSON-ish maps (chat, message, photo, +# media, callback_query). Strict ``Unknown`` reporting on every ``.get(...)`` +# adds noise without catching real bugs — narrowing happens via runtime +# isinstance checks instead. Other type checks remain strict. +reportUnknownArgumentType = "none" +reportUnknownMemberType = "none" +reportUnknownVariableType = "none" +reportUnknownLambdaType = "none" +reportOptionalMemberAccess = "none" + +[tool.mypy] +plugins = ['pydantic.mypy'] +strict = true +python_version = "3.10" +ignore_missing_imports = true +disallow_untyped_defs = true +no_implicit_optional = true +check_untyped_defs = true +warn_return_any = true +show_error_codes = true +warn_unused_ignores = false +disallow_incomplete_defs = true +disallow_untyped_decorators = true + +[tool.bandit] +targets = ["agent_framework_hosting_telegram"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks.mypy] +help = "Run MyPy for this package." +cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hosting_telegram" + +[tool.poe.tasks.test] +help = "Run the default unit test suite for this package." +cmd = 'pytest -m "not integration" --cov=agent_framework_hosting_telegram --cov-report=term-missing:skip-covered tests' + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" diff --git a/python/packages/hosting-telegram/tests/__init__.py b/python/packages/hosting-telegram/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/packages/hosting-telegram/tests/test_channel.py b/python/packages/hosting-telegram/tests/test_channel.py new file mode 100644 index 0000000000..b94155f8a7 --- /dev/null +++ b/python/packages/hosting-telegram/tests/test_channel.py @@ -0,0 +1,209 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Unit tests for :mod:`agent_framework_hosting_telegram`. + +These tests exercise the internal parsing helpers and the webhook entry-point +without spinning up a real Telegram bot. The polling loop and HTTP-side +helpers are excluded from coverage because they require a live bot token. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest +from agent_framework_hosting import ( + AgentFrameworkHost, + ChannelCommand, + ChannelCommandContext, + HostedRunResult, +) +from starlette.testclient import TestClient + +from agent_framework_hosting_telegram import TelegramChannel, telegram_isolation_key +from agent_framework_hosting_telegram._channel import ( + _parse_telegram_message, + _telegram_media_file_id, +) + +# --------------------------------------------------------------------------- # +# Pure helpers # +# --------------------------------------------------------------------------- # + + +def test_telegram_isolation_key_format() -> None: + assert telegram_isolation_key(42) == "telegram:42" + assert telegram_isolation_key("abc") == "telegram:abc" + + +class TestMediaFileId: + def test_no_media(self) -> None: + assert _telegram_media_file_id({"text": "hi"}) is None + + def test_photo_picks_largest(self) -> None: + assert _telegram_media_file_id({"photo": [{"file_id": "small"}, {"file_id": "large"}]}) == ( + "large", + "image/jpeg", + ) + + def test_photo_empty_list(self) -> None: + assert _telegram_media_file_id({"photo": []}) is None + + def test_document_uses_mime_type(self) -> None: + result = _telegram_media_file_id({"document": {"file_id": "f1", "mime_type": "application/pdf"}}) + assert result == ("f1", "application/pdf") + + def test_voice_default_mime(self) -> None: + result = _telegram_media_file_id({"voice": {"file_id": "v1"}}) + assert result == ("v1", "audio/ogg") + + +@pytest.mark.asyncio +class TestParseTelegramMessage: + async def test_text_only(self) -> None: + async def resolve(_: str) -> str | None: + return None + + msg = await _parse_telegram_message({"text": "hello"}, resolve) + assert msg.role == "user" + assert msg.text == "hello" + + async def test_text_and_photo(self) -> None: + async def resolve(file_id: str) -> str | None: + return f"https://files.telegram.org/{file_id}" + + msg = await _parse_telegram_message({"caption": "look", "photo": [{"file_id": "p1"}]}, resolve) + assert msg.text == "look" + # Image content present. + assert any((getattr(c, "uri", None) or "").endswith("/p1") for c in msg.contents) + + async def test_unresolvable_media_falls_back_to_text(self) -> None: + async def resolve(_: str) -> str | None: + return None + + msg = await _parse_telegram_message({"text": "x", "voice": {"file_id": "v1"}}, resolve) + # Resolver returned None — the contents should still include the + # text without crashing. + assert msg.text == "x" + + +# --------------------------------------------------------------------------- # +# Webhook entry point # +# --------------------------------------------------------------------------- # + + +@dataclass +class _FakeAgentResponse: + text: str + + +class _FakeAgent: + def __init__(self, reply: str = "ok") -> None: + self._reply = reply + self.runs: list[Any] = [] + + def create_session(self, *, session_id: str | None = None) -> Any: + return {"session_id": session_id} + + def run(self, messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any: + self.runs.append({"messages": messages, "stream": stream, "kwargs": kwargs}) + + async def _coro() -> _FakeAgentResponse: + return _FakeAgentResponse(text=self._reply) + + return _coro() + + +def _make_telegram(stream_default: bool = False) -> tuple[TelegramChannel, _FakeAgent]: + agent = _FakeAgent("hi") + ch = TelegramChannel( + bot_token="123:abc", + webhook_url="https://example.com/hook", + secret_token="s3cr3t", + stream=stream_default, + ) + # Replace the internal HTTP client with an AsyncMock so the channel + # never tries to call the real Telegram API. + fake_http = MagicMock() + # post() returns a response object whose raise_for_status() is sync. + response_mock = MagicMock() + response_mock.json = MagicMock(return_value={"ok": True, "result": {}}) + fake_http.post = AsyncMock(return_value=response_mock) + fake_http.get = AsyncMock(return_value=response_mock) + fake_http.aclose = AsyncMock() + ch._http = fake_http + return ch, agent + + +class TestTelegramWebhook: + def test_webhook_accepts_text_message_and_dispatches_to_agent(self) -> None: + ch, agent = _make_telegram() + host = AgentFrameworkHost(target=agent, channels=[ch]) + # Skip lifespan so polling/setWebhook are not invoked. + with TestClient(host.app) as client: + r = client.post( + "/telegram/webhook", + json={"update_id": 1, "message": {"chat": {"id": 99}, "text": "hello"}}, + headers={"x-telegram-bot-api-secret-token": "s3cr3t"}, + ) + assert r.status_code == 200 + assert agent.runs, "expected the agent to be invoked" + + def test_webhook_rejects_bad_secret(self) -> None: + ch, agent = _make_telegram() + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app) as client: + r = client.post( + "/telegram/webhook", + json={"update_id": 1, "message": {"chat": {"id": 99}, "text": "hi"}}, + headers={"x-telegram-bot-api-secret-token": "WRONG"}, + ) + assert r.status_code == 401 + assert not agent.runs + + +@pytest.mark.asyncio +class TestPushAndCommand: + async def test_push_calls_send(self) -> None: + ch, _agent = _make_telegram() + from agent_framework_hosting import ChannelIdentity + + await ch.push(ChannelIdentity(channel="telegram", native_id="42"), HostedRunResult(text="hi")) + assert ch._http is not None + ch._http.post.assert_called() # type: ignore[attr-defined] + args, kwargs = ch._http.post.call_args # type: ignore[attr-defined] + assert args[0].endswith("/sendMessage") + assert kwargs["json"]["chat_id"] in ("42", 42) + assert kwargs["json"]["text"] == "hi" + + async def test_command_handler_invoked(self) -> None: + captured: list[ChannelCommandContext] = [] + + async def handler(ctx: ChannelCommandContext) -> None: + captured.append(ctx) + await ctx.reply("pong") + + ch = TelegramChannel( + bot_token="123:abc", + webhook_url="https://example.com/hook", + commands=[ChannelCommand(name="ping", description="ping", handle=handler)], + register_native_commands=False, + ) + fake_http = MagicMock() + response_mock = MagicMock() + response_mock.json = MagicMock(return_value={"ok": True, "result": {}}) + fake_http.post = AsyncMock(return_value=response_mock) + fake_http.get = AsyncMock(return_value=response_mock) + fake_http.aclose = AsyncMock() + ch._http = fake_http + host = AgentFrameworkHost(target=_FakeAgent(), channels=[ch]) + + with TestClient(host.app) as client: + r = client.post( + "/telegram/webhook", + json={"update_id": 2, "message": {"chat": {"id": 7}, "text": "/ping"}}, + ) + assert r.status_code == 200 + assert captured and captured[0].request.operation == "command.invoke" diff --git a/python/packages/hosting/LICENSE b/python/packages/hosting/LICENSE new file mode 100644 index 0000000000..9e841e7a26 --- /dev/null +++ b/python/packages/hosting/LICENSE @@ -0,0 +1,21 @@ + MIT License + + Copyright (c) Microsoft Corporation. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE diff --git a/python/packages/hosting/README.md b/python/packages/hosting/README.md new file mode 100644 index 0000000000..a2690b5705 --- /dev/null +++ b/python/packages/hosting/README.md @@ -0,0 +1,59 @@ +# agent-framework-hosting + +Multi-channel hosting for Microsoft Agent Framework agents. + +`agent-framework-hosting` lets you serve a single agent (or workflow) +target through one or more **channels** — pluggable adapters that +expose the target over different transports. The result is a single +Starlette ASGI application you can host anywhere (local Hypercorn, +Azure Container Apps, Foundry Hosted Agents, …). + +The base package contains only the channel-neutral plumbing: + +- `AgentFrameworkHost` — the Starlette host +- `Channel` / `ChannelPush` — the channel protocols +- `ChannelRequest` / `ChannelSession` / `ChannelIdentity` / `ResponseTarget` + — the request envelope and routing primitives +- `ChannelContext` / `ChannelContribution` / `ChannelCommand` — the + channel-side hooks for invoking the target and contributing routes, + commands, and lifecycle callbacks +- `ChannelRunHook` / `ChannelStreamTransformHook` — the per-request + customization seams + +Concrete channels live in their own packages so you only install what +you use: + +| Package | Transport | +|---|---| +| `agent-framework-hosting-responses` | OpenAI Responses API | +| `agent-framework-hosting-invocations` | Foundry-native invocation envelope | +| `agent-framework-hosting-telegram` | Telegram Bot API | +| `agent-framework-hosting-activity-protocol` | Bot Framework Activity Protocol (Teams, Direct Line, Web Chat, …) | +| `agent-framework-hosting-teams` | Microsoft Teams (Teams SDK) | +| `agent-framework-hosting-entra` | Entra (OAuth) identity-link sidecar | + +## Install + +```bash +pip install agent-framework-hosting agent-framework-hosting-responses +# or with uvicorn pre-installed for the demo `host.serve(...)` helper +pip install "agent-framework-hosting[serve]" agent-framework-hosting-responses +``` + +## Quickstart + +```python +from agent_framework import ChatAgent +from agent_framework.openai import OpenAIChatClient +from agent_framework_hosting import AgentFrameworkHost +from agent_framework_hosting_responses import ResponsesChannel + +agent = ChatAgent(name="Assistant", chat_client=OpenAIChatClient()) + +host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel()]) +host.serve(port=8000) +``` + +See the [hosting samples](https://github.com/microsoft/agent-framework/tree/main/python/samples/04-hosting/af-hosting) +for richer multi-channel apps (Telegram + Teams + Responses fan-out, +identity linking, `ResponseTarget` routing, etc.). diff --git a/python/packages/hosting/agent_framework_hosting/__init__.py b/python/packages/hosting/agent_framework_hosting/__init__.py new file mode 100644 index 0000000000..9a7cbcadad --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/__init__.py @@ -0,0 +1,74 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Multi-channel hosting for Microsoft Agent Framework agents. + +Serve a single agent target through one or more **channels** — pluggable +adapters that expose the target over different transports such as the +OpenAI Responses API, Microsoft Teams, Telegram, and others. The base +package contains only the channel-neutral plumbing; concrete channels +ship in their own packages (``agent-framework-hosting-responses``, +``agent-framework-hosting-telegram``, …) so users install only what +they need. +""" + +import importlib.metadata + +from ._host import AgentFrameworkHost, ChannelContext, logger +from ._isolation import ( + ISOLATION_HEADER_CHAT, + ISOLATION_HEADER_USER, + IsolationKeys, + get_current_isolation_keys, + reset_current_isolation_keys, + set_current_isolation_keys, +) +from ._types import ( + Channel, + ChannelCommand, + ChannelCommandContext, + ChannelContribution, + ChannelIdentity, + ChannelPush, + ChannelRequest, + ChannelRunHook, + ChannelSession, + ChannelStreamTransformHook, + DeliveryReport, + HostedRunResult, + ResponseTarget, + ResponseTargetKind, + apply_run_hook, +) + +try: + __version__ = importlib.metadata.version(__name__) +except importlib.metadata.PackageNotFoundError: + __version__ = "0.0.0" + +__all__ = [ + "ISOLATION_HEADER_CHAT", + "ISOLATION_HEADER_USER", + "AgentFrameworkHost", + "Channel", + "ChannelCommand", + "ChannelCommandContext", + "ChannelContext", + "ChannelContribution", + "ChannelIdentity", + "ChannelPush", + "ChannelRequest", + "ChannelRunHook", + "ChannelSession", + "ChannelStreamTransformHook", + "DeliveryReport", + "HostedRunResult", + "IsolationKeys", + "ResponseTarget", + "ResponseTargetKind", + "__version__", + "apply_run_hook", + "get_current_isolation_keys", + "logger", + "reset_current_isolation_keys", + "set_current_isolation_keys", +] diff --git a/python/packages/hosting/agent_framework_hosting/_host.py b/python/packages/hosting/agent_framework_hosting/_host.py new file mode 100644 index 0000000000..92a0b1c45c --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_host.py @@ -0,0 +1,946 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""The :class:`AgentFrameworkHost` and its :class:`ChannelContext` bridge. + +The host is a tiny Starlette wrapper: + +- ``__init__`` accepts a hostable target (``SupportsAgentRun`` agent or + ``Workflow``) and a sequence of channels. +- :meth:`AgentFrameworkHost.app` lazily builds a Starlette app by calling + every channel's ``contribute`` and mounting the returned routes under + the channel's ``path`` (empty path → mount at the app root). +- :class:`ChannelContext` exposes ``run`` / ``run_stream`` / + ``deliver_response`` for channels to invoke; the host handles + per-``isolation_key`` session caching, identity tracking, and + :class:`ResponseTarget` fan-out. + +Per SPEC-002 (and ADR-0026), the host is intentionally thin so the bulk +of channel-specific behaviour stays in the channel package. Identity +linking, link policies, response targets, background runs, and the like +are pluggable extensions that the future identity/foundry packages will +contribute on top of this surface. +""" + +from __future__ import annotations + +import logging +import os +import uuid +from collections.abc import Awaitable, Callable, Sequence +from contextlib import AbstractContextManager, ExitStack, asynccontextmanager +from pathlib import Path +from typing import TYPE_CHECKING, Any, AsyncIterator, cast + +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + CheckpointStorage, + Content, + FileCheckpointStorage, + Message, + ResponseStream, + SupportsAgentRun, + Workflow, + WorkflowEvent, +) +from starlette.applications import Starlette +from starlette.middleware import Middleware +from starlette.requests import Request +from starlette.responses import PlainTextResponse +from starlette.routing import BaseRoute, Mount, Route +from starlette.types import ASGIApp, Receive, Scope, Send + +from ._isolation import ( + ISOLATION_HEADER_CHAT, + ISOLATION_HEADER_USER, + IsolationKeys, + reset_current_isolation_keys, + set_current_isolation_keys, +) +from ._types import ( + Channel, + ChannelIdentity, + ChannelPush, + ChannelRequest, + DeliveryReport, + HostedRunResult, + ResponseTargetKind, +) + +if TYPE_CHECKING: + pass + +logger = logging.getLogger("agent_framework.hosting") + + +def _workflow_output_to_text(value: Any) -> str: + """Render a single workflow ``output`` payload as plain text. + + ``AgentResponse`` and ``AgentResponseUpdate`` carry text natively; + everything else is best-effort ``str()``. + """ + text = getattr(value, "text", None) + if isinstance(text, str): + return text + return str(value) + + +def _workflow_event_to_update(event: WorkflowEvent[Any]) -> AgentResponseUpdate | None: + """Map a :class:`WorkflowEvent` to a channel-friendly :class:`AgentResponseUpdate`. + + Returns ``None`` for events the host should drop (anything that is not + user-visible output). The original event is preserved on the update's + ``raw_representation`` so consumers can recover full workflow context. + """ + if event.type != "output": + return None + payload: Any = event.data + if isinstance(payload, AgentResponseUpdate): + # Already a streaming update — pass through but tag the source so + # downstream hooks can tell it came from a workflow executor. + if payload.raw_representation is None: + payload.raw_representation = event + return payload + text = _workflow_output_to_text(payload) + return AgentResponseUpdate( + contents=[Content.from_text(text=text)], + role="assistant", + author_name=event.executor_id, + raw_representation=event, + ) + + +@asynccontextmanager +async def _suppress_already_consumed() -> AsyncIterator[None]: # noqa: RUF029 + """Yield, swallowing the ``RuntimeError`` ``ResponseStream`` raises on double-consume. + + The bridge stream calls ``get_final_response()`` after iterating the + workflow stream so the workflow's cleanup hooks run; on some paths the + stream considers itself already finalized and raises, which we treat + as benign — we're only after the side effect. + """ + try: + yield + except RuntimeError as exc: + logger.debug("workflow stream finalize skipped: %s", exc) + except Exception: # pragma: no cover - defensive: never let cleanup hide the real result + logger.exception("workflow stream finalize failed") + + +class _BoundResponseStream: + """Adapter that keeps an :class:`ExitStack` open across stream iteration. + + Streaming runs return a :class:`ResponseStream` synchronously, but + consumption happens later (the channel iterates). For host-bound + request context (e.g. Foundry response-id binding) to survive that + gap, we hold the stack open until the underlying stream is exhausted + or :meth:`close` is called. We forward awaitable + async-iterator + + ``get_final_response`` semantics so the channel sees a normal + ``ResponseStream``-shaped object. + """ + + def __init__(self, inner: Any, stack: ExitStack) -> None: + self._inner = inner + self._stack = stack + self._closed = False + + def _close(self) -> None: + if self._closed: + return + self._closed = True + self._stack.close() + + def __await__(self) -> Any: + # ``__await__`` returns a generator; closing here would be too + # eager — we close in ``__aiter__`` finally instead. Awaitable + # consumers (rare for streams) call ``aclose()`` separately. + return self._inner.__await__() + + def __aiter__(self) -> AsyncIterator[Any]: + return self._wrap() + + async def _wrap(self) -> AsyncIterator[Any]: + try: + async for item in self._inner: + yield item + finally: + self._close() + + async def get_final_response(self) -> Any: + try: + return await self._inner.get_final_response() + finally: + self._close() + + def __getattr__(self, name: str) -> Any: + return getattr(self._inner, name) + + +class ChannelContext: + """Host-owned bridge that channels call to invoke the target.""" + + def __init__(self, host: "AgentFrameworkHost") -> None: + """Bind the context to its owning :class:`AgentFrameworkHost`. + + The host instance is the source of truth for the target, registered + channels, identity stores, sessions, and lifecycle state. Channels + only ever receive a context; they never see the host directly. + """ + self._host = host + + @property + def target(self) -> SupportsAgentRun | Workflow: + """The hostable target the channel should invoke.""" + return self._host.target + + async def run(self, request: ChannelRequest) -> HostedRunResult: + """Invoke the target for ``request`` and return a channel-neutral result.""" + return await self._host._invoke(request) # pyright: ignore[reportPrivateUsage] + + def run_stream(self, request: ChannelRequest) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + """Invoke the target with ``stream=True`` and return the agent's ResponseStream. + + Channels iterate the stream directly (it acts like an AsyncGenerator) + and are responsible for delivering updates to their wire protocol. + Apply per-channel ``transform_hook`` callables during iteration to + rewrite or drop individual updates before they hit the wire. + """ + return self._host._invoke_stream(request) # pyright: ignore[reportPrivateUsage] + + async def deliver_response(self, request: ChannelRequest, payload: HostedRunResult) -> DeliveryReport: + """Resolve ``request.response_target`` and push ``payload`` to each destination. + + Returns a :class:`DeliveryReport` so the originating channel knows + whether to render the agent reply on its own wire (``originating`` + included in or implied by the target) or just acknowledge dispatch. + """ + return await self._host._deliver_response(request, payload) # pyright: ignore[reportPrivateUsage] + + +class _FoundryIsolationASGIMiddleware: + """Lift the two well-known Foundry isolation headers into a contextvar. + + The Foundry Hosted Agents runtime injects + ``x-agent-{user,chat}-isolation-key`` on every inbound HTTP request. + Storage providers that need partition-aware writes (notably + :class:`FoundryHostedAgentHistoryProvider`) read those keys via + :func:`get_current_isolation_keys` to avoid every channel having to + parse Foundry-specific headers itself. We intentionally inspect + only HTTP scopes; lifespan/websocket scopes are forwarded + untouched. When neither header is present the contextvar stays at + its default ``None``, so local-dev requests behave as before. + """ + + def __init__(self, app: ASGIApp) -> None: + self.app = app + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] != "http": + await self.app(scope, receive, send) + return + user_key: str | None = None + chat_key: str | None = None + for raw_name, raw_value in scope.get("headers") or (): + name = raw_name.decode("latin-1").lower() + if name == ISOLATION_HEADER_USER: + user_key = raw_value.decode("latin-1") or None + elif name == ISOLATION_HEADER_CHAT: + chat_key = raw_value.decode("latin-1") or None + if user_key is None and chat_key is None: + await self.app(scope, receive, send) + return + token = set_current_isolation_keys(IsolationKeys(user_key=user_key, chat_key=chat_key)) + try: + await self.app(scope, receive, send) + finally: + reset_current_isolation_keys(token) + + +class AgentFrameworkHost: + """Owns one Starlette app, one hostable target, and a sequence of channels.""" + + def __init__( + self, + target: SupportsAgentRun | Workflow, + *, + channels: Sequence[Channel], + debug: bool = False, + checkpoint_location: str | os.PathLike[str] | CheckpointStorage | None = None, + ) -> None: + """Create a host for ``target`` and its channels. + + Args: + target: The hostable target to invoke from channels — either a + ``SupportsAgentRun``-compatible agent or a ``Workflow``. The + host detects the kind and dispatches to the appropriate + execution seam (``agent.run(...)`` vs ``workflow.run(message=...)``). + For workflow targets, channels (or their ``run_hook``) are + responsible for shaping ``ChannelRequest.input`` into the + workflow start executor's typed input. + channels: The channels to expose. Each channel contributes routes + and commands that are mounted under ``channel.path`` (defaulting + to the channel name). + debug: Whether to enable Starlette's debug mode (stack traces in + responses, etc.) and per-channel debug logging. + checkpoint_location: When ``target`` is a :class:`Workflow`, the + location used to persist workflow checkpoints across requests. + Either a filesystem path (``str`` / ``PathLike``) — the host + creates a per-conversation + :class:`~agent_framework.FileCheckpointStorage` rooted at + ``checkpoint_location / `` — or a + :class:`~agent_framework.CheckpointStorage` instance the host + uses as-is (caller owns scoping). Per-request behaviour: + requests without ``ChannelRequest.session.isolation_key`` + are run without checkpointing. When set on a workflow that + already has its own checkpoint storage configured + (``WorkflowBuilder(checkpoint_storage=...)``), the host + refuses to start so ownership of checkpointing is + unambiguous. Ignored for ``SupportsAgentRun`` targets (a + warning is emitted). + """ + self.target: SupportsAgentRun | Workflow = target + self._is_workflow = isinstance(target, Workflow) + self.channels = list(channels) + self._debug = debug + self._app: Starlette | None = None + self._checkpoint_location: Path | CheckpointStorage | None = None + if checkpoint_location is not None: + if not self._is_workflow: + logger.warning("checkpoint_location is set but target is not a Workflow; ignoring.") + else: + workflow: Workflow = target # type: ignore[assignment] + if workflow._runner_context.has_checkpointing(): # type: ignore[reportPrivateUsage] + raise RuntimeError( + "Workflow already has checkpoint storage configured " + "(WorkflowBuilder(checkpoint_storage=...)). The host " + "manages checkpoints when checkpoint_location is set; " + "remove one of the two configurations." + ) + if isinstance(checkpoint_location, (str, os.PathLike)): + self._checkpoint_location = Path(os.fspath(checkpoint_location)) + else: + # Anything else is treated as a CheckpointStorage instance. + # ``CheckpointStorage`` is a non-runtime-checkable Protocol, + # so we cannot ``isinstance``-check it directly. + self._checkpoint_location = checkpoint_location + # Per-isolation_key session cache. The real spec backs this with a + # pluggable session store; this base host keeps it in-process. + self._sessions: dict[str, Any] = {} + # ``isolation_key -> active session_id``. Normally identical to the + # isolation_key, but ``reset_session`` rotates this to a fresh id so + # the next turn starts a new ``AgentSession`` while the old history + # remains on disk under its original session_id. + self._session_aliases: dict[str, str] = {} + # Per-isolation_key identity registry: which channels we've seen this + # user on, and which native_id they used on each. Powers + # ResponseTarget.active / .channel(name) / .channels([...]) / + # .all_linked. + # Shape: { isolation_key: { channel_name: ChannelIdentity } }. + self._identities: dict[str, dict[str, ChannelIdentity]] = {} + # (isolation_key -> last-seen channel name) for ResponseTarget.active. + self._active: dict[str, str] = {} + + @property + def app(self) -> Starlette: + """Lazily build (and cache) the Starlette application.""" + if self._app is None: + self._app = self._build_app() + return self._app + + def serve( + self, + *, + host: str = "127.0.0.1", + port: int = 8000, + workers: int = 1, + **config_kwargs: Any, + ) -> None: + """Start the host on ``host:port`` using Hypercorn. + + Hypercorn is the same ASGI server the Foundry Hosted Agents + runtime uses for production deployments, so running locally with + the same server keeps dev/prod parity (Trio fallbacks, lifespan + semantics, HTTP/2 support, …). Install with the ``serve`` extra + (``pip install agent-framework-hosting[serve]``). + + Args: + host: Interface to bind. Defaults to ``127.0.0.1``. + port: TCP port to bind. Defaults to ``8000``. + workers: Number of worker processes. Defaults to ``1``; + Hypercorn's process model only kicks in for ``>1``. + **config_kwargs: Forwarded to :class:`hypercorn.config.Config` + via attribute assignment, so any documented Hypercorn + config field (e.g. ``keep_alive_timeout=...``, + ``access_log_format=...``) can be set directly. + """ + try: + import asyncio + from typing import cast as _cast + + from hypercorn.asyncio import ( # pyright: ignore[reportMissingImports] + serve as _hypercorn_serve, # pyright: ignore[reportUnknownVariableType] + ) + from hypercorn.config import Config # pyright: ignore[reportMissingImports, reportUnknownVariableType] + except ImportError as exc: # pragma: no cover - exercised at runtime + raise RuntimeError( + "AgentFrameworkHost.serve() requires hypercorn. " + "Install with `pip install agent-framework-hosting[serve]` or `pip install hypercorn`." + ) from exc + + config = Config() # pyright: ignore[reportUnknownVariableType] + config.bind = [f"{host}:{port}"] # pyright: ignore[reportUnknownMemberType] + config.workers = workers # pyright: ignore[reportUnknownMemberType] + for key, value in config_kwargs.items(): + setattr(config, key, value) # pyright: ignore[reportUnknownArgumentType] + + # Touch ``self.app`` so the lifespan startup log fires once before + # we hand off to hypercorn — gives a single, readable banner of + # what the host is exposing without requiring channels to log + # individually. + app = self.app + self._log_startup(host=host, port=port, workers=workers) + + # ``hypercorn.asyncio.serve`` has a complex partially-typed signature + # (multiple ASGI/WSGI app overloads) and its ``Scope`` definition + # diverges from Starlette's; cast both sides to ``Any`` to keep the + # call site readable without sprinkling per-error suppressions. + serve_callable = _cast(Any, _hypercorn_serve) + asyncio.run(serve_callable(app, config)) + + def reset_session(self, isolation_key: str) -> None: + """Rotate ``isolation_key`` to a fresh session id without deleting history. + + Old turns are preserved on disk under their original session id and + remain accessible by passing that id explicitly (e.g. as + ``previous_response_id``). Future requests using ``isolation_key`` + get a new, empty ``AgentSession``. + """ + new_id = f"{isolation_key}#{uuid.uuid4().hex[:8]}" + self._session_aliases[isolation_key] = new_id + self._sessions.pop(isolation_key, None) + + # -- internals --------------------------------------------------------- # + + def _log_startup(self, *, host: str, port: int, workers: int) -> None: + """Emit a single human-friendly startup banner. + + Mirrors the ``AgentServerHost`` convention from + ``azure.ai.agentserver.core``: one INFO line that captures the + target type, every channel + its mount path, the bind address, + whether we're running inside a Foundry Hosted Agents container, + and the worker count. Keeps log noise low while still giving an + operator a single grep-able anchor when triaging. + """ + target_kind = "Workflow" if isinstance(self.target, Workflow) else type(self.target).__name__ + target_name = getattr(self.target, "name", None) or target_kind + channels_repr = ", ".join( + f"{ch.name}@{ch.path or '/'}" # blank path means "mounted at root" + for ch in self.channels + ) + is_hosted = bool(os.environ.get("FOUNDRY_HOSTING_ENVIRONMENT")) + logger.info( + "AgentFrameworkHost starting: target=%s (%s) bind=%s:%d workers=%d hosted=%s channels=[%s]", + target_name, + target_kind, + host, + port, + workers, + is_hosted, + channels_repr or "", + ) + + def _build_app(self) -> Starlette: + context = ChannelContext(self) + routes: list[BaseRoute] = [] + on_startup: list[Callable[[], Awaitable[None]]] = [] + on_shutdown: list[Callable[[], Awaitable[None]]] = [] + + # ``/readiness`` is the standard probe path the Foundry Hosted Agents + # runtime hits to gate traffic. We expose it unconditionally — once the + # ASGI app is up the host considers itself ready (channels register + # their own startup hooks and may run before the first request, but + # readiness is intentionally cheap so the platform's probe never times + # out on transient channel work). Mounted first so a channel cannot + # accidentally shadow it. + async def _readiness(_request: Request) -> PlainTextResponse: # noqa: RUF029 + """Liveness/readiness probe handler used by Foundry Hosted Agents.""" + return PlainTextResponse("ok") + + routes.append(Route("/readiness", _readiness, methods=["GET"])) + + for channel in self.channels: + contribution = channel.contribute(context) + # Channels publish routes relative to their root; mount under channel.path. + # An empty path means "mount at the app root" — useful for single-channel hosts + # that don't want a prefix (e.g. ResponsesChannel exposing POST /responses directly). + if contribution.routes: + if channel.path: + routes.append(Mount(channel.path, routes=list(contribution.routes))) + else: + routes.extend(contribution.routes) + on_startup.extend(contribution.on_startup) + on_shutdown.extend(contribution.on_shutdown) + + @asynccontextmanager + async def lifespan(_app: Starlette) -> AsyncIterator[None]: + for cb in on_startup: + await cb() + try: + yield + finally: + for cb in on_shutdown: + await cb() + + return Starlette( + debug=self._debug, + routes=routes, + lifespan=lifespan, + middleware=[Middleware(_FoundryIsolationASGIMiddleware)], + ) + + def _build_run_kwargs(self, request: ChannelRequest) -> dict[str, Any]: + # The full spec resolves a ChannelSession into an AgentSession here, + # honors session_mode, and consults LinkPolicy / ResponseTarget. This + # base host keys a per-isolation_key AgentSession off the channel's + # session hint so context providers (FileHistoryProvider, …) on the + # target see one session per end user. + session = None + if request.session_mode != "disabled" and request.session is not None: + isolation_key = request.session.isolation_key + if isolation_key is not None and hasattr(self.target, "create_session"): + session_id = self._session_aliases.get(isolation_key, isolation_key) + session = self._sessions.get(isolation_key) + if session is None: + # ``create_session`` lives on agent-typed targets but not on + # ``Workflow``; the ``hasattr`` above guards the call site. + session = self.target.create_session( # pyright: ignore[reportAttributeAccessIssue, reportUnknownVariableType, reportUnknownMemberType] + session_id=session_id + ) + self._sessions[isolation_key] = session # pyright: ignore[reportUnknownArgumentType] + + run_kwargs: dict[str, Any] = {} + if session is not None: + run_kwargs["session"] = session + if request.options: + run_kwargs["options"] = request.options + return run_kwargs + + def _log_incoming(self, request: ChannelRequest, *, stream: bool) -> None: + """Emit a one-line INFO summary for every incoming target invocation. + + When ``debug=True`` is set on the host, also dump the channel-native + settings the channel attached to the ``ChannelRequest`` — ``options`` + (the ChatOptions-shaped fields the channel parsed from its protocol + payload, e.g. temperature/tools/tool_choice for Responses), plus + ``attributes`` / ``metadata`` (the channel's protocol-specific bag, + e.g. ``chat_id`` / ``callback_query_id`` for Telegram). + """ + isolation_key = request.session.isolation_key if request.session is not None else None + logger.info( + "channel=%s op=%s stream=%s session=%s session_mode=%s", + request.channel, + request.operation, + stream, + isolation_key, + request.session_mode, + ) + logger.debug( + " ↳ options=%s attributes=%s metadata=%s", + dict(request.options) if request.options else {}, + dict(request.attributes) if request.attributes else {}, + dict(request.metadata) if request.metadata else {}, + ) + + def _flat_context_providers(self) -> list[Any]: + """Flatten ``target.context_providers`` one level for duck-typed hooks. + + ``ContextProviderBase`` aggregates child providers under a + ``providers`` attribute when wrapped (e.g. by ``ChatClientAgent``). + We descend one level so the host catches both styles without + forcing a particular wiring on the agent. + """ + providers = getattr(self.target, "context_providers", None) or () + flat: list[Any] = [] + for entry in providers: + children = getattr(entry, "providers", None) + if children: + flat.extend(children) + else: + flat.append(entry) + return flat + + def _bind_request_context(self, request: ChannelRequest) -> ExitStack: + """Bind any per-request anchors a target's context-providers expose. + + Channels announce per-request anchors (currently ``response_id`` + and ``previous_response_id``) via ``ChannelRequest.attributes``. + Some history providers — notably the Foundry hosted-agent history + provider — need to write storage under the same ``response_id`` + the channel surfaces on its envelope so the next turn's + ``previous_response_id`` walks the chain. Rather than the host + knowing about specific provider classes, we duck-type: any + context provider on the target that exposes a + ``bind_request_context(response_id=..., previous_response_id=..., + **_)`` context-manager gets it called with the request's + attribute values. Per-request platform isolation keys are handled + separately by :class:`_FoundryIsolationASGIMiddleware` (lifted + off the inbound headers into a contextvar) so providers don't + depend on channels to forward them. Bindings are scoped to the + returned :class:`ExitStack` which the caller must enter before + invoking the target and leave after the run completes. + """ + stack = ExitStack() + attrs = request.attributes or {} + response_id = attrs.get("response_id") + if not isinstance(response_id, str) or not response_id: + return stack + previous_response_id = attrs.get("previous_response_id") + if previous_response_id is not None and not isinstance(previous_response_id, str): + previous_response_id = None + + flat: list[Any] = self._flat_context_providers() + + for provider in flat: + bind = getattr(provider, "bind_request_context", None) + if not callable(bind): + continue + stack.enter_context( + cast( + "AbstractContextManager[Any]", + bind( + response_id=response_id, + previous_response_id=previous_response_id, + ), + ) + ) + return stack + + async def _invoke(self, request: ChannelRequest) -> HostedRunResult: + self._log_incoming(request, stream=False) + self._record_identity(request) + if self._is_workflow: + return await self._invoke_workflow(request) + run_kwargs = self._build_run_kwargs(request) + with self._bind_request_context(request): + # ``_is_workflow`` is False here so ``self.target`` is an + # ``Agent``-shaped target whose ``.run`` returns + # :class:`AgentResponse`. Narrow back to keep ``result.text`` + # well-typed without conditional imports of ``Agent``. + agent_target = cast("SupportsAgentRun", self.target) + result = await agent_target.run(self._wrap_input(request), **run_kwargs) + return HostedRunResult(text=result.text) + + def _invoke_stream(self, request: ChannelRequest) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + self._log_incoming(request, stream=True) + self._record_identity(request) + if self._is_workflow: + return self._invoke_workflow_stream(request) + run_kwargs = self._build_run_kwargs(request) + # ``run(stream=True)`` returns a ResponseStream synchronously (it is + # itself awaitable / async-iterable). We hand it back to the channel + # so the channel can drive iteration and apply its transform hook. + # Streaming flows iterate after this method returns, which is + # *outside* a sync ``with`` block — so we wrap the underlying + # stream in an adapter that holds the binding open across the + # iteration lifecycle. + binder = self._bind_request_context(request) + return _BoundResponseStream( # type: ignore[return-value] + self.target.run(self._wrap_input(request), stream=True, **run_kwargs), + binder, + ) + + def _resolve_checkpoint_storage(self, request: ChannelRequest) -> CheckpointStorage | None: + """Build (or return) the per-request checkpoint storage, or ``None``. + + Returns ``None`` when no ``checkpoint_location`` is configured or + when the request lacks a stable session key — without a key we + cannot scope checkpoints per conversation, and we'd rather skip + checkpointing than pollute a single shared store. + """ + if self._checkpoint_location is None: + return None + if request.session is None or not request.session.isolation_key: + return None + if isinstance(self._checkpoint_location, Path): + return FileCheckpointStorage(str(self._checkpoint_location / request.session.isolation_key)) + # Caller-supplied storage — used as-is; caller owns scoping. + return self._checkpoint_location + + async def _invoke_workflow(self, request: ChannelRequest) -> HostedRunResult: + """Dispatch to ``Workflow.run`` and collapse outputs into a ``HostedRunResult``. + + The channel's ``run_hook`` is the canonical adapter for shaping + ``request.input`` into the workflow start executor's typed input + (free-form text from a Telegram message, structured ``Responses`` + ``input`` items, …). When no hook is wired, ``request.input`` is + forwarded verbatim — appropriate for workflows whose start executor + accepts the channel's native input type (commonly ``str``). + + When ``checkpoint_location`` is configured on the host, a + per-conversation checkpoint storage is resolved, the workflow is + restored from its latest checkpoint (if any) and then re-run with + the new input — mirroring the resume semantics of the Foundry + Responses host. + """ + # Workflows do not own session state in the agent sense and do not + # accept ``session=`` / ``options=`` kwargs. The channel's run_hook is + # the seam for any per-run customization; nothing flows through here. + workflow: Workflow = self.target # type: ignore[assignment] + storage = self._resolve_checkpoint_storage(request) + if storage is not None: + latest = await storage.get_latest(workflow_name=workflow.name) + if latest is not None: + # Restore in-memory state from the most recent checkpoint + # before applying the new input. + await workflow.run(checkpoint_id=latest.checkpoint_id, checkpoint_storage=storage) + result = await workflow.run(request.input, checkpoint_storage=storage) + else: + result = await workflow.run(request.input) + outputs = result.get_outputs() + text = "\n".join(_workflow_output_to_text(o) for o in outputs) if outputs else "" + return HostedRunResult(text=text) + + def _invoke_workflow_stream(self, request: ChannelRequest) -> ResponseStream[AgentResponseUpdate, AgentResponse]: + """Bridge ``Workflow.run(stream=True)`` to a channel-facing ``ResponseStream``. + + Wraps the workflow's ``ResponseStream[WorkflowEvent, WorkflowRunResult]`` + in a new ``ResponseStream[AgentResponseUpdate, AgentResponse]`` so + channels can iterate it identically to an agent stream and apply + their ``stream_transform_hook`` callables. + + Mapping rules: + + - ``output`` events whose ``data`` is already an + :class:`AgentResponseUpdate` (the common case for workflows + containing :class:`AgentExecutor`) pass through unchanged. + - ``output`` events with any other ``data`` are wrapped into a + single-text-content :class:`AgentResponseUpdate`. + - All other event types (``status``, ``executor_invoked``, + ``superstep_*``, lifecycle, …) are filtered out — channels only + care about user-visible text. Hooks can opt back in by inspecting + ``raw_representation`` on the produced updates. + + The original :class:`WorkflowEvent` is stashed on + ``AgentResponseUpdate.raw_representation`` so advanced consumers + (telemetry, debug UIs) can recover the full workflow timeline. + + Checkpoint restoration (when ``checkpoint_location`` is set) runs + before the input stream is opened so the new turn observes the + restored state. + """ + workflow: Workflow = self.target # type: ignore[assignment] + storage = self._resolve_checkpoint_storage(request) + + async def _maybe_restore() -> None: + if storage is None: + return + latest = await storage.get_latest(workflow_name=workflow.name) + if latest is None: + return + # Drain the restoration stream so the no-op invocation actually + # rehydrates state before the real run starts. + async for _ in workflow.run( + stream=True, + checkpoint_id=latest.checkpoint_id, + checkpoint_storage=storage, + ): + pass + + async def _bridge() -> AsyncIterator[AgentResponseUpdate]: + await _maybe_restore() + workflow_stream = workflow.run(request.input, stream=True, checkpoint_storage=storage) + try: + async for event in workflow_stream: + update = _workflow_event_to_update(event) + if update is not None: + yield update + finally: + async with _suppress_already_consumed(): + await workflow_stream.get_final_response() + + async def _finalize(updates: Sequence[AgentResponseUpdate]) -> AgentResponse: # noqa: RUF029 + return AgentResponse.from_updates(updates) + + return ResponseStream[AgentResponseUpdate, AgentResponse](_bridge(), finalizer=_finalize) + + def _wrap_input(self, request: ChannelRequest) -> Message | list[Message]: + """Promote ``request.input`` to ``Message``(s) carrying channel metadata. + + Channels deliver inputs as plain text, a single ``Message``, or a list + of ``Message`` (e.g. a Responses-API request that includes a ``system`` + instruction plus the user turn). To preserve channel provenance + + identity + ``response_target`` on the persisted history record (and + make it visible to context providers, evals, audits), we attach a + ``hosting`` block under ``additional_properties``. AF's + ``Message.to_dict`` round-trips ``additional_properties`` through any + ``HistoryProvider`` that serializes via ``to_dict`` (e.g. + ``FileHistoryProvider``) and the framework explicitly does *not* + forward these fields to model providers, so they are safe to attach. + + For a list of messages we attach the metadata to the LAST message that + will be persisted (typically the user turn) — this keeps a single, + searchable record of where the inbound message came from. + """ + hosting_meta: dict[str, Any] = {"channel": request.channel} + if request.identity is not None: + hosting_meta["identity"] = { + "channel": request.identity.channel, + "native_id": request.identity.native_id, + "attributes": dict(request.identity.attributes) if request.identity.attributes else {}, + } + target = request.response_target + hosting_meta["response_target"] = { + "kind": target.kind.value, + "targets": list(target.targets), + } + + raw = request.input + if isinstance(raw, Message): + raw.additional_properties = {**(raw.additional_properties or {}), "hosting": hosting_meta} + return raw + if isinstance(raw, list) and raw and all(isinstance(m, Message) for m in raw): + messages: list[Message] = [m for m in raw if isinstance(m, Message)] + last = messages[-1] + last.additional_properties = {**(last.additional_properties or {}), "hosting": hosting_meta} + return messages + # ``raw`` is typed as ``AgentRunInputs`` (str | Content | Message | Sequence[…]). + # The remaining cases are str / Content / Mapping — wrap as a single user message. + return Message( + role="user", + contents=[raw], # type: ignore[list-item] + additional_properties={"hosting": hosting_meta}, + ) + + def _record_identity(self, request: ChannelRequest) -> None: + """Update the per-``isolation_key`` identity registry + active-channel hint. + + Called on every successful resolve. ``ResponseTarget.active`` + consumes ``self._active``; ``ResponseTarget.channel(name)`` / + ``.channels([...])`` / ``.all_linked`` consume ``self._identities``. + """ + if request.identity is None or request.session is None: + return + key = request.session.isolation_key + if not key: + return + self._identities.setdefault(key, {})[request.identity.channel] = request.identity + self._active[key] = request.identity.channel + + async def _deliver_response(self, request: ChannelRequest, payload: HostedRunResult) -> DeliveryReport: + """Resolve ``request.response_target`` and call ``ChannelPush.push`` on each. + + Per SPEC-002 §"ResponseTarget": for any non-``originating`` target, + the originating channel returns an acknowledgment and the actual + agent reply lands on the destination channel(s). When a destination + cannot be resolved (no known native id) or doesn't implement + ``ChannelPush``, it is dropped and surfaced in + :class:`DeliveryReport.skipped`. If every destination drops, we + fall back to delivering on the originating channel (matching the + spec's policy default). + """ + target = request.response_target + kind = target.kind + + # Fast paths for the trivial variants. + if kind == ResponseTargetKind.ORIGINATING: + return DeliveryReport(include_originating=True) + if kind == ResponseTargetKind.NONE: + # Background-only — drop the reply on the floor for now (no + # ContinuationToken in the prototype). + return DeliveryReport(include_originating=False) + + # Build the destination set. + include_originating = False + # Each entry is (channel_name, identity_override_or_None_to_lookup). + destinations: list[tuple[str, ChannelIdentity | None]] = [] + isolation_key = request.session.isolation_key if request.session is not None else None + known = self._identities.get(isolation_key or "", {}) + + if kind == ResponseTargetKind.ACTIVE: + active = self._active.get(isolation_key or "") + if active is None or active == request.channel: + # Fall back to originating when there's no other active + # channel known (matches the "first message" case). + return DeliveryReport(include_originating=True) + destinations.append((active, known.get(active))) + + elif kind == ResponseTargetKind.ALL_LINKED: + for channel_name, identity in known.items(): + if channel_name == request.channel: + include_originating = True + continue + destinations.append((channel_name, identity)) + if not destinations and not include_originating: + # No links recorded yet — fall back. + return DeliveryReport(include_originating=True) + + elif kind == ResponseTargetKind.CHANNELS: + for entry in target.targets: + if entry == "originating": + include_originating = True + continue + if ":" in entry: + channel_name, _, native_id = entry.partition(":") + if channel_name == request.channel: + # Pointing the originating channel at itself with a + # specific native id — treat as "include + # originating" since the channel will reply on its + # own wire to that user anyway. + include_originating = True + continue + destinations.append((channel_name, ChannelIdentity(channel=channel_name, native_id=native_id))) + else: + if entry == request.channel: + include_originating = True + continue + destinations.append((entry, known.get(entry))) + + # Dispatch. + by_name = {ch.name: ch for ch in self.channels} + pushed: list[str] = [] + skipped: list[str] = [] + for channel_name, dest_identity in destinations: + channel = by_name.get(channel_name) + token = f"{channel_name}:{dest_identity.native_id}" if dest_identity is not None else channel_name + if channel is None: + logger.warning("deliver_response: no channel named %r (target=%s)", channel_name, token) + skipped.append(token) + continue + if not isinstance(channel, ChannelPush): + logger.warning( + "deliver_response: channel %r does not implement ChannelPush (target=%s)", + channel_name, + token, + ) + skipped.append(token) + continue + if dest_identity is None: + logger.warning( + "deliver_response: no known identity for isolation_key=%s on channel=%s", + isolation_key, + channel_name, + ) + skipped.append(token) + continue + try: + await channel.push(dest_identity, payload) + except Exception: + logger.exception("deliver_response: push failed for target=%s", token) + skipped.append(token) + continue + pushed.append(token) + logger.info("deliver_response: pushed to %s (%d chars)", token, len(payload.text)) + + if not pushed and not include_originating: + # Spec policy: if every destination drops, deliver to originating. + logger.warning("deliver_response: every destination dropped — falling back to originating") + include_originating = True + + return DeliveryReport( + include_originating=include_originating, + pushed=tuple(pushed), + skipped=tuple(skipped), + ) + + +__all__ = ["AgentFrameworkHost", "ChannelContext", "logger"] diff --git a/python/packages/hosting/agent_framework_hosting/_isolation.py b/python/packages/hosting/agent_framework_hosting/_isolation.py new file mode 100644 index 0000000000..53fb2f1e54 --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_isolation.py @@ -0,0 +1,76 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Per-request isolation keys read from inbound HTTP headers. + +The Foundry Hosted Agents runtime injects two well-known headers on every +request it forwards to the user's container: + +* ``x-agent-user-isolation-key`` — opaque per-user partition key +* ``x-agent-chat-isolation-key`` — opaque per-conversation partition key + +When the headers are present we are running inside (or being driven by) the +Foundry runtime; when they are absent we are running in plain local dev. The +host installs an ASGI middleware in :meth:`AgentFrameworkHost._build_app` +that reads both headers off every inbound HTTP request and pushes them into +the :data:`current_isolation_keys` contextvar for the duration of the +request, then resets it. Providers that need partition-aware storage (most +notably ``FoundryHostedAgentHistoryProvider``) read the contextvar via +:func:`get_current_isolation_keys` and apply the keys to their backend +calls — so app authors don't have to wire any middleware themselves and +channels stay free of Foundry-specific header knowledge. + +The contextvar holds a plain :class:`IsolationKeys` mapping; conversion to +provider-specific types (e.g. Foundry's ``IsolationContext``) happens at +the consuming provider so this module has no provider dependencies. +""" + +from __future__ import annotations + +from contextvars import ContextVar, Token + +__all__ = [ + "ISOLATION_HEADER_CHAT", + "ISOLATION_HEADER_USER", + "IsolationKeys", + "current_isolation_keys", + "get_current_isolation_keys", + "reset_current_isolation_keys", + "set_current_isolation_keys", +] + + +ISOLATION_HEADER_USER = "x-agent-user-isolation-key" +ISOLATION_HEADER_CHAT = "x-agent-chat-isolation-key" + + +class IsolationKeys: + """Per-request Foundry isolation keys lifted off the inbound headers.""" + + def __init__(self, user_key: str | None = None, chat_key: str | None = None) -> None: + self.user_key = user_key + self.chat_key = chat_key + + @property + def is_empty(self) -> bool: + return self.user_key is None and self.chat_key is None + + +current_isolation_keys: ContextVar[IsolationKeys | None] = ContextVar( + "agent_framework_hosting_isolation_keys", + default=None, +) + + +def get_current_isolation_keys() -> IsolationKeys | None: + """Return the isolation keys bound to the current request, if any.""" + return current_isolation_keys.get() + + +def set_current_isolation_keys(keys: IsolationKeys | None) -> Token[IsolationKeys | None]: + """Bind ``keys`` to the current async context and return a reset token.""" + return current_isolation_keys.set(keys) + + +def reset_current_isolation_keys(token: Token[IsolationKeys | None]) -> None: + """Restore the isolation contextvar to its prior value.""" + current_isolation_keys.reset(token) diff --git a/python/packages/hosting/agent_framework_hosting/_types.py b/python/packages/hosting/agent_framework_hosting/_types.py new file mode 100644 index 0000000000..7d200a3266 --- /dev/null +++ b/python/packages/hosting/agent_framework_hosting/_types.py @@ -0,0 +1,376 @@ +# Copyright (c) Microsoft. All rights reserved. + +# ``ChannelRequest`` is the only intentional dataclass here (callers use +# ``dataclasses.replace`` on it in run hooks). The other types are plain +# Python classes by preference, so the "could be a dataclass" lint is muted +# at the file level. +# ruff: noqa: B903 + +"""Channel-neutral request envelope and channel protocol types. + +These types form the boundary between the host and individual channels. +A channel parses its native payload, builds a :class:`ChannelRequest`, and +hands it to :class:`ChannelContext.run` (or ``run_stream``) on the host. +The host normalizes the request into a single agent invocation and either +returns the result to the originating channel or fans out via +:class:`ResponseTarget` to other channels that implement +:class:`ChannelPush`. + +See ``docs/specs/002-python-hosting-channels.md`` for the full design. +""" + +from __future__ import annotations + +from collections.abc import Awaitable, Callable, Mapping, Sequence +from dataclasses import dataclass, field +from enum import Enum +from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable + +from agent_framework import ( + AgentResponse, + AgentResponseUpdate, + AgentRunInputs, + ResponseStream, + SupportsAgentRun, + Workflow, +) +from starlette.routing import BaseRoute + +if TYPE_CHECKING: + from ._host import ChannelContext + + +# --------------------------------------------------------------------------- # +# Channel-neutral request envelope +# --------------------------------------------------------------------------- # + + +_EMPTY_MAPPING: Mapping[str, Any] = {} + + +class ChannelSession: + """Channel-supplied session hint. + + The host turns this into an ``AgentSession`` keyed by ``isolation_key`` so + every distinct end user gets their own context-provider state (e.g. one + ``FileHistoryProvider`` JSONL file per user). + """ + + def __init__(self, isolation_key: str | None = None) -> None: + self.isolation_key = isolation_key + + +class ChannelIdentity: + """Channel-native identity the host sees on each request. + + Consumed by the host's identity registry. The host uses it for two things: + + 1. Recording the active channel for an ``isolation_key`` so + ``ResponseTarget.active`` resolves correctly. + 2. Telling :class:`ChannelPush` ``push`` recipients **where** in their + native namespace to deliver — Telegram uses ``native_id`` as the + chat id, Teams as the conversation/AAD id, etc. + """ + + def __init__( + self, + channel: str, + native_id: str, + attributes: Mapping[str, Any] | None = None, + ) -> None: + self.channel = channel + self.native_id = native_id + self.attributes: Mapping[str, Any] = attributes if attributes is not None else _EMPTY_MAPPING + + +class ResponseTargetKind(str, Enum): + """Discriminator for :class:`ResponseTarget` variants.""" + + ORIGINATING = "originating" + ACTIVE = "active" + CHANNELS = "channels" + ALL_LINKED = "all_linked" + NONE = "none" + + +class ResponseTarget: + """Per-request directive controlling **where** the host delivers the agent reply. + + Independent of ``session_mode``. Construct via the classmethod helpers or + use the module-level singletons rather than touching ``kind`` directly. + Variants: + + - ``ResponseTarget.originating`` (default) — synchronous response on the + originating channel only. + - ``ResponseTarget.active`` — push to the channel most recently observed + for the resolved ``isolation_key``. + - ``ResponseTarget.channel("teams")`` / ``.channels([...])`` — push to + one or more named destinations. Each entry is either a bare channel + name (host resolves the native id from its identity registry) or a + ``"channel:native_id"`` token (used verbatim). The pseudo-name + ``"originating"`` includes the originating channel in the fan-out. + - ``ResponseTarget.all_linked`` — push to every channel where the + resolved ``isolation_key`` has been observed. + - ``ResponseTarget.none`` — background-only; in the prototype this just + suppresses the originating reply (no ``ContinuationToken`` yet). + + Instances are intended to be treated as immutable; the singletons are + shared across the process. + """ + + def __init__( + self, + kind: ResponseTargetKind = ResponseTargetKind.ORIGINATING, + targets: tuple[str, ...] = (), + ) -> None: + self.kind = kind + self.targets = targets + + # -- builders ---------------------------------------------------------- # + + @classmethod + def channel(cls, name: str) -> "ResponseTarget": + """Target a single named destination channel.""" + return cls(kind=ResponseTargetKind.CHANNELS, targets=(name,)) + + @classmethod + def channels(cls, names: Sequence[str]) -> "ResponseTarget": + """Target an explicit list of destination channels.""" + return cls(kind=ResponseTargetKind.CHANNELS, targets=tuple(names)) + + # -- value semantics --------------------------------------------------- # + # ``ResponseTarget`` is treated as immutable, so two instances with the + # same ``kind`` + ``targets`` are interchangeable. Tests and channel + # parsers compare instances with ``==`` and use them as dict keys. + + def __eq__(self, other: object) -> bool: + if not isinstance(other, ResponseTarget): + return NotImplemented + return self.kind is other.kind and self.targets == other.targets + + def __hash__(self) -> int: + return hash((self.kind, self.targets)) + + def __repr__(self) -> str: + if self.kind is ResponseTargetKind.CHANNELS: + return f"ResponseTarget.channels({list(self.targets)!r})" + return f"ResponseTarget.{self.kind.value}" + + +# Module-level singletons so callers can write ``ResponseTarget.originating`` +# (matching the spec's classmethod-style notation) without juggling Python's +# no-zero-arg-classmethod-property limitation. +ResponseTarget.originating = ResponseTarget(kind=ResponseTargetKind.ORIGINATING) # type: ignore[attr-defined] +ResponseTarget.active = ResponseTarget(kind=ResponseTargetKind.ACTIVE) # type: ignore[attr-defined] +ResponseTarget.all_linked = ResponseTarget(kind=ResponseTargetKind.ALL_LINKED) # type: ignore[attr-defined] +ResponseTarget.none = ResponseTarget(kind=ResponseTargetKind.NONE) # type: ignore[attr-defined] + + +@dataclass +class ChannelRequest: + """Uniform invocation envelope every channel produces from its native payload. + + Kept as a dataclass so app authors can use ``dataclasses.replace(...)`` in + run hooks to produce a modified envelope without re-listing every field. + """ + + channel: str + operation: str # e.g. "message.create", "command.invoke" + input: AgentRunInputs + session: ChannelSession | None = None + options: Mapping[str, Any] | None = None + session_mode: str = "auto" # "auto" | "required" | "disabled" + metadata: Mapping[str, Any] = field(default_factory=lambda: {}) + attributes: Mapping[str, Any] = field(default_factory=lambda: {}) + stream: bool = False + identity: ChannelIdentity | None = None + response_target: ResponseTarget = field(default_factory=lambda: ResponseTarget.originating) # type: ignore[attr-defined] + + +class ChannelCommand: + """A discoverable command a channel exposes to its users (e.g. ``/reset``).""" + + def __init__( + self, + name: str, + description: str, + handle: Callable[["ChannelCommandContext"], Awaitable[None]], + ) -> None: + self.name = name + self.description = description + self.handle = handle + + +class ChannelCommandContext: + """Context passed to a :class:`ChannelCommand` handler.""" + + def __init__( + self, + request: ChannelRequest, + reply: Callable[[str], Awaitable[None]], + ) -> None: + self.request = request + self.reply = reply + + +_EMPTY_ROUTES: tuple[BaseRoute, ...] = () +_EMPTY_COMMANDS: tuple[ChannelCommand, ...] = () +_EMPTY_LIFECYCLE: tuple[Callable[[], Awaitable[None]], ...] = () + + +class ChannelContribution: + """Routes, commands, and lifecycle hooks a channel contributes to the host.""" + + def __init__( + self, + routes: Sequence[BaseRoute] = _EMPTY_ROUTES, + commands: Sequence[ChannelCommand] = _EMPTY_COMMANDS, + on_startup: Sequence[Callable[[], Awaitable[None]]] = _EMPTY_LIFECYCLE, + on_shutdown: Sequence[Callable[[], Awaitable[None]]] = _EMPTY_LIFECYCLE, + ) -> None: + self.routes = routes + self.commands = commands + self.on_startup = on_startup + self.on_shutdown = on_shutdown + + +class HostedRunResult: + """Channel-neutral result of an agent invocation routed through the host.""" + + def __init__(self, text: str) -> None: + self.text = text + + +class DeliveryReport: + """What :meth:`ChannelContext.deliver_response` did with a payload. + + The originating channel uses ``include_originating`` to decide whether + to render the agent reply on its own wire (``True`` — default for the + ``originating`` target, or when ``"originating"`` is one of the listed + destinations) or to return only an acknowledgement (``False`` — when + the target lists only out-of-band destinations). + """ + + def __init__( + self, + include_originating: bool, + pushed: tuple[str, ...] = (), + skipped: tuple[str, ...] = (), + ) -> None: + self.include_originating = include_originating + self.pushed = pushed # destination tokens delivered to (e.g. "telegram:123") + self.skipped = skipped # destinations resolved but skipped (no push, failed, …) + + +# A transform hook runs over each AgentResponseUpdate as the channel consumes +# the stream. It can return a replacement update, ``None`` to drop the update, +# or be async. Channels apply it during iteration so that channel-specific +# concerns (e.g. masking, redaction, formatting for the wire) live close to +# the channel rather than on the agent. +ChannelStreamTransformHook = Callable[ + [AgentResponseUpdate], + "AgentResponseUpdate | Awaitable[AgentResponseUpdate | None] | None", +] + + +# --------------------------------------------------------------------------- # +# Channel run hook +# --------------------------------------------------------------------------- # + + +# Run hooks accept the channel-built ``ChannelRequest`` and return a +# (possibly modified) replacement. Channels invoke the hook with both the +# request and the channel-side context as keyword arguments — the call +# convention is ``await hook(request, target=..., protocol_request=...)``. +# +# The ergonomic minimum for a hook implementation is therefore a function +# accepting ``request`` positionally plus ``**kwargs`` and returning a +# (possibly mutated) :class:`ChannelRequest`. Hooks that need the agent +# target or the raw channel-native payload pull them off the keyword +# arguments by name (``target`` / ``protocol_request``). +# +# ``protocol_request`` is the raw, channel-native payload the channel +# parsed (the JSON body for Responses, the Telegram ``Update`` dict, the +# Bot Framework ``Activity`` for Teams). Use it when the hook needs a +# field the channel did not lift onto ``ChannelRequest`` (e.g. OpenAI's +# ``safety_identifier``, Teams' ``from.aadObjectId``, …). +ChannelRunHook = Callable[..., "Awaitable[ChannelRequest] | ChannelRequest"] + + +async def apply_run_hook( + hook: ChannelRunHook, + request: ChannelRequest, + *, + target: SupportsAgentRun | Workflow, + protocol_request: Any | None, +) -> ChannelRequest: + """Channel-side helper to invoke a :data:`ChannelRunHook` with the standard kwargs. + + Channels call this rather than calling the hook directly so the + invocation convention (``request`` positional, ``target`` / + ``protocol_request`` keyword) is enforced in one place. + """ + result = hook(request, target=target, protocol_request=protocol_request) + if isinstance(result, Awaitable): + return await result + return result + + +# --------------------------------------------------------------------------- # +# Channel protocols +# --------------------------------------------------------------------------- # + + +@runtime_checkable +class Channel(Protocol): + """A pluggable adapter that exposes one transport on the host. + + Channels publish their routes, commands, and lifecycle callbacks via + :meth:`contribute`. The host mounts them under the channel's ``path`` + (or at the app root when ``path == ""``) and gives the channel a + :class:`ChannelContext` so it can call back into the host to invoke + the agent target and deliver responses. + """ + + name: str + path: str # default mount path (e.g. "/responses"); use "" to mount routes at the app root + + def contribute(self, context: "ChannelContext") -> ChannelContribution: ... + + +@runtime_checkable +class ChannelPush(Protocol): + """Optional capability: a channel that can deliver outbound messages without a prior request. + + Per SPEC-002 (req #13), channels that can do proactive delivery + (Telegram bot proactive message, Teams proactive bot message, + webhook callbacks, SSE broadcasts) implement ``push`` on top of the + base :class:`Channel` protocol. Channels without push can only be + addressed as the ``originating`` :class:`ResponseTarget`. + """ + + name: str + + async def push(self, identity: ChannelIdentity, payload: HostedRunResult) -> None: ... + + +__all__ = [ + "AgentResponse", + "AgentResponseUpdate", + "Channel", + "ChannelCommand", + "ChannelCommandContext", + "ChannelContribution", + "ChannelIdentity", + "ChannelPush", + "ChannelRequest", + "ChannelRunHook", + "ChannelSession", + "ChannelStreamTransformHook", + "DeliveryReport", + "HostedRunResult", + "ResponseStream", + "ResponseTarget", + "ResponseTargetKind", + "apply_run_hook", +] diff --git a/python/packages/hosting/pyproject.toml b/python/packages/hosting/pyproject.toml new file mode 100644 index 0000000000..57c79854bb --- /dev/null +++ b/python/packages/hosting/pyproject.toml @@ -0,0 +1,107 @@ +[project] +name = "agent-framework-hosting" +description = "Multi-channel hosting for Microsoft Agent Framework agents." +authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}] +readme = "README.md" +requires-python = ">=3.10" +version = "1.0.0a260424" +license-files = ["LICENSE"] +urls.homepage = "https://aka.ms/agent-framework" +urls.source = "https://github.com/microsoft/agent-framework/tree/main/python" +urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true" +urls.issues = "https://github.com/microsoft/agent-framework/issues" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", + "Typing :: Typed", +] +dependencies = [ + "agent-framework-core>=1.2.0,<2", + "starlette>=0.37", +] + +[project.optional-dependencies] +serve = [ + "hypercorn>=0.17", +] + +[tool.uv] +prerelease = "if-necessary-or-explicit" +environments = [ + "sys_platform == 'darwin'", + "sys_platform == 'linux'", + "sys_platform == 'win32'" +] + +[tool.uv-dynamic-versioning] +fallback-version = "0.0.0" + +[tool.pytest.ini_options] +testpaths = 'tests' +addopts = "-ra -q -r fEX" +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +filterwarnings = [] +timeout = 120 +markers = [ + "integration: marks tests as integration tests that require external services", +] + +[tool.ruff] +extend = "../../pyproject.toml" + +[tool.coverage.run] +omit = [ + "**/__init__.py" +] + +[tool.pyright] +extends = "../../pyproject.toml" +include = ["agent_framework_hosting"] +exclude = ['tests'] + +[tool.mypy] +plugins = ['pydantic.mypy'] +strict = true +python_version = "3.10" +ignore_missing_imports = true +disallow_untyped_defs = true +no_implicit_optional = true +check_untyped_defs = true +warn_return_any = true +show_error_codes = true +warn_unused_ignores = false +disallow_incomplete_defs = true +disallow_untyped_decorators = true + +[tool.bandit] +targets = ["agent_framework_hosting"] +exclude_dirs = ["tests"] + +[tool.poe] +executor.type = "uv" +include = "../../shared_tasks.toml" + +[tool.poe.tasks.mypy] +help = "Run MyPy for this package." +cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hosting" + +[tool.poe.tasks.test] +help = "Run the default unit test suite for this package." +cmd = 'pytest -m "not integration" --cov=agent_framework_hosting --cov-report=term-missing:skip-covered tests' + +[build-system] +requires = ["flit-core >= 3.11,<4.0"] +build-backend = "flit_core.buildapi" + +[dependency-groups] +dev = [ + "httpx>=0.28.1", +] diff --git a/python/packages/hosting/tests/__init__.py b/python/packages/hosting/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/packages/hosting/tests/_workflow_fixtures.py b/python/packages/hosting/tests/_workflow_fixtures.py new file mode 100644 index 0000000000..f59bb8cab8 --- /dev/null +++ b/python/packages/hosting/tests/_workflow_fixtures.py @@ -0,0 +1,43 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Workflow fixtures for hosting tests. + +Defined in a module that does not use ``from __future__ import annotations`` +because the workflow handler validation reflects on real annotation objects +rather than stringified forms. +""" + +from agent_framework import Executor, Workflow, WorkflowBuilder, WorkflowContext, handler + + +class _UpperExecutor(Executor): + @handler + async def handle(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.yield_output(text.upper()) + + +class _EchoExecutor(Executor): + @handler + async def handle(self, text: str, ctx: WorkflowContext[str]) -> None: + await ctx.yield_output(text) + + +def build_upper_workflow() -> Workflow: + return WorkflowBuilder(start_executor=_UpperExecutor(id="upper")).build() + + +def build_echo_workflow() -> Workflow: + return WorkflowBuilder(start_executor=_EchoExecutor(id="echo")).build() + + +class _MultiChunkExecutor(Executor): + """Yields three separate ``output`` events so streaming has something to chew on.""" + + @handler + async def handle(self, text: str, ctx: WorkflowContext[str]) -> None: + for chunk in (f"{text}-1", f"{text}-2", f"{text}-3"): + await ctx.yield_output(chunk) + + +def build_multi_chunk_workflow() -> Workflow: + return WorkflowBuilder(start_executor=_MultiChunkExecutor(id="multi")).build() diff --git a/python/packages/hosting/tests/test_host.py b/python/packages/hosting/tests/test_host.py new file mode 100644 index 0000000000..bb6d7d5963 --- /dev/null +++ b/python/packages/hosting/tests/test_host.py @@ -0,0 +1,714 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for :class:`AgentFrameworkHost` invocation, session, and delivery routing.""" + +from __future__ import annotations + +from collections.abc import AsyncIterator, Sequence +from dataclasses import dataclass, field +from typing import Any + +import pytest +from agent_framework import AgentResponseUpdate +from starlette.requests import Request +from starlette.responses import JSONResponse +from starlette.routing import BaseRoute, Route +from starlette.testclient import TestClient + +from agent_framework_hosting import ( + AgentFrameworkHost, + Channel, + ChannelContext, + ChannelContribution, + ChannelIdentity, + ChannelPush, + ChannelRequest, + ChannelSession, + HostedRunResult, + ResponseTarget, +) + + +async def _ping(_request: Request) -> JSONResponse: + return JSONResponse({"ok": True}) + + +# --------------------------------------------------------------------------- # +# Fakes # +# --------------------------------------------------------------------------- # + + +@dataclass +class _FakeAgentSession: + session_id: str | None = None + service_session_id: str | None = None + + +@dataclass +class _FakeAgentResponse: + text: str + + +class _FakeAgent: + """Minimal :class:`SupportsAgentRun` implementation that records invocations.""" + + def __init__(self, reply: str = "ok") -> None: + self._reply = reply + self.calls: list[dict[str, Any]] = [] + self.created_sessions: list[_FakeAgentSession] = [] + + def create_session(self, *, session_id: str | None = None) -> _FakeAgentSession: + s = _FakeAgentSession(session_id=session_id) + self.created_sessions.append(s) + return s + + async def run(self, messages: Any = None, *, stream: bool = False, session: Any = None, **kwargs: Any) -> Any: + self.calls.append({"messages": messages, "stream": stream, "session": session, "kwargs": kwargs}) + if stream: # pragma: no cover - not used by these tests + + async def _gen() -> AsyncIterator[Any]: + yield self._reply + + return _gen() + return _FakeAgentResponse(text=self._reply) + + +class _RecordingChannel: + """Minimal :class:`Channel` + :class:`ChannelPush` for routing tests.""" + + def __init__(self, name: str = "fake", path: str = "/fake", supports_push: bool = True) -> None: + self.name = name + self.path = path + self.context: ChannelContext | None = None + self.pushes: list[tuple[ChannelIdentity, HostedRunResult]] = [] + self._push_raises: Exception | None = None + self._supports_push = supports_push + # Provide a single trivial route so contribute() exercises the mount path. + self._routes: Sequence[BaseRoute] = (Route("/ping", _ping),) + + def contribute(self, context: ChannelContext) -> ChannelContribution: + self.context = context + return ChannelContribution(routes=self._routes) + + async def push(self, identity: ChannelIdentity, payload: HostedRunResult) -> None: + if self._push_raises is not None: + raise self._push_raises + self.pushes.append((identity, payload)) + + +class _NoPushChannel: + """A channel that does NOT implement :class:`ChannelPush`.""" + + def __init__(self, name: str = "nopush", path: str = "/nopush") -> None: + self.name = name + self.path = path + + def contribute(self, context: ChannelContext) -> ChannelContribution: + return ChannelContribution() + + +@dataclass +class _LifecycleChannel: + name: str = "lifecycle" + path: str = "" + started: list[str] = field(default_factory=list) + stopped: list[str] = field(default_factory=list) + + def contribute(self, context: ChannelContext) -> ChannelContribution: + async def on_start() -> None: + self.started.append("up") + + async def on_stop() -> None: + self.stopped.append("down") + + return ChannelContribution(on_startup=[on_start], on_shutdown=[on_stop]) + + +# --------------------------------------------------------------------------- # +# Host wiring # +# --------------------------------------------------------------------------- # + + +class TestHostWiring: + def test_channel_is_recognized(self) -> None: + ch = _RecordingChannel() + assert isinstance(ch, Channel) + assert isinstance(ch, ChannelPush) + + def test_app_mounts_channel_routes_under_path(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel(path="/fake") + host = AgentFrameworkHost(target=agent, channels=[ch]) + + with TestClient(host.app) as client: + r = client.get("/fake/ping") + assert r.status_code == 200 + assert r.json() == {"ok": True} + + def test_app_mounts_at_root_when_path_is_empty(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel(path="") + host = AgentFrameworkHost(target=agent, channels=[ch]) + + with TestClient(host.app) as client: + r = client.get("/ping") + assert r.status_code == 200 + + def test_app_is_cached(self) -> None: + host = AgentFrameworkHost(target=_FakeAgent(), channels=[_RecordingChannel()]) + assert host.app is host.app + + def test_lifespan_invokes_startup_and_shutdown(self) -> None: + agent = _FakeAgent() + ch = _LifecycleChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + with TestClient(host.app): + assert ch.started == ["up"] + assert ch.stopped == ["down"] + + def test_app_exposes_readiness_probe(self) -> None: + host = AgentFrameworkHost(target=_FakeAgent(), channels=[_RecordingChannel()]) + with TestClient(host.app) as client: + r = client.get("/readiness") + assert r.status_code == 200 + assert r.text == "ok" + + +# --------------------------------------------------------------------------- # +# Invoke + sessions # +# --------------------------------------------------------------------------- # + + +class TestHostInvoke: + @pytest.mark.asyncio + async def test_invoke_wraps_input_with_hosting_metadata(self) -> None: + agent = _FakeAgent(reply="hello") + ch = _RecordingChannel(name="responses") + host = AgentFrameworkHost(target=agent, channels=[ch]) + # Force ``app`` build to trigger ``contribute``. + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel="responses", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="user:1"), + identity=ChannelIdentity(channel="responses", native_id="user:1"), + ) + result = await ch.context.run(req) + + assert result.text == "hello" + assert len(agent.calls) == 1 + msg = agent.calls[0]["messages"] + assert msg.role == "user" + assert msg.additional_properties["hosting"]["channel"] == "responses" + assert msg.additional_properties["hosting"]["identity"] == { + "channel": "responses", + "native_id": "user:1", + "attributes": {}, + } + assert msg.additional_properties["hosting"]["response_target"] == { + "kind": "originating", + "targets": [], + } + + @pytest.mark.asyncio + async def test_invoke_caches_session_per_isolation_key(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req_a = ChannelRequest( + channel=ch.name, operation="op", input="1", session=ChannelSession(isolation_key="alice") + ) + req_b = ChannelRequest( + channel=ch.name, operation="op", input="2", session=ChannelSession(isolation_key="alice") + ) + req_c = ChannelRequest(channel=ch.name, operation="op", input="3", session=ChannelSession(isolation_key="bob")) + + await ch.context.run(req_a) + await ch.context.run(req_b) + await ch.context.run(req_c) + + # Two distinct sessions created (alice, bob) — never re-created. + assert len(agent.created_sessions) == 2 + assert agent.calls[0]["session"] is agent.calls[1]["session"] + assert agent.calls[0]["session"] is not agent.calls[2]["session"] + + @pytest.mark.asyncio + async def test_session_disabled_does_not_create_session(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel=ch.name, + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + session_mode="disabled", + ) + await ch.context.run(req) + assert agent.created_sessions == [] + assert agent.calls[0]["session"] is None + + @pytest.mark.asyncio + async def test_reset_session_rotates_id_and_drops_cache(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest(channel=ch.name, operation="op", input="x", session=ChannelSession(isolation_key="alice")) + await ch.context.run(req) + first_session = agent.calls[-1]["session"] + assert first_session.session_id == "alice" + + host.reset_session("alice") + await ch.context.run(req) + second_session = agent.calls[-1]["session"] + # New session, new id (alias rotation), distinct object. + assert second_session is not first_session + assert second_session.session_id != "alice" + assert second_session.session_id.startswith("alice#") + + @pytest.mark.asyncio + async def test_options_propagates_to_target_run(self) -> None: + agent = _FakeAgent() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=agent, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel=ch.name, + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + options={"temperature": 0.4}, + ) + await ch.context.run(req) + assert agent.calls[0]["kwargs"]["options"] == {"temperature": 0.4} + + +# --------------------------------------------------------------------------- # +# Workflow target # +# --------------------------------------------------------------------------- # + + +class TestHostWorkflowTarget: + """The host accepts a ``Workflow`` and dispatches to ``workflow.run(...)``.""" + + @pytest.mark.asyncio + async def test_invoke_workflow_collapses_outputs_to_hosted_run_result(self) -> None: + from tests._workflow_fixtures import build_upper_workflow + + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch]) + _ = host.app + assert ch.context is not None + + # The channel's run_hook is the canonical adapter from a free-form input + # to a workflow's typed input; here the start executor accepts ``str`` + # already so the channel forwards ``input`` verbatim. + req = ChannelRequest(channel="fake", operation="message.create", input="hello") + result = await ch.context.run(req) + + assert result.text == "HELLO" + # No session caching for workflow targets — Workflow has no + # ``create_session`` and the host must not invent one. + assert host._sessions == {} + + @pytest.mark.asyncio + async def test_stream_workflow_yields_updates_and_finalizes(self) -> None: + from tests._workflow_fixtures import build_echo_workflow + + workflow = build_echo_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest(channel="fake", operation="message.create", input="hi") + stream = ch.context.run_stream(req) + + updates: list[AgentResponseUpdate] = [] + async for update in stream: + updates.append(update) + + # The echo workflow yields a single ``output`` event whose payload is + # the original string; the host wraps non-update payloads into a + # one-shot ``AgentResponseUpdate`` carrying the text. + assert [u.text for u in updates] == ["hi"] + # ``raw_representation`` preserves the source ``WorkflowEvent`` so + # advanced consumers (telemetry, debug UIs) can recover the full + # workflow timeline. + assert all(u.raw_representation is not None for u in updates) + + final = await stream.get_final_response() + assert final.text == "hi" + + @pytest.mark.asyncio + async def test_stream_workflow_yields_one_update_per_output_event(self) -> None: + from tests._workflow_fixtures import build_multi_chunk_workflow + + workflow = build_multi_chunk_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch]) + _ = host.app + assert ch.context is not None + + req = ChannelRequest(channel="fake", operation="message.create", input="x") + stream = ch.context.run_stream(req) + + chunks: list[str] = [] + async for update in stream: + chunks.append(update.text) + # The originating ``executor_id`` is propagated via author_name so + # multi-agent workflows can route per-author rendering downstream. + assert update.author_name == "multi" + + assert chunks == ["x-1", "x-2", "x-3"] + final = await stream.get_final_response() + assert final.text == "x-1x-2x-3" + + +class TestHostWorkflowCheckpointing: + """The host scopes per-conversation checkpoints when ``checkpoint_location`` is set.""" + + def test_rejects_workflow_with_existing_checkpoint_storage(self, tmp_path: Any) -> None: + from agent_framework import InMemoryCheckpointStorage, WorkflowBuilder + + from tests._workflow_fixtures import _UpperExecutor + + workflow = WorkflowBuilder( + start_executor=_UpperExecutor(id="upper"), + checkpoint_storage=InMemoryCheckpointStorage(), + ).build() + with pytest.raises(RuntimeError, match="already has checkpoint storage"): + AgentFrameworkHost( + target=workflow, + channels=[_RecordingChannel()], + checkpoint_location=tmp_path, + ) + + def test_warns_when_target_is_agent(self, tmp_path: Any, caplog: Any) -> None: + import logging as _logging + + agent = _FakeAgent() + with caplog.at_level(_logging.WARNING, logger="agent_framework.hosting"): + host = AgentFrameworkHost(target=agent, channels=[_RecordingChannel()], checkpoint_location=tmp_path) + assert host._checkpoint_location is None + assert any("checkpoint_location" in rec.message for rec in caplog.records) + + @pytest.mark.asyncio + async def test_invoke_skips_checkpointing_when_no_isolation_key(self, tmp_path: Any) -> None: + from tests._workflow_fixtures import build_upper_workflow + + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=tmp_path) + _ = host.app + assert ch.context is not None + + # No session -> no scoping key -> no checkpoint storage written. + req = ChannelRequest(channel="fake", operation="message.create", input="hi") + result = await ch.context.run(req) + + assert result.text == "HI" + assert list(tmp_path.iterdir()) == [] + + @pytest.mark.asyncio + async def test_invoke_writes_checkpoint_under_isolation_key(self, tmp_path: Any) -> None: + from tests._workflow_fixtures import build_upper_workflow + + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=tmp_path) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel="fake", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="alice"), + ) + result = await ch.context.run(req) + assert result.text == "HI" + + # FileCheckpointStorage rooted at / should + # have produced at least one checkpoint file scoped to that user. + scoped = tmp_path / "alice" + assert scoped.exists() + assert any(scoped.iterdir()), "expected at least one checkpoint to be written under the per-user dir" + + @pytest.mark.asyncio + async def test_stream_writes_checkpoint_under_isolation_key(self, tmp_path: Any) -> None: + from tests._workflow_fixtures import build_echo_workflow + + workflow = build_echo_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=tmp_path) + _ = host.app + assert ch.context is not None + + req = ChannelRequest( + channel="fake", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="bob"), + ) + stream = ch.context.run_stream(req) + async for _ in stream: + pass + await stream.get_final_response() + + scoped = tmp_path / "bob" + assert scoped.exists() + assert any(scoped.iterdir()) + + @pytest.mark.asyncio + async def test_caller_supplied_checkpoint_storage_used_as_is(self, tmp_path: Any) -> None: + from agent_framework import InMemoryCheckpointStorage + + from tests._workflow_fixtures import build_upper_workflow + + storage = InMemoryCheckpointStorage() + workflow = build_upper_workflow() + ch = _RecordingChannel() + host = AgentFrameworkHost(target=workflow, channels=[ch], checkpoint_location=storage) + _ = host.app + assert ch.context is not None + assert host._checkpoint_location is storage + + req = ChannelRequest( + channel="fake", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="carol"), + ) + await ch.context.run(req) + + # The caller-owned storage is used directly (no per-user scoping + # applied by the host); a checkpoint should appear in it. + checkpoints = await storage.list_checkpoints(workflow_name=workflow.name) + assert checkpoints, "expected the caller-supplied storage to receive a checkpoint" + # And nothing should have been written into the tmp_path tree. + assert list(tmp_path.iterdir()) == [] + + +# --------------------------------------------------------------------------- # +# Delivery routing # +# --------------------------------------------------------------------------- # + + +def _make_host_with_two_channels() -> tuple[AgentFrameworkHost, _RecordingChannel, _RecordingChannel, ChannelContext]: + agent = _FakeAgent() + a = _RecordingChannel(name="responses", path="/r") + b = _RecordingChannel(name="telegram", path="/t") + host = AgentFrameworkHost(target=agent, channels=[a, b]) + _ = host.app + assert a.context is not None + return host, a, b, a.context + + +def _record_identity_on(host: AgentFrameworkHost, isolation_key: str, channel: str, native_id: str) -> None: + """Pre-seed the host's identity registry by running a request.""" + host._identities.setdefault(isolation_key, {})[channel] = ChannelIdentity(channel=channel, native_id=native_id) + host._active[isolation_key] = channel + + +class TestDeliverResponse: + @pytest.mark.asyncio + async def test_originating_returns_include_originating(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + req = ChannelRequest(channel="responses", operation="op", input="x") + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == () + assert report.skipped == () + + @pytest.mark.asyncio + async def test_none_suppresses_everything(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + response_target=ResponseTarget.none, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is False + assert report.pushed == () + assert report.skipped == () + + @pytest.mark.asyncio + async def test_active_pushes_to_other_channel(self) -> None: + host, a, b, ctx = _make_host_with_two_channels() + # Alice was last seen on telegram. + _record_identity_on(host, "alice", "telegram", "42") + # Now she sends a message via responses; ResponseTarget.active should + # push to telegram, not back to responses. + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.active, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is False + assert report.pushed == ("telegram:42",) + assert b.pushes and b.pushes[0][0].native_id == "42" + + @pytest.mark.asyncio + async def test_active_falls_back_to_originating_when_self(self) -> None: + host, _a, _b, ctx = _make_host_with_two_channels() + _record_identity_on(host, "alice", "responses", "user:1") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.active, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + + @pytest.mark.asyncio + async def test_channels_with_unknown_identity_skipped(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + # No prior identity seeded for telegram on alice. + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channel("telegram"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + # Skipped → fallback to originating. + assert report.include_originating is True + assert report.skipped == ("telegram",) + assert report.pushed == () + + @pytest.mark.asyncio + async def test_channels_with_explicit_native_id_token(self) -> None: + _, _, b, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + response_target=ResponseTarget.channel("telegram:99"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.pushed == ("telegram:99",) + assert report.include_originating is False + assert b.pushes[0][0].native_id == "99" + + @pytest.mark.asyncio + async def test_channels_originating_pseudo_includes_origin(self) -> None: + host, _a, _b, ctx = _make_host_with_two_channels() + _record_identity_on(host, "alice", "telegram", "42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channels(["originating", "telegram"]), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == ("telegram:42",) + + @pytest.mark.asyncio + async def test_channels_unknown_channel_name_skipped(self) -> None: + _, _, _, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + response_target=ResponseTarget.channel("nope"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True # fallback + assert report.skipped == ("nope",) + + @pytest.mark.asyncio + async def test_no_push_capability_skipped(self) -> None: + agent = _FakeAgent() + a = _RecordingChannel(name="responses", path="/r") + b = _NoPushChannel(name="nopush", path="/n") + host = AgentFrameworkHost(target=agent, channels=[a, b]) + _ = host.app + assert a.context is not None + # Pre-seed identity on the no-push channel so we get past the + # identity check and hit the ChannelPush check. + host._identities.setdefault("alice", {})["nopush"] = ChannelIdentity(channel="nopush", native_id="42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channel("nopush"), + ) + report = await a.context.deliver_response(req, HostedRunResult(text="reply")) + assert report.skipped == ("nopush:42",) + assert report.include_originating is True # fallback + + @pytest.mark.asyncio + async def test_all_linked_pushes_to_every_other_channel(self) -> None: + host, _a, b, ctx = _make_host_with_two_channels() + # Alice on responses (originating) and telegram. + host._identities.setdefault("alice", {}) + host._identities["alice"]["responses"] = ChannelIdentity(channel="responses", native_id="user:1") + host._identities["alice"]["telegram"] = ChannelIdentity(channel="telegram", native_id="42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.all_linked, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == ("telegram:42",) + assert b.pushes and b.pushes[0][1].text == "reply" + + @pytest.mark.asyncio + async def test_all_linked_no_other_channels_falls_back(self) -> None: + host, _a, _b, ctx = _make_host_with_two_channels() + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.all_linked, # type: ignore[attr-defined] + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.include_originating is True + assert report.pushed == () + + @pytest.mark.asyncio + async def test_push_exception_marks_skipped(self) -> None: + host, _a, b, ctx = _make_host_with_two_channels() + b._push_raises = RuntimeError("boom") # type: ignore[attr-defined] + host._identities.setdefault("alice", {})["telegram"] = ChannelIdentity(channel="telegram", native_id="42") + req = ChannelRequest( + channel="responses", + operation="op", + input="x", + session=ChannelSession(isolation_key="alice"), + response_target=ResponseTarget.channel("telegram"), + ) + report = await ctx.deliver_response(req, HostedRunResult(text="reply")) + assert report.skipped == ("telegram:42",) + assert report.include_originating is True # fallback diff --git a/python/packages/hosting/tests/test_types.py b/python/packages/hosting/tests/test_types.py new file mode 100644 index 0000000000..76531dfe49 --- /dev/null +++ b/python/packages/hosting/tests/test_types.py @@ -0,0 +1,105 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tests for the channel-neutral envelope types in :mod:`agent_framework_hosting._types`.""" + +from __future__ import annotations + +from agent_framework_hosting import ( + ChannelIdentity, + ChannelRequest, + ChannelSession, + ResponseTarget, + ResponseTargetKind, +) + + +class TestResponseTarget: + def test_originating_default_singleton(self) -> None: + target = ResponseTarget.originating # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.ORIGINATING + assert target.targets == () + + def test_active_singleton(self) -> None: + target = ResponseTarget.active # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.ACTIVE + assert target.targets == () + + def test_all_linked_singleton(self) -> None: + target = ResponseTarget.all_linked # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.ALL_LINKED + + def test_none_singleton(self) -> None: + target = ResponseTarget.none # type: ignore[attr-defined] + assert target.kind is ResponseTargetKind.NONE + + def test_channel_builder_single(self) -> None: + target = ResponseTarget.channel("teams") + assert target.kind is ResponseTargetKind.CHANNELS + assert target.targets == ("teams",) + + def test_channels_builder_list(self) -> None: + target = ResponseTarget.channels(["teams", "telegram", "originating"]) + assert target.kind is ResponseTargetKind.CHANNELS + assert target.targets == ("teams", "telegram", "originating") + + def test_channels_builder_accepts_tuple(self) -> None: + target = ResponseTarget.channels(("a", "b")) + assert target.targets == ("a", "b") + + def test_target_is_hashable(self) -> None: + # Plain class — hashing falls back to identity, which is fine here: + # the two keys below are different instances (singleton vs builder). + d = {ResponseTarget.originating: 1, ResponseTarget.channel("t"): 2} # type: ignore[attr-defined] + assert len(d) == 2 + + +class TestChannelRequest: + def test_required_fields_only(self) -> None: + req = ChannelRequest(channel="responses", operation="message.create", input="hi") + assert req.channel == "responses" + assert req.operation == "message.create" + assert req.input == "hi" + assert req.session is None + assert req.options is None + assert req.session_mode == "auto" + assert req.metadata == {} + assert req.attributes == {} + assert req.stream is False + assert req.identity is None + # Default response target is the originating singleton. + assert req.response_target.kind is ResponseTargetKind.ORIGINATING + + def test_default_response_target_is_originating_singleton(self) -> None: + # Every new request shares the module-level ``originating`` singleton + # by default — instances are intended to be treated as immutable, so + # sharing is safe and avoids per-request allocation. + a = ChannelRequest(channel="a", operation="op", input="x") + b = ChannelRequest(channel="b", operation="op", input="y") + assert a.response_target is ResponseTarget.originating # type: ignore[attr-defined] + assert a.response_target is b.response_target + + def test_with_session_and_identity(self) -> None: + req = ChannelRequest( + channel="telegram", + operation="message.create", + input="hi", + session=ChannelSession(isolation_key="user:42"), + identity=ChannelIdentity(channel="telegram", native_id="42"), + response_target=ResponseTarget.active, # type: ignore[attr-defined] + ) + assert req.session is not None + assert req.session.isolation_key == "user:42" + assert req.identity is not None + assert req.identity.channel == "telegram" + assert req.identity.native_id == "42" + assert req.response_target.kind is ResponseTargetKind.ACTIVE + + +class TestChannelIdentity: + def test_attributes_default_empty_mapping(self) -> None: + ident = ChannelIdentity(channel="teams", native_id="abc") + assert dict(ident.attributes) == {} + + def test_attributes_passthrough(self) -> None: + ident = ChannelIdentity(channel="teams", native_id="abc", attributes={"role": "user"}) + assert dict(ident.attributes) == {"role": "user"} diff --git a/python/pyproject.toml b/python/pyproject.toml index b788f48e71..e43d330c9a 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -85,6 +85,8 @@ agent-framework-foundry-hosting = { workspace = true } agent-framework-foundry-local = { workspace = true } agent-framework-gemini = { workspace = true } agent-framework-github-copilot = { workspace = true } +agent-framework-hosting = { workspace = true } +agent-framework-hosting-telegram = { workspace = true } agent-framework-hyperlight = { workspace = true } agent-framework-lab = { workspace = true } agent-framework-mem0 = { workspace = true } @@ -205,6 +207,8 @@ executionEnvironments = [ { root = "packages/foundry/tests", reportPrivateUsage = "none" }, { root = "packages/foundry_local/tests", reportPrivateUsage = "none" }, { root = "packages/github_copilot/tests", reportPrivateUsage = "none" }, + { root = "packages/hosting/tests", reportPrivateUsage = "none" }, + { root = "packages/hosting-telegram/tests", reportPrivateUsage = "none" }, { root = "packages/lab/gaia/tests", reportPrivateUsage = "none" }, { root = "packages/lab/lightning/tests", reportPrivateUsage = "none" }, { root = "packages/lab/tau2/tests", reportPrivateUsage = "none" }, diff --git a/python/uv.lock b/python/uv.lock index 85a968174a..acddc409d0 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -47,6 +47,8 @@ members = [ "agent-framework-foundry-local", "agent-framework-gemini", "agent-framework-github-copilot", + "agent-framework-hosting", + "agent-framework-hosting-telegram", "agent-framework-hyperlight", "agent-framework-lab", "agent-framework-mem0", @@ -599,6 +601,53 @@ requires-dist = [ { name = "github-copilot-sdk", marker = "python_full_version >= '3.11'", specifier = ">=0.2.1,<=0.2.1" }, ] +[[package]] +name = "agent-framework-hosting" +version = "1.0.0a260424" +source = { editable = "packages/hosting" } +dependencies = [ + { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "starlette", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.optional-dependencies] +serve = [ + { name = "hypercorn", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.dev-dependencies] +dev = [ + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.metadata] +requires-dist = [ + { name = "agent-framework-core", editable = "packages/core" }, + { name = "hypercorn", marker = "extra == 'serve'", specifier = ">=0.17" }, + { name = "starlette", specifier = ">=0.37" }, +] +provides-extras = ["serve"] + +[package.metadata.requires-dev] +dev = [{ name = "httpx", specifier = ">=0.28.1" }] + +[[package]] +name = "agent-framework-hosting-telegram" +version = "1.0.0a260424" +source = { editable = "packages/hosting-telegram" } +dependencies = [ + { name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "agent-framework-hosting", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, +] + +[package.metadata] +requires-dist = [ + { name = "agent-framework-core", editable = "packages/core" }, + { name = "agent-framework-hosting", editable = "packages/hosting" }, + { name = "httpx", specifier = ">=0.27,<1" }, +] + [[package]] name = "agent-framework-hyperlight" version = "1.0.0a260429" @@ -1636,7 +1685,7 @@ name = "clr-loader" version = "0.2.10" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cffi", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "cffi", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/18/24/c12faf3f61614b3131b5c98d3bf0d376b49c7feaa73edca559aeb2aee080/clr_loader-0.2.10.tar.gz", hash = "sha256:81f114afbc5005bafc5efe5af1341d400e22137e275b042a8979f3feb9fc9446", size = 83605, upload-time = "2026-01-03T23:13:06.984Z" } wheels = [ @@ -5132,8 +5181,8 @@ name = "powerfx" version = "0.0.34" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "cffi", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, - { name = "pythonnet", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "cffi", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, + { name = "pythonnet", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/9f/fb/6c4bf87e0c74ca1c563921ce89ca1c5785b7576bca932f7255cdf81082a7/powerfx-0.0.34.tar.gz", hash = "sha256:956992e7afd272657ed16d80f4cad24ec95d9e4a79fb9dfa4a068a09e136af32", size = 3237555, upload-time = "2025-12-22T15:50:59.682Z" } wheels = [ @@ -5806,7 +5855,7 @@ name = "pythonnet" version = "3.0.5" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "clr-loader", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, + { name = "clr-loader", marker = "(python_full_version < '3.14' and sys_platform == 'darwin') or (python_full_version < '3.14' and sys_platform == 'linux') or (python_full_version < '3.14' and sys_platform == 'win32')" }, ] sdist = { url = "https://files.pythonhosted.org/packages/9a/d6/1afd75edd932306ae9bd2c2d961d603dc2b52fcec51b04afea464f1f6646/pythonnet-3.0.5.tar.gz", hash = "sha256:48e43ca463941b3608b32b4e236db92d8d40db4c58a75ace902985f76dac21cf", size = 239212, upload-time = "2024-12-13T08:30:44.393Z" } wheels = [