Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
764f0dd
[Feature][KVCache] Support cache manager v1 architecture
kevincheng2 Mar 19, 2026
9cf02f2
Update cache manager and related modules
kevincheng2 Mar 23, 2026
991881c
chore: update cache_manager and related modules
kevincheng2 Mar 24, 2026
48fc4af
fix: add node to evictable set in complete_swap_to_device
kevincheng2 Mar 24, 2026
1c644f6
feat: update cache manager v1 and related modules
kevincheng2 Mar 25, 2026
201e862
feat(cache): add cache controller v1 implementation
kevincheng2 Mar 26, 2026
37f0a20
feat(cache_manager): update cache manager v1
kevincheng2 Mar 27, 2026
2979ee9
fix(cache_manager): 修复 swap_cache H2D/D2H 方向的 block_ids 逻辑并清理 Forward…
kevincheng2 Mar 27, 2026
2dd64b1
feat(cache_manager): refactor cache manager v1 and optimize swap ops
kevincheng2 Mar 30, 2026
e00f86d
[KVCache][MTP] 支持 cache_manager_v1 下的 MTP KV Cache 初始化及多模态 hash
kevincheng2 Mar 30, 2026
6089180
fix(cache_manager): multi-GPU fix, mm hash boundary fix, and remove b…
kevincheng2 Mar 31, 2026
55a798c
[BugFix][KVCache] fix List import and move write_policy normalization…
kevincheng2 Mar 31, 2026
e328e92
[BugFix][KVCache] fix pre-commit code style issues
kevincheng2 Mar 31, 2026
228f1ee
[Feature][KVCache] update cache_manager_v1 modules
kevincheng2 Mar 31, 2026
ed3b33b
[Feature][KVCache] add BatchRequest.from_tasks and refactor worker ta…
kevincheng2 Mar 31, 2026
4d7a745
[Feature][KVCache] add NUMA affinity for host cache and skip swap cac…
kevincheng2 Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion custom_ops/gpu_ops/swap_cache_batch.cu
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ void SwapCacheAllLayers(
const std::vector<int64_t>& swap_block_ids_cpu,
int rank,
int mode) {
checkCudaErrors(cudaSetDevice(rank)); // used for distributed launch
assert(cache_gpu_tensors.size() > 0 &&
cache_gpu_tensors.size() == cache_cpu_ptrs.size());
switch (cache_gpu_tensors[0].dtype()) {
Expand Down
400 changes: 400 additions & 0 deletions custom_ops/gpu_ops/swap_cache_optimized.cu

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions custom_ops/setup_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ def find_end_files(directory, end_str):
"gpu_ops/swap_cache_batch.cu",
"gpu_ops/swap_cache.cu",
"gpu_ops/swap_cache_layout.cu",
"gpu_ops/swap_cache_optimized.cu", # 新增:优化的 KV cache 换入算子
"gpu_ops/step_system_cache.cu",
"gpu_ops/cpp_extensions.cc",
"gpu_ops/share_external_data.cu",
Expand Down
22 changes: 22 additions & 0 deletions fastdeploy/cache_manager/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

try:
if current_platform.is_cuda():
from fastdeploy.model_executor.ops.gpu import (
swap_cache_per_layer, # 单层 KV cache 换入算子(同步)
)
from fastdeploy.model_executor.ops.gpu import (
swap_cache_per_layer_async, # 单层 KV cache 换入算子(异步,无强制 sync)
)
from fastdeploy.model_executor.ops.gpu import (
cuda_host_alloc,
cuda_host_free,
Expand All @@ -43,6 +49,12 @@ def get_peer_mem_addr(*args, **kwargs):
raise RuntimeError("CUDA no need of get_peer_mem_addr!")

elif current_platform.is_maca():
from fastdeploy.model_executor.ops.gpu import (
swap_cache_per_layer, # 单层 KV cache 换入算子(同步)
)
from fastdeploy.model_executor.ops.gpu import (
swap_cache_per_layer_async, # 单层 KV cache 换入算子(异步,无强制 sync)
)
from fastdeploy.model_executor.ops.gpu import ( # get_output_kv_signal,; ipc_sent_key_value_cache_by_remote_ptr_block_sync,
cuda_host_alloc,
cuda_host_free,
Expand Down Expand Up @@ -89,6 +101,12 @@ def ipc_sent_key_value_cache_by_remote_ptr(*args, **kwargs):
def ipc_sent_key_value_cache_by_remote_ptr_block_sync(*args, **kwargs):
raise RuntimeError("XPU No ipc_sent_key_value_cache_by_remote_ptr UNIMPLENENTED")

def swap_cache_per_layer(*args, **kwargs): # 单层 KV cache 换入算子(同步)
raise RuntimeError("XPU swap_cache_per_layer UNIMPLENENTED")

def swap_cache_per_layer_async(*args, **kwargs): # 单层 KV cache 换入算子(异步)
raise RuntimeError("XPU swap_cache_per_layer_async UNIMPLENENTED")

else:
raise RuntimeError("Prefix cache ops only supported CUDA nor XPU platform ")

Expand Down Expand Up @@ -128,6 +146,8 @@ def get_all_visible_devices():
set_data_ipc = None
share_external_data_ = None
swap_cache_all_layers = None
swap_cache_per_layer = None # 单层 KV cache 换入算子(同步)
swap_cache_per_layer_async = None # 单层 KV cache 换入算子(异步)
unset_data_ipc = None
set_device = None
memory_allocated = None
Expand All @@ -146,6 +166,8 @@ def get_all_visible_devices():
"set_data_ipc",
"share_external_data_",
"swap_cache_all_layers",
"swap_cache_per_layer", # 单层 KV cache 换入算子(同步)
"swap_cache_per_layer_async", # 单层 KV cache 换入算子(异步,无强制 sync)
"unset_data_ipc", # XPU是 None
"set_device",
"memory_allocated",
Expand Down
71 changes: 71 additions & 0 deletions fastdeploy/cache_manager/v1/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""

from .base import KVCacheBase
from .cache_controller import CacheController
from .cache_manager import CacheManager
from .cache_utils import LayerDoneCounter, LayerSwapTimeoutError
from .metadata import (
AsyncTaskHandler,
BlockNode,
CacheBlockMetadata,
CacheStatus,
MatchResult,
PDTransferMetadata,
StorageConfig,
StorageMetadata,
StorageType,
TransferConfig,
TransferResult,
TransferStatus,
TransferTask,
TransferType,
)
from .storage import create_storage_connector, create_storage_scheduler
from .transfer import create_transfer_connector
from .transfer_manager import CacheTransferManager

__all__ = [
# Base classes
"KVCacheBase",
# Managers
"CacheManager",
"CacheController",
"CacheTransferManager",
# Exceptions
"LayerSwapTimeoutError",
# Utils
"LayerDoneCounter",
# Metadata
"CacheBlockMetadata",
"BlockNode",
"CacheStatus",
"TransferTask",
"TransferStatus",
"TransferConfig",
"TransferResult",
"AsyncTaskHandler",
"MatchResult",
"StorageMetadata",
"PDTransferMetadata",
"StorageConfig",
"StorageType",
"TransferType",
# Factory functions
"create_storage_scheduler",
"create_storage_connector",
"create_transfer_connector",
]
80 changes: 80 additions & 0 deletions fastdeploy/cache_manager/v1/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from fastdeploy.config import FDConfig


class KVCacheBase(ABC):
"""
Abstract base class for KV cache management.

This class defines the common interface for cache management operations.
Subclasses (CacheManager and CacheController) implement specific behaviors
based on their roles in the system.

CacheManager (Scheduler process):
- Manages DeviceBlockPool and HostBlockPool
- Handles block allocation and release
- Coordinates storage operations via StorageScheduler

CacheController (Worker process):
- Manages cache transfer operations
- Handles layer-by-layer transfer synchronization
- Coordinates cross-node transfer via TransferConnector
"""

def __init__(self, config: "FDConfig"):
"""
Initialize the KV cache base.

Args:
config: FDConfig instance containing all fastdeploy configuration
"""
self.config = config

# Extract configuration from FDConfig
self.model_config = config.model_config
self.cache_config = config.cache_config
self.quant_config = config.quant_config
self.parallel_config = config.parallel_config

self._initialized = False

@abstractmethod
def reset_cache(self) -> bool:
"""
Reset the cache state.

This method should be implemented by subclasses to reset their
specific cache state (e.g., clear block pools, reset transfer state).

Returns:
True if reset was successful, False otherwise
"""
pass

def is_initialized(self) -> bool:
"""
Check if the cache has been initialized.

Returns:
True if initialized, False otherwise
"""
return self._initialized
Loading
Loading