Refactor RL disagg producer flow with deeper manager-side abstraction#1769
Open
jayhenry wants to merge 11 commits intoInternLM:rl_designfrom
Open
Refactor RL disagg producer flow with deeper manager-side abstraction#1769jayhenry wants to merge 11 commits intoInternLM:rl_designfrom
jayhenry wants to merge 11 commits intoInternLM:rl_designfrom
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
动机
这次重构的核心目标是把分散在
AgentLoopManager里的隐藏信息收进更合适的局部抽象里,让 manager 只负责流程编排。主要隐藏的信息包括:
ProduceProgress隐藏 producer / consumer 的累计进度口径,包括 target、consumed、future step 的推进规则。ReplayBuffer隐藏 batch 是否 ready、如何取 batch、如何统计状态、如何刷新 staleness 等 buffer 操作细节。ProduceContext隐藏 strategy 运行时需要的 task 上下文,包括 sampler、replay buffer、progress、abort 信号、model step、生成结果入库顺序。_PendingTasks隐藏 async pending rollout task 的并发协议,包括 schedule、wait、claim、cancel,避免 manager 直接读取 strategy 私有状态。这样
AgentLoopManager不再需要理解这些底层操作细节,只保留 colocate / disagg 的生产、消费、暂停、恢复等流程控制。主要改动
ProduceStrategy.produce_batch(ctx)/pause_produce(ctx)参数为ProduceContext。ProduceProgress,统一维护生产和消费进度。ReplayBuffer,承接通用 batch 和 staleness 操作。_PendingTasks,封装异步 pending task 集合操作。AgentLoopManager.save()通过ProduceStrategy.pending_task_count()检查 pending task,不再访问策略私有字段。_produce_batch_to_buffer,显式传入task_batch_sizes,并统一 single/multi task 生产路径。_get_batch_from_buffer的日志和 result 构造逻辑。produce_loop的状态等待分支。