原文链接: https://docs.langchain.com/oss/python/langchain/streaming/overview
实时流式传输智能体运行的更新
LangChain 实现了一个流式处理系统来展示实时更新。流式处理对于增强基于 LLM 构建的应用程序的响应性至关重要。通过逐步显示输出,甚至在完整响应准备好之前,流式处理显著改善了用户体验(UX),特别是在处理 LLM 的延迟时。
LangChain 的流式处理系统让您可以将智能体运行的实时反馈展示到您的应用程序中。LangChain 流式处理可以实现的功能:
- 流式传输智能体进度 — 在每个智能体步骤后获取状态更新。
- 流式传输 LLM 令牌 — 在生成时流式传输语言模型令牌。
- 流式传输自定义更新 — 发出用户定义的信号(例如,
"已获取 10/100 条记录")。 - 流式传输多种模式 — 从
updates(智能体进度)、messages(LLM 令牌 + 元数据)或custom(任意用户数据)中选择。
请参阅下面的常见模式部分以获取其他端到端示例。
将一个或多个以下流模式作为列表传递给 stream 或 astream 方法:
| 模式 | 描述 |
|---|---|
| updates | 在每个智能体步骤后流式传输状态更新。如果在同一步骤中进行了多个更新(例如,运行了多个节点),这些更新会分别流式传输。 |
| messages | 从调用 LLM 的任何图节点流式传输(令牌,元数据)元组。 |
| custom | 使用流写入器从图节点内部流式传输自定义数据。 |
要流式传输智能体进度,请使用 stream_mode="updates" 的 stream 或 astream 方法。这会在每个智能体步骤后发出一个事件。例如,如果您有一个调用一次工具的智能体,您应该看到以下更新:
- LLM 节点:带有工具调用请求的 AIMessage
- 工具节点:带有执行结果的 ToolMessage
- LLM 节点:最终的 AI 响应
流式传输智能体进度:
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""获取指定城市的天气。"""
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for chunk in agent.stream( # 流式传输智能体运行
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="updates",
):
for step, data in chunk.items():
print(f"step: {step}")
print(f"content: {data['messages'][-1].content_blocks}")输出:
step: model
content: [{'type': 'tool_call', 'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': 'call_OW2NYNsNSKhRZpjW0wm2Aszd'}]
step: tools
content: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]
step: model
content: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]
要在 LLM 生成令牌时流式传输令牌,请使用 stream_mode="messages"。下面您可以看到智能体流式传输工具调用和最终响应的输出。
流式传输 LLM 令牌:
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""获取指定城市的天气。"""
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for token, metadata in agent.stream( # 流式传输令牌和元数据
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="messages",
):
print(f"node: {metadata['langgraph_node']}")
print(f"content: {token.content_blocks}")
print("\n")输出:
node: model
content: [{'type': 'tool_call_chunk', 'id': 'call_vbCyBcP8VuneUzyYlSBZZsVa', 'name': 'get_weather', 'args': '', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '{"', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'city', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '":"', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': 'San', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': ' Francisco', 'index': 0}]
node: model
content: [{'type': 'tool_call_chunk', 'id': None, 'name': None, 'args': '"}', 'index': 0}]
node: model
content: []
node: tools
content: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]
node: model
content: []
node: model
content: [{'type': 'text', 'text': 'Here'}]
node: model
content: [{'type': 'text', 'text': ''s'}]
node: model
content: [{'type': 'text', 'text': ' what'}]
node: model
content: [{'type': 'text', 'text': ' I'}]
node: model
content: [{'type': 'text', 'text': ' got'}]
node: model
content: [{'type': 'text', 'text': ':'}]
node: model
content: [{'type': 'text', 'text': ' "'}]
node: model
content: [{'type': 'text', 'text': "It's"}]
node: model
content: [{'type': 'text', 'text': ' always'}]
node: model
content: [{'type': 'text', 'text': ' sunny'}]
node: model
content: [{'type': 'text', 'text': ' in'}]
node: model
content: [{'type': 'text', 'text': ' San'}]
node: model
content: [{'type': 'text', 'text': ' Francisco'}]
node: model
content: [{'type': 'text', 'text': '!"\n\n'}]
要在工具执行时流式传输更新,您可以使用 get_stream_writer。
流式传输自定义更新:
from langchain.agents import create_agent
from langchain.tools import tool, ToolRuntime, get_stream_writer
@tool
def process_data(data: str, runtime: ToolRuntime) -> str:
"""处理数据并流式传输进度更新。"""
writer = get_stream_writer(runtime)
# 流式传输进度更新
writer("开始处理数据...")
writer("已处理 50% 的数据...")
writer("处理完成!")
return f"处理后的数据: {data}"
agent = create_agent(
model="gpt-5-nano",
tools=[process_data],
)
for update in agent.stream(
{"messages": [{"role": "user", "content": "处理这些数据: example"}]},
stream_mode="custom",
):
print(update)您可以同时流式传输多种模式。只需将多个模式作为列表传递:
for stream_type, stream_mode, data in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode=["updates", "messages", "custom"], # 同时流式传输多种模式
):
if stream_mode == "updates":
# 处理智能体进度更新
print(f"更新: {data}")
elif stream_mode == "messages":
# 处理 LLM 令牌
token, metadata = data
print(f"令牌: {token.content}")
elif stream_mode == "custom":
# 处理自定义更新
print(f"自定义: {data}")当智能体调用工具时,您可以在工具执行期间流式传输进度更新:
from langchain.agents import create_agent
from langchain.tools import tool, ToolRuntime
@tool
def long_running_task(task_name: str, runtime: ToolRuntime) -> str:
"""执行长时间运行的任务并流式传输进度。"""
from langchain.tools import get_stream_writer
writer = get_stream_writer(runtime)
writer(f"开始执行任务: {task_name}")
# 模拟长时间运行的任务
import time
for i in range(5):
time.sleep(0.5)
writer(f"进度: {i+1}/5")
writer(f"任务 {task_name} 完成!")
return f"任务 {task_name} 已完成"
agent = create_agent(
model="gpt-5-nano",
tools=[long_running_task],
)
for update in agent.stream(
{"messages": [{"role": "user", "content": "执行任务: 数据分析"}]},
stream_mode="custom",
):
print(update)在流式传输过程中,您可能想要访问已完成的消息。您可以使用 stream_mode="updates" 来获取每个步骤的完整消息:
from langchain.agents import create_agent
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="updates",
):
for step, data in chunk.items():
if step == "model":
last_message = data["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
print(f"工具调用: {last_message.tool_calls}")
elif step == "tools":
last_message = data["messages"][-1]
print(f"工具响应: {last_message.content}")当使用人在回路功能时,您仍然可以流式传输智能体进度。这允许您在等待人工输入时显示实时更新:
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
checkpointer=InMemorySaver(),
)
config = {"configurable": {"thread_id": "1"}}
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
config=config,
stream_mode="updates",
):
for step, data in chunk.items():
print(f"步骤: {step}")
# 处理更新...当智能体内部有多个 LLM 时,通常需要消除消息来源的歧义。为此,在创建智能体时为每个智能体传递一个名称。当以 "messages" 模式流式传输时,此名称可通过元数据中的 lc_agent_name 键获得。下面,我们更新流式传输工具调用示例:
- 我们将工具替换为在内部调用智能体的
call_weather_agent工具 - 我们为每个智能体添加一个
name - 我们在创建流时指定
subgraphs=True - 我们的流处理与之前相同,但添加了使用
create_agent的name参数跟踪哪个智能体处于活动状态的逻辑
当您在智能体上设置 name 时,该名称也会附加到该智能体生成的任何 AIMessage。
首先我们构建智能体:
from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.messages import AIMessage, AnyMessage
def get_weather(city: str) -> str:
"""获取指定城市的天气。"""
return f"It's always sunny in {city}!"
weather_model = init_chat_model("openai:gpt-5.2")
weather_agent = create_agent(
model=weather_model,
tools=[get_weather],
name="weather_agent", # 为天气智能体设置名称
)
def call_weather_agent(query: str) -> str:
"""查询天气智能体。"""
result = weather_agent.invoke({
"messages": [{"role": "user", "content": query}]
})
return result["messages"][-1].text
supervisor_model = init_chat_model("openai:gpt-5.2")
agent = create_agent(
model=supervisor_model,
tools=[call_weather_agent],
name="supervisor", # 为监督智能体设置名称
)接下来,我们向流式传输循环添加逻辑以报告哪个智能体正在发出令牌:
def _render_message_chunk(token: AIMessageChunk) -> None:
"""渲染消息块。"""
if token.text:
print(token.text, end="|")
if token.tool_call_chunks:
print(token.tool_call_chunks)
def _render_completed_message(message: AnyMessage) -> None:
"""渲染已完成的消息。"""
if isinstance(message, AIMessage) and message.tool_calls:
print(f"工具调用: {message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"工具响应: {message.content_blocks}")
input_message = {"role": "user", "content": "What is the weather in Boston?"}
current_agent = None
for _, stream_mode, data in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates"],
subgraphs=True, # 启用子图流式传输
):
if stream_mode == "messages":
token, metadata = data
if agent_name := metadata.get("lc_agent_name"): # 获取智能体名称
if agent_name != current_agent: # 如果智能体切换了
print(f"🤖 {agent_name}: ") # 打印智能体名称
current_agent = agent_name # 更新当前智能体
if isinstance(token, AIMessage):
_render_message_chunk(token)
if stream_mode == "updates":
for source, update in data.items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])输出:
🤖 supervisor:
[{'name': 'call_weather_agent', 'args': '', 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'query', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'Boston', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' weather', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' right', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' now', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' and', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': " today's", 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': ' forecast', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
Tool calls: [{'name': 'call_weather_agent', 'args': {'query': "Boston weather right now and today's forecast"}, 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'type': 'tool_call'}]
🤖 weather_agent:
[{'name': 'get_weather', 'args': '', 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'city', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': 'Boston', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
[{'name': None, 'args': '"}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
Tool calls: [{'name': 'get_weather', 'args': {'city': 'Boston'}, 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'type': 'tool_call'}]
Tool response: [{'type': 'text', 'text': "It's always sunny in Boston!"}]
Boston| weather| right| now|:| **|Sunny|**|.
|Today|'s| forecast| for| Boston|:| **|Sunny| all| day|**|.|Tool response: [{'type': 'text', 'text': 'Boston weather right now: **Sunny**.\n\nToday's forecast for Boston: **Sunny all day**.'}]
🤖 supervisor:
Boston| weather| right| now|:| **|Sunny|**|.
|Today|'s| forecast| for| Boston|:| **|Sunny| all| day|**|.|
在某些应用程序中,您可能需要禁用给定模型的单个令牌流式传输。这在以下情况下很有用:
- 使用多智能体系统来控制哪些智能体流式传输其输出
- 混合支持流式传输和不支持流式传输的模型
- 部署到 LangSmith 并希望防止某些模型输出被流式传输到客户端
在初始化模型时设置 streaming=False。
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-4o",
streaming=False # 禁用流式传输
)当部署到 LangSmith 时,在您不想流式传输到客户端的任何模型上设置 streaming=False。这在部署之前在您的图代码中配置。
并非所有聊天模型集成都支持 streaming 参数。如果您的模型不支持它,请改用 disable_streaming=True。此参数通过基类在所有聊天模型上可用。
有关更多详细信息,请参阅 LangGraph 流式传输指南。
- 前端流式传输 — 使用
useStream构建 React UI 以实现实时智能体交互 - 使用聊天模型流式传输 — 直接从聊天模型流式传输令牌,无需使用智能体或图
- 使用人在回路流式传输 — 在处理人工审查中断时流式传输智能体进度
- LangGraph 流式传输 — 高级流式传输选项,包括
values、debug模式和子图流式传输
本文档由 LangChain 官方文档翻译而来