Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fastdeploy/cache_manager/cache_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ def read_storage_task(self, task: ReadStorageTask):
try:
valid_gpu_block_ids = self._run_read_storage(
task.task_id,
task.token_ids[: match_block_num * self.block_size],
task.token_ids[: match_block_num * self.block_size] if task.token_ids else None,
task.start_read_block_idx,
k_cache_keys,
v_cache_keys,
Expand Down
31 changes: 18 additions & 13 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,12 @@ def update_cache_config(self, cache_config):
main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_resource.set(1.0)

def can_allocate_gpu_blocks(self, num_blocks: int):
def can_allocate_gpu_blocks(self, num_blocks: int, try_free_gpu_blocks: bool = True):
"""
Check if num_blocks gpu blocks can be allocated.
"""
if len(self.gpu_free_block_list) < num_blocks:
if self.cache_config.enable_prefix_caching:
if self.cache_config.enable_prefix_caching and try_free_gpu_blocks:
self.free_block_ids(num_blocks)
if len(self.gpu_free_block_list) < num_blocks:
return False
Expand Down Expand Up @@ -814,7 +814,7 @@ def request_match_blocks(self, task: Request, block_size, *args):
# 2. prepare cpu cache: allocate gpu cache for matched cpu blocks, wait for data transfer to complete
gpu_recv_block_ids = []
match_cpu_blocks_num = len(match_cpu_block_ids)
if self.can_allocate_gpu_blocks(num_blocks=match_cpu_blocks_num):
if self.can_allocate_gpu_blocks(num_blocks=match_cpu_blocks_num, try_free_gpu_blocks=False):
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

避免可能的死锁卡住

if match_cpu_blocks_num > 0:
logger.debug(
f"request_match_blocks: req_id {req_id}, allocate {match_cpu_blocks_num} block to receive cpu cache"
Expand Down Expand Up @@ -845,7 +845,7 @@ def request_match_blocks(self, task: Request, block_size, *args):
match_storage_block_ids = []

if self.kvcache_storage_backend and no_match_token_num >= block_size:
if not self.can_allocate_gpu_blocks(num_blocks=no_match_block_num):
if not self.can_allocate_gpu_blocks(num_blocks=no_match_block_num, try_free_gpu_blocks=False):
raise Exception(
"request_match_blocks: Not enough GPU memory to allocate cache for matched Storage Cache"
)
Expand Down Expand Up @@ -881,7 +881,7 @@ def request_match_blocks(self, task: Request, block_size, *args):
read_storage_task = ReadStorageTask(
task_id=req_id,
keys=no_match_block_keys,
token_ids=input_token_ids,
token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None,
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

减少跨进程传输的数据量

gpu_block_ids=gpu_recv_storage_block_ids,
start_read_block_idx=match_token_num // block_size,
)
Expand Down Expand Up @@ -1141,8 +1141,11 @@ def write_cache_to_storage(self, request: Request):
token_ids = request.prompt_token_ids
if isinstance(token_ids, np.ndarray):
token_ids = token_ids.tolist()

if self.config.cache_config.enable_output_caching:
token_ids += request.output_token_ids
input_token_ids = token_ids + request.output_token_ids
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

修复修改request中的prompt_token_ids的bug

else:
input_token_ids = token_ids

req_id = request.request_id
keys = []
Expand All @@ -1159,7 +1162,7 @@ def write_cache_to_storage(self, request: Request):
write_storage_task = WriteStorageTask(
task_id=req_id,
keys=keys,
token_ids=token_ids,
token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None,
gpu_block_ids=gpu_block_ids,
)
logger.debug(f"issue write storage task: {write_storage_task}")
Expand Down Expand Up @@ -1193,16 +1196,18 @@ def write_cache_to_storage_decode(self, request: Request):
token_ids = list(token_ids)

if self.config.cache_config.enable_output_caching:
token_ids = token_ids + request.output_token_ids
input_token_ids = token_ids + request.output_token_ids
else:
input_token_ids = token_ids

# 2. Calculate cache keys using chained hash (consistent with P instance)
keys = []
prefix_block_key = [] # Initial is empty list
block_size = self.config.cache_config.block_size
mm_idx = 0 # Multimodal index for tracking position in mm_inputs

for i in range(0, len(token_ids), block_size):
block_token_ids = token_ids[i : i + block_size]
for i in range(0, len(input_token_ids), block_size):
block_token_ids = input_token_ids[i : i + block_size]
if len(block_token_ids) < block_size:
break # Do not cache incomplete block

Expand Down Expand Up @@ -1236,7 +1241,7 @@ def write_cache_to_storage_decode(self, request: Request):
write_storage_task = WriteStorageTask(
task_id=req_id,
keys=keys,
token_ids=token_ids,
token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None,
gpu_block_ids=gpu_block_ids,
)

Expand Down Expand Up @@ -2166,7 +2171,7 @@ def recv_data_transfer_result(self):
event_type = data[0]

if event_type.value == CacheStatus.STORAGE2GPU.value:
logger.info(f"recv_data_transfer_result: {data}")
logger.debug(f"recv_data_transfer_result: {data}")
task_id, hash_keys, block_ids = data[1:]
if task_id not in self.storage_prefetch_block_ids:
self.storage_prefetch_block_ids[task_id] = []
Expand All @@ -2177,7 +2182,7 @@ def recv_data_transfer_result(self):
if task_id in self.task_prefetch_event:
self.task_prefetch_event[task_id].set()
elif event_type.value == CacheStatus.GPU2STORAGE.value:
logger.info(f"recv_data_transfer_result: {data}")
logger.debug(f"recv_data_transfer_result: {data}")
task_id, hash_keys, block_ids = data[1:]
if task_id in self.task_write_back_event:
self.task_write_back_event[task_id].set()
Expand Down
2 changes: 1 addition & 1 deletion fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ def _fetch_request():
self.split_connector.send_splitwise_tasks([task], task.idx)
status, msg = self.split_connector.check_decode_allocated(task)
if not status:
self.llm_logger.error(
self.llm_logger.warning(
f"D failed to allocate resource for request {task.request_id}, try again."
)
time.sleep(0.05)
Expand Down
15 changes: 11 additions & 4 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,15 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re
del self.requests[preempted_req.request_id]
if preempted_req.request_id in self.req_dict:
del self.req_dict[preempted_req.request_id]
if envs.FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST:
if self.config.cache_config.kvcache_storage_backend:
self.cache_manager.write_cache_to_storage_decode(preempted_req)
self._free_blocks(preempted_req)
Comment on lines +369 to 372
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里在持有 ResourceManager 的 lock 的情况下同步调用 write_cache_to_storage_decode(),该调用会等待 cache_transfer 线程回执(is_sync=True),可能阻塞调度线程并放大抢占路径的尾延迟。建议将写回逻辑移出锁(或提交到线程池异步执行),同时确保在写回完成前不要 recycle 对应的 GPU block(例如延后 _free_blocks 或在写回任务中持有必要的 block_ids 快照)。

Copilot uses AI. Check for mistakes.
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
else:
if envs.FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST:
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

重调度请求的cache写出到storage

if self.config.cache_config.kvcache_storage_backend:
self.cache_manager.write_cache_to_storage(preempted_req)
self._free_blocks(preempted_req)
preempted_req.num_cached_blocks = 0
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)
Expand Down Expand Up @@ -398,7 +404,7 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re
self.can_relax_prefill_strategy = False
return can_schedule

def _get_can_schedule_prefill_threshold_block(self, request, num_chunk_new_block):
def _get_can_schedule_prefill_threshold_block(self, num_chunk_new_block):
if self.can_relax_prefill_strategy:
can_schedule_block_num_threshold = num_chunk_new_block
else:
Expand Down Expand Up @@ -986,7 +992,7 @@ def _allocate_decode_and_extend():
continue
num_new_block = self.get_new_block_nums(request, num_new_tokens)
can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block(
request, num_new_block
num_new_block
)
# Allocate blocks to prefill
if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold):
Expand Down Expand Up @@ -1051,7 +1057,7 @@ def _allocate_decode_and_extend():
continue
num_new_block = self.get_new_block_nums(request, num_new_tokens)
can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block(
request, num_new_block
num_new_block
)
# Allocate blocks to prefill
if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold):
Expand Down Expand Up @@ -1388,7 +1394,8 @@ def preallocate_resource_in_d(self, request: Request):
return False
if self.available_batch() == 0:
return False
if not self.cache_manager.can_allocate_gpu_blocks(need_prealloc_prefill_blocks):
total_need_blocks = self._get_can_schedule_prefill_threshold_block(need_prealloc_prefill_blocks)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p实例的请求向d申请block,d实例考虑给running的请求预留block ids

if not self.cache_manager.can_allocate_gpu_blocks(total_need_blocks):
return False

request.block_tables = self.cache_manager.allocate_gpu_blocks(
Expand Down
4 changes: 4 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ def _validate_split_kv_size(value: int) -> int:
# When v1 is enabled, the legacy /clear_load_weight and /update_model_weight
# will adopt this new communication pattern.
"FD_ENABLE_V1_UPDATE_WEIGHTS": lambda: bool(int(os.getenv("FD_ENABLE_V1_UPDATE_WEIGHTS", "0"))),
# Whether to save the cache of output token for preempted request to storage.
"FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST": lambda: bool(
int(os.getenv("FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST", "1"))
),
}


Expand Down
Loading