Skip to content

Latest commit

 

History

History
333 lines (264 loc) · 11 KB

File metadata and controls

333 lines (264 loc) · 11 KB

20. 人在回路(Human-in-the-loop, HITL)

原文链接: https://docs.langchain.com/oss/python/langchain/human-in-the-loop

Human-in-the-Loop(HITL)中间件允许你在智能体的工具调用上加入人工监督。当模型提出的动作可能需要人工复核时(例如写文件、执行 SQL),中间件可以暂停执行并等待决策。

它的工作方式是:

  • 对每次工具调用,根据可配置的策略进行检查;
  • 如果需要干预,则发出一次“中断(interrupt)”并暂停执行;
  • 使用 LangGraph 的持久化层保存图状态,确保执行可以安全暂停并在之后恢复;
  • 人工给出决策后,再决定下一步是:
    • 原样执行(approve),
    • 修改后执行(edit),
    • 或拒绝并给出反馈(reject)。

中断决策类型(Interrupt decision types)

中间件内置三种决策类型来响应一次中断:

决策类型 描述 示例用法
✅ approve 按原样批准该动作并直接执行 将邮件草稿原样发送
✏️ edit 对工具调用进行修改后再执行 修改收件人后再发送邮件
❌ reject 拒绝执行该工具调用,并在对话中附加解释性反馈 拒绝邮件草稿,并说明应该如何重写

每个工具可允许的决策类型由 interrupt_on 配置决定。当多个工具调用同时被暂停时,每个动作都需要单独决策,且决策顺序必须与中断请求中的动作顺序一致。

编辑(edit) 工具参数时,建议尽量“保守修改”。如果修改过大,模型可能会重新评估整个方案,从而多次执行工具或采取意外操作。


配置中断(Configuring interrupts)

在创建智能体时,将 HITL 中间件加入 middleware 列表即可。通过 interrupt_on 指定哪些工具需要人工干预,以及允许的决策类型。

from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver


agent = create_agent(
    model="gpt-4o",
    tools=[write_file_tool, execute_sql_tool, read_data_tool],
    middleware=[
        HumanInTheLoopMiddleware(
            interrupt_on={
                "write_file": True,  # 允许 approve / edit / reject
                "execute_sql": {"allowed_decisions": ["approve", "reject"]},  # 不允许编辑
                # 安全操作,无需审批
                "read_data": False,
            },
            # 中断消息前缀,会与工具名和参数组合成完整描述
            # 例如:"Tool execution pending approval: execute_sql with query='DELETE FROM...'"
            # 单个工具可以通过在配置中设置 "description" 覆盖默认描述
            description_prefix="Tool execution pending approval",
        ),
    ],
    # 人在回路依赖检查点来处理中断。
    # 生产环境推荐使用持久化 checkpointer(如 AsyncPostgresSaver),
    # 这里示例使用内存版本 InMemorySaver。
    checkpointer=InMemorySaver(),
)

你必须配置一个 checkpointer 才能在中断间持久化图状态。生产环境应使用持久化 checkpointer(例如 AsyncPostgresSaver),而在测试或原型阶段可以使用 InMemorySaver

调用智能体时,需要在 config 中提供 thread_id,用来将执行与某个对话线程绑定。更多细节参见 LangGraph interrupts 文档。

