Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ dev = [
"wheel>=0.45.1",
"strands-agents>=1.18.0",
"strands-agents-evals>=0.1.0",
"a2a-sdk[http-server]>=0.3",
]

[project.optional-dependencies]
a2a = ["a2a-sdk[http-server]>=0.3"]
strands-agents = [
"strands-agents>=1.1.0"
]
Expand Down
4 changes: 4 additions & 0 deletions src/bedrock_agentcore/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- BedrockAgentCoreContext: Agent identity context
"""

from .a2a import BedrockCallContextBuilder, build_a2a_app, serve_a2a
from .agent_core_runtime_client import AgentCoreRuntimeClient
from .app import BedrockAgentCoreApp
from .context import BedrockAgentCoreContext, RequestContext
Expand All @@ -14,7 +15,10 @@
__all__ = [
"AgentCoreRuntimeClient",
"BedrockAgentCoreApp",
"BedrockCallContextBuilder",
"RequestContext",
"BedrockAgentCoreContext",
"PingStatus",
"build_a2a_app",
"serve_a2a",
]
256 changes: 256 additions & 0 deletions src/bedrock_agentcore/runtime/a2a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
"""A2A protocol support for Bedrock AgentCore Runtime.

Provides Bedrock-specific glue around the official a2a-sdk, handling header
extraction, health checks, and Docker host detection.
"""

import logging
import time
import uuid
from typing import Any, Callable, Optional

from .context import BedrockAgentCoreContext
from .models import (
ACCESS_TOKEN_HEADER,
AGENTCORE_RUNTIME_URL_ENV,
AUTHORIZATION_HEADER,
CUSTOM_HEADER_PREFIX,
OAUTH2_CALLBACK_URL_HEADER,
REQUEST_ID_HEADER,
SESSION_HEADER,
PingStatus,
)

logger = logging.getLogger(__name__)


def _check_a2a_sdk() -> None:
"""Raise ImportError with install instructions if a2a-sdk is missing."""
try:
import a2a # noqa: F401
except ImportError:
raise ImportError(
'a2a-sdk is required for A2A protocol support. Install it with: pip install "bedrock-agentcore[a2a]"'
) from None


def _build_agent_card(executor: Any, url: str) -> Any:
"""Build an AgentCard by introspecting a StrandsA2AExecutor.

Extracts name/description from ``executor.agent``. Falls back to generic
defaults for other executors.
"""
from a2a.types import AgentCapabilities, AgentCard, AgentSkill

name = "agent"
description = "A Bedrock AgentCore agent"

agent = getattr(executor, "agent", None)
if agent is not None:
name = getattr(agent, "name", None) or name
description = getattr(agent, "description", None) or description

return AgentCard(
name=name,
description=description,
url=url,
version="0.1.0",
capabilities=AgentCapabilities(streaming=True),
skills=[AgentSkill(id="main", name=name, description=description, tags=["main"])],
default_input_modes=["text"],
default_output_modes=["text"],
)


class BedrockCallContextBuilder:
"""Extracts Bedrock runtime headers and propagates them into BedrockAgentCoreContext.

Implements the a2a-sdk CallContextBuilder ABC so the A2A server
automatically calls ``build()`` on every incoming request.
"""

def build(self, request: Any) -> Any:
"""Build a ServerCallContext from a Starlette Request.

Args:
request: A Starlette Request object.

Returns:
A ServerCallContext with Bedrock headers stored in ``state``.
"""
from a2a.server.context import ServerCallContext

headers = request.headers

request_id = headers.get(REQUEST_ID_HEADER) or str(uuid.uuid4())
session_id = headers.get(SESSION_HEADER)
BedrockAgentCoreContext.set_request_context(request_id, session_id)

workload_access_token = headers.get(ACCESS_TOKEN_HEADER)
if workload_access_token:
BedrockAgentCoreContext.set_workload_access_token(workload_access_token)

oauth2_callback_url = headers.get(OAUTH2_CALLBACK_URL_HEADER)
if oauth2_callback_url:
BedrockAgentCoreContext.set_oauth2_callback_url(oauth2_callback_url)

request_headers: dict[str, str] = {}
authorization_header = headers.get(AUTHORIZATION_HEADER)
if authorization_header is not None:
request_headers[AUTHORIZATION_HEADER] = authorization_header
for header_name, header_value in headers.items():
if header_name.lower().startswith(CUSTOM_HEADER_PREFIX.lower()):
request_headers[header_name] = header_value
if request_headers:
BedrockAgentCoreContext.set_request_headers(request_headers)

state = {
"request_id": request_id,
"session_id": session_id,
}
if workload_access_token:
state["workload_access_token"] = workload_access_token
if oauth2_callback_url:
state["oauth2_callback_url"] = oauth2_callback_url

return ServerCallContext(state=state)


# Register as a virtual subclass so isinstance checks pass without
# requiring a2a-sdk to be importable at class-definition time.
try:
from a2a.server.apps import CallContextBuilder

CallContextBuilder.register(BedrockCallContextBuilder)
except Exception: # pragma: no cover
pass


def build_a2a_app(
executor: Any,
agent_card: Any = None,
*,
task_store: Any = None,
context_builder: Any = None,
ping_handler: Optional[Callable[[], PingStatus]] = None,
) -> Any:
"""Build a Starlette app wired for A2A protocol with Bedrock extras.

