diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index c41a6109029..490cec6cc11 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -1363,20 +1363,24 @@ def free_nodes_directly(self, node): logger.error(f"free_nodes_directly: error: {type(e)} {e}") raise e - def _handle_free_gpu_node_without_cpu(self, node): + def _handle_free_gpu_node_without_cpu(self, node, defer_recycle=False): """ GPU node eviction """ - node.cache_status = CacheStatus.CPU - self.node_id_pool.append(node.node_id) if node.node_id in self.node_map: del self.node_map[node.node_id] - logger.info(f"free_block_ids_async: free node {node}") + logger.info(f"_handle_free_gpu_node_without_cpu: free node {node.node_id}") - self.recycle_gpu_blocks(node.reverved_dec_block_ids) - node.reverved_dec_block_ids = [] - self.recycle_gpu_blocks(node.block_id) + blocks_to_recycle = list(node.reverved_dec_block_ids) + [node.block_id] + if not defer_recycle: + self.recycle_gpu_blocks(blocks_to_recycle) + logger.info( + f"_handle_free_gpu_node_without_cpu: recycle blocks for node {node.node_id}, {blocks_to_recycle}" + ) + return [] + else: + return blocks_to_recycle def _handle_free_gpu_node_with_cpu( self, @@ -1472,6 +1476,9 @@ def free_block_ids_async(self, need_block_num): hash_value_flush_info = {} # {input_hash_value: (token_ids, min_depth)} total_gpu_free_count = 0 + # Defer block recycling to avoid busy heap operations in loop + blocks_deferred_to_recycle = [] + while True: if len(self.gpu_lru_leaf_heap) == 0: logger.info("free_block_ids_async: no more gpu leaf node available.") @@ -1486,24 +1493,34 @@ def free_block_ids_async(self, need_block_num): key = node.input_hash_value if key not in hash_value_flush_info or node.depth < hash_value_flush_info[key][1]: hash_value_flush_info[key] = (node.input_ids, node.depth) - self._handle_free_gpu_node_without_cpu(node) + blocks_deferred_to_recycle.extend( + self._handle_free_gpu_node_without_cpu(node, defer_recycle=True) + ) total_gpu_free_count += 1 - cur_node = node - node = node.parent - if cur_node.hash_value in node.children: - del node.children[cur_node.hash_value] - if not node.children: - if node in self.gpu_lru_leaf_set: + + # Disconnect node from its parent node + parent = node.parent + if node.hash_value in parent.children: + del parent.children[node.hash_value] + + if not parent.children: + if parent in self.gpu_lru_leaf_set: + logger.warning( + f"Node {parent.node_id} is already in gpu lru leaf heap, duplicated node free may occured!" + ) continue if ( - node != self.radix_tree_root - and node.shared_count == 0 - and node.is_gpu_leaf_node - and node.is_persistent is False + parent != self.radix_tree_root + and parent.shared_count == 0 + and parent.is_gpu_leaf_node + and parent.is_persistent is False ): - heapq.heappush(self.gpu_lru_leaf_heap, node) - self.gpu_lru_leaf_set.add(node) + heapq.heappush(self.gpu_lru_leaf_heap, parent) + self.gpu_lru_leaf_set.add(parent) else: + logger.warning( + f"Node {node.node_id} popped out of gpu lru leaf heap, but its shared count is not zero." + ) continue else: if node.shared_count == 0 and node.is_gpu_leaf_node: @@ -1535,6 +1552,12 @@ def free_block_ids_async(self, need_block_num): f"free_block_ids_async: need_block_num {need_block_num}, free_block_num {total_gpu_free_count}." ) + if blocks_deferred_to_recycle: + self.recycle_gpu_blocks(blocks_deferred_to_recycle) + logger.info( + f"free_block_ids_async: deferred recycling {len(blocks_deferred_to_recycle)} blocks, {blocks_deferred_to_recycle}" + ) + if ( envs.FD_AS_ONLY_FLUSH and self.kvcache_storage_backend == "attention_store"