Skip to content

[Feat]: Add RetryTransport for automatic retry with exponential backoff #871

@cchinchilla-dev

Description

@cchinchilla-dev

Is your feature request related to a problem? Please describe.

The SDK's transports (JsonRpcTransport, RestTransport, GrpcTransport) raise exceptions immediately on transient failures — network errors, timeouts, and server-side errors — with no built-in retry mechanism. Callers must implement their own retry logic at every call site:

transport = JsonRpcTransport(httpx_client=client, agent_card=card)

# Caller is responsible for retry logic every time
for attempt in range(max_retries):
    try:
        result = await transport.send_message(params)
        break
    except A2AClientTimeoutError:
        if attempt < max_retries - 1:
            await asyncio.sleep(2 ** attempt)
            continue
        raise
    except A2AClientError as e:
        # No programmatic way to distinguish transient (429, 503) from permanent (400) errors
        # Must inspect the chained __cause__ exception for status codes
        cause = e.__cause__
        if isinstance(cause, httpx.HTTPStatusError) and cause.response.status_code in (429, 502, 503, 504):
            if attempt < max_retries - 1:
                await asyncio.sleep(2 ** attempt)
                continue
        raise

This pattern is error-prone: each call site must independently decide which errors are retriable, implement correct backoff, handle edge cases like Retry-After headers, and deal with the fact that the current error hierarchy does not expose transport-level metadata (like HTTP status codes) directly on A2AClientError.

Why this matters

Transient failures are common in distributed agent-to-agent communication:

  • Rate limiting (429): Agents behind API gateways routinely return 429 Too Many Requests.
  • Temporary unavailability (502/503/504): Agents scaling up, redeploying, or behind load balancers.
  • Network instability: Connection resets and timeouts in cross-network agent communication.
  • gRPC transient errors: UNAVAILABLE, DEADLINE_EXCEEDED, and RESOURCE_EXHAUSTED status codes.

All three transports already normalize errors into the SDK's exception hierarchy (A2AClientError, A2AClientTimeoutError, domain errors) — see http_helpers.py for HTTP transports and _map_grpc_error() in grpc.py — which makes it possible to build a retry layer on top without modifying them.

Describe the solution you'd like

A new RetryTransport class in src/a2a/client/transports/retry.py that wraps any ClientTransport using the decorator pattern:

from a2a.client.transports.retry import RetryTransport

inner = JsonRpcTransport(httpx_client=client, agent_card=card)
transport = RetryTransport(
    transport=inner,
    max_retries=3,
    base_delay=1.0,
    max_delay=30.0,
)

# Retries are handled transparently
async with transport:
    result = await transport.send_message(params)

Key design decisions

  1. Decorator over ClientTransport, not via interceptors

The existing ClientCallInterceptor has both before() and after() hooks (src/a2a/client/interceptors.py). However, the after() hook is only invoked on successful responses — if transport_call raises an exception, it propagates directly without calling after(). Since interceptors never see exceptions, they cannot implement retry logic. A transport wrapper is the correct abstraction level.

This follows the existing pattern: TenantTransportDecorator (src/a2a/client/transports/tenant_decorator.py) already wraps ClientTransport to attach tenant metadata to requests.

  1. Retriable error classification

The current error hierarchy maps all transport errors to A2AClientError (generic) or A2AClientTimeoutError, with domain errors (TaskNotFoundError, InvalidParamsError, etc.) as separate subclasses. Notably, A2AClientError does not expose transport-level metadata like HTTP status codes — these are only available via the chained __cause__ exception.

The default retry predicate would classify errors by inspecting the exception chain:

def _default_retry_predicate(exc: Exception) -> bool:
    # Timeouts are always retriable
    if isinstance(exc, A2AClientTimeoutError):
        return True

    # Domain errors (TaskNotFoundError, InvalidParamsError, etc.) are never retriable
    if not isinstance(exc, A2AClientError):
        return False

    cause = exc.__cause__

    # HTTP transports: inspect the original httpx exception
    if isinstance(cause, httpx.HTTPStatusError):
        return cause.response.status_code in (429, 502, 503, 504)
    if isinstance(cause, httpx.RequestError):
        return True  # Network errors are transient

    # gRPC transport: inspect the original grpc exception
    try:
        import grpc
        if isinstance(cause, grpc.aio.AioRpcError):
            return cause.code() in (
                grpc.StatusCode.UNAVAILABLE,
                grpc.StatusCode.RESOURCE_EXHAUSTED,
                # DEADLINE_EXCEEDED is already mapped to A2AClientTimeoutError
                # by _map_grpc_error(), so it won't reach here
            )
    except ImportError:
        pass

    return False

