Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/google/adk/tools/mcp_tool/mcp_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ def __init__(
StreamableHTTPConnectionParams,
],
errlog: TextIO = sys.stderr,
sampling_callback: Optional[Any] = None,
sampling_capabilities: Optional[Any] = None,
):
"""Initializes the MCP session manager.

Expand All @@ -205,6 +207,9 @@ def __init__(
errlog: (Optional) TextIO stream for error logging. Use only for
initializing a local stdio MCP session.
"""
self._sampling_callback = sampling_callback
self._sampling_capabilities = sampling_capabilities

if isinstance(connection_params, StdioServerParameters):
# So far timeout is not configurable. Given MCP is still evolving, we
# would expect stdio_client to evolve to accept timeout parameter like
Expand Down Expand Up @@ -475,6 +480,8 @@ async def create_session(
timeout=timeout_in_seconds,
sse_read_timeout=sse_read_timeout_in_seconds,
is_stdio=is_stdio,
sampling_callback=self._sampling_callback,
sampling_capabilities=self._sampling_capabilities,
)
),
timeout=timeout_in_seconds,
Expand Down
7 changes: 7 additions & 0 deletions src/google/adk/tools/mcp_tool/mcp_toolset.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ def __init__(
Union[ProgressFnT, ProgressCallbackFactory]
] = None,
use_mcp_resources: Optional[bool] = False,
sampling_callback: Optional[SamplingFnT] = None,
sampling_capabilities: Optional[Any] = None,
):
"""Initializes the McpToolset.

Expand Down Expand Up @@ -154,6 +156,9 @@ def __init__(

super().__init__(tool_filter=tool_filter, tool_name_prefix=tool_name_prefix)

self._sampling_callback = sampling_callback
self._sampling_capabilities = sampling_capabilities

if not connection_params:
raise ValueError("Missing connection params in McpToolset.")

Expand All @@ -166,6 +171,8 @@ def __init__(
self._mcp_session_manager = MCPSessionManager(
connection_params=self._connection_params,
errlog=self._errlog,
sampling_callback=self._sampling_callback,
sampling_capabilities=self._sampling_capabilities,
)
self._auth_scheme = auth_scheme
self._auth_credential = auth_credential
Expand Down
8 changes: 8 additions & 0 deletions src/google/adk/tools/mcp_tool/session_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def __init__(
timeout: Optional[float],
sse_read_timeout: Optional[float],
is_stdio: bool = False,
sampling_callback: Optional[Any] = None,
sampling_capabilities: Optional[Any] = None,
):
"""
Args:
Expand All @@ -73,6 +75,8 @@ def __init__(
self._close_event = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._task_lock = asyncio.Lock()
self._sampling_callback = sampling_callback
self._sampling_capabilities = sampling_capabilities

@property
def session(self) -> Optional[ClientSession]:
Expand Down Expand Up @@ -165,6 +169,8 @@ async def _run(self):
read_timeout_seconds=timedelta(seconds=self._timeout)
if self._timeout is not None
else None,
sampling_callback=self._sampling_callback,
sampling_capabilities=self._sampling_capabilities,
)
)
else:
Expand All @@ -176,6 +182,8 @@ async def _run(self):
read_timeout_seconds=timedelta(seconds=self._sse_read_timeout)
if self._sse_read_timeout is not None
else None,
sampling_callback=self._sampling_callback,
sampling_capabilities=self._sampling_capabilities,
)
)
await asyncio.wait_for(session.initialize(), timeout=self._timeout)
Expand Down
45 changes: 45 additions & 0 deletions tests/unittests/tools/mcp_tool/test_mcp_sampling_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import pytest
from fastmcp.client.sampling import SamplingMessage
from google.adk.tools.mcp_tool.mcp_toolset import McpToolset
# from google.adk.tools.mcp_tool.mcp_toolset import StreamableHTTPConnectionParams
from google.adk.tools.mcp_tool.mcp_session_manager import StreamableHTTPConnectionParams


@pytest.mark.asyncio
async def test_sampling_callback_invoked():

called = {"value": False}

async def mock_sampling_handler(messages, params=None, context=None):
called["value"] = True

assert isinstance(messages, list)
assert messages[0].role == "user"

return {
"model": "test-model",
"role": "assistant",
"content": {"type": "text", "text": "sampling response"},
"stopReason": "endTurn",
}

toolset = McpToolset(
connection_params=StreamableHTTPConnectionParams(
url="http://localhost:9999",
timeout=10,
),
sampling_callback=mock_sampling_handler,
)

messages = [
SamplingMessage(
role="user",
content={"type": "text", "text": "hello"},
)
]

result = await toolset._sampling_callback(messages)

assert called["value"] is True
assert result["role"] == "assistant"
assert result["content"]["text"] == "sampling response"