配置选项:

  • interrupt_on(必填)

    • 工具名到审批配置的映射。
    • 值可以是:
      • True:使用默认配置中断;
      • False:自动批准;
      • InterruptOnConfig 对象:精细化配置。
  • description_prefix(字符串,默认 "Tool execution requires approval"

    • 用作中断请求描述的前缀。

InterruptOnConfig 选项:

  • allowed_decisions(字符串列表)

    • 允许的决策类型:"approve""edit""reject"
  • description(字符串或可调用)

    • 静态描述或返回描述的函数,用于自定义某个工具的中断说明。

响应中断(Responding to interrupts)

调用智能体时,它会运行直到:

  • 正常完成;
  • 或触发中断。

当某个工具调用匹配 interrupt_on 策略时,本次调用结果中会包含一个 __interrupt__ 字段,描述所有待审核的动作。你可以:

  • 将这些动作展示给审核人;
  • 根据人工决策,再调用智能体恢复执行。
from langgraph.types import Command

# 人在回路依赖 LangGraph 的持久化层。
# 必须提供 thread_id,将执行与某个对话线程关联,
# 以便在人工审核时可以暂停 / 恢复。
config = {"configurable": {"thread_id": "some_id"}}

# 运行图直到遇到中断
result = agent.invoke(
    {
        "messages": [
            {
                "role": "user",
                "content": "Delete old records from the database",
            }
        ]
    },
    config=config,
)

# 中断对象包含完整的 HITL 请求,包括 action_requests 和 review_configs
print(result["__interrupt__"])
# > [
# >    Interrupt(
# >       value={
# >          'action_requests': [
# >             {
# >                'name': 'execute_sql',
# >                'arguments': {'query': "DELETE FROM records WHERE created_at < NOW() - INTERVAL '30 days';"},
# >                'description': 'Tool execution pending approval\n\nTool: execute_sql\nArgs: {...}'
# >             }
# >          ],
# >          'review_configs': [
# >             {
# >                'action_name': 'execute_sql',
# >                'allowed_decisions': ['approve', 'reject']
# >             }
# >          ]
# >       }
# >    )
# > ]


# 通过审批决策恢复执行
agent.invoke(
    Command(
        resume={"decisions": [{"type": "approve"}]},  # 或 "reject"
    ),
    config=config,  # 使用相同 thread_id 恢复暂停的对话
)

决策类型(Decision types)

  • approve
  • ✏️ edit
  • reject

approve:直接批准

使用 approve 按原样批准工具调用并执行:

agent.invoke(
    Command(
        # 决策以列表形式提供,每个待审核动作对应一个决策。
        # 决策顺序必须与 `__interrupt__` 请求中的 action 顺序一致。
        resume={
            "decisions": [
                {
                    "type": "approve",
                }
            ]
        },
    ),
    config=config,  # 使用相同 thread_id 恢复
)

edit:修改后执行

使用 edit 在执行前修改工具调用。需要在 edited_action 字段中提供新的工具名与参数:

agent.invoke(
    Command(
        resume={
            "decisions": [
                {
                    "type": "edit",
                    # 修改后的动作(工具名与参数)
                    "edited_action": {
                        # 要调用的工具名(通常与原动作一致)
                        "name": "new_tool_name",
                        # 传递给工具的新参数
                        "args": {"key1": "new_value", "key2": "original_value"},
                    },
                }
            ]
        },
    ),
    config=config,
)

再次提醒:编辑参数时务必保守,过大改动可能让模型重新规划,从而多次调用工具或产生不可预期行为。

reject:拒绝并提供反馈

使用 reject 拒绝工具调用,并给出一段解释:

agent.invoke(
    Command(
        resume={
            "decisions": [
                {
                    "type": "reject",
                    # 说明为什么拒绝,以及应当如何改写
                    "message": "No, this is wrong because ..., instead do this ...",
                }
            ]
        },
    ),
    config=config,
)

message 内容会作为反馈加入对话,帮助智能体理解该动作为何被拒绝,以及之后应如何调整行为。

多个决策(Multiple decisions)

当一次中断中包含多个待审核动作时,需要按顺序为每个动作提供决策:

{
  "decisions": [
    {"type": "approve"},
    {
      "type": "edit",
      "edited_action": {
        "name": "tool_name",
        "args": {"param": "new_value"}
      }
    },
    {
      "type": "reject",
      "message": "This action is not allowed"
    }
  ]
}

与流式处理结合使用(Streaming with human-in-the-loop)

你可以使用 stream() 替代 invoke(),在智能体运行和处理中断的同时获得实时更新。通过 stream_mode=["updates", "messages"] 同时流式传输:

  • 智能体进度(updates)
  • LLM 令牌(messages)
from langgraph.types import Command

config = {"configurable": {"thread_id": "some_id"}}

# 流式输出智能体进度与 LLM 令牌,直到遇到中断
for mode, chunk in agent.stream(
    {"messages": [{"role": "user", "content": "Delete old records from the database"}]},
    config=config,
    stream_mode=["updates", "messages"],
):
    if mode == "messages":
        # LLM token
        token, metadata = chunk
        if token.content:
            print(token.content, end="", flush=True)
    elif mode == "updates":
        # 检查是否有中断
        if "__interrupt__" in chunk:
            print(f"\n\nInterrupt: {chunk['__interrupt__']}")

# 在人工决策后继续以流式方式恢复执行
for mode, chunk in agent.stream(
    Command(resume={"decisions": [{"type": "approve"}]}),
    config=config,
    stream_mode=["updates", "messages"],
):
    if mode == "messages":
        token, metadata = chunk
        if token.content:
            print(token.content, end="", flush=True)

更多关于流式模式的细节,参见 Streaming 指南。


执行生命周期(Execution lifecycle)

HITL 中间件在模型生成响应后、工具调用执行前,通过一个 after_model 钩子介入:

  1. 智能体调用模型生成响应;
  2. 中间件检查响应中的工具调用;
  3. 对于需要人工输入的工具调用,中间件构造 HITLRequest(包含 action_requestsreview_configs),并触发中断;
  4. 智能体等待人工决策;
  5. 根据 HITLResponse 的决策:
    • 执行被批准或编辑后的工具调用;
    • 为被拒绝的调用合成对应的 ToolMessage
    • 然后继续恢复执行流程。

自定义 HITL 逻辑(Custom HITL logic)

对于更复杂、定制化的工作流,你可以直接使用“中断原语(interrupt primitive)”和中间件抽象构建自己的 HITL 逻辑。

建议先理解上文的执行生命周期,再根据业务需求在:

  • 模型前 / 后;
  • 工具前 / 后;
  • 或更高层级的控制流

中植入你自己的审核与决策流程。


本文档由 LangChain 官方文档翻译而来