This approach works across all three transports because they all chain the original exception via raise ... from e. The conditional grpc import follows the pattern already used in the SDK for optional dependencies.

Note: A cleaner long-term solution would be for A2AClientError to expose retry-relevant metadata (e.g., an is_transient flag or status_code attribute), eliminating the need to inspect __cause__. This is out of scope for this PR but would simplify any cross-transport error handling in the future.

This classification is exposed as a configurable retry_predicate callback for users who need custom logic, with the above as a sensible default.

  1. Streaming behavior

send_message_streaming and subscribe return AsyncGenerator[StreamResponse]. The proposed approach: retry only if the connection fails before the first event is yielded. Once events have started flowing, the stream is considered established and errors are propagated as-is. This avoids the complexity of partial-stream replay.

  1. Exponential backoff with jitter

Standard full-jitter strategy to avoid thundering herd:

delay = min(max_delay, base_delay * (2 ** attempt))
actual_delay = random.uniform(0, delay)
  1. Methods that bypass retry

close() and get_extended_agent_card() delegate directly to the inner transport without retry logic — they are lifecycle/discovery operations, not data exchange.

Proposed public API

class RetryTransport(ClientTransport):
    def __init__(
        self,
        transport: ClientTransport,
        *,
        max_retries: int = 3,
        base_delay: float = 1.0,
        max_delay: float = 30.0,
        retry_predicate: Callable[[Exception], bool] | None = None,
        on_retry: Callable[[int, Exception, float], Awaitable[None]] | None = None,
    ) -> None: ...
  • transport: The inner ClientTransport to wrap.
  • max_retries: Maximum number of retry attempts (default: 3).
  • base_delay: Initial delay in seconds before the first retry (default: 1.0).
  • max_delay: Maximum delay cap in seconds (default: 30.0).
  • retry_predicate: Optional custom function to determine if an exception is retriable. Defaults to the classification described above.
  • on_retry: Optional async callback invoked before each retry with (attempt, exception, delay), useful for logging or metrics.

Integration with ClientFactory

An initial implementation would not modify ClientFactory. Users would wrap the transport manually. A follow-up could add a retry_config parameter to ClientConfig if maintainers prefer built-in integration.

Describe alternatives you've considered

  1. Retry via ClientCallInterceptor

Not feasible with the current design. The interceptor's after() hook only receives successful results (AfterArgs.result) — it is never invoked when the transport raises an exception. Retry would require the interceptor to control the call lifecycle (e.g., an intercept(next_handler) pattern), which would be a breaking change to the middleware API.

  1. Transport-specific retry mechanisms

Both httpx and grpc support retry natively, but at different layers:

  • httpx: Supports custom transports with retry logic. Covers JsonRpcTransport and RestTransport, but not GrpcTransport.
  • gRPC: Supports retry via service config (retryPolicy) or grpc.aio.UnaryUnaryClientInterceptor. Covers GrpcTransport only.

Both operate below the SDK's exception layer — they see raw HTTP/gRPC status codes, not the application-level classification that determines whether a retry makes sense semantically. Using transport-specific retry means maintaining two separate retry configurations with different APIs, different error taxonomies, and no shared policy.

  1. Retry logic inside each transport implementation

Feasible but requires modifying JsonRpcTransport, RestTransport, and GrpcTransport independently. This triples the retry logic across three files, introduces retry configuration into each transport's constructor, and means any future transport must reimplement it. The decorator approach keeps retry in a single class, applies to any transport, and avoids modifying existing code.

Additional context

Compatibility

  • Purely additive: New retry.py file + tests + export update, zero changes to existing code.
  • No new dependencies: Uses only asyncio, random, and logging from the standard library. The grpc conditional import in the default predicate follows the SDK's existing pattern for optional dependencies.
  • Non-breaking: Existing transports continue to work exactly as before. RetryTransport is opt-in.
  • Follows existing patterns: The decorator/wrapper approach is already used in the codebase — see TenantTransportDecorator in src/a2a/client/transports/tenant_decorator.py.

Scope

I'm happy to open a PR targeting 1.0-dev for this. The implementation would include:

  • src/a2a/client/transports/retry.py — the RetryTransport class
  • tests/client/transports/test_retry.py — unit tests covering retry logic, backoff timing, error classification, streaming edge cases, and the on_retry callback
  • Public export update in src/a2a/client/transports/__init__.py

Happy to adjust the design based on feedback — particularly around the __cause__ inspection strategy for error classification and whether ClientFactory integration should be part of the initial PR or a follow-up.

Code of Conduct

  • I agree to follow this project's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions