Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions src/a2a/client/transports/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
TaskPushNotificationConfig,
)
from a2a.utils.constants import PROTOCOL_VERSION_CURRENT, VERSION_HEADER
from a2a.utils.errors import A2A_REASON_TO_ERROR
from a2a.utils.errors import A2A_REASON_TO_ERROR, A2AError, InvalidParamsError
from a2a.utils.telemetry import SpanKind, trace_class


Expand All @@ -61,17 +61,30 @@ def _map_grpc_error(e: grpc.aio.AioRpcError) -> NoReturn:

# Use grpc_status to cleanly extract the rich Status from the call
status = rpc_status.from_call(cast('grpc.Call', e))
data = None

if status is not None:
exception_cls: type[A2AError] | None = None
for detail in status.details:
if detail.Is(error_details_pb2.BadRequest.DESCRIPTOR):
bad_request = error_details_pb2.BadRequest()
detail.Unpack(bad_request)
errors = [
{'field': v.field, 'message': v.description}
for v in bad_request.field_violations
]
data = {'errors': errors}
exception_cls = InvalidParamsError
break
if detail.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
error_info = error_details_pb2.ErrorInfo()
detail.Unpack(error_info)

if error_info.domain == 'a2a-protocol.org':
exception_cls = A2A_REASON_TO_ERROR.get(error_info.reason)
if exception_cls:
raise exception_cls(status.message) from e
break

if exception_cls:
raise exception_cls(status.message, data=data) from e

raise A2AClientError(f'gRPC Error {e.code().name}: {e.details()}') from e

Expand Down
3 changes: 2 additions & 1 deletion src/a2a/client/transports/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,10 @@ def _create_jsonrpc_error(self, error_dict: dict[str, Any]) -> Exception:
"""Creates the appropriate A2AError from a JSON-RPC error dictionary."""
code = error_dict.get('code')
message = error_dict.get('message', str(error_dict))
data = error_dict.get('data')

if isinstance(code, int) and code in _JSON_RPC_ERROR_CODE_TO_A2A_ERROR:
return _JSON_RPC_ERROR_CODE_TO_A2A_ERROR[code](message)
return _JSON_RPC_ERROR_CODE_TO_A2A_ERROR[code](message, data=data)

