Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 43 additions & 20 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1340,20 +1340,24 @@ def free_nodes_directly(self, node):
logger.error(f"free_nodes_directly: error: {type(e)} {e}, {traceback.format_exc()}")
raise e

def _handle_free_gpu_node_without_cpu(self, node):
def _handle_free_gpu_node_without_cpu(self, node, defer_recycle=False):

This comment was marked as outdated.

"""
GPU node eviction
"""
node.cache_status = CacheStatus.CPU

self.node_id_pool.append(node.node_id)
if node.node_id in self.node_map:
del self.node_map[node.node_id]
logger.info(f"free_block_ids_async: free node {node}")
logger.info(f"_handle_free_gpu_node_without_cpu: free node {node.node_id}")

This comment was marked as outdated.

This comment was marked as outdated.

self.recycle_gpu_blocks(node.reverved_dec_block_ids)
node.reverved_dec_block_ids = []
self.recycle_gpu_blocks(node.block_id)
blocks_to_recycle = list(node.reverved_dec_block_ids) + [node.block_id]
if not defer_recycle:
self.recycle_gpu_blocks(blocks_to_recycle)
logger.info(
f"_handle_free_gpu_node_without_cpu: recycle blocks for node {node.node_id}, {blocks_to_recycle}"
)
return []
else:
return blocks_to_recycle
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug defer_recycle=True 路径未清空 node.reverved_dec_block_ids,存在双重回收风险

原代码在 _handle_free_gpu_node_without_cpu 中有 node.reverved_dec_block_ids = [],确保节点释放后引用被清空。PR 将此逻辑移除,当 defer_recycle=True 时,调用方收到 blocks 列表后延迟回收,但节点的 reverved_dec_block_ids 字段仍保留旧值。若该节点在批量回收完成前被其他路径再次访问(例如 free_nodes_directly 或 swap 路径),可能引发同一 block 被重复 recycle。

建议修复:在 defer_recycle=True 分支的 return 前清空:

blocks_to_recycle = list(node.reverved_dec_block_ids) + [node.block_id]
node.reverved_dec_block_ids = []  # 立即清空,防止双重回收
if not defer_recycle:
    ...
else:
    return blocks_to_recycle

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Bug node.cache_status 未在 _handle_free_gpu_node_without_cpu 中更新

原代码在函数入口处设置 node.cache_status = CacheStatus.CPU(标记节点已被驱逐),PR 将该行完整删除,两条路径(defer_recycle=True/False)均未更新 cache_status。节点已从 node_map 删除但 cache_status 仍为 GPU,若外部代码(如 PD 分离传输路径 recv_data_transfer_result 中的 node.cache_status.value == CacheStatus.GPU.value 判断)通过其他引用访问该节点,会误判其仍为 GPU 状态。

建议修复:在函数开头恢复状态更新(或调整为更合适的驱逐态枚举值):

node.cache_status = CacheStatus.CPU  # 或新增 EVICTED 状态


def _handle_free_gpu_node_with_cpu(
self,
Expand Down Expand Up @@ -1449,6 +1453,9 @@ def free_block_ids_async(self, need_block_num):
hash_value_flush_info = {} # {input_hash_value: (token_ids, min_depth)}
total_gpu_free_count = 0

# Defer block recycling to avoid busy heap operations in loop
blocks_deferred_to_recycle = []

while True:
if len(self.gpu_lru_leaf_heap) == 0:
logger.info("free_block_ids_async: no more gpu leaf node available.")
Expand All @@ -1463,24 +1470,34 @@ def free_block_ids_async(self, need_block_num):
key = node.input_hash_value
if key not in hash_value_flush_info or node.depth < hash_value_flush_info[key][1]:
hash_value_flush_info[key] = (node.input_ids, node.depth)
self._handle_free_gpu_node_without_cpu(node)
blocks_deferred_to_recycle.extend(
self._handle_free_gpu_node_without_cpu(node, defer_recycle=True)
)
total_gpu_free_count += 1
cur_node = node
node = node.parent
if cur_node.hash_value in node.children:
del node.children[cur_node.hash_value]
if not node.children:
if node in self.gpu_lru_leaf_set:

# Disconnect node from its parent node
parent = node.parent
if node.hash_value in parent.children:
del parent.children[node.hash_value]

if not parent.children:
if parent in self.gpu_lru_leaf_set:
logger.warning(
f"Node {parent.node_id} is already in gpu lru leaf heap, duplicated node free may occured!"
)
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 continue 跳过时父节点已在 heap 中——但 total_gpu_free_count 未增加,是否会导致循环提前退出?

parent in self.gpu_lru_leaf_set 时执行 continue,本次迭代没有递增 total_gpu_free_count(当前被驱逐的叶节点计数已在上方 +1),逻辑上正确。但 warning 日志描述为 "duplicated node free may occured":如果父节点已经在 heap 中被弹出并处理过(已从 node_map 删除),再次被加入 heap 就会导致真正的重复释放。

建议确认:父节点被加入 gpu_lru_leaf_heap 前,是否应先检查其是否已被从 node_map 删除(即是否已经历过 _handle_free_gpu_node_without_cpu)?若已删除则跳过入堆。

if (
node != self.radix_tree_root
and node.shared_count == 0
and node.is_gpu_leaf_node
and node.is_persistent is False
parent != self.radix_tree_root
and parent.shared_count == 0
and parent.is_gpu_leaf_node
and parent.is_persistent is False
):
heapq.heappush(self.gpu_lru_leaf_heap, node)
self.gpu_lru_leaf_set.add(node)
heapq.heappush(self.gpu_lru_leaf_heap, parent)
self.gpu_lru_leaf_set.add(parent)
else:
logger.warning(
f"Node {node.node_id} popped out of gpu lru leaf heap, but its shared count is not zero."
)
continue
else:
if node.shared_count == 0 and node.is_gpu_leaf_node:
Expand Down Expand Up @@ -1512,6 +1529,12 @@ def free_block_ids_async(self, need_block_num):
f"free_block_ids_async: need_block_num {need_block_num}, free_block_num {total_gpu_free_count}."
)

if blocks_deferred_to_recycle:
self.recycle_gpu_blocks(blocks_deferred_to_recycle)
logger.info(
f"free_block_ids_async: deferred recycling {len(blocks_deferred_to_recycle)} blocks, {blocks_deferred_to_recycle}"
)

if (
envs.FD_AS_ONLY_FLUSH
and self.kvcache_storage_backend == "attention_store"
Expand Down
Loading