Skip to content

Latest commit

 

History

History
568 lines (410 loc) · 15.4 KB

File metadata and controls

568 lines (410 loc) · 15.4 KB

19. 模型上下文协议(Model Context Protocol, MCP)

原文链接: https://docs.langchain.com/oss/python/langchain/mcp

Model Context Protocol(MCP)是一个开放协议,用于标准化应用向 LLM 提供工具和上下文的方式。借助 langchain-mcp-adapters 库,LangChain 智能体可以使用定义在 MCP 服务器上的工具。


快速开始(Quickstart)

安装 langchain-mcp-adapters

pip install langchain-mcp-adapters

langchain-mcp-adapters 让智能体可以使用一个或多个 MCP 服务器上定义的工具。

MultiServerMCPClient 默认是无状态的:每次工具调用都会创建新的 MCP ClientSession,执行工具后清理会话。关于有状态会话的更多内容见后文“Stateful sessions”。

访问多个 MCP 服务器

from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent


client = MultiServerMCPClient(
    {
        "math": {
            "transport": "stdio",  # 本地子进程通信
            "command": "python",
            # math_server.py 的绝对路径
            "args": ["/path/to/math_server.py"],
        },
        "weather": {
            "transport": "http",  # 基于 HTTP 的远程服务器
            # 确保你的天气服务监听在 8000 端口
            "url": "http://localhost:8000/mcp",
        },
    }
)

tools = await client.get_tools()  # 从所有 MCP 服务器加载工具
agent = create_agent(
    "claude-sonnet-4-5-20250929",
    tools,
)

math_response = await agent.ainvoke(
    {"messages": [{"role": "user", "content": "what's (3 + 5) x 12?"}]}
)

weather_response = await agent.ainvoke(
    {"messages": [{"role": "user", "content": "what is the weather in nyc?"}]}
)

自定义服务器(Custom servers)

要创建自定义 MCP 服务器,可以使用 FastMCP 库:

pip install fastmcp

可以使用如下示例测试你的智能体与 MCP 工具服务器的集成。

数学服务器(stdio 传输)

from fastmcp import FastMCP

mcp = FastMCP("Math")


@mcp.tool()
def add(a: int, b: int) -> int:
    """Add two numbers"""
    return a + b


@mcp.tool()
def multiply(a: int, b: int) -> int:
    """Multiply two numbers"""
    return a * b


if __name__ == "__main__":
    mcp.run(transport="stdio")

传输(Transports)

MCP 支持多种客户端-服务器通信传输方式。

HTTP

http 传输(也称为 streamable-http)使用 HTTP 请求进行通信。具体协议细节可见 MCP HTTP 传输规范。

client = MultiServerMCPClient(
    {
        "weather": {
            "transport": "http",
            "url": "http://localhost:8000/mcp",
        }
    }
)

传递 headers(Passing headers)

连接到 HTTP MCP 服务器时,可以通过配置中的 headers 字段添加自定义请求头(例如认证或链路追踪)。这对 sse(已在 MCP 规范中弃用)和 streamable_http 传输都适用。

from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent


client = MultiServerMCPClient(
    {
        "weather": {
            "transport": "http",
            "url": "http://localhost:8000/mcp",
            "headers": {
                "Authorization": "Bearer YOUR_TOKEN",
                "X-Custom-Header": "custom-value",
            },
        }
    }
)

tools = await client.get_tools()
agent = create_agent("openai:gpt-4.1", tools)
response = await agent.ainvoke({"messages": "what is the weather in nyc?"})

认证(Authentication)

langchain-mcp-adapters 库在底层使用官方 MCP SDK,你可以通过实现 httpx.Auth 接口来提供自定义认证机制:

from langchain_mcp_adapters.client import MultiServerMCPClient


client = MultiServerMCPClient(
    {
        "weather": {
            "transport": "http",
            "url": "http://localhost:8000/mcp",
            "auth": auth,  # 自定义 httpx.Auth 实例
        }
    }
)

你可以实现自定义认证类,也可以使用内置 OAuth 流程。

stdio

stdio 模式下,客户端以子进程方式启动服务器,并通过标准输入/输出通信。适合本地工具和简单部署。

与 HTTP 传输不同,stdio 连接在进程层面是 有状态的——子进程会在客户端连接生命周期内持续存在。但如果只使用默认的 MultiServerMCPClient 而不做显式会话管理,每次工具调用仍然会创建新会话。关于持久会话管理见下节。

client = MultiServerMCPClient(
    {
        "math": {
            "transport": "stdio",
            "command": "python",
            "args": ["/path/to/math_server.py"],
        }
    }
)

有状态会话(Stateful sessions)

