Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 69 additions & 9 deletions src/google/adk/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ class Runner:
resumability_config: The resumability config for the application.
"""

# Semaphore to limit concurrent event compaction tasks to prevent resource
# exhaustion under high concurrency. Limits concurrent LLM calls and DB writes.
# Shared across all Runner instances for global concurrency control.
_compaction_semaphore: Optional[asyncio.Semaphore] = None

app_name: str
"""The app name of the runner."""
agent: BaseAgent
Expand Down Expand Up @@ -150,6 +155,7 @@ def __init__(
credential_service: Optional[BaseCredentialService] = None,
plugin_close_timeout: float = 5.0,
auto_create_session: bool = False,
max_concurrent_compactions: int = 10,
):
"""Initializes the Runner.

Expand Down Expand Up @@ -179,6 +185,11 @@ def __init__(
auto_create_session: Whether to automatically create a session when
not found. Defaults to False. If False, a missing session raises
ValueError with a helpful message.
max_concurrent_compactions: Maximum number of concurrent event
compaction tasks allowed. Defaults to 10. This limit is shared across
all Runner instances to prevent resource exhaustion. Higher values
allow more concurrent compactions but consume more resources (LLM
API calls, database connections).

Raises:
ValueError: If `app` is provided along with `agent` or `plugins`, or if
Expand Down Expand Up @@ -206,6 +217,8 @@ def __init__(
) = self._infer_agent_origin(self.agent)
self._app_name_alignment_hint: Optional[str] = None
self._enforce_app_name_alignment()
# Initialize or update the shared compaction semaphore
self._initialize_compaction_semaphore(max_concurrent_compactions)
Comment thread
lwangverizon marked this conversation as resolved.
Outdated

def _validate_runner_params(
self,
Expand Down Expand Up @@ -320,6 +333,27 @@ def _infer_agent_origin(
return None, origin_dir
return origin_name, origin_dir

@classmethod
def _initialize_compaction_semaphore(cls, limit: int) -> None:
"""Initializes or updates the shared compaction semaphore.

This method ensures the class-level semaphore is initialized with the
specified limit. If a semaphore already exists, it creates a new one
with the updated limit (the old one will be garbage collected once all
pending tasks complete).

Args:
limit: Maximum number of concurrent compaction tasks allowed.
"""
if limit <= 0:
raise ValueError(
f'max_concurrent_compactions must be positive, got {limit}'
)
# Note: We can't use async lock here since this is called from __init__.
# The semaphore creation itself is thread-safe, and in practice Runner
# instances are created in the same event loop, so this is safe.
cls._compaction_semaphore = asyncio.Semaphore(limit)
Comment thread
lwangverizon marked this conversation as resolved.
Outdated

def _enforce_app_name_alignment(self) -> None:
origin_name = self._agent_origin_app_name
origin_dir = self._agent_origin_dir
Expand Down Expand Up @@ -401,8 +435,8 @@ def run(

If event compaction is enabled in the App configuration, it will be
performed after all agent events for the current invocation have been
yielded. The generator will only finish iterating after event
compaction is complete.
yielded. Compaction runs as a background task and does not block the
generator from completing.

Args:
user_id: The user ID of the session.
Expand Down Expand Up @@ -464,9 +498,10 @@ async def run_async(

If event compaction is enabled in the App configuration, it will be
performed after all agent events for the current invocation have been
yielded. The async generator will only finish iterating after event
compaction is complete. However, this does not block new `run_async`
calls for subsequent user queries, which can be started concurrently.
yielded. Compaction runs as a background task and does not block the
generator from completing, allowing the frontend to receive responses
without delay. However, this does not block new `run_async` calls for
subsequent user queries, which can be started concurrently.

Args:
user_id: The user ID of the session.
Expand Down Expand Up @@ -552,11 +587,36 @@ async def execute(ctx: InvocationContext) -> AsyncGenerator[Event]:
# Run compaction after all events are yielded from the agent.
# (We don't compact in the middle of an invocation, we only compact at
# the end of an invocation.)
# Run compaction as a background task to avoid blocking the generator
# completion, which causes delays on the frontend. Use a semaphore to
# limit concurrent compactions and prevent resource exhaustion under
# high concurrency.
if self.app and self.app.events_compaction_config:
logger.debug('Running event compactor.')
await _run_compaction_for_sliding_window(
self.app, session, self.session_service
)
logger.debug('Scheduling event compactor in background.')

async def _run_compaction_with_error_handling():
try:
# Ensure semaphore is initialized (should always be after __init__)
if self._compaction_semaphore is None:
logger.warning(
'Compaction semaphore not initialized, using default limit.'
)
self._initialize_compaction_semaphore(10)
Comment thread
lwangverizon marked this conversation as resolved.
Outdated
async with self._compaction_semaphore:
await _run_compaction_for_sliding_window(
self.app, session, self.session_service
)
except asyncio.CancelledError:
logger.debug('Event compaction cancelled.')
raise
except Exception as e:
logger.error(
'Event compaction failed but not blocking response: %s',
e,
exc_info=True,
)

asyncio.create_task(_run_compaction_with_error_handling())
Comment thread
lwangverizon marked this conversation as resolved.
Outdated

async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen:
async for event in agen:
Expand Down
Loading