From d208a4727b361e0a54fe3485fde156e08a415352 Mon Sep 17 00:00:00 2001 From: juncaipeng <13006307475@163.com> Date: Tue, 31 Mar 2026 07:54:07 +0000 Subject: [PATCH 1/3] Write the cache of preempted req to storage --- .../cache_manager/prefix_cache_manager.py | 25 +++++++++++-------- fastdeploy/engine/common_engine.py | 2 +- .../engine/sched/resource_manager_v1.py | 15 ++++++++--- fastdeploy/envs.py | 4 +++ 4 files changed, 31 insertions(+), 15 deletions(-) diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index dd64d7fb712..1c41c24a511 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -462,12 +462,12 @@ def update_cache_config(self, cache_config): main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks) main_process_metrics.available_gpu_resource.set(1.0) - def can_allocate_gpu_blocks(self, num_blocks: int): + def can_allocate_gpu_blocks(self, num_blocks: int, try_free_gpu_blocks: bool = True): """ Check if num_blocks gpu blocks can be allocated. """ if len(self.gpu_free_block_list) < num_blocks: - if self.cache_config.enable_prefix_caching: + if self.cache_config.enable_prefix_caching and try_free_gpu_blocks: self.free_block_ids(num_blocks) if len(self.gpu_free_block_list) < num_blocks: return False @@ -814,7 +814,7 @@ def request_match_blocks(self, task: Request, block_size, *args): # 2. prepare cpu cache: allocate gpu cache for matched cpu blocks, wait for data transfer to complete gpu_recv_block_ids = [] match_cpu_blocks_num = len(match_cpu_block_ids) - if self.can_allocate_gpu_blocks(num_blocks=match_cpu_blocks_num): + if self.can_allocate_gpu_blocks(num_blocks=match_cpu_blocks_num, try_free_gpu_blocks=False): if match_cpu_blocks_num > 0: logger.debug( f"request_match_blocks: req_id {req_id}, allocate {match_cpu_blocks_num} block to receive cpu cache" @@ -845,7 +845,7 @@ def request_match_blocks(self, task: Request, block_size, *args): match_storage_block_ids = [] if self.kvcache_storage_backend and no_match_token_num >= block_size: - if not self.can_allocate_gpu_blocks(num_blocks=no_match_block_num): + if not self.can_allocate_gpu_blocks(num_blocks=no_match_block_num, try_free_gpu_blocks=False): raise Exception( "request_match_blocks: Not enough GPU memory to allocate cache for matched Storage Cache" ) @@ -1141,8 +1141,11 @@ def write_cache_to_storage(self, request: Request): token_ids = request.prompt_token_ids if isinstance(token_ids, np.ndarray): token_ids = token_ids.tolist() + if self.config.cache_config.enable_output_caching: - token_ids += request.output_token_ids + input_token_ids = token_ids + request.output_token_ids + else: + input_token_ids = token_ids req_id = request.request_id keys = [] @@ -1159,7 +1162,7 @@ def write_cache_to_storage(self, request: Request): write_storage_task = WriteStorageTask( task_id=req_id, keys=keys, - token_ids=token_ids, + token_ids=input_token_ids, gpu_block_ids=gpu_block_ids, ) logger.debug(f"issue write storage task: {write_storage_task}") @@ -1193,7 +1196,9 @@ def write_cache_to_storage_decode(self, request: Request): token_ids = list(token_ids) if self.config.cache_config.enable_output_caching: - token_ids = token_ids + request.output_token_ids + input_token_ids = token_ids + request.output_token_ids + else: + input_token_ids = token_ids # 2. Calculate cache keys using chained hash (consistent with P instance) keys = [] @@ -1201,8 +1206,8 @@ def write_cache_to_storage_decode(self, request: Request): block_size = self.config.cache_config.block_size mm_idx = 0 # Multimodal index for tracking position in mm_inputs - for i in range(0, len(token_ids), block_size): - block_token_ids = token_ids[i : i + block_size] + for i in range(0, len(input_token_ids), block_size): + block_token_ids = input_token_ids[i : i + block_size] if len(block_token_ids) < block_size: break # Do not cache incomplete block @@ -1236,7 +1241,7 @@ def write_cache_to_storage_decode(self, request: Request): write_storage_task = WriteStorageTask( task_id=req_id, keys=keys, - token_ids=token_ids, + token_ids=input_token_ids, gpu_block_ids=gpu_block_ids, ) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 28776b53ede..f8a3ce540d9 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -899,7 +899,7 @@ def _fetch_request(): self.split_connector.send_splitwise_tasks([task], task.idx) status, msg = self.split_connector.check_decode_allocated(task) if not status: - self.llm_logger.error( + self.llm_logger.warning( f"D failed to allocate resource for request {task.request_id}, try again." ) time.sleep(0.05) diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 0d91ea4d8bc..5763c310592 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -366,9 +366,15 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re del self.requests[preempted_req.request_id] if preempted_req.request_id in self.req_dict: del self.req_dict[preempted_req.request_id] + if envs.FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST: + if self.config.cache_config.kvcache_storage_backend: + self.cache_manager.write_cache_to_storage_decode(preempted_req) self._free_blocks(preempted_req) llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}") else: + if envs.FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST: + if self.config.cache_config.kvcache_storage_backend: + self.cache_manager.write_cache_to_storage(preempted_req) self._free_blocks(preempted_req) preempted_req.num_cached_blocks = 0 self.to_be_rescheduled_request_id_set.add(preempted_req.request_id) @@ -398,7 +404,7 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re self.can_relax_prefill_strategy = False return can_schedule - def _get_can_schedule_prefill_threshold_block(self, request, num_chunk_new_block): + def _get_can_schedule_prefill_threshold_block(self, num_chunk_new_block): if self.can_relax_prefill_strategy: can_schedule_block_num_threshold = num_chunk_new_block else: @@ -986,7 +992,7 @@ def _allocate_decode_and_extend(): continue num_new_block = self.get_new_block_nums(request, num_new_tokens) can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block( - request, num_new_block + num_new_block ) # Allocate blocks to prefill if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold): @@ -1051,7 +1057,7 @@ def _allocate_decode_and_extend(): continue num_new_block = self.get_new_block_nums(request, num_new_tokens) can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block( - request, num_new_block + num_new_block ) # Allocate blocks to prefill if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold): @@ -1388,7 +1394,8 @@ def preallocate_resource_in_d(self, request: Request): return False if self.available_batch() == 0: return False - if not self.cache_manager.can_allocate_gpu_blocks(need_prealloc_prefill_blocks): + total_need_blocks = self._get_can_schedule_prefill_threshold_block(need_prealloc_prefill_blocks) + if not self.cache_manager.can_allocate_gpu_blocks(total_need_blocks): return False request.block_tables = self.cache_manager.allocate_gpu_blocks( diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 72cd6dc7c48..6523ed30a27 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -254,6 +254,10 @@ def _validate_split_kv_size(value: int) -> int: # When v1 is enabled, the legacy /clear_load_weight and /update_model_weight # will adopt this new communication pattern. "FD_ENABLE_V1_UPDATE_WEIGHTS": lambda: bool(int(os.getenv("FD_ENABLE_V1_UPDATE_WEIGHTS", "0"))), + # Whether to save the cache of output token for preemted request to radix tree or storage. + "FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST": lambda: bool( + int(os.getenv("FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST", "1")) + ), } From b275e52e66972536d4d4beeb6692fd630ad00026 Mon Sep 17 00:00:00 2001 From: juncaipeng <13006307475@163.com> Date: Tue, 31 Mar 2026 08:03:33 +0000 Subject: [PATCH 2/3] up --- fastdeploy/cache_manager/prefix_cache_manager.py | 10 +++++----- fastdeploy/envs.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index 1c41c24a511..e5e27405255 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -881,7 +881,7 @@ def request_match_blocks(self, task: Request, block_size, *args): read_storage_task = ReadStorageTask( task_id=req_id, keys=no_match_block_keys, - token_ids=input_token_ids, + token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None, gpu_block_ids=gpu_recv_storage_block_ids, start_read_block_idx=match_token_num // block_size, ) @@ -1162,7 +1162,7 @@ def write_cache_to_storage(self, request: Request): write_storage_task = WriteStorageTask( task_id=req_id, keys=keys, - token_ids=input_token_ids, + token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None, gpu_block_ids=gpu_block_ids, ) logger.debug(f"issue write storage task: {write_storage_task}") @@ -1241,7 +1241,7 @@ def write_cache_to_storage_decode(self, request: Request): write_storage_task = WriteStorageTask( task_id=req_id, keys=keys, - token_ids=input_token_ids, + token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None, gpu_block_ids=gpu_block_ids, ) @@ -2171,7 +2171,7 @@ def recv_data_transfer_result(self): event_type = data[0] if event_type.value == CacheStatus.STORAGE2GPU.value: - logger.info(f"recv_data_transfer_result: {data}") + logger.debug(f"recv_data_transfer_result: {data}") task_id, hash_keys, block_ids = data[1:] if task_id not in self.storage_prefetch_block_ids: self.storage_prefetch_block_ids[task_id] = [] @@ -2182,7 +2182,7 @@ def recv_data_transfer_result(self): if task_id in self.task_prefetch_event: self.task_prefetch_event[task_id].set() elif event_type.value == CacheStatus.GPU2STORAGE.value: - logger.info(f"recv_data_transfer_result: {data}") + logger.debug(f"recv_data_transfer_result: {data}") task_id, hash_keys, block_ids = data[1:] if task_id in self.task_write_back_event: self.task_write_back_event[task_id].set() diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 6523ed30a27..389023dc815 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -254,7 +254,7 @@ def _validate_split_kv_size(value: int) -> int: # When v1 is enabled, the legacy /clear_load_weight and /update_model_weight # will adopt this new communication pattern. "FD_ENABLE_V1_UPDATE_WEIGHTS": lambda: bool(int(os.getenv("FD_ENABLE_V1_UPDATE_WEIGHTS", "0"))), - # Whether to save the cache of output token for preemted request to radix tree or storage. + # Whether to save the cache of output token for preempted request to storage. "FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST": lambda: bool( int(os.getenv("FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST", "1")) ), From ef30e0c6fffcc14f2ad93061e53a504c4da14c1d Mon Sep 17 00:00:00 2001 From: juncaipeng <13006307475@163.com> Date: Tue, 31 Mar 2026 12:20:25 +0000 Subject: [PATCH 3/3] fix --- fastdeploy/cache_manager/cache_transfer_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/cache_manager/cache_transfer_manager.py b/fastdeploy/cache_manager/cache_transfer_manager.py index b264f03b753..8c5499cafde 100644 --- a/fastdeploy/cache_manager/cache_transfer_manager.py +++ b/fastdeploy/cache_manager/cache_transfer_manager.py @@ -796,7 +796,7 @@ def read_storage_task(self, task: ReadStorageTask): try: valid_gpu_block_ids = self._run_read_storage( task.task_id, - task.token_ids[: match_block_num * self.block_size], + task.token_ids[: match_block_num * self.block_size] if task.token_ids else None, task.start_read_block_idx, k_cache_keys, v_cache_keys,