默认情况下,MultiServerMCPClient无状态 的:

  • 每次工具调用都会创建新的 MCP 会话;
  • 执行工具后立即清理。

如果你需要控制 MCP 会话的生命周期(例如服务器在多次调用间保持上下文),可以通过 client.session() 创建持久化的 ClientSession

from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain.agents import create_agent


client = MultiServerMCPClient({...})

# 显式创建会话
async with client.session("server_name") as session:
    # 使用该会话加载工具、资源或提示
    tools = await load_mcp_tools(session)
    agent = create_agent(
        "anthropic:claude-3-7-sonnet-latest",
        tools,
    )

核心特性(Core features)

工具(Tools)

MCP 服务器通过工具向 LLM 暴露可执行函数,例如:

  • 查询数据库
  • 调用外部 API
  • 与外部系统交互

LangChain 会将 MCP 工具转换为 LangChain 工具,从而可以直接在任意 LangChain 智能体或工作流中使用。

加载工具(Loading tools)

使用 client.get_tools() 从 MCP 服务器加载工具并传给智能体:

from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent


client = MultiServerMCPClient({...})
tools = await client.get_tools()
agent = create_agent("claude-sonnet-4-5-20250929", tools)

结构化内容(Structured content)

MCP 工具可以在返回给模型的人类可读文本之外,同时返回结构化内容(如 JSON)。

  • 当 MCP 工具返回 structuredContent 时,适配器会将其包装为 MCPToolArtifact,并作为工具的 artifact 暴露;
  • 你可以在 ToolMessage 上通过 artifact 字段访问它;
  • 也可以通过拦截器自动处理或转换结构化内容。

(具体示例略,可在需要时参考原文档。)

资源(Resources)与提示(Prompts)

MCP 还支持:

  • Resources:如文档、文件等可供模型访问的资源;
  • Prompts:由 MCP 服务器提供的可重用提示模板。

通过 load_mcp_tools 及相关 API,可以一并加载工具、资源与提示,并在 LangChain 智能体中使用。


高级特性(Advanced features)

工具拦截器(Tool interceptors)

工具拦截器允许你:

  • 在 MCP 工具调用前后插入自定义逻辑;
  • 记录日志、修改请求 / 响应、重试失败调用等。

访问运行时上下文(Accessing runtime context)

在拦截器中可以访问 MCPToolCallRequest,并查看:

  • 工具名
  • 参数
  • 服务器信息等

状态更新与命令(State updates and commands)

你也可以结合 LangGraph 的 Command、状态更新等机制,在 MCP 工具调用完成后更新智能体的 state / store。

自定义拦截器(Custom interceptors)

拦截器函数通常签名类似:

async def interceptor(request: MCPToolCallRequest, handler):
    # 调用前逻辑
    result = await handler(request)
    # 调用后逻辑
    return result

将拦截器注册到 MultiServerMCPClient

client = MultiServerMCPClient(
    {...},
    tool_interceptors=[logging_interceptor],  # 自定义拦截器列表
)

修改请求(Modifying requests)

使用 request.override() 创建修改后的请求,遵循不可变模式,原始请求不会被改变。

示例:修改工具参数

async def double_args_interceptor(
    request: MCPToolCallRequest,
    handler,
):
    """在执行前将所有数值参数翻倍。"""
    modified_args = {k: v * 2 for k, v in request.args.items()}
    modified_request = request.override(args=modified_args)
    return await handler(modified_request)

# 原始调用:add(a=2, b=3) 会变为 add(a=4, b=6)

示例:在运行时修改 HTTP headers

async def auth_header_interceptor(
    request: MCPToolCallRequest,
    handler,
):
    """根据工具名称动态添加认证头。"""
    token = get_token_for_tool(request.name)
    modified_request = request.override(
        headers={"Authorization": f"Bearer {token}"}
    )
    return await handler(modified_request)

组合拦截器(Composing interceptors)

多个拦截器按“洋葱模型”顺序组合:拦截器列表中第一个是最外层。

async def outer_interceptor(request, handler):
    print("outer: before")
    result = await handler(request)
    print("outer: after")
    return result


async def inner_interceptor(request, handler):
    print("inner: before")
    result = await handler(request)
    print("inner: after")
    return result


client = MultiServerMCPClient(
    {...},
    tool_interceptors=[outer_interceptor, inner_interceptor],
)

# 执行顺序:
# outer: before -> inner: before -> 工具执行 -> inner: after -> outer: after

错误处理(Error handling)

拦截器可以捕获工具执行错误并实现重试逻辑:

import asyncio


