Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,17 +1249,20 @@ def issue_write_back_storage_task(self, task: WriteStorageTask, is_sync=True):
if self.kvcache_storage_backend is None:
return

if len(task.keys) != len(task.gpu_block_ids):
err_msg = (
f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})"
)
logger.error(err_msg)
raise ValueError(err_msg)

self.task_write_back_event[task.task_id] = Event()
self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task))
if is_sync:
self.wait_write_storage_task(task.task_id)
assert is_sync, "Only support is_sync=True for now."
self._acquire_kvcache_lock()
try:
if len(task.keys) != len(task.gpu_block_ids):
err_msg = f"write_back_storage error: hash_keys({len(task.keys)}) != gpu_block_ids({len(task.gpu_block_ids)})"
logger.error(err_msg)
raise ValueError(err_msg)

self.task_write_back_event[task.task_id] = Event()
self.cache_task_queue.put_transfer_task((CacheStatus.GPU2STORAGE, task))
if is_sync:
self.wait_write_storage_task(task.task_id)
finally:
self._release_kvcache_lock()

def wait_write_storage_task(self, req_id):
"""
Expand All @@ -1276,12 +1279,18 @@ def issue_prefetch_storage_task(self, task: ReadStorageTask, is_sync=True):
if self.kvcache_storage_backend is None:
return []

storage_block_ids = []
self.task_prefetch_event[task.task_id] = Event()
# issue task to cache_transfer_manager
self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task))
if is_sync:
storage_block_ids = self.wait_prefetch_storage_task(task.task_id)
assert is_sync, "Only support is_sync=True for now."
self._acquire_kvcache_lock()

try:
storage_block_ids = []
self.task_prefetch_event[task.task_id] = Event()
# issue task to cache_transfer_manager
self.cache_task_queue.put_transfer_task((CacheStatus.STORAGE2GPU, task))
if is_sync:
storage_block_ids = self.wait_prefetch_storage_task(task.task_id)
finally:
self._release_kvcache_lock()
return storage_block_ids

def wait_prefetch_storage_task(self, req_id):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,12 @@ def __init__(self, tp_rank=None):

def warmup(self):
warmup_key = "fastdeploy_mooncake_store_warmup_key" + str(uuid.uuid4())
warmup_value = bytes(1 * 1024 * 1024) # 1 MB
warmup_value = bytes(4 * 1024) # 4 kb
rc = self.store.put(warmup_key, warmup_value)
assert rc == 0, f"Failed to put warmup key, key:{warmup_key}, error code: {rc}"
rc = self.store.is_exist(warmup_key)
assert rc == 1, f"Failed to check existence, key:{warmup_key}, error code: {rc}"
self.store.get(warmup_key)
self.store.remove(warmup_key)

def register_buffer(self, buffer_ptr, buffer_size) -> None:
try:
Expand Down
Loading