-
Notifications
You must be signed in to change notification settings - Fork 744
[Cherry-Pick][Scheduler] Defer block recycling to accelerate LRU node freeing #7886
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release/2.6
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1363,20 +1363,24 @@ def free_nodes_directly(self, node): | |
| logger.error(f"free_nodes_directly: error: {type(e)} {e}") | ||
| raise e | ||
|
|
||
| def _handle_free_gpu_node_without_cpu(self, node): | ||
| def _handle_free_gpu_node_without_cpu(self, node, defer_recycle=False): | ||
| """ | ||
| GPU node eviction | ||
| """ | ||
| node.cache_status = CacheStatus.CPU | ||
|
|
||
| self.node_id_pool.append(node.node_id) | ||
| if node.node_id in self.node_map: | ||
| del self.node_map[node.node_id] | ||
| logger.info(f"free_block_ids_async: free node {node}") | ||
| logger.info(f"_handle_free_gpu_node_without_cpu: free node {node.node_id}") | ||
|
|
||
| self.recycle_gpu_blocks(node.reverved_dec_block_ids) | ||
| node.reverved_dec_block_ids = [] | ||
| self.recycle_gpu_blocks(node.block_id) | ||
| blocks_to_recycle = list(node.reverved_dec_block_ids) + [node.block_id] | ||
This comment was marked as outdated.
Sorry, something went wrong.
This comment was marked as outdated.
Sorry, something went wrong. |
||
| if not defer_recycle: | ||
| self.recycle_gpu_blocks(blocks_to_recycle) | ||
| logger.info( | ||
| f"_handle_free_gpu_node_without_cpu: recycle blocks for node {node.node_id}, {blocks_to_recycle}" | ||
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ❓ 疑问 若节点对象存在池化复用(node_id 回收后重新分配给新节点),旧的 node.reverved_dec_block_ids = [] |
||
| return [] | ||
| else: | ||
| return blocks_to_recycle | ||
|
|
||
| def _handle_free_gpu_node_with_cpu( | ||
| self, | ||
|
|
@@ -1472,6 +1476,9 @@ def free_block_ids_async(self, need_block_num): | |
| hash_value_flush_info = {} # {input_hash_value: (token_ids, min_depth)} | ||
| total_gpu_free_count = 0 | ||
|
|
||
| # Defer block recycling to avoid busy heap operations in loop | ||
| blocks_deferred_to_recycle = [] | ||
|
|
||
| while True: | ||
| if len(self.gpu_lru_leaf_heap) == 0: | ||
| logger.info("free_block_ids_async: no more gpu leaf node available.") | ||
|
|
@@ -1486,24 +1493,34 @@ def free_block_ids_async(self, need_block_num): | |
| key = node.input_hash_value | ||
| if key not in hash_value_flush_info or node.depth < hash_value_flush_info[key][1]: | ||
| hash_value_flush_info[key] = (node.input_ids, node.depth) | ||
| self._handle_free_gpu_node_without_cpu(node) | ||
| blocks_deferred_to_recycle.extend( | ||
| self._handle_free_gpu_node_without_cpu(node, defer_recycle=True) | ||
| ) | ||
| total_gpu_free_count += 1 | ||
| cur_node = node | ||
| node = node.parent | ||
| if cur_node.hash_value in node.children: | ||
| del node.children[cur_node.hash_value] | ||
| if not node.children: | ||
| if node in self.gpu_lru_leaf_set: | ||
|
|
||
| # Disconnect node from its parent node | ||
| parent = node.parent | ||
| if node.hash_value in parent.children: | ||
| del parent.children[node.hash_value] | ||
|
|
||
| if not parent.children: | ||
| if parent in self.gpu_lru_leaf_set: | ||
This comment was marked as outdated.
Sorry, something went wrong. |
||
| logger.warning( | ||
| f"Node {parent.node_id} is already in gpu lru leaf heap, duplicated node free may occured!" | ||
| ) | ||
| continue | ||
| if ( | ||
| node != self.radix_tree_root | ||
| and node.shared_count == 0 | ||
| and node.is_gpu_leaf_node | ||
| and node.is_persistent is False | ||
| parent != self.radix_tree_root | ||
| and parent.shared_count == 0 | ||
| and parent.is_gpu_leaf_node | ||
| and parent.is_persistent is False | ||
| ): | ||
| heapq.heappush(self.gpu_lru_leaf_heap, node) | ||
| self.gpu_lru_leaf_set.add(node) | ||
| heapq.heappush(self.gpu_lru_leaf_heap, parent) | ||
| self.gpu_lru_leaf_set.add(parent) | ||
| else: | ||
| logger.warning( | ||
| f"Node {node.node_id} popped out of gpu lru leaf heap, but its shared count is not zero." | ||
| ) | ||
| continue | ||
| else: | ||
| if node.shared_count == 0 and node.is_gpu_leaf_node: | ||
|
|
@@ -1535,6 +1552,12 @@ def free_block_ids_async(self, need_block_num): | |
| f"free_block_ids_async: need_block_num {need_block_num}, free_block_num {total_gpu_free_count}." | ||
| ) | ||
|
|
||
| if blocks_deferred_to_recycle: | ||
| self.recycle_gpu_blocks(blocks_deferred_to_recycle) | ||
| logger.info( | ||
| f"free_block_ids_async: deferred recycling {len(blocks_deferred_to_recycle)} blocks, {blocks_deferred_to_recycle}" | ||
| ) | ||
|
|
||
| if ( | ||
| envs.FD_AS_ONLY_FLUSH | ||
| and self.kvcache_storage_backend == "attention_store" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🟡 建议 静默移除了
node.cache_status = CacheStatus.CPU,未在 PR Modifications 中说明。原函数名为
_handle_free_gpu_node_without_cpu(无 CPU 卸载路径),原先设置cache_status = CacheStatus.CPU确实语义矛盾,疑似原始代码 bug。但此行为变更影响节点状态,建议在 PR 描述 Modifications 中补充说明该删除是有意修复,并确认无下游逻辑依赖cache_status == CacheStatus.CPU作为节点已释放的判断依据。