原文链接: https://docs.langchain.com/oss/python/langchain/multi-agent/custom-workflow
在 Custom workflow 架构中,你可以使用 LangGraph 定义“完全自定义”的执行流程:
- 你对图结构拥有完整控制权;
- 可以自由组合:顺序步骤、条件分支、循环、并行执行等;
- 可以在节点中混合“确定性逻辑”和“智能体行为”。
- 完全掌控图结构(Complete control over graph structure);
- 混合确定性逻辑与智能体逻辑(Mix deterministic logic with agentic behavior);
- 支持顺序 / 分支 / 循环 / 并行(Sequential steps, conditional branches, loops, parallel execution);
- 可以将其他多智能体模式(subagents、router 等)嵌入为工作流中的节点。
在以下情况适合使用自定义工作流:
- 标准模式(Subagents、Skills 等)无法很好满足需求;
- 需要在一个流程中混合“强约束的确定性步骤”和“灵活的智能体推理”;
- 需要复杂路由或多阶段处理(multi-stage processing)。
在自定义工作流中:
- 每个节点可以是:
- 普通 Python 函数;
- 一次 LLM 调用;
- 一个带工具的完整智能体;
- 你甚至可以将“整个多智能体系统”视作一个节点嵌入到更大工作流中。
官方教程中,“多源知识库 + 路由”(multi-source KB with routing)就是一个典型的自定义工作流示例。
核心思想:
- 你可以在任意 LangGraph 节点内部直接调用一个 LangChain 智能体;
- 这样既能享受自定义工作流带来的灵活性,又能复用已有的智能体与工具。
from langchain.agents import create_agent
from langgraph.graph import StateGraph, START, END
agent = create_agent(model="openai:gpt-4o", tools=[...])
def agent_node(state: State) -> dict:
"""在 LangGraph 节点中调用 LangChain 智能体。"""
result = agent.invoke({
"messages": [{"role": "user", "content": state["query"]}],
})
return {"answer": result["messages"][-1].content}
# 构建一个简单的工作流
workflow = (
StateGraph(State)
.add_node("agent", agent_node)
.add_edge(START, "agent")
.add_edge("agent", END)
.compile()
)一个常见用例是把“检索(Retrieval)”与智能体结合起来。下面的示例构建了一个 WNBA 数据助手:
- 从知识库中检索数据;
- 还可以通过工具获取最新新闻;
- 使用一个多步骤的 LangGraph 工作流串联整条 RAG 管线。
自定义 RAG 工作流包含三类节点:
- Model 节点(Rewrite):
- 使用结构化输出重写用户问题,使其更适合检索;
- 确定性节点(Retrieve):
- 执行向量相似度检索,不涉及 LLM;
- Agent 节点(Agent):
- 在检索到的上下文基础上进行推理;
- 可通过工具(如
get_latest_news)获取额外信息。
通过 LangGraph 的 State,你可以在工作流各步骤间传递结构化信息:
- 每个节点都可以读写
State中的字段; - 方便在不同节点之间共享数据与上下文。
from typing import TypedDict
from pydantic import BaseModel
from langgraph.graph import StateGraph, START, END
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
class State(TypedDict):
question: str
rewritten_query: str
documents: list[str]
answer: str
# WNBA 知识库:包含阵容、比赛结果和球员数据
embeddings = OpenAIEmbeddings()
vector_store = InMemoryVectorStore(embeddings)
vector_store.add_texts([
# 阵容
"New York Liberty 2024 roster: Breanna Stewart, Sabrina Ionescu, Jonquel Jones, Courtney Vandersloot.",
"Las Vegas Aces 2024 roster: A'ja Wilson, Kelsey Plum, Jackie Young, Chelsea Gray.",
"Indiana Fever 2024 roster: Caitlin Clark, Aliyah Boston, Kelsey Mitchell, NaLyssa Smith.",
# 比赛结果
"2024 WNBA Finals: New York Liberty defeated Minnesota Lynx 3-2 to win the championship.",
"June 15, 2024: Indiana Fever 85, Chicago Sky 79. Caitlin Clark had 23 points and 8 assists.",
"August 20, 2024: Las Vegas Aces 92, Phoenix Mercury 84. A'ja Wilson scored 35 points.",
# 球员数据
"A'ja Wilson 2024 season stats: 26.9 PPG, 11.9 RPG, 2.6 BPG. Won MVP award.",
"Caitlin Clark 2024 rookie stats: 19.2 PPG, 8.4 APG, 5.7 RPG. Won Rookie of the Year.",
"Breanna Stewart 2024 stats: 20.4 PPG, 8.5 RPG, 3.5 APG.",
])
retriever = vector_store.as_retriever(search_kwargs={"k": 5})
@tool
def get_latest_news(query: str) -> str:
"""获取最新的 WNBA 新闻和更新。"""
# 这里应调用真实新闻 API
return "Latest: The WNBA announced expanded playoff format for 2025..."
agent = create_agent(
model="openai:gpt-4o",
tools=[get_latest_news],
)
model = ChatOpenAI(model="gpt-4o")
class RewrittenQuery(BaseModel):
query: str
def rewrite_query(state: State) -> dict:
"""重写用户查询,使其更利于检索。"""
system_prompt = """Rewrite this query to retrieve relevant WNBA information.
The knowledge base contains: team rosters, game results with scores, and player statistics (PPG, RPG, APG).
Focus on specific player names, team names, or stat categories mentioned."""
response = model.with_structured_output(RewrittenQuery).invoke([
{"role": "system", "content": system_prompt},
{"role": "user", "content": state["question"]},
])
return {"rewritten_query": response.query}
def retrieve(state: State) -> dict:
"""基于重写后的查询进行检索。"""
docs = retriever.invoke(state["rewritten_query"])
return {"documents": [doc.page_content for doc in docs]}
def call_agent(state: State) -> dict:
"""在检索到的上下文上生成最终回答。"""
context = "\n\n".join(state["documents"])
prompt = f"Context:\n{context}\n\nQuestion: {state['question']}"
response = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
return {"answer": response["messages"][-1].content_blocks}
workflow = (
StateGraph(State)
.add_node("rewrite", rewrite_query)
.add_node("retrieve", retrieve)
.add_node("agent", call_agent)
.add_edge(START, "rewrite")
.add_edge("rewrite", "retrieve")
.add_edge("retrieve", "agent")
.add_edge("agent", END)
.compile()
)
result = workflow.invoke({"question": "Who won the 2024 WNBA Championship?"})
print(result["answer"])本文档由 LangChain 官方文档翻译而来