forked from modelcontextprotocol/python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtool_manager.py
More file actions
125 lines (106 loc) · 4.29 KB
/
tool_manager.py
File metadata and controls
125 lines (106 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from __future__ import annotations as _annotations
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from mcp.server.fastmcp.exceptions import ToolError
from mcp.server.fastmcp.tools.base import Tool
from mcp.server.fastmcp.utilities.logging import get_logger
from mcp.shared.context import LifespanContextT, RequestT
from mcp.types import Icon, ToolAnnotations
if TYPE_CHECKING:
from mcp.server.fastmcp.server import Context
from mcp.server.session import ServerSessionT
logger = get_logger(__name__)
class ToolManager:
"""Manages FastMCP tools."""
def __init__(
self,
warn_on_duplicate_tools: bool = True,
*,
tools: list[Tool] | None = None,
):
self._tools: dict[str, Tool] = {}
if tools is not None:
for tool in tools:
if warn_on_duplicate_tools and tool.name in self._tools:
logger.warning(f"Tool already exists: {tool.name}")
self._tools[tool.name] = tool
self.warn_on_duplicate_tools = warn_on_duplicate_tools
def get_tool(self, name: str) -> Tool | None:
"""Get tool by name."""
return self._tools.get(name)
def _include_tools(self, tools: dict[str, Tool], include: list[str]) -> list[Tool]:
"""Filter tools to include only the specified tool names."""
filtered_tools: list[Tool] = []
for tool_name in include:
tool = tools.get(tool_name)
if tool is None:
raise ValueError(f"Tool '{tool_name}' not found in available tools, cannot be included.")
filtered_tools.append(tool)
return filtered_tools
def _exclude_tools(self, tools: dict[str, Tool], exclude: list[str]) -> list[Tool]:
"""Filter tools to exclude the specified tool names."""
exclude_set = set(exclude)
for tool_name in exclude:
if tool_name not in tools:
raise ValueError(f"Tool '{tool_name}' not found in available tools, cannot be excluded.")
return [tool for name, tool in tools.items() if name not in exclude_set]
def list_tools(
self,
*,
include: list[str] | None = None,
exclude: list[str] | None = None,
) -> list[Tool]:
"""List all registered tools, optionally filtered by include or exclude parameters."""
if include is not None and exclude is not None:
raise ValueError("Cannot specify both 'include' and 'exclude' parameters")
elif include is not None:
return self._include_tools(self._tools, include)
elif exclude is not None:
return self._exclude_tools(self._tools, exclude)
return list(self._tools.values())
def add_tool(
self,
fn: Callable[..., Any],
name: str | None = None,
title: str | None = None,
description: str | None = None,
annotations: ToolAnnotations | None = None,
icons: list[Icon] | None = None,
meta: dict[str, Any] | None = None,
structured_output: bool | None = None,
) -> Tool:
"""Add a tool to the server."""
tool = Tool.from_function(
fn,
name=name,
title=title,
description=description,
annotations=annotations,
icons=icons,
meta=meta,
structured_output=structured_output,
)
existing = self._tools.get(tool.name)
if existing:
if self.warn_on_duplicate_tools:
logger.warning(f"Tool already exists: {tool.name}")
return existing
self._tools[tool.name] = tool
return tool
def remove_tool(self, name: str) -> None:
"""Remove a tool by name."""
if name not in self._tools:
raise ToolError(f"Unknown tool: {name}")
del self._tools[name]
async def call_tool(
self,
name: str,
arguments: dict[str, Any],
context: Context[ServerSessionT, LifespanContextT, RequestT] | None = None,
convert_result: bool = False,
) -> Any:
"""Call a tool by name with arguments."""
tool = self.get_tool(name)
if not tool:
raise ToolError(f"Unknown tool: {name}")
return await tool.run(arguments, context=context, convert_result=convert_result)