async def retry_interceptor(
    request: MCPToolCallRequest,
    handler,
    max_retries: int = 3,
    delay: float = 1.0,
):
    """在错误时按指数退避重试工具调用。"""
    last_error = None
    for attempt in range(max_retries):
        try:
            return await handler(request)
        except Exception as e:
            last_error = e
            if attempt < max_retries - 1:
                wait_time = delay * (2 ** attempt)
                print(f"Tool {request.name} failed (attempt {attempt + 1}), retrying in {wait_time}s...")
                await asyncio.sleep(wait_time)
    raise last_error


client = MultiServerMCPClient(
    {...},
    tool_interceptors=[retry_interceptor],
)

也可以对特定错误做回退处理:

async def fallback_interceptor(
    request: MCPToolCallRequest,
    handler,
):
    """在工具执行失败时返回回退结果。"""
    try:
        return await handler(request)
    except TimeoutError:
        return f"Tool {request.name} timed out. Please try again later."
    except ConnectionError:
        return f"Could not connect to {request.name} service. Using cached data."

进度通知(Progress notifications)

可以订阅长时间运行工具的进度更新:

from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.callbacks import Callbacks, CallbackContext


async def on_progress(
    progress: float,
    total: float | None,
    message: str | None,
    context: CallbackContext,
):
    """处理来自 MCP 服务器的进度更新。"""
    percent = (progress / total * 100) if total else progress
    tool_info = f" ({context.tool_name})" if context.tool_name else ""
    print(f"[{context.server_name}{tool_info}] Progress: {percent:.1f}% - {message}")


client = MultiServerMCPClient(
    {...},
    callbacks=Callbacks(on_progress=on_progress),
)

CallbackContext 提供:

  • server_name:MCP 服务器名称
  • tool_name:正在执行的工具名称(仅在工具调用期间可用)

日志(Logging)

MCP 协议支持服务器端日志通知。可以通过 Callbacks 订阅这些事件:

from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.callbacks import Callbacks, CallbackContext
from mcp.types import LoggingMessageNotificationParams


async def on_logging_message(
    params: LoggingMessageNotificationParams,
    context: CallbackContext,
):
    """处理来自 MCP 服务器的日志消息。"""
    print(f"[{context.server_name}] {params.level}: {params.data}")


client = MultiServerMCPClient(
    {...},
    callbacks=Callbacks(on_logging_message=on_logging_message),
)

引导式补充输入(Elicitation)

Elicitation 允许 MCP 服务器在工具执行期间向用户请求额外输入,而不是一开始就要求所有参数。服务器可以按需交互式询问信息。

服务器端示例(Server setup)

from pydantic import BaseModel
from mcp.server.fastmcp import Context, FastMCP


server = FastMCP("Profile")


class UserDetails(BaseModel):
    email: str
    age: int


@server.tool()
async def create_profile(name: str, ctx: Context) -> str:
    """创建用户档案,并通过引导式提问获取详情。"""
    result = await ctx.elicit(
        message=f"Please provide details for {name}'s profile:",
        schema=UserDetails,
    )
    if result.action == "accept" and result.data:
        return f"Created profile for {name}: email={result.data.email}, age={result.data.age}"
    if result.action == "decline":
        return f"User declined. Created minimal profile for {name}."
    return "Profile creation cancelled."


if __name__ == "__main__":
    server.run(transport="http")

客户端示例(Client setup)

from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.callbacks import Callbacks, CallbackContext
from mcp.shared.context import RequestContext
from mcp.types import ElicitRequestParams, ElicitResult


async def on_elicitation(
    mcp_context: RequestContext,
    params: ElicitRequestParams,
    context: CallbackContext,
) -> ElicitResult:
    """处理来自 MCP 服务器的引导式输入请求。"""
    # 在真实应用中,你会根据 params.message 和 params.requestedSchema
    # 提示真实用户输入。这里用固定值示例。
    return ElicitResult(
        action="accept",
        content={"email": "[email protected]", "age": 25},
    )


client = MultiServerMCPClient(
    {
        "profile": {
            "url": "http://localhost:8000/mcp",
            "transport": "http",
        }
    },
    callbacks=Callbacks(on_elicitation=on_elicitation),
)

响应动作(Response actions)

引导式回调可以返回三种动作(action):

Action 描述
accept 用户提供了有效输入,数据放在 content 字段中
decline 用户选择不提供请求的信息
cancel 用户取消整个操作

示例:

# 接受并带有数据
ElicitResult(action="accept", content={"email": "[email protected]", "age": 25})

# 拒绝(用户不想提供信息)
ElicitResult(action="decline")

# 取消(终止操作)
ElicitResult(action="cancel")

更多资源(Additional resources)

  • MCP documentation — MCP 协议完整文档
  • MCP Transport documentation — MCP 传输方式详解
  • langchain-mcp-adapters — LangChain MCP 适配器库文档

本文档由 LangChain 官方文档翻译而来