From 9a029344d780e3ec0c030612cab90c964d0f66ff Mon Sep 17 00:00:00 2001 From: Madhavi <144771026+madhavi-joshi-nutrien@users.noreply.github.com> Date: Thu, 7 May 2026 18:14:19 +0530 Subject: [PATCH] feat(bidi): add configurable event_queue_size to BidiAgent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The internal event queue between the model receive loop and the output handler was hardcoded to maxsize=1. This causes the model receive loop to block whenever the output handler performs any async I/O (e.g., websocket.send_json()), resulting in audio chunks piling up and delivering in bursts — perceived as choppy audio. This is particularly noticeable with Gemini Live which sends many small audio chunks rapidly (~50/sec), unlike Nova Sonic which sends fewer, larger chunks. Add a configurable event_queue_size parameter to BidiAgent (default: 1 to preserve existing behavior). Users experiencing choppy audio with fast-delivering models can increase this value to provide buffering between the model receive loop and the output handler. --- src/strands/experimental/bidi/agent/agent.py | 7 +++++++ src/strands/experimental/bidi/agent/loop.py | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/strands/experimental/bidi/agent/agent.py b/src/strands/experimental/bidi/agent/agent.py index 8c68e780e..b3951d8e3 100644 --- a/src/strands/experimental/bidi/agent/agent.py +++ b/src/strands/experimental/bidi/agent/agent.py @@ -74,6 +74,7 @@ def __init__( state: AgentState | dict | None = None, session_manager: "SessionManager | None" = None, tool_executor: ToolExecutor | None = None, + event_queue_size: int = 1, **kwargs: Any, ): """Initialize bidirectional agent. @@ -93,6 +94,11 @@ def __init__( session_manager: Manager for handling agent sessions including conversation history and state. If provided, enables session-based persistence and state management. tool_executor: Definition of tool execution strategy (e.g., sequential, concurrent, etc.). + event_queue_size: Maximum size of the internal event queue between the model receive loop + and the output handler. Higher values provide more buffer for bursty audio delivery + (e.g., Gemini Live) at the cost of slightly higher memory usage. Lower values apply + more backpressure. Default of 32 provides ~640ms of audio buffer at typical chunk rates. + Set to 1 for legacy behavior (may cause choppy audio with fast-delivering models). **kwargs: Additional configuration for future extensibility. Raises: @@ -108,6 +114,7 @@ def __init__( self.system_prompt = system_prompt self.messages = messages or [] + self._event_queue_size = event_queue_size # Agent identification self.agent_id = _identifier.validate(agent_id or _DEFAULT_AGENT_ID, _identifier.Identifier.AGENT) diff --git a/src/strands/experimental/bidi/agent/loop.py b/src/strands/experimental/bidi/agent/loop.py index 79818ae7c..638e89bc0 100644 --- a/src/strands/experimental/bidi/agent/loop.py +++ b/src/strands/experimental/bidi/agent/loop.py @@ -49,6 +49,8 @@ class _BidiAgentLoop: _invocation_state: Optional context to pass to tools during execution. This allows passing custom data (user_id, session_id, database connections, etc.) that tools can access via their invocation_state parameter. + _event_queue_size: Maximum size of the internal event queue. Controls backpressure + between the model receive loop and the output handler. _send_gate: Gate the sending of events to the model. Blocks when agent is reseting the model connection after timeout. """ @@ -65,6 +67,7 @@ def __init__(self, agent: "BidiAgent") -> None: self._started = False self._task_pool = _TaskPool() self._event_queue: asyncio.Queue + self._event_queue_size = agent._event_queue_size self._invocation_state: dict[str, Any] self._send_gate = asyncio.Event() @@ -94,7 +97,7 @@ async def start(self, invocation_state: dict[str, Any] | None = None) -> None: messages=self._agent.messages, ) - self._event_queue = asyncio.Queue(maxsize=1) + self._event_queue = asyncio.Queue(maxsize=self._event_queue_size) self._task_pool = _TaskPool() self._task_pool.create(self._run_model())