From 6473a75b72bc7c27bf5331934111b44b2919b497 Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Thu, 21 May 2026 03:06:47 +0000 Subject: [PATCH] [Scheduler] Increase sleep interval in fetch loops and cancel schedule threashold for prefill instance --- fastdeploy/engine/common_engine_prepare_mixin.py | 4 ++-- fastdeploy/engine/sched/resource_manager_v1.py | 10 ++++++++-- fastdeploy/envs.py | 2 +- fastdeploy/output/token_processor.py | 2 +- fastdeploy/splitwise/splitwise_connector.py | 2 +- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/fastdeploy/engine/common_engine_prepare_mixin.py b/fastdeploy/engine/common_engine_prepare_mixin.py index 71327025458..60ccb7ccd09 100644 --- a/fastdeploy/engine/common_engine_prepare_mixin.py +++ b/fastdeploy/engine/common_engine_prepare_mixin.py @@ -248,10 +248,10 @@ def _fetch_loop(self, fetch_fn, thread_idx: int): with self._pause_cond: self._pause_cond.wait_for(lambda: not self.is_paused) fetch_fn() - time.sleep(0.002) + time.sleep(0.02) except Exception as e: self.llm_logger.error(f"fetching request error in worker-{thread_idx}: {e} {traceback.format_exc()}") - time.sleep(0.002) + time.sleep(0.02) def _prepare_request_v1(self): """Prepare request and send to the queue for scheduling""" diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index de89ab3adca..2f7e6dcbf6e 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -245,7 +245,7 @@ def get_new_block_nums(self, request: Request, num_new_tokens: int): block_num = ( request.num_computed_tokens + num_new_tokens + self.config.cache_config.block_size - 1 ) // self.config.cache_config.block_size - len(request.block_tables) - + block_num = max(block_num, 0) if self.config.speculative_config.method is not None: block_num = min(block_num + 1, self.config.cache_config.max_block_num_per_seq) else: @@ -1001,7 +1001,13 @@ def _allocate_decode_and_extend(): req_index += 1 continue num_new_block = self.get_new_block_nums(request, num_new_tokens) - can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block(num_new_block) + if self.config.scheduler_config.splitwise_role == "prefill": + # for prefill instance, do not set threshold for running requests + can_schedule_block_num_threshold = 0 + else: + can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block( + num_new_block + ) # Allocate blocks to prefill if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold): request.block_tables.extend( diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 509f9a768d9..955f3dfdd39 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -192,7 +192,7 @@ def _validate_split_kv_size(value: int) -> int: # "Enable FP8 calibration on HPU" "FD_HPU_MEASUREMENT_MODE": lambda: os.getenv("FD_HPU_MEASUREMENT_MODE", "0"), # Number of worker threads for prepare requests in prefill instance - "FD_PREFILL_PREPARE_REQ_THREAD_NUM": lambda: int(os.getenv("FD_PREFILL_PREPARE_REQ_THREAD_NUM", "5")), + "FD_PREFILL_PREPARE_REQ_THREAD_NUM": lambda: int(os.getenv("FD_PREFILL_PREPARE_REQ_THREAD_NUM", "3")), "FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")), "FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE": lambda: int( os.getenv("FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE", "1") diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index a8544cb5979..6451305b837 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -668,7 +668,7 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False # TODO: Refine checking sending cache and do not keep waiting if time.time() - start_time > 30: llm_logger.warning(f"wait for sending cache, {task_id}") - time.sleep(0.002) + time.sleep(0.005) else: if envs.ENABLE_V1_KVCACHE_SCHEDULER: self.resource_manager.finish_requests_async(task_id) diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index 27c608a4d1f..9f896b694a3 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -267,7 +267,7 @@ def check_decode_allocated(self, task): return True, "" while self.current_request_ids[task.request_id] == "init": - time.sleep(0.001) + time.sleep(0.005) if time.time() - start_time > envs.FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS: del self.current_request_ids[task.request_id] return False, "prefill waits for decode resource timeout"