简体中文 | English
Dataflow-LoopAI 是一个具备自优化能力的智能系统,能够自动检测并评估特定领域大语言模型的生成缺陷。它通过对话式主动数据获取与自驱优化机制,实现数据与模型的持续协同演进。
用户 ⇄ 管理器(控制逻辑) ⇄ LangGraph(状态机)
│
├── 普通问答:直接返回
└── 复杂任务:进入图(评估 → 挖掘 → 训练)下面是经过整理与美化的项目目录结构,展示了主要模块的职责:
Dataflow-LoopAI/
├── api/ # WebUI 后端,FastAPI 服务与静态前端 dist 托管
│ ├── app/controllers/ # config / task / resource / starter 等 API 路由
│ ├── app/utils/ # Starter 进程、资源预览、硬件监控等后端工具
│ ├── db/ # SQLite 数据库目录
│ └── dist/ # 发布版前端产物,生产环境由 FastAPI 直接托管
│
├── examples/ # 示例脚本与运行用例
│ └── scripts/ # 启动、测试等脚本
│
├── loopai/ # 项目核心目录
│ ├── agents/ # 各类智能 Agent(每个 Agent 是一个子状态机)
│ │ ├── BaseAgent/ # 基础 Agent 定义
│ │ ├── Starter/ # 主入口 Agent
│ │ ├── Analyzer/ # 模型评估/挖掘 Agent
│ │ ├── Obtainer/ # 数据获取 Agent
│ │ └── ... # 其他自定义 Agent
│ │
│ ├── common/ # 全局工具
│ │ ├── prompts/ # 通用 Prompt 模板
│ │ └── ... # 其它通用组件
│ │
│ ├── memory/ # 持久化存储(当前使用简单存储,未来可扩展数据库)
│ │
│ ├── states/ # 状态定义 & 事件定义
│ │
│ ├── utils/ # 通用工具类与辅助代码
│ │
│ └── ... # 其它框架相关内容
│
├── scripts/ # 发布、下载等项目脚本
│ ├── download_ui_release.py # 下载 GitHub 发布页中的前端 dist 到 api/dist
│ └── release_ui.sh # 打 UI 标签并触发 GitHub Actions 发布
│
├── ui/ # Vue 3 + Vite 前端源码
│ └── src/ # 页面、组件、路由、API 调用封装
│
└── docs/ # 文档与资源
└── assets/ # 图片与素材
🤖 已实现的核心 Agent
目前 Dataflow-LoopAI 已实现以下核心 Agent,每个 Agent 均作为一个 可独立运行、可组合调度的子图(subgraph):
作为系统的 总调度器,负责:
- 与用户对话
- 解析任务意图
- 自动选择并调用其他 Agent
- 管理任务的整体执行流程
用于自动评测待测试模型,主要功能包括:
- 自动生成代码(调用 LLM)
- 提交到 OJ(在线判题)系统执行
- 收集运行结果与评测数据
基于 JudgerAgent 的评测结果,负责:
- 统计与分析模型表现
- 挖掘错误类别与模式
- 生成可读性强的分析报告
作为系统的交互式配置专家,负责:
- 与用户对话修改配置信息
- 缺失信息反馈和修改再校验(待实现)
- 继续执行中断节点(待实现)
作为系统的数据获取单元,负责:
- 将用户的需求进行分析并调研
- 收集相关数据集信息
- 收集相关网页数据信息(待实现)
- 整理各种格式的数据至可以直接用于训练的格式
pip install -e .-
将
examples/config/starter.yaml复制到./starter.yaml,并修改其中的system配置参数。 -
运行
run_starter.py脚本启动 LoopAI。
python examples/scripts/run_starter.py生产环境推荐直接下载已发布的前端 dist:
python scripts/download_ui_release.py本节只用于需要修改或调试 ui/ 源码的开发场景。
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.7/install.sh | bashsource ~/.bashrc # 或 ~/.zshrcnvm install 20
nvm use 20
nvm alias default 20node -v
npm -vcorepack enable
corepack prepare yarn@stable --activatecd ui
yarn如果后端没有运行在 127.0.0.1:8855,请编辑 ui/vite.config.js:
server: {
host: '0.0.0.0',
proxy: {
'/api': {
target: 'http://<host>:8855/',
changeOrigin: true,
rewrite: path => path.replace(/^\/api/, '')
}
}
}yarn devVite 开发服务器会将 /api/* 代理到 FastAPI 后端;生产环境则由 python api/start.py 直接从 api/dist 托管静态前端。
前端改动准备好后,在仓库根目录运行发布脚本:
bash scripts/release_ui.sh [可选版本号]如果省略版本号,脚本会交互式要求输入版本。它会更新 ui/package.json,创建 ui-v<version> 标签,并推送分支和标签;随后 GitHub Action 会构建并发布前端 dist 产物。
conda activate vllm
bash examples/scripts/run_manager_vllm.sh修改脚本中的模型路径与配置:
{
'eval_model_path': '/home/lpc/models/glm-4-9b-chat/',
'eval_base_url': 'http://127.0.0.1:8911/v1',
'eval_api_key': api_key,
'eval_test_case_path': '/home/lpc/repos/Dataflow-LoopAI/output/test.json',
'eval_problem_path': '/home/lpc/repos/Dataflow-LoopAI/data/human-eval-v2-20210705.jsonl',
'eval_result_path': '/home/lpc/repos/Dataflow-LoopAI/output/result.json',
}运行:
python examples/scripts/run_judger.py-
模型配置:将
examples/config/starter.yaml复制到./starter.yaml,并修改 Obtainer 相关模型路径与参数。- 注意:RAG 当前仅支持基于 API 的嵌入模型。
-
必要 API 密钥:
bash examples/scripts/run_obtainer.sh在 Dataflow-LoopAI 中,每个 Agent 实质上是一个子图(subgraph),由多个节点函数与边逻辑构成,并会被自动整合到 StarterAgent 中进行协调调度。
所有自定义 Agent 需继承 BaseAgent,其提供:
- 基础事件记录机制(
agent_event) - 可选通用 LLM 对话节点构建方法:
create_llm_node - 标准的图初始化入口:
init_graph - 统一的调用协议:
__call__
def init_graph(self, **kwargs):
builder = StateGraph(LoopAIState)
...
self.graph = builder.compile(
checkpointer=self.checkpointer,
store=self.store,
**kwargs
)- 子图模式:
self.init_graph(**kwargs)
return self.graph- StarterAgent 中的流式调用:
for res in self.graph.stream(
Command(resume=input),
subgraphs=True,
stream_mode=["updates", "messages"],
**invoke_args
):
yield res为了保持项目可维护性与一致性,推荐遵循以下规范:
- 命名规范
- Agent 类名:大写开头驼峰(如
AnalyzerAgent) - 文件夹:大写开头驼峰
- Python 文件:小写 + 下划线(如
eval_model.py)
- 代码组织结构
当节点逻辑较复杂时,推荐:
-
✅ 节点放入
nodes/子文件夹: 示例:loopai/agents/Analyzer/nodes/eval_model.py -
✅ 工具函数放入
utils/: 示例:loopai/agents/Analyzer/utils/llmaj.py -
✅ LLM 工具调用放入
tools/: 示例:loopai/agents/Starter/tools/check_motivation.py -
✅ Agent 本体保持“薄”,节点逻辑不要堆积在类中,便于维护。
为了保证整个系统中 Prompt 的统一性与可维护性,Dataflow-LoopAI 提供了 全局 Prompt 模板管理机制。所有通用 Prompt 均在 common/prompts/ 中定义,并通过统一的加载器进行管理。
默认全局 Prompt 模板文件位于:
loopai/common/prompts/
由以下工具类负责加载:
prompt_loader.py
在 BaseAgent 中统一初始化:
self.prompt_loader = PromptLoader(prompt_template_dir)你可以通过修改 prompt_template_dir 来指定不同的 Prompt 扫描路径,实现自定义扩展。
由于我们重写了ReAct节点, 我们观察到, 尽管Sub-Agent采用不同LLM_Node时可能限定定义了不同的工具, 但是Sub-Agent仍然可能受到上下文影响调用本不属于它可使用的工具。因此, 如果你需要自定义工具, 切记返回的为对象dict, 避免造成StarterAgent及其它Sub-Agent在校验时无法解析结果而报错。
每个继承 BaseAgent 的自定义 Agent,需要通过以下两个抽象属性指定自身的系统 Prompt:
@property
@abstractmethod
def system_prompt_type(self) -> str:
"""系统 Prompt 类型"""
return "system"
@property
@abstractmethod
def system_prompt_name(self) -> str:
"""系统 Prompt 名称"""
pass-
用于指定 Prompt 的角色类型,如:
"system""user""assistant"
-
其对应的文件存储格式为:
<prompt_type>_prompt.json
例如:
system_prompt.json
user_prompt.json
assistant_prompt.json
- 用于指定具体要加载的 Prompt 名称,例如
"default_prompt"。 - 加载方式为在对应的
<prompt_type>_prompt.json文件中查找同名字段:
{
"default_prompt": "..."
}系统将自动从对应 JSON 中读取该模板,作为 Agent 的系统 Prompt 注入运行流程。
BaseAgent 内置了 AgentEvent,用于完整追踪 Agent 的执行过程。
- 当前事件类型(
stream_mode) - 当前执行节点(
node) - 状态更新(
state) - 消息流(
stream_message) - 执行路径(
node_path) - 自定义事件字典(
custom_info)
虽然每个 Agent 理论上都可以维护自己的 AgentEvent,但在 LoopAI 中,为了统一管理,我们只使用 StarterAgent 中的 AgentEvent。
- ✅
update—— 节点执行结束后的状态更新事件 - ✅
message—— LLM 或节点返回的消息 - ✅
custom—— 用户自定义事件
基于这些事件类型,LoopAI 将监测事件分为两类:
包括 update 与 message:
update:仅在节点执行完成后触发message:捕获基于 ChatOpenAI 的消息,并支持流式返回
- 缺乏 实时性(无法在节点执行过程中触发)
- 只能记录存放在
LoopAIState中的字段变化
为提升实时性与灵活性,我们引入了自定义事件机制:
- 可在节点执行过程中实时触发
- 可记录无需写入
LoopAIState的临时信息 - 支持更灵活的业务扩展和状态监控
在子图中,有些参数无需保存到 LoopAIState,但仍需监测。这类信息可以通过 自定义事件(get_stream_writer)进行流式返回。
LoopAI 使用 StreamEvent 规范自定义事件的格式。所有自定义事件都会被记录进 AgentEvent 的 custom_info 字段,并最终展示在可视化工具中。
| 字段 | 含义 | 可选 |
|---|---|---|
current |
当前节点名称 | 必填 |
progress |
进度百分比 | 可选 |
progress_num |
当前进度数值 | 可选 |
total |
总进度 | 可选 |
message |
输出文本消息 | 可选 |
data |
任意自定义数据 | 可选 |
假设我们希望在 AnalyzerAgent 的 eval_model 节点中,实时监测 configer_error 的变化。
示例代码:
from langgraph.config import get_stream_writer
from loopai.schema.events import StreamEvent
from loopai.schema.states import LoopAIState
from loopai.agents import BaseAgent
writer = get_stream_writer()
@BaseAgent.set_current
def node(state: LoopAIState):
writer(StreamEvent(
current=state['current'],
data={'configer_error': state['configer_error']}
).json())@BaseAgent.set_current 会在执行前设置当前节点名称,这样事件中便能记录正确的 current 字段。
StarterAgent 接收到该自定义事件后,会将其写入:
AgentEvent.custom_info[state['current']]
其中 value 为数组,每触发一次 writer,就会追加一个事件对象。
如果节点检测到必要参数缺失,可以触发异常处理流程:
required_fields = {
"analyzer": [
"analyze_model_path", "analyze_base_url", "analyze_api_key", "analyze_temperature", "analyze_top_p",
"output_brief", "analyze_task_type", "analyze_sampling_top_k", "output_suggestion", "analyze_batch_size"
],
"judger": ["eval_result_path"],
"default": ["output_dir"]
} # 在这里可以定义当前Agent下需要的参数, 并在下面的代码中进行检查
missing_fields = {}
for key in required_fields:
for field in required_fields[key]:
if key == 'default':
if field not in state:
missing_fields.setdefault(key, []).append(field)
else:
if field not in state.get(key, {}):
missing_fields.setdefault(key, []).append(field)
if missing_fields:
state['exception'] = 'ConfigerError'
state['next_to'] = 'config_node'
state['automated_query'] = self.prompt_loader("automated_query", "analyzer_missing_fields_prompt") # 使用相应的自动查询Prompt, 以便在跳出ConfigerAgent后向Agent进行通报
state.setdefault('configer', {})['configer_error'] = missing_fields
goto_node = runtime.context['exception_navigate'] # 在context中定义了StarterAgent中异常处理要跳转来进行处理的节点(默认为route_node)
logger.info(f'found missing fields, goto {goto_node}')
return Command(
update=state,
goto=goto_node,
graph=Command.PARENT
)如果需要在子图中统一定义, 建议统一命名为check_required_fields, 为了获取当前Agent下需要的参数, 可以定义嵌套函数来返回该节点:
def get_check_required_fields_node(self):
@BaseAgent.set_current
def check_required_fields(state: LoopAIState, runtime: Runtime[RuntimeContext]):
...
return check_required_fields字段说明:
exception:异常类型next_to:需要跳转到的异常处理节点,如config_nodeautomated_query:自动生成的查询,用于在完成参数不全后提示 StarterAgent 用户已经补全了哪些参数, 并自动向Agent进行通报configer.configer_error:传递给 ConfigerAgent 的缺失参数字段goto_node:异常处理跳转节点,如外部配置的exception_navigate
如果异常无法通过 Configer 修复,但仍希望继续流程,可以将 next_to 设置为 query_node,并定义相应的 automated_query。StarterAgent 会根据该提示引导用户进行必要的手动操作。