Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5fb1882
WIP
eavanvalkenburg Jan 20, 2026
b8b49bb
big update to new ResponseStream model
eavanvalkenburg Jan 22, 2026
20cc47c
lots of test fixes
eavanvalkenburg Jan 23, 2026
d8dc08e
fixed tests and typing
eavanvalkenburg Jan 23, 2026
f504b67
fixed tools typevar import
eavanvalkenburg Jan 23, 2026
057507f
fix
eavanvalkenburg Jan 23, 2026
fd641ac
mypy fix
eavanvalkenburg Jan 23, 2026
c695fe2
mypy fixes and some cleanup
eavanvalkenburg Jan 23, 2026
373f375
fix missing quoted names
eavanvalkenburg Jan 23, 2026
df575e0
and client
eavanvalkenburg Jan 23, 2026
992f887
fix imports agui
eavanvalkenburg Jan 23, 2026
01dd35f
fix anthropic override
eavanvalkenburg Jan 23, 2026
7795164
fix agui
eavanvalkenburg Jan 23, 2026
8d7e77b
fix ag ui
eavanvalkenburg Jan 23, 2026
3aa56bb
fix import
eavanvalkenburg Jan 23, 2026
d83b8e7
fix anthropic types
eavanvalkenburg Jan 23, 2026
f8d1950
fix mypy
eavanvalkenburg Jan 23, 2026
c56de7e
refactoring
eavanvalkenburg Jan 23, 2026
ea04778
updated typing
eavanvalkenburg Jan 29, 2026
da94138
fix 3.11
eavanvalkenburg Jan 29, 2026
60ab6ee
fixes
eavanvalkenburg Jan 29, 2026
8206573
redid layering of chat clients and agents
eavanvalkenburg Jan 29, 2026
b5fd3e3
redid layering of chat clients and agents
eavanvalkenburg Jan 30, 2026
5c78d91
Fix lint, type, and test issues after rebase
eavanvalkenburg Jan 30, 2026
61e3adf
Fix AgentExecutionException import error in test_agents.py
eavanvalkenburg Jan 30, 2026
3111315
Fix test import and asyncio deprecation issues
eavanvalkenburg Jan 30, 2026
e5f5e93
Fix azure-ai test failures
eavanvalkenburg Jan 30, 2026
ae3dc5e
Convert ag-ui utils_test_ag_ui.py to conftest.py
eavanvalkenburg Jan 30, 2026
9e89b76
fix: use relative imports for ag-ui test utilities
eavanvalkenburg Jan 30, 2026
21a95c0
fix agui
eavanvalkenburg Jan 30, 2026
dc2a757
Rename Bare*Client to Raw*Client and BaseChatClient
eavanvalkenburg Jan 30, 2026
cdc9df6
Fix layer ordering: FunctionInvocationLayer before ChatTelemetryLayer
eavanvalkenburg Jan 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/decisions/0012-python-typeddict-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,4 @@ response = await client.get_response(

Chosen option: **"Option 2: TypedDict with Generic Type Parameters"**, because it provides full type safety, excellent IDE support with autocompletion, and allows users to extend provider-specific options for their use cases. Extended this Generic to ChatAgents in order to also properly type the options used in agent construction and run methods.

See [typed_options.py](../../python/samples/getting_started/chat_client/typed_options.py) for a complete example demonstrating the usage of typed options with custom extensions.
See [typed_options.py](../../python/samples/concepts/typed_options.py) for a complete example demonstrating the usage of typed options with custom extensions.
2 changes: 2 additions & 0 deletions python/.cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
"endregion",
"entra",
"faiss",
"finalizer",
"finalizers",
"genai",
"generativeai",
"hnsw",
Expand Down
2 changes: 1 addition & 1 deletion python/.github/instructions/python.instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ applyTo: '**/agent-framework/python/**'
- Do not use `Optional`; use `Type | None` instead.
- Before running any commands to execute or test the code, ensure that all problems, compilation errors, and warnings are resolved.
- When formatting files, format only the files you changed or are currently working on; do not format the entire codebase.
- Do not mark new tests with `@pytest.mark.asyncio`.
- Do not mark new tests with `@pytest.mark.asyncio`, they are marked automatically, so you can just set the test to `async def`.
- If you need debug information to understand an issue, use print statements as needed and remove them when testing is complete.
- Avoid adding excessive comments.
- When working with samples, make sure to update the associated README files with the latest information. These files are usually located in the same folder as the sample or in one of its parent folders.
Expand Down
92 changes: 70 additions & 22 deletions python/packages/a2a/agent_framework_a2a/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import json
import re
import uuid
from collections.abc import AsyncIterable, Sequence
from typing import Any, Final, cast
from collections.abc import AsyncIterable, Awaitable, Sequence
from typing import Any, Final, Literal, cast, overload

import httpx
from a2a.client import Client, ClientConfig, ClientFactory, minimal_agent_card
Expand All @@ -29,14 +29,15 @@
AgentResponse,
AgentResponseUpdate,
AgentThread,
BaseAgent,
BareAgent,
ChatMessage,
Content,
ResponseStream,
Role,
normalize_messages,
prepend_agent_framework_to_user_agent,
)
from agent_framework.observability import use_agent_instrumentation
from agent_framework.observability import AgentTelemetryLayer

