diff --git a/vertexai/preview/reasoning_engines/templates/a2a.py b/vertexai/preview/reasoning_engines/templates/a2a.py index fb02b94ccc..8e89a1d762 100644 --- a/vertexai/preview/reasoning_engines/templates/a2a.py +++ b/vertexai/preview/reasoning_engines/templates/a2a.py @@ -16,6 +16,7 @@ import os from typing import Any, Callable, Dict, List, Mapping, Optional, TYPE_CHECKING +from collections.abc import AsyncIterator if TYPE_CHECKING: @@ -55,6 +56,7 @@ def create_agent_card( agent_card: Optional[Dict[str, Any]] = None, default_input_modes: Optional[List[str]] = None, default_output_modes: Optional[List[str]] = None, + streaming: bool = False, ) -> "AgentCard": """Creates an AgentCard object. @@ -73,6 +75,8 @@ def create_agent_card( default to ["text/plain"]. default_output_modes (Optional[List[str]]): A list of output modes, default to ["application/json"]. + streaming (bool): Whether to enable streaming for the agent. + Defaults to False. Returns: AgentCard: A fully constructed AgentCard object. @@ -96,8 +100,7 @@ def create_agent_card( version="1.0.0", default_input_modes=default_input_modes or ["text/plain"], default_output_modes=default_output_modes or ["application/json"], - # Agent Engine does not support streaming yet - capabilities=AgentCapabilities(streaming=False), + capabilities=AgentCapabilities(streaming=streaming), skills=skills, preferred_transport=TransportProtocol.http_json, # Http Only. supports_authenticated_extended_card=True, @@ -185,8 +188,6 @@ def __init__( raise ValueError( "Only HTTP+JSON is supported for preferred transport on agent card " ) - if agent_card.capabilities and agent_card.capabilities.streaming: - raise ValueError("Streaming is not supported by Agent Engine") self._tmpl_attrs: dict[str, Any] = { "project": initializer.global_config.project, @@ -334,6 +335,27 @@ def register_operations(self) -> Dict[str, List[str]]: "on_cancel_task", ] } + if self.agent_card.capabilities and self.agent_card.capabilities.streaming: + routes["a2a_extension"].append("on_message_send_stream") + routes["a2a_extension"].append("on_resubscribe_to_task") if self.agent_card.supports_authenticated_extended_card: routes["a2a_extension"].append("handle_authenticated_agent_card") return routes + + async def on_message_send_stream( + self, + request: "Request", + context: "ServerCallContext", + ) -> AsyncIterator[str]: + """Handles A2A streaming requests via SSE.""" + async for chunk in self.rest_handler.on_message_send_stream(request, context): + yield chunk + + async def on_resubscribe_to_task( + self, + request: "Request", + context: "ServerCallContext", + ) -> AsyncIterator[str]: + """Handles A2A task resubscription requests via SSE.""" + async for chunk in self.rest_handler.on_resubscribe_to_task(request, context): + yield chunk