Args:
executor: An ``AgentExecutor`` that implements the agent logic.
agent_card: Optional ``a2a.types.AgentCard`` describing the agent.
If ``None``, one is built automatically by introspecting the executor.
task_store: Optional ``TaskStore``; defaults to ``InMemoryTaskStore``.
context_builder: Optional ``CallContextBuilder``; defaults to
``BedrockCallContextBuilder``.
ping_handler: Optional callback returning a ``PingStatus``.

Returns:
A Starlette application.
"""
import os

_check_a2a_sdk()

from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from starlette.responses import JSONResponse
from starlette.routing import Route

runtime_url = os.environ.get(AGENTCORE_RUNTIME_URL_ENV, "http://localhost:9000/")

if agent_card is None:
agent_card = _build_agent_card(executor, runtime_url)
elif os.environ.get(AGENTCORE_RUNTIME_URL_ENV):
agent_card.url = runtime_url

if task_store is None:
task_store = InMemoryTaskStore()
if context_builder is None:
context_builder = BedrockCallContextBuilder()

http_handler = DefaultRequestHandler(
agent_executor=executor,
task_store=task_store,
)

a2a_app = A2AStarletteApplication(
agent_card=agent_card,
http_handler=http_handler,
context_builder=context_builder,
)

app = a2a_app.build()

last_status_update_time = time.time()

def _handle_ping(request: Any) -> JSONResponse:
nonlocal last_status_update_time
try:
if ping_handler is not None:
status = ping_handler()
else:
status = PingStatus.HEALTHY
last_status_update_time = time.time()
except Exception:
logger.exception("Custom ping handler failed, falling back to Healthy")
status = PingStatus.HEALTHY
return JSONResponse({"status": status.value, "time_of_last_update": int(last_status_update_time)})

app.routes.append(Route("/ping", _handle_ping, methods=["GET"]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: if future Starlette version compiles routes eagerly in build(), will this break ? I am good if we can rely on this implementation detail.


return app


def serve_a2a(
executor: Any,
agent_card: Any = None,
*,
port: int = 9000,
host: Optional[str] = None,
task_store: Any = None,
context_builder: Any = None,
ping_handler: Optional[Callable[[], PingStatus]] = None,
**kwargs: Any,
) -> None:
"""Start a Bedrock-compatible A2A server.

Args:
executor: An ``AgentExecutor`` that implements the agent logic.
agent_card: Optional ``a2a.types.AgentCard`` describing the agent.
If ``None``, one is built automatically by introspecting the executor.
port: Port to serve on (default 9000).
host: Host to bind to; auto-detected if ``None``.
task_store: Optional ``TaskStore``; defaults to ``InMemoryTaskStore``.
context_builder: Optional ``CallContextBuilder``; defaults to
``BedrockCallContextBuilder``.
ping_handler: Optional callback returning a ``PingStatus``.
**kwargs: Additional arguments forwarded to ``uvicorn.run()``.
"""
import os

import uvicorn

app = build_a2a_app(
executor,
agent_card,
task_store=task_store,
context_builder=context_builder,
ping_handler=ping_handler,
)

if host is None:
if os.path.exists("/.dockerenv") or os.environ.get("DOCKER_CONTAINER"):
host = "0.0.0.0" # nosec B104 - Container needs this to expose the port
else:
host = "127.0.0.1"

uvicorn_params: dict[str, Any] = {
"host": host,
"port": port,
"log_level": "info",
}
uvicorn_params.update(kwargs)

uvicorn.run(app, **uvicorn_params)
1 change: 1 addition & 0 deletions src/bedrock_agentcore/runtime/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class PingStatus(str, Enum):
OAUTH2_CALLBACK_URL_HEADER = "OAuth2CallbackUrl"
AUTHORIZATION_HEADER = "Authorization"
CUSTOM_HEADER_PREFIX = "X-Amzn-Bedrock-AgentCore-Runtime-Custom-"
AGENTCORE_RUNTIME_URL_ENV = "AGENTCORE_RUNTIME_URL"

# Task action constants
TASK_ACTION_PING_STATUS = "ping_status"
Expand Down
Loading
Loading