Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions contributing/samples/streaming_tools_non_live_agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Streaming Tools Non-Live Agent

This agent demonstrates streaming tools in non-live mode (run_async/SSE).

## Features

- **monitor_stock_price**: Monitors stock prices with real-time updates
- **process_large_dataset**: Processes datasets with progress updates
- **monitor_system_health**: Monitors system health metrics continuously

## Testing

### With ADK Web UI

```bash
cd contributing/samples
adk web .
```

Then try:
- "Monitor the stock price for AAPL"
- "Process a large dataset at /tmp/data.csv"
- "Monitor system health"

### With ADK CLI

```bash
cd contributing/samples/streaming_tools_non_live_agent
adk run .
```

### With API Server (SSE)

```bash
cd contributing/samples
adk api_server .
```

Then send a POST request to `/run_sse` with `streaming: true` to see intermediate Events.
15 changes: 15 additions & 0 deletions contributing/samples/streaming_tools_non_live_agent/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from . import agent
128 changes: 128 additions & 0 deletions contributing/samples/streaming_tools_non_live_agent/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example agent demonstrating streaming tools in non-live mode (run_async/SSE).

This agent shows how to use streaming tools that yield intermediate results
in non-live mode. Streaming tools work with both run_async and SSE endpoints.
"""

from __future__ import annotations

import asyncio
from typing import AsyncGenerator

from google.adk.agents import Agent


async def monitor_stock_price(symbol: str) -> AsyncGenerator[dict, None]:
"""Monitor stock price with real-time updates.

This is a streaming tool that yields intermediate results as the stock
price changes. The agent can react to these intermediate results.

Args:
symbol: The stock symbol to monitor (e.g., 'AAPL', 'GOOGL').

Yields:
Dictionary containing stock price updates with status indicators.
"""
# Simulate stock price changes
prices = [100, 105, 110, 108, 112, 115]
for i, price in enumerate(prices):
await asyncio.sleep(1) # Simulate real-time updates
yield {
'symbol': symbol,
'price': price,
'update': i + 1,
'status': 'streaming' if i < len(prices) - 1 else 'complete',
}


async def process_large_dataset(file_path: str) -> AsyncGenerator[dict, None]:
"""Process dataset with progress updates.

This streaming tool demonstrates how to provide progress feedback
for long-running operations.

Args:
file_path: Path to the dataset file to process.

Yields:
Dictionary containing progress information and final result.
"""
total_rows = 100
processed = 0

# Simulate processing in batches
for batch in range(10):
await asyncio.sleep(0.5) # Simulate processing time
processed += 10
yield {
'progress': processed / total_rows,
'processed': processed,
'total': total_rows,
'status': 'streaming',
'message': f'Processed {processed}/{total_rows} rows',
}

# Final result
yield {
'result': 'Processing complete',
'status': 'complete',
'file_path': file_path,
'total_processed': total_rows,
}


async def monitor_system_health() -> AsyncGenerator[dict, None]:
"""Monitor system health metrics with continuous updates.

This streaming tool demonstrates continuous monitoring that can be
stopped by the agent when thresholds are reached.

Yields:
Dictionary containing system health metrics.
"""
metrics = [
{'cpu': 45, 'memory': 60, 'disk': 70},
{'cpu': 50, 'memory': 65, 'disk': 72},
{'cpu': 55, 'memory': 70, 'disk': 75},
{'cpu': 60, 'memory': 75, 'disk': 78},
]

for i, metric in enumerate(metrics):
await asyncio.sleep(2) # Check every 2 seconds
yield {
'metrics': metric,
'timestamp': i + 1,
'status': 'streaming' if i < len(metrics) - 1 else 'complete',
'alert': 'high' if metric['cpu'] > 55 else 'normal',
}


root_agent = Agent(
name='streaming_tools_agent',
model='gemini-2.5-flash-lite',
instruction=(
'You are a helpful assistant that can monitor stock prices, process'
' datasets, and monitor system health using streaming tools. When'
' using streaming tools, you will receive intermediate results that'
' you can react to. For example, if monitoring stock prices, you can'
' alert the user when prices change significantly. If processing a'
' dataset, you can provide progress updates. If monitoring system'
' health, you can alert when metrics exceed thresholds.'
),
tools=[monitor_stock_price, process_large_dataset, monitor_system_health],
)
96 changes: 73 additions & 23 deletions src/google/adk/flows/llm_flows/base_llm_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,35 +678,85 @@ async def _postprocess_handle_function_calls_async(
function_call_event: Event,
llm_request: LlmRequest,
) -> AsyncGenerator[Event, None]:
if function_response_event := await functions.handle_function_calls_async(
invocation_context, function_call_event, llm_request.tools_dict
):
auth_event = functions.generate_auth_event(
invocation_context, function_response_event
)
if auth_event:
yield auth_event
function_calls = function_call_event.get_function_calls()
if not function_calls:
return

# Check if any tools are streaming tools
has_streaming_tools = any(
functions._is_streaming_tool(tool)
for call in function_calls
if (tool := llm_request.tools_dict.get(call.name))
)

tool_confirmation_event = functions.generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
if has_streaming_tools:
# Use streaming handler
tool_confirmation_dict = (
invocation_context.tool_confirmation_dict
if hasattr(invocation_context, 'tool_confirmation_dict')
else None
)
if tool_confirmation_event:
yield tool_confirmation_event
async for event in functions.handle_function_calls_async_with_streaming(
invocation_context,
function_calls,
llm_request.tools_dict,
tool_confirmation_dict,
):
auth_event = functions.generate_auth_event(invocation_context, event)
if auth_event:
yield auth_event

# Always yield the function response event first
yield function_response_event
tool_confirmation_event = functions.generate_request_confirmation_event(
invocation_context, function_call_event, event
)
if tool_confirmation_event:
yield tool_confirmation_event

# Check if this is a set_model_response function response
if json_response := _output_schema_processor.get_structured_model_response(
function_response_event
yield event

# Check if this is a set_model_response function response
if json_response := (
_output_schema_processor.get_structured_model_response(event)
):
final_event = (
_output_schema_processor.create_final_model_response_event(
invocation_context, json_response
)
)
yield final_event
else:
Comment thread
sarojrout marked this conversation as resolved.
# Use regular handler
if function_response_event := await functions.handle_function_calls_async(
invocation_context, function_call_event, llm_request.tools_dict
):
# Create and yield a final model response event
final_event = (
_output_schema_processor.create_final_model_response_event(
invocation_context, json_response
)
auth_event = functions.generate_auth_event(
invocation_context, function_response_event
)
yield final_event
if auth_event:
yield auth_event

tool_confirmation_event = functions.generate_request_confirmation_event(
invocation_context, function_call_event, function_response_event
)
if tool_confirmation_event:
yield tool_confirmation_event

# Always yield the function response event first
yield function_response_event

# Check if this is a set_model_response function response
if json_response := (
_output_schema_processor.get_structured_model_response(
function_response_event
)
):
# Create and yield a final model response event
final_event = (
_output_schema_processor.create_final_model_response_event(
invocation_context, json_response
)
)
yield final_event
Comment thread
sarojrout marked this conversation as resolved.
Outdated
transfer_to_agent = function_response_event.actions.transfer_to_agent
if transfer_to_agent:
agent_to_run = self._get_agent_to_run(
Expand Down
Loading