-
Notifications
You must be signed in to change notification settings - Fork 735
[PD Disaggregation] Write the cache of preempted req to storage and refine PD Disaggregation #7107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -462,12 +462,12 @@ def update_cache_config(self, cache_config): | |
| main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks) | ||
| main_process_metrics.available_gpu_resource.set(1.0) | ||
|
|
||
| def can_allocate_gpu_blocks(self, num_blocks: int): | ||
| def can_allocate_gpu_blocks(self, num_blocks: int, try_free_gpu_blocks: bool = True): | ||
| """ | ||
| Check if num_blocks gpu blocks can be allocated. | ||
| """ | ||
| if len(self.gpu_free_block_list) < num_blocks: | ||
| if self.cache_config.enable_prefix_caching: | ||
| if self.cache_config.enable_prefix_caching and try_free_gpu_blocks: | ||
| self.free_block_ids(num_blocks) | ||
| if len(self.gpu_free_block_list) < num_blocks: | ||
| return False | ||
|
|
@@ -814,7 +814,7 @@ def request_match_blocks(self, task: Request, block_size, *args): | |
| # 2. prepare cpu cache: allocate gpu cache for matched cpu blocks, wait for data transfer to complete | ||
| gpu_recv_block_ids = [] | ||
| match_cpu_blocks_num = len(match_cpu_block_ids) | ||
| if self.can_allocate_gpu_blocks(num_blocks=match_cpu_blocks_num): | ||
| if self.can_allocate_gpu_blocks(num_blocks=match_cpu_blocks_num, try_free_gpu_blocks=False): | ||
| if match_cpu_blocks_num > 0: | ||
| logger.debug( | ||
| f"request_match_blocks: req_id {req_id}, allocate {match_cpu_blocks_num} block to receive cpu cache" | ||
|
|
@@ -845,7 +845,7 @@ def request_match_blocks(self, task: Request, block_size, *args): | |
| match_storage_block_ids = [] | ||
|
|
||
| if self.kvcache_storage_backend and no_match_token_num >= block_size: | ||
| if not self.can_allocate_gpu_blocks(num_blocks=no_match_block_num): | ||
| if not self.can_allocate_gpu_blocks(num_blocks=no_match_block_num, try_free_gpu_blocks=False): | ||
| raise Exception( | ||
| "request_match_blocks: Not enough GPU memory to allocate cache for matched Storage Cache" | ||
| ) | ||
|
|
@@ -881,7 +881,7 @@ def request_match_blocks(self, task: Request, block_size, *args): | |
| read_storage_task = ReadStorageTask( | ||
| task_id=req_id, | ||
| keys=no_match_block_keys, | ||
| token_ids=input_token_ids, | ||
| token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None, | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 减少跨进程传输的数据量
juncaipeng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| gpu_block_ids=gpu_recv_storage_block_ids, | ||
| start_read_block_idx=match_token_num // block_size, | ||
| ) | ||
|
|
@@ -1141,8 +1141,11 @@ def write_cache_to_storage(self, request: Request): | |
| token_ids = request.prompt_token_ids | ||
| if isinstance(token_ids, np.ndarray): | ||
| token_ids = token_ids.tolist() | ||
|
|
||
| if self.config.cache_config.enable_output_caching: | ||
| token_ids += request.output_token_ids | ||
| input_token_ids = token_ids + request.output_token_ids | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 修复修改request中的prompt_token_ids的bug |
||
| else: | ||
| input_token_ids = token_ids | ||
|
|
||
| req_id = request.request_id | ||
| keys = [] | ||
|
|
@@ -1159,7 +1162,7 @@ def write_cache_to_storage(self, request: Request): | |
| write_storage_task = WriteStorageTask( | ||
| task_id=req_id, | ||
| keys=keys, | ||
| token_ids=token_ids, | ||
| token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None, | ||
juncaipeng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| gpu_block_ids=gpu_block_ids, | ||
| ) | ||
| logger.debug(f"issue write storage task: {write_storage_task}") | ||
|
|
@@ -1193,16 +1196,18 @@ def write_cache_to_storage_decode(self, request: Request): | |
| token_ids = list(token_ids) | ||
|
|
||
| if self.config.cache_config.enable_output_caching: | ||
| token_ids = token_ids + request.output_token_ids | ||
| input_token_ids = token_ids + request.output_token_ids | ||
| else: | ||
| input_token_ids = token_ids | ||
|
|
||
| # 2. Calculate cache keys using chained hash (consistent with P instance) | ||
| keys = [] | ||
| prefix_block_key = [] # Initial is empty list | ||
| block_size = self.config.cache_config.block_size | ||
| mm_idx = 0 # Multimodal index for tracking position in mm_inputs | ||
|
|
||
| for i in range(0, len(token_ids), block_size): | ||
| block_token_ids = token_ids[i : i + block_size] | ||
| for i in range(0, len(input_token_ids), block_size): | ||
| block_token_ids = input_token_ids[i : i + block_size] | ||
| if len(block_token_ids) < block_size: | ||
| break # Do not cache incomplete block | ||
|
|
||
|
|
@@ -1236,7 +1241,7 @@ def write_cache_to_storage_decode(self, request: Request): | |
| write_storage_task = WriteStorageTask( | ||
| task_id=req_id, | ||
| keys=keys, | ||
| token_ids=token_ids, | ||
| token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None, | ||
juncaipeng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| gpu_block_ids=gpu_block_ids, | ||
| ) | ||
|
|
||
|
|
@@ -2166,7 +2171,7 @@ def recv_data_transfer_result(self): | |
| event_type = data[0] | ||
|
|
||
| if event_type.value == CacheStatus.STORAGE2GPU.value: | ||
| logger.info(f"recv_data_transfer_result: {data}") | ||
| logger.debug(f"recv_data_transfer_result: {data}") | ||
| task_id, hash_keys, block_ids = data[1:] | ||
| if task_id not in self.storage_prefetch_block_ids: | ||
| self.storage_prefetch_block_ids[task_id] = [] | ||
|
|
@@ -2177,7 +2182,7 @@ def recv_data_transfer_result(self): | |
| if task_id in self.task_prefetch_event: | ||
| self.task_prefetch_event[task_id].set() | ||
| elif event_type.value == CacheStatus.GPU2STORAGE.value: | ||
| logger.info(f"recv_data_transfer_result: {data}") | ||
| logger.debug(f"recv_data_transfer_result: {data}") | ||
| task_id, hash_keys, block_ids = data[1:] | ||
| if task_id in self.task_write_back_event: | ||
| self.task_write_back_event[task_id].set() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -366,9 +366,15 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re | |
| del self.requests[preempted_req.request_id] | ||
| if preempted_req.request_id in self.req_dict: | ||
| del self.req_dict[preempted_req.request_id] | ||
| if envs.FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST: | ||
| if self.config.cache_config.kvcache_storage_backend: | ||
| self.cache_manager.write_cache_to_storage_decode(preempted_req) | ||
| self._free_blocks(preempted_req) | ||
|
Comment on lines
+369
to
372
|
||
| llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}") | ||
| else: | ||
| if envs.FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST: | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 重调度请求的cache写出到storage |
||
| if self.config.cache_config.kvcache_storage_backend: | ||
| self.cache_manager.write_cache_to_storage(preempted_req) | ||
juncaipeng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._free_blocks(preempted_req) | ||
juncaipeng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| preempted_req.num_cached_blocks = 0 | ||
| self.to_be_rescheduled_request_id_set.add(preempted_req.request_id) | ||
|
|
@@ -398,7 +404,7 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re | |
| self.can_relax_prefill_strategy = False | ||
| return can_schedule | ||
|
|
||
| def _get_can_schedule_prefill_threshold_block(self, request, num_chunk_new_block): | ||
| def _get_can_schedule_prefill_threshold_block(self, num_chunk_new_block): | ||
| if self.can_relax_prefill_strategy: | ||
| can_schedule_block_num_threshold = num_chunk_new_block | ||
| else: | ||
|
|
@@ -986,7 +992,7 @@ def _allocate_decode_and_extend(): | |
| continue | ||
| num_new_block = self.get_new_block_nums(request, num_new_tokens) | ||
| can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block( | ||
| request, num_new_block | ||
| num_new_block | ||
| ) | ||
| # Allocate blocks to prefill | ||
| if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold): | ||
|
|
@@ -1051,7 +1057,7 @@ def _allocate_decode_and_extend(): | |
| continue | ||
| num_new_block = self.get_new_block_nums(request, num_new_tokens) | ||
| can_schedule_block_num_threshold = self._get_can_schedule_prefill_threshold_block( | ||
| request, num_new_block | ||
| num_new_block | ||
| ) | ||
| # Allocate blocks to prefill | ||
| if self.cache_manager.can_allocate_gpu_blocks(can_schedule_block_num_threshold): | ||
|
|
@@ -1388,7 +1394,8 @@ def preallocate_resource_in_d(self, request: Request): | |
| return False | ||
| if self.available_batch() == 0: | ||
| return False | ||
| if not self.cache_manager.can_allocate_gpu_blocks(need_prealloc_prefill_blocks): | ||
| total_need_blocks = self._get_can_schedule_prefill_threshold_block(need_prealloc_prefill_blocks) | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. p实例的请求向d申请block,d实例考虑给running的请求预留block ids |
||
| if not self.cache_manager.can_allocate_gpu_blocks(total_need_blocks): | ||
| return False | ||
|
|
||
| request.block_tables = self.cache_manager.allocate_gpu_blocks( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
避免可能的死锁卡住