diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index dd64d7fb712..db559ea4c95 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -1249,17 +1249,20 @@ def issue_write_back_storage_task(self, task: WriteStorageTask, is_sync=True): if self.kvcache_storage_backend is None: return - if len(task.keys) != len(task.gpu_block_ids): - err_msg = ( - f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})" - ) - logger.error(err_msg) - raise ValueError(err_msg) - - self.task_write_back_event[task.task_id] = Event() - self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task)) - if is_sync: - self.wait_write_storage_task(task.task_id) + assert is_sync, "Only support is_sync=True for now." + self._acquire_kvcache_lock() + try: + if len(task.keys) != len(task.gpu_block_ids): + err_msg = f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})" + logger.error(err_msg) + raise ValueError(err_msg) + + self.task_write_back_event[task.task_id] = Event() + self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task)) + if is_sync: + self.wait_write_storage_task(task.task_id) + finally: + self._release_kvcache_lock() def wait_write_storage_task(self, req_id): """ @@ -1276,12 +1279,18 @@ def issue_prefetch_storage_task(self, task: ReadStorageTask, is_sync=True): if self.kvcache_storage_backend is None: return [] - storage_block_ids = [] - self.task_prefetch_event[task.task_id] = Event() - # issue task to cache_transfer_manager - self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task)) - if is_sync: - storage_block_ids = self.wait_prefetch_storage_task(task.task_id) + assert is_sync, "Only support is_sync=True for now." + self._acquire_kvcache_lock() + + try: + storage_block_ids = [] + self.task_prefetch_event[task.task_id] = Event() + # issue task to cache_transfer_manager + self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task)) + if is_sync: + storage_block_ids = self.wait_prefetch_storage_task(task.task_id) + finally: + self._release_kvcache_lock() return storage_block_ids def wait_prefetch_storage_task(self, req_id): diff --git a/fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py b/fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py index 13273b11e5a..becbff9b97e 100644 --- a/fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py +++ b/fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py @@ -150,13 +150,12 @@ def __init__(self, tp_rank=None): def warmup(self): warmup_key = "fastdeploy_mooncake_store_warmup_key" + str(uuid.uuid4()) - warmup_value = bytes(1 * 1024 * 1024) # 1 MB + warmup_value = bytes(4 * 1024) # 4 kb rc = self.store.put(warmup_key, warmup_value) assert rc == 0, f"Failed to put warmup key, key:{warmup_key}, error code: {rc}" rc = self.store.is_exist(warmup_key) assert rc == 1, f"Failed to check existence, key:{warmup_key}, error code: {rc}" self.store.get(warmup_key) - self.store.remove(warmup_key) def register_buffer(self, buffer_ptr, buffer_size) -> None: try: