Skip to content

[RFC] 关于将pipeline重构为workflow的草案 #1948

@Raven95676

Description

@Raven95676

非常好的更新,看了一下 #1821 的 issue,思考了一下,目前还有很多其他的功能都需要这种会话级别的隔离方式(比如主动回复、MCP、function tools)。所以我在想是否还可以考虑从更加顶层更加通用的的视角去实现这个功能?@advent259141 @Raven95676 @anka-afk @IGCrystal

Originally posted by @Soulter in #1803 (comment)


为提升系统灵活性和实现更精细的会话隔离,现提出将现有pipeline架构重构为workflow模式的草案(如草案通过,预计最早在4.0版本推出预览)。新架构将为用户提供更高的自定义自由度,同时在底层实现更好的会话管理机制。

新架构

graph TD
    subgraph "入口"
        A_Event(AstrMessageEvent) -- "被取出" --> EventProcessor["唤起检查(合并whitelist/waking/rate_limit阶段)"]
        EventProcessor --> ContinueDecision{继续处理?}
        ContinueDecision -- "否" --> Drop(丢弃事件)
    end

    subgraph "调度器"
        ContinueDecision -- "是" --> Scheduler[调度器]
        Loader
        SessionManager
        Scheduler -- "请求加载工作流定义" --> Loader
        Loader -- "实例化" --> WorkflowData((Workflow 数据类))
        Scheduler -- "调用" --> SessionManager
        SessionManager -- "根据 unified_msg_origin 获取/创建" --> WorkflowSession((WorkflowSession))
        Scheduler -- "创建本次运行实例" --> WorkFlowRun((WorkflowRun))
    end
    
    subgraph "执行"
        Engine[工作流引擎]
        Plugin[内置节点/节点插件]
        Engine -- "读取DAG" --> WorkFlowRun
        Engine -- "更新状态" --> WorkFlowRun
        Engine --> Plugin
        Plugin --> Engine
    end

    subgraph "出口"
        Engine --> OutputDecision{成功执行?}
        OutputDecision -- "是" --> ResultDecorate[装饰结果(原decorate阶段)]
        OutputDecision -- "否" --> ErrorHandler[错误处理]
        ErrorHandler--> ErrorHandleDecision{报错信息发送?}
        ErrorHandleDecision -- "是" -->  Send(发送到平台)
        ErrorHandleDecision -- "否" -->  Stop(停止处理)
        ResultDecorate --> Send
    end

    WorkFlowRun -."引用".->  WorkflowData
    WorkFlowRun -."引用".-> WorkflowSession
    WorkFlowRun --"提交至"--> Engine
Loading

常规非节点插件将在对应功能节点中正常处理。每个Workflow都将明确声明其依赖的节点插件以及可触发的非节点插件。用户可在开始节点配置工作流触发条件,在结束节点设定平台发送方式。LLM工具(包括MCP)将在各个LLM节点独立配置。

计划中Workflow需要支持会话变量。考虑到会话变量修改可能引发的竞争条件问题,计划将Workflow分为两类:有状态的Chatflow需要在执行期间持有会话锁以确保数据一致性,无状态的Workflow则不持有会话锁。

已知问题

  • 此次重构将替代现有的洋葱模型,所有基于该模型的功能模块均需重写,包括会话控制等。插件将不再支持通过yield方式发送消息。
  • Workflow可能包含多个LLM节点,on_llm_requeston_llm_response两个事件钩子暂定对单个工作流内的所有节点生效(另一种方案是只在最后一个LLM节点生效)。会话管理、会话到对话的映射、对话持久化逻辑都需要与工作流状态管理深度集成。
  • 人格信息、Identifier、时间、回复内容等System Prompt的注入逻辑需要整合到Workflow架构中。工具调用逻辑和流式输出方式需要适配新的工作流引擎。
  • 内置命令系统需要重新设计,部分命令(如:/llm)需要移除,同时需要增加适用于工作流的新命令。内置命令的实现方式也可能需要调整。
  • WebUI需重构以适配工作流的新架构与状态视图。

鉴于改动范围广、涉及模块众多,需要在主线外建立专用开发分支。本草案为初步设计方案,欢迎各位贡献者及社区成员共同参与协作修订与细节


UPDATE:

实际落地为chain架构,简化了原有执行模型,并为后续可能的扩展为完整workflow预留了基础

尽可能兼容了现有的插件生态

 graph LR
      subgraph "入口"
          A_Event(AstrMessageEvent) -- "event_queue" --> EventBus[EventBus.dispatch]
          EventBus --> WaitCheck{WaitRegistry 检查}
          WaitCheck -- "有等待中的节点" --> ResumeChain[恢复 Chain 执行]
          WaitCheck -- "无" --> ChainRouter[ChainRouter.route]
          ChainRouter -- "匹配 ChainConfig" --> PipelineExec[PipelineExecutor]
      end

      subgraph "前置处理 (PipelineExecutor)"
          PipelineExec --> EventPreprocessor[EventPreprocessor<br/>管理员检测/会话ID生成]
          EventPreprocessor --> CommandDispatcher[CommandDispatcher.match<br/>过滤器匹配 Star 处理器]
          CommandDispatcher --> WakeDetect[唤醒检测<br/>前缀/At/私聊]
          WakeDetect --> SystemMech[系统机制<br/>RateLimiter/AccessController]
          SystemMech --> CmdExec[CommandDispatcher.execute<br/>执行匹配的 Star 处理器]
      end

    subgraph "Chain 执行引擎"
          CmdExec --> ChainExecutor[ChainExecutor.execute]
          ChainExecutor --> NodeLoop{遍历 chain_config.nodes}
          NodeLoop --> CreateCtx[创建 NodeContext<br/>推入 context_stack]
          CreateCtx --> LoadConfig[加载 AstrBotNodeConfig<br/>节点级持久化配置]
          LoadConfig --> NodeInit[node_initialize<br/>首次初始化]
          NodeInit --> NodeProcess[node.process<br/>返回 NodeResult]
          NodeProcess --> ResultCheck{NodeResult?}
          ResultCheck -- "CONTINUE" --> NextNode[继续下一节点]
          ResultCheck -- "STOP" --> StopChain[停止 Chain]
          ResultCheck -- "WAIT" --> SaveWait[保存到 WaitRegistry<br/>等待下次消息]
          ResultCheck -- "SKIP" --> NextNode
          NextNode --> NodeLoop
      end

      subgraph "出口"
          StopChain --> SendService[SendService.send]
          ChainExecutor -- "所有节点完成" --> SendService
          SendService --> Platform[发送到平台]
          SendService --> AfterHook[OnAfterMessageSentEvent]
      end
Loading

Metadata

Metadata

Labels

No labels
No labels

Type

Projects

Status

Approved

Relationships

None yet

Development

No branches or pull requests

Issue actions