__all__ = ["A2AAgent"]

Expand All @@ -57,13 +58,12 @@ def _get_uri_data(uri: str) -> str:
return match.group("base64_data")


@use_agent_instrumentation
class A2AAgent(BaseAgent):
class A2AAgent(AgentTelemetryLayer, BareAgent):
"""Agent2Agent (A2A) protocol implementation.

Wraps an A2A Client to connect the Agent Framework with external A2A-compliant agents
via HTTP/JSON-RPC. Converts framework ChatMessages to A2A Messages on send, and converts
A2A responses (Messages/Tasks) back to framework types. Inherits BaseAgent capabilities
A2A responses (Messages/Tasks) back to framework types. Inherits BareAgent capabilities
while managing the underlying A2A protocol communication.

Can be initialized with a URL, AgentCard, or existing A2A Client instance.
Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(
timeout: Request timeout configuration. Can be a float (applied to all timeout components),
httpx.Timeout object (for full control), or None (uses 10.0s connect, 60.0s read,
10.0s write, 5.0s pool - optimized for A2A operations).
kwargs: any additional properties, passed to BaseAgent.
kwargs: any additional properties, passed to BareAgent.
"""
super().__init__(id=id, name=name, description=description, **kwargs)
self._http_client: httpx.AsyncClient | None = http_client
Expand Down Expand Up @@ -185,44 +185,92 @@ async def __aexit__(
if self._http_client is not None and self._close_http_client:
await self._http_client.aclose()

async def run(
@overload
def run(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
stream: Literal[False] = ...,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentResponse:
) -> Awaitable[AgentResponse[Any]]: ...

@overload
def run(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
stream: Literal[True],
thread: AgentThread | None = None,
**kwargs: Any,
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]: ...

def run(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
stream: bool = False,
thread: AgentThread | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse[Any]] | ResponseStream[AgentResponseUpdate, AgentResponse[Any]]:
"""Get a response from the agent.

This method returns the final result of the agent's execution
as a single AgentResponse object. The caller is blocked until
the final result is available.
as a single AgentResponse object when stream=False. When stream=True,
it returns a ResponseStream that yields AgentResponseUpdate objects.

Args:
messages: The message(s) to send to the agent.

Keyword Args:
stream: Whether to stream the response. Defaults to False.
thread: The conversation thread associated with the message(s).
kwargs: Additional keyword arguments.

Returns:
An agent response item.
When stream=False: An Awaitable[AgentResponse].
When stream=True: A ResponseStream of AgentResponseUpdate items.
"""
if stream:
return self._run_stream_impl(messages=messages, thread=thread, **kwargs)
return self._run_impl(messages=messages, thread=thread, **kwargs)

async def _run_impl(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentResponse[Any]:
"""Non-streaming implementation of run."""
# Collect all updates and use framework to consolidate updates into response
updates = [update async for update in self.run_stream(messages, thread=thread, **kwargs)]
updates: list[AgentResponseUpdate] = []
async for update in self._stream_updates(messages, thread=thread, **kwargs):
updates.append(update)
return AgentResponse.from_agent_run_response_updates(updates)

async def run_stream(
def _run_stream_impl(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
"""Run the agent as a stream.
) -> ResponseStream[AgentResponseUpdate, AgentResponse[Any]]:
"""Streaming implementation of run."""

def _finalize(updates: Sequence[AgentResponseUpdate]) -> AgentResponse[Any]:
return AgentResponse.from_agent_run_response_updates(list(updates))

return ResponseStream(self._stream_updates(messages, thread=thread, **kwargs), finalizer=_finalize)

This method will return the intermediate steps and final results of the
agent's execution as a stream of AgentResponseUpdate objects to the caller.
async def _stream_updates(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
"""Internal method to stream updates from the A2A agent.

Args:
messages: The message(s) to send to the agent.
Expand All @@ -232,10 +280,10 @@ async def run_stream(
kwargs: Additional keyword arguments.

Yields:
An agent response item.
AgentResponseUpdate items from the A2A agent.
"""
messages = normalize_messages(messages)
a2a_message = self._prepare_message_for_a2a(messages[-1])
normalized_messages = normalize_messages(messages)
a2a_message = self._prepare_message_for_a2a(normalized_messages[-1])

response_stream = self.client.send_message(a2a_message)

Expand Down
6 changes: 3 additions & 3 deletions python/packages/a2a/tests/test_a2a_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,13 @@ def test_prepare_message_for_a2a_empty_contents_raises_error(a2a_agent: A2AAgent
a2a_agent._prepare_message_for_a2a(message)


async def test_run_stream_with_message_response(a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient) -> None:
"""Test run_stream() method with immediate Message response."""
async def test_run_streaming_with_message_response(a2a_agent: A2AAgent, mock_a2a_client: MockA2AClient) -> None:
"""Test run(stream=True) method with immediate Message response."""
mock_a2a_client.add_message_response("msg-stream-123", "Streaming response from agent!", "agent")

# Collect streaming updates
updates: list[AgentResponseUpdate] = []
async for update in a2a_agent.run_stream("Hello agent"):
async for update in a2a_agent.run("Hello agent", stream=True):
updates.append(update)

# Verify streaming response
Expand Down
Loading
Loading