-
Notifications
You must be signed in to change notification settings - Fork 381
feat: implement rich gRPC error details per A2A v1.0 spec #790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 1.0-dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,12 @@ | ||
| import logging | ||
|
|
||
| from collections.abc import AsyncGenerator, Callable | ||
| from collections.abc import AsyncGenerator, Callable, Iterable | ||
| from functools import wraps | ||
| from typing import Any, NoReturn | ||
| from typing import Any, NoReturn, cast | ||
|
|
||
| from a2a.client.errors import A2AClientError, A2AClientTimeoutError | ||
| from a2a.client.middleware import ClientCallContext | ||
| from a2a.utils.errors import JSON_RPC_ERROR_CODE_MAP | ||
| from a2a.utils.errors import JSON_RPC_ERROR_CODE_MAP, A2AError | ||
|
|
||
|
|
||
| try: | ||
|
|
@@ -18,8 +19,12 @@ | |
| ) from e | ||
|
|
||
|
|
||
| from google.rpc import ( # type: ignore[reportMissingModuleSource] | ||
| error_details_pb2, | ||
| status_pb2, | ||
| ) | ||
|
|
||
| from a2a.client.client import ClientConfig | ||
| from a2a.client.errors import A2AClientError, A2AClientTimeoutError | ||
| from a2a.client.middleware import ClientCallInterceptor | ||
| from a2a.client.optionals import Channel | ||
| from a2a.client.transports.base import ClientTransport | ||
|
|
@@ -44,6 +49,7 @@ | |
| TaskPushNotificationConfig, | ||
| ) | ||
| from a2a.utils.constants import PROTOCOL_VERSION_CURRENT, VERSION_HEADER | ||
| from a2a.utils.errors import A2A_REASON_TO_ERROR | ||
| from a2a.utils.telemetry import SpanKind, trace_class | ||
|
|
||
|
|
||
|
|
@@ -54,47 +60,77 @@ | |
| } | ||
|
|
||
|
|
||
| def _parse_rich_grpc_error( | ||
| value: bytes, original_error: grpc.aio.AioRpcError | ||
| ) -> None: | ||
| try: | ||
| status = status_pb2.Status.FromString(value) | ||
| for detail in status.details: | ||
| if detail.Is(error_details_pb2.ErrorInfo.DESCRIPTOR): | ||
| error_info = error_details_pb2.ErrorInfo() | ||
| detail.Unpack(error_info) | ||
|
|
||
| if error_info.domain == 'a2a-protocol.org': | ||
| exception_cls = A2A_REASON_TO_ERROR.get(error_info.reason) | ||
| if exception_cls: | ||
| raise exception_cls(status.message) from original_error # noqa: TRY301 | ||
| except Exception as parse_e: | ||
| # Don't swallow A2A errors generated above | ||
| if isinstance(parse_e, (A2AError, A2AClientError)): | ||
| raise parse_e | ||
| logger.warning( | ||
| 'Failed to parse grpc-status-details-bin', exc_info=parse_e | ||
| ) | ||
|
|
||
|
|
||
| def _map_grpc_error(e: grpc.aio.AioRpcError) -> NoReturn: | ||
| if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED: | ||
| raise A2AClientTimeoutError('Client Request timed out') from e | ||
|
|
||
| metadata = e.trailing_metadata() | ||
| if metadata: | ||
| iterable_metadata = cast('Iterable[tuple[str, str | bytes]]', metadata) | ||
| for key, value in iterable_metadata: | ||
| if key == 'grpc-status-details-bin' and isinstance(value, bytes): | ||
| _parse_rich_grpc_error(value, e) | ||
|
|
||
| details = e.details() | ||
| if isinstance(details, str) and ': ' in details: | ||
| error_type_name, error_message = details.split(': ', 1) | ||
| # TODO(#723): Resolving imports by name is temporary until proper error handling structure is added in #723. | ||
| # Leaving as fallback for errors that don't use the rich error details. | ||
| exception_cls = _A2A_ERROR_NAME_TO_CLS.get(error_type_name) | ||
| if exception_cls: | ||
| raise exception_cls(error_message) from e | ||
|
Comment on lines
98
to
103
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code was added in |
||
| raise A2AClientError(f'gRPC Error {e.code().name}: {e.details()}') from e | ||
|
|
||
|
|
||
| def _handle_grpc_exception(func: Callable[..., Any]) -> Callable[..., Any]: | ||
| @wraps(func) | ||
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | ||
| try: | ||
| return await func(*args, **kwargs) | ||
| except grpc.aio.AioRpcError as e: | ||
| _map_grpc_error(e) | ||
|
|
||
| return wrapper | ||
|
|
||
|
|
||
| def _handle_grpc_stream_exception( | ||
| func: Callable[..., Any], | ||
| ) -> Callable[..., Any]: | ||
| @wraps(func) | ||
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | ||
| try: | ||
| async for item in func(*args, **kwargs): | ||
| yield item | ||
| except grpc.aio.AioRpcError as e: | ||
| _map_grpc_error(e) | ||
|
|
||
| return wrapper | ||
|
|
||
|
|
||
| @trace_class(kind=SpanKind.CLIENT) | ||
| class GrpcTransport(ClientTransport): | ||
| """A gRPC transport for the A2A client.""" | ||
|
|
||
| def __init__( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,8 @@ | |
| import logging | ||
|
|
||
| from abc import ABC, abstractmethod | ||
| from collections.abc import AsyncIterable, Awaitable | ||
| from collections.abc import AsyncIterable, Awaitable, Callable | ||
| from typing import cast | ||
|
|
||
|
|
||
| try: | ||
|
|
@@ -16,9 +17,8 @@ | |
| "'pip install a2a-sdk[grpc]'" | ||
| ) from e | ||
|
|
||
| from collections.abc import Callable | ||
|
|
||
| from google.protobuf import empty_pb2, message | ||
| from google.protobuf import any_pb2, empty_pb2, message | ||
| from google.rpc import error_details_pb2, status_pb2 | ||
|
|
||
| import a2a.types.a2a_pb2_grpc as a2a_grpc | ||
|
|
||
|
|
@@ -33,7 +33,7 @@ | |
| from a2a.types import a2a_pb2 | ||
| from a2a.types.a2a_pb2 import AgentCard | ||
| from a2a.utils import proto_utils | ||
| from a2a.utils.errors import A2AError, TaskNotFoundError | ||
| from a2a.utils.errors import A2A_ERROR_REASONS, A2AError, TaskNotFoundError | ||
| from a2a.utils.helpers import maybe_await, validate, validate_async_generator | ||
|
|
||
|
|
||
|
|
@@ -419,31 +419,58 @@ | |
| ) -> None: | ||
| """Sets the grpc errors appropriately in the context.""" | ||
| code = _ERROR_CODE_MAP.get(type(error)) | ||
|
|
||
| status_value = code.value if code else grpc.StatusCode.UNKNOWN.value | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to use https://grpc.github.io/grpc/python/grpc_status.html here? |
||
| status_code = ( | ||
| status_value[0] if isinstance(status_value, tuple) else status_value | ||
| ) | ||
| error_msg = error.message if hasattr(error, 'message') else str(error) | ||
| status = status_pb2.Status(code=status_code, message=error_msg) | ||
|
|
||
| if code: | ||
| reason = A2A_ERROR_REASONS.get(type(error), 'UNKNOWN_ERROR') | ||
|
|
||
| error_info = error_details_pb2.ErrorInfo( | ||
| reason=reason, | ||
| domain='a2a-protocol.org', | ||
| ) | ||
|
|
||
| detail = any_pb2.Any() | ||
| detail.Pack(error_info) | ||
| status.details.append(detail) | ||
|
|
||
| context.set_trailing_metadata( | ||
| cast( | ||
| 'tuple[tuple[str, str | bytes], ...]', | ||
| (('grpc-status-details-bin', status.SerializeToString()),), | ||
| ) | ||
| ) | ||
|
|
||
| if code: | ||
| await context.abort( | ||
| code, | ||
| f'{type(error).__name__}: {error.message}', | ||
| status.message, | ||
| ) | ||
| else: | ||
| await context.abort( | ||
| grpc.StatusCode.UNKNOWN, | ||
| f'Unknown error type: {error}', | ||
| ) | ||
|
|
||
| def _set_extension_metadata( | ||
| self, | ||
| context: grpc.aio.ServicerContext, | ||
| server_context: ServerCallContext, | ||
| ) -> None: | ||
| if server_context.activated_extensions: | ||
| context.set_trailing_metadata( | ||
| [ | ||
| (HTTP_EXTENSION_HEADER.lower(), e) | ||
| for e in sorted(server_context.activated_extensions) | ||
| ] | ||
| ) | ||
|
|
||
| def _build_call_context( | ||
| self, | ||
| context: grpc.aio.ServicerContext, | ||
| request: message.Message, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,11 +82,26 @@ | |
| message = 'Method not found' | ||
|
|
||
|
|
||
| class ExtensionSupportRequiredError(A2AError): | ||
| """Exception raised when extension support is required but not present.""" | ||
|
|
||
| message = 'Extension support required' | ||
|
|
||
|
|
||
| class VersionNotSupportedError(A2AError): | ||
| """Exception raised when the requested version is not supported.""" | ||
|
|
||
| message = 'Version not supported' | ||
|
|
||
|
|
||
| # For backward compatibility if needed, or just aliases for clean refactor | ||
| # We remove the Pydantic models here. | ||
|
|
||
| __all__ = [ | ||
| 'A2A_ERROR_REASONS', | ||
| 'A2A_REASON_TO_ERROR', | ||
| 'JSON_RPC_ERROR_CODE_MAP', | ||
| 'ExtensionSupportRequiredError', | ||
| 'InternalError', | ||
| 'InvalidAgentResponseError', | ||
| 'InvalidParamsError', | ||
|
|
@@ -96,19 +111,39 @@ | |
| 'TaskNotCancelableError', | ||
| 'TaskNotFoundError', | ||
| 'UnsupportedOperationError', | ||
| 'VersionNotSupportedError', | ||
| ] | ||
|
|
||
|
|
||
| JSON_RPC_ERROR_CODE_MAP: dict[type[A2AError], int] = { | ||
| TaskNotFoundError: -32001, | ||
| TaskNotCancelableError: -32002, | ||
| PushNotificationNotSupportedError: -32003, | ||
| UnsupportedOperationError: -32004, | ||
| ContentTypeNotSupportedError: -32005, | ||
| InvalidAgentResponseError: -32006, | ||
| AuthenticatedExtendedCardNotConfiguredError: -32007, | ||
| InvalidParamsError: -32602, | ||
| InvalidRequestError: -32600, | ||
| MethodNotFoundError: -32601, | ||
| InternalError: -32603, | ||
| } | ||
|
|
||
|
|
||
| A2A_ERROR_REASONS = { | ||
| TaskNotFoundError: 'TASK_NOT_FOUND', | ||
| TaskNotCancelableError: 'TASK_NOT_CANCELABLE', | ||
| PushNotificationNotSupportedError: 'PUSH_NOTIFICATION_NOT_SUPPORTED', | ||
| UnsupportedOperationError: 'UNSUPPORTED_OPERATION', | ||
| ContentTypeNotSupportedError: 'CONTENT_TYPE_NOT_SUPPORTED', | ||
| InvalidAgentResponseError: 'INVALID_AGENT_RESPONSE', | ||
| AuthenticatedExtendedCardNotConfiguredError: 'EXTENDED_AGENT_CARD_NOT_CONFIGURED', | ||
| ExtensionSupportRequiredError: 'EXTENSION_SUPPORT_REQUIRED', | ||
| VersionNotSupportedError: 'VERSION_NOT_SUPPORTED', | ||
| InvalidParamsError: 'INVALID_PARAMS', | ||
| InvalidRequestError: 'INVALID_REQUEST', | ||
| MethodNotFoundError: 'METHOD_NOT_FOUND', | ||
| InternalError: 'INTERNAL_ERROR', | ||
|
Comment on lines
+143
to
+146
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe those are JSON-RPC leftovers, we can keep A2A errors only here: https://a2a-protocol.org/latest/specification/#54-error-code-mappings. |
||
| } | ||
|
|
||
| A2A_REASON_TO_ERROR = {reason: cls for cls, reason in A2A_ERROR_REASONS.items()} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use https://grpc.github.io/grpc/python/grpc_status.html here?