Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1363,20 +1363,24 @@ def free_nodes_directly(self, node):
logger.error(f"free_nodes_directly: error: {type(e)} {e}")
raise e

def _handle_free_gpu_node_without_cpu(self, node):
def _handle_free_gpu_node_without_cpu(self, node, defer_recycle=False):
"""
GPU node eviction
"""
node.cache_status = CacheStatus.CPU

self.node_id_pool.append(node.node_id)
if node.node_id in self.node_map:
del self.node_map[node.node_id]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 静默移除了 node.cache_status = CacheStatus.CPU,未在 PR Modifications 中说明。

原函数名为 _handle_free_gpu_node_without_cpu(无 CPU 卸载路径),原先设置 cache_status = CacheStatus.CPU 确实语义矛盾,疑似原始代码 bug。但此行为变更影响节点状态,建议在 PR 描述 Modifications 中补充说明该删除是有意修复,并确认无下游逻辑依赖 cache_status == CacheStatus.CPU 作为节点已释放的判断依据。

logger.info(f"free_block_ids_async: free node {node}")
logger.info(f"_handle_free_gpu_node_without_cpu: free node {node.node_id}")

self.recycle_gpu_blocks(node.reverved_dec_block_ids)
node.reverved_dec_block_ids = []
self.recycle_gpu_blocks(node.block_id)
blocks_to_recycle = list(node.reverved_dec_block_ids) + [node.block_id]

This comment was marked as outdated.

This comment was marked as outdated.

if not defer_recycle:
self.recycle_gpu_blocks(blocks_to_recycle)
logger.info(
f"_handle_free_gpu_node_without_cpu: recycle blocks for node {node.node_id}, {blocks_to_recycle}"
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 defer_recycle=True 路径下,blocks_to_recycle = list(node.reverved_dec_block_ids) + [node.block_id] 仅复制了列表,但原属性 node.reverved_dec_block_ids 未置空(原代码有 node.reverved_dec_block_ids = [])。

若节点对象存在池化复用(node_id 回收后重新分配给新节点),旧的 reverved_dec_block_ids 未清空可能携带脏数据。请确认节点对象是否会被复用,若会则需补充清空:

node.reverved_dec_block_ids = []

return []
else:
return blocks_to_recycle

def _handle_free_gpu_node_with_cpu(
self,
Expand Down Expand Up @@ -1472,6 +1476,9 @@ def free_block_ids_async(self, need_block_num):
hash_value_flush_info = {} # {input_hash_value: (token_ids, min_depth)}
total_gpu_free_count = 0

# Defer block recycling to avoid busy heap operations in loop
blocks_deferred_to_recycle = []

while True:
if len(self.gpu_lru_leaf_heap) == 0:
logger.info("free_block_ids_async: no more gpu leaf node available.")
Expand All @@ -1486,24 +1493,34 @@ def free_block_ids_async(self, need_block_num):
key = node.input_hash_value
if key not in hash_value_flush_info or node.depth < hash_value_flush_info[key][1]:
hash_value_flush_info[key] = (node.input_ids, node.depth)
self._handle_free_gpu_node_without_cpu(node)
blocks_deferred_to_recycle.extend(
self._handle_free_gpu_node_without_cpu(node, defer_recycle=True)
)
total_gpu_free_count += 1
cur_node = node
node = node.parent
if cur_node.hash_value in node.children:
del node.children[cur_node.hash_value]
if not node.children:
if node in self.gpu_lru_leaf_set:

# Disconnect node from its parent node
parent = node.parent
if node.hash_value in parent.children:
del parent.children[node.hash_value]

if not parent.children:
if parent in self.gpu_lru_leaf_set:

This comment was marked as outdated.

logger.warning(
f"Node {parent.node_id} is already in gpu lru leaf heap, duplicated node free may occured!"
)
continue
if (
node != self.radix_tree_root
and node.shared_count == 0
and node.is_gpu_leaf_node
and node.is_persistent is False
parent != self.radix_tree_root
and parent.shared_count == 0
and parent.is_gpu_leaf_node
and parent.is_persistent is False
):
heapq.heappush(self.gpu_lru_leaf_heap, node)
self.gpu_lru_leaf_set.add(node)
heapq.heappush(self.gpu_lru_leaf_heap, parent)
self.gpu_lru_leaf_set.add(parent)
else:
logger.warning(
f"Node {node.node_id} popped out of gpu lru leaf heap, but its shared count is not zero."
)
continue
else:
if node.shared_count == 0 and node.is_gpu_leaf_node:
Expand Down Expand Up @@ -1535,6 +1552,12 @@ def free_block_ids_async(self, need_block_num):
f"free_block_ids_async: need_block_num {need_block_num}, free_block_num {total_gpu_free_count}."
)

if blocks_deferred_to_recycle:
self.recycle_gpu_blocks(blocks_deferred_to_recycle)
logger.info(
f"free_block_ids_async: deferred recycling {len(blocks_deferred_to_recycle)} blocks, {blocks_deferred_to_recycle}"
)

if (
envs.FD_AS_ONLY_FLUSH
and self.kvcache_storage_backend == "attention_store"
Expand Down
Loading