Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fastdeploy/engine/common_engine_prepare_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,10 @@ def _fetch_loop(self, fetch_fn, thread_idx: int):
with self._pause_cond:
self._pause_cond.wait_for(lambda: not self.is_paused)
fetch_fn()
time.sleep(0.002)
time.sleep(0.02)

This comment was marked as outdated.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 PR 标题称 "reduce sleep time in loops",但此处 sleep 从 0.002s(2ms)增大0.02s(20ms),增大了 10 倍。

同样地,token_processor.py(2ms→5ms)和 splitwise_connector.py(1ms→5ms)也均为增大。请确认:

  1. PR 标题是否表述有误(应为 "adjust" 而非 "reduce")?
  2. _fetch_loop 增大 sleep 的预期效果是什么(减少 CPU 轮询开销)?降低轮询频率会增加请求入队延迟,是否已在高并发场景验证过影响?

except Exception as e:
self.llm_logger.error(f"fetching request error in worker-{thread_idx}: {e} {traceback.format_exc()}")
time.sleep(0.002)
time.sleep(0.02)

def _prepare_request_v1(self):
"""Prepare request and send to the queue for scheduling"""
Expand Down
10 changes: 8 additions & 2 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def get_new_block_nums(self, request: Request, num_new_tokens: int):
block_num = (
request.num_computed_tokens + num_new_tokens + self.config.cache_config.block_size - 1
) // self.config.cache_config.block_size - len(request.block_tables)

block_num = max(block_num, 0)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议block_num 计算结果为负值时(已分配的 block 超出当前所需),此处静默截断为 0。

截断逻辑本身正确,但建议补充 warning 日志,便于排查潜在的 block 过度分配根因:

if block_num < 0:
    self.llm_logger.warning(
        f"block_num negative ({block_num}) for req {request.request_id}, "
        f"num_computed={request.num_computed_tokens}, num_new={num_new_tokens}, "
        f"allocated={len(request.block_tables)}, clamping to 0"
    )
block_num = max(block_num, 0)

if self.config.speculative_config.method is not None:
block_num = min(block_num + 1, self.config.cache_config.max_block_num_per_seq)
else:
Expand Down Expand Up @@ -1001,7 +1001,13 @@ def _allocate_decode_and_extend():
req_index += 1
continue
num_new_block = self.get_new_block_nums(request, num_new_tokens)
can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block(num_new_block)
if self.config.scheduler_config.splitwise_role == "prefill":
# for prefill instance, do not set threshold for running requests
can_schedule_block_num_threshold = 0
else:
can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block(
num_new_block
)
# Allocate blocks to prefill
if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold):
request.block_tables.extend(
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ def _validate_split_kv_size(value: int) -> int:
# "Enable FP8 calibration on HPU"
"FD_HPU_MEASUREMENT_MODE": lambda: os.getenv("FD_HPU_MEASUREMENT_MODE", "0"),
# Number of worker threads for prepare requests in prefill instance
"FD_PREFILL_PREPARE_REQ_THREAD_NUM": lambda: int(os.getenv("FD_PREFILL_PREPARE_REQ_THREAD_NUM", "5")),
"FD_PREFILL_PREPARE_REQ_THREAD_NUM": lambda: int(os.getenv("FD_PREFILL_PREPARE_REQ_THREAD_NUM", "3")),
"FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS": lambda: int(os.getenv("FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS", "30")),
"FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE": lambda: int(
os.getenv("FD_ENABLE_REQUEST_DISCONNECT_STOP_INFERENCE", "1")
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/output/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False
# TODO: Refine checking sending cache and do not keep waiting
if time.time() - start_time > 30:
llm_logger.warning(f"wait for sending cache, {task_id}")
time.sleep(0.002)
time.sleep(0.005)
else:
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
self.resource_manager.finish_requests_async(task_id)
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/splitwise/splitwise_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def check_decode_allocated(self, task):
return True, ""

while self.current_request_ids[task.request_id] == "init":
time.sleep(0.001)
time.sleep(0.005)
if time.time() - start_time > envs.FD_PREFILL_WAIT_DECODE_RESOURCE_SECONDS:
del self.current_request_ids[task.request_id]
return False, "prefill waits for decode resource timeout"
Expand Down
Loading