# Fallback to general A2AClientError
return A2AClientError(f'JSON-RPC Error {code}: {message}')
Expand Down
2 changes: 1 addition & 1 deletion src/a2a/server/apps/rest/rest_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ async def event_generator(
yield json.dumps(item)

return EventSourceResponse(
event_generator(method(request, call_context))
event_generator(await method(request, call_context))
)

async def handle_get_agent_card(
Expand Down
15 changes: 14 additions & 1 deletion src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
InMemoryQueueManager,
QueueManager,
)
from a2a.server.request_handlers.request_handler import RequestHandler
from a2a.server.request_handlers.request_handler import (
RequestHandler,
validate_request_params,
)
from a2a.server.tasks import (
PushNotificationConfigStore,
PushNotificationEvent,
Expand Down Expand Up @@ -118,6 +121,7 @@ def __init__( # noqa: PLR0913
# asyncio tasks and to surface unexpected exceptions.
self._background_tasks = set()

@validate_request_params
async def on_get_task(
self,
params: GetTaskRequest,
Expand All @@ -133,6 +137,7 @@ async def on_get_task(

return apply_history_length(task, params)

@validate_request_params
async def on_list_tasks(
self,
params: ListTasksRequest,
Expand All @@ -154,6 +159,7 @@ async def on_list_tasks(

return page

@validate_request_params
async def on_cancel_task(
self,
params: CancelTaskRequest,
Expand Down Expand Up @@ -317,6 +323,7 @@ async def _send_push_notification_if_needed(
):
await self._push_sender.send_notification(task_id, event)

@validate_request_params
async def on_message_send(
self,
params: SendMessageRequest,
Expand Down Expand Up @@ -386,6 +393,7 @@ async def push_notification_callback(event: Event) -> None:

return result

@validate_request_params
async def on_message_send_stream(
self,
params: SendMessageRequest,
Expand Down Expand Up @@ -474,6 +482,7 @@ async def _cleanup_producer(
async with self._running_agents_lock:
self._running_agents.pop(task_id, None)

@validate_request_params
async def on_create_task_push_notification_config(
self,
params: TaskPushNotificationConfig,
Expand All @@ -499,6 +508,7 @@ async def on_create_task_push_notification_config(

return params

@validate_request_params
async def on_get_task_push_notification_config(
self,
params: GetTaskPushNotificationConfigRequest,
Expand Down Expand Up @@ -530,6 +540,7 @@ async def on_get_task_push_notification_config(

raise InternalError(message='Push notification config not found')

@validate_request_params
async def on_subscribe_to_task(
self,
params: SubscribeToTaskRequest,
Expand Down Expand Up @@ -572,6 +583,7 @@ async def on_subscribe_to_task(
async for event in result_aggregator.consume_and_emit(consumer):
yield event

@validate_request_params
async def on_list_task_push_notification_configs(
self,
params: ListTaskPushNotificationConfigsRequest,
Expand All @@ -597,6 +609,7 @@ async def on_list_task_push_notification_configs(
configs=push_notification_config_list
)

@validate_request_params
async def on_delete_task_push_notification_config(
self,
params: DeleteTaskPushNotificationConfigRequest,
Expand Down
24 changes: 20 additions & 4 deletions src/a2a/server/request_handlers/grpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,16 +403,32 @@ async def abort_context(
error.message if hasattr(error, 'message') else str(error)
)

# Create standard Status and pack the ErrorInfo
# Create standard Status
status = status_pb2.Status(code=status_code, message=error_msg)
detail = any_pb2.Any()
detail.Pack(error_info)
status.details.append(detail)

if (
isinstance(error, types.InvalidParamsError)
and error.data
and error.data.get('errors')
):
bad_request = error_details_pb2.BadRequest()
for err_dict in error.data['errors']:
violation = bad_request.field_violations.add()
violation.field = err_dict.get('field', '')
violation.description = err_dict.get('message', '')
any_bad_request = any_pb2.Any()
any_bad_request.Pack(bad_request)
status.details.append(any_bad_request)
else:
detail = any_pb2.Any()
detail.Pack(error_info)
status.details.append(detail)

# Use grpc_status to safely generate standard trailing metadata
rich_status = rpc_status.to_status(status)

new_metadata: list[tuple[str, str | bytes]] = []

trailing = context.trailing_metadata()
if trailing:
for k, v in trailing:
Expand Down
1 change: 1 addition & 0 deletions src/a2a/server/request_handlers/jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def _build_error_response(
jsonrpc_error = model_class(
code=code,
message=str(error),
data=error.data,
)
else:
jsonrpc_error = JSONRPCInternalError(message=str(error))
Expand Down
42 changes: 41 additions & 1 deletion src/a2a/server/request_handlers/request_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import functools
import inspect

from abc import ABC, abstractmethod
from collections.abc import AsyncGenerator
from collections.abc import AsyncGenerator, Callable
from typing import Any

from google.protobuf.message import Message as ProtoMessage

from a2a.server.context import ServerCallContext
from a2a.server.events.event_queue import Event
Expand All @@ -19,6 +25,7 @@
TaskPushNotificationConfig,
)
from a2a.utils.errors import UnsupportedOperationError
from a2a.utils.proto_utils import validate_proto_required_fields


class RequestHandler(ABC):
Expand Down Expand Up @@ -218,3 +225,36 @@ async def on_delete_task_push_notification_config(
Returns:
None
"""


def validate_request_params(method: Callable) -> Callable:
"""Decorator for RequestHandler methods to validate required fields on incoming requests."""
if inspect.iscoroutinefunction(method):

@functools.wraps(method)
async def async_wrapper(
self: RequestHandler,
params: ProtoMessage,
context: ServerCallContext,
*args: Any,
**kwargs: Any,
) -> Any:
if params is not None:
validate_proto_required_fields(params)
return await method(self, params, context, *args, **kwargs)

return async_wrapper

@functools.wraps(method)
def sync_wrapper(
self: RequestHandler,
params: ProtoMessage,
context: ServerCallContext,
*args: Any,
**kwargs: Any,
) -> Any:
if params is not None:
validate_proto_required_fields(params)
return method(self, params, context, *args, **kwargs)

return sync_wrapper
24 changes: 16 additions & 8 deletions src/a2a/server/request_handlers/rest_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,14 @@ async def on_message_send_stream(
body = await request.body()
params = a2a_pb2.SendMessageRequest()
Parse(body, params)
async for event in self.request_handler.on_message_send_stream(
params, context
):
response = proto_utils.to_stream_response(event)
yield MessageToDict(response)
stream = self.request_handler.on_message_send_stream(params, context)

async def _generator() -> AsyncIterator[dict[str, Any]]:
async for event in stream:
response = proto_utils.to_stream_response(event)
yield MessageToDict(response)

return _generator()

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def on_cancel_task(
Expand Down Expand Up @@ -167,10 +170,15 @@ async def on_subscribe_to_task(
JSON serialized objects containing streaming events
"""
task_id = request.path_params['id']
async for event in self.request_handler.on_subscribe_to_task(
stream = self.request_handler.on_subscribe_to_task(
SubscribeToTaskRequest(id=task_id), context
):
yield MessageToDict(proto_utils.to_stream_response(event))
)

async def _generator() -> AsyncIterator[dict[str, Any]]:
async for event in stream:
yield MessageToDict(proto_utils.to_stream_response(event))

return _generator()

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def get_push_notification(
Expand Down
3 changes: 2 additions & 1 deletion src/a2a/utils/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ class A2AError(Exception):
message: str = 'A2A Error'
data: dict | None = None

def __init__(self, message: str | None = None):
def __init__(self, message: str | None = None, data: dict | None = None):
if message:
self.message = message
self.data = data
super().__init__(self.message)


Expand Down
Loading
Loading