Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion fastdeploy/cache_manager/cache_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,35 @@
# limitations under the License.
"""

from dataclasses import dataclass
from enum import Enum
from typing import Any, Optional

from fastdeploy.utils import get_logger

logger = get_logger("prefix_cache_manager", "cache_manager.log")


@dataclass
class AuxBlockDataSpec:
"""
Describes a type of auxiliary data bound to KVCache blocks.
CacheTransferManager iterates registered specs during swap/storage
to perform corresponding data transfers.
"""

name: str
num_layers: int
per_token_size: int = 0
block_size: int = 0
dtype: str = "uint8"
swap_buffer: Optional[Any] = None
enabled: bool = True

def get_storage_key(self, key_prefix: str, block_hash: str, rank: int) -> str:
return f"prefix{key_prefix}_{block_hash}_{rank}_{self.name}"


class CacheStatus(Enum):
"""
cache status enum class
Expand Down Expand Up @@ -56,6 +78,7 @@ def __init__(
cache_status=CacheStatus.GPU,
is_persistent=False,
persistent_shared_count=0,
aux_data_names=None,
):
"""
Args:
Expand Down Expand Up @@ -89,6 +112,7 @@ def __init__(
self.cache_status = cache_status
self.is_persistent = is_persistent
self.persistent_shared_count = persistent_shared_count
self.aux_data_names = aux_data_names or []
self.req_id_set = set()

def __lt__(self, other):
Expand All @@ -102,7 +126,7 @@ def __lt__(self, other):
else:
return self.depth > other.depth

def __str__(self):
def __str__(self) -> str:
"""
return node info
"""
Expand Down
186 changes: 186 additions & 0 deletions fastdeploy/cache_manager/cache_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,15 @@ def parse_args():
)
parser.add_argument("--model_path", type=str, help="The path of model")

# Routing replay (R3) arguments
parser.add_argument("--enable_routing_replay", type=int, default=0, help="Enable routing replay")
parser.add_argument("--routing_num_moe_layers", type=int, default=0, help="Number of MoE layers for routing")
parser.add_argument("--routing_moe_top_k", type=int, default=0, help="MoE top_k for routing")
parser.add_argument("--routing_dtype", type=str, default="uint8", help="Routing data dtype")

args = parser.parse_args()
# Convert int flag to bool
args.enable_routing_replay = bool(args.enable_routing_replay)
return args


Expand Down Expand Up @@ -241,6 +249,13 @@ def __init__(self, args):
self._init_cpu_cache()
if self.storage_backend_type is not None:
self._init_storage(args)

# Initialize auxiliary data specs (e.g., routing replay)
self.aux_data_specs = {}
self.routing_host_view = None
self.routing_swap_buffer = None
self._init_routing_aux_data(args)

self._init_control()

cache_task_broadcast_data = np.zeros(shape=[1], dtype=np.int32)
Expand Down Expand Up @@ -307,6 +322,162 @@ def __init__(self, args):
)
self.cache_transfer_inited_signal.value[self.rank] = 1

def _init_routing_aux_data(self, args):
"""Initialize routing auxiliary data buffers for swap sync."""
enable_routing_replay = getattr(args, "enable_routing_replay", False)
if not enable_routing_replay:
return

try:
from fastdeploy.cache_manager.cache_data import AuxBlockDataSpec
from fastdeploy.model_executor.layers.moe.routing_indices_cache import (
RoutingHostBufferView,
RoutingSwapBuffer,
)

num_moe_layers = getattr(args, "routing_num_moe_layers", 0)
moe_top_k = getattr(args, "routing_moe_top_k", 0)
routing_dtype = getattr(args, "routing_dtype", "uint8")

if num_moe_layers == 0 or moe_top_k == 0:
return

spec = AuxBlockDataSpec(
name="routing",
num_layers=num_moe_layers,
per_token_size=moe_top_k,
block_size=self.block_size,
dtype=routing_dtype,
)

# Create routing swap buffer (for CPU blocks)
if self.num_cpu_blocks > 0:
dp_suffix = str(getattr(args, "engine_worker_queue_port", ""))
self.routing_swap_buffer = RoutingSwapBuffer(
num_cpu_blocks=self.num_cpu_blocks,
block_size=self.block_size,
num_moe_layers=num_moe_layers,
top_k=moe_top_k,
dtype=routing_dtype,
dp_suffix=dp_suffix,
)
spec.swap_buffer = self.routing_swap_buffer

# Attach to routing host buffer (SharedMemory created by Engine)
dp_suffix = str(getattr(args, "engine_worker_queue_port", ""))
shm_name = f"routing_host_buffer.{dp_suffix}"
max_num_kv_tokens = self.num_gpu_blocks * self.block_size
shape = (max_num_kv_tokens, num_moe_layers, moe_top_k)
try:
self.routing_host_view = RoutingHostBufferView(shape=shape, dtype=routing_dtype, shm_name=shm_name)
logger.info(f"[R3] CTM attached to RoutingHostBuffer: {shm_name}")
except FileNotFoundError:
logger.warning(f"[R3] CTM RoutingHostBuffer {shm_name} not found")

self.aux_data_specs["routing"] = spec
logger.info(f"[R3] CTM registered routing aux data: layers={num_moe_layers}, top_k={moe_top_k}")

except Exception as e:
logger.warning(f"[R3] CTM failed to init routing aux data: {e}")

def _swap_routing(self, gpu_block_ids, cpu_block_ids, direction):
"""
Swap routing data between routing_host_buffer and routing_swap_buffer.
Pure CPU-to-CPU numpy memcpy, no GPU DMA.
"""
if self.routing_host_view is None or self.routing_swap_buffer is None:
return
bs = self.block_size
for gpu_bid, cpu_bid in zip(gpu_block_ids, cpu_block_ids):
gpu_start = gpu_bid * bs
gpu_end = gpu_start + bs
cpu_start = cpu_bid * bs
cpu_end = cpu_start + bs
if direction == "to_cpu":
self.routing_swap_buffer.buffer[cpu_start:cpu_end] = self.routing_host_view.buffer[gpu_start:gpu_end]
else: # to_gpu
self.routing_host_view.buffer[gpu_start:gpu_end] = self.routing_swap_buffer.buffer[cpu_start:cpu_end]

def _write_routing_to_storage(self, task_keys, gpu_block_ids):
"""
Write routing data from routing_host_buffer to storage backend.
Only for mooncake/file backends; only tp_rank=0 writes routing.
"""
if self.routing_host_view is None or self.rank != 0:
return
if self.storage_backend_type not in ("mooncake", "file"):
return

try:
spec = self.aux_data_specs.get("routing")
if spec is None or not spec.enabled:
return

bs = self.block_size
routing_keys = []
routing_ptrs = []
routing_sizes = []
per_block_bytes = bs * spec.num_layers * spec.per_token_size * np.dtype(spec.dtype).itemsize

for block_hash, gpu_bid in zip(task_keys, gpu_block_ids):
key = spec.get_storage_key(self.key_prefix, block_hash, self.rank)
start = gpu_bid * bs
end = start + bs
block_data = self.routing_host_view.buffer[start:end]
if not block_data.flags["C_CONTIGUOUS"]:
block_data = np.ascontiguousarray(block_data)
routing_keys.append(key)
routing_ptrs.append(block_data.ctypes.data)
routing_sizes.append(per_block_bytes)

if routing_keys:
self.storage_backend.batch_set(
keys=routing_keys, target_locations=routing_ptrs, target_sizes=routing_sizes
)
logger.debug(f"[R3] Wrote {len(routing_keys)} routing blocks to storage")
except Exception as e:
logger.warning(f"[R3] Failed to write routing to storage: {e}")

def _read_routing_from_storage(self, task_keys, gpu_block_ids):
"""
Read routing data from storage backend into routing_host_buffer.
Only for mooncake/file backends; only tp_rank=0 reads routing.
"""
if self.routing_host_view is None or self.rank != 0:
return
if self.storage_backend_type not in ("mooncake", "file"):
return

try:
spec = self.aux_data_specs.get("routing")
if spec is None or not spec.enabled:
return

bs = self.block_size
per_block_bytes = bs * spec.num_layers * spec.per_token_size * np.dtype(spec.dtype).itemsize

for block_hash, gpu_bid in zip(task_keys, gpu_block_ids):
key = spec.get_storage_key(self.key_prefix, block_hash, self.rank)
start = gpu_bid * bs
end = start + bs
target_slice = self.routing_host_view.buffer[start:end]
if not target_slice.flags["C_CONTIGUOUS"]:
# Need contiguous target for ctypes pointer
tmp = np.ascontiguousarray(target_slice)
result = self.storage_backend.get(
key=key, target_location=tmp.ctypes.data, target_size=per_block_bytes
)
if result is not None and result >= 0:
self.routing_host_view.buffer[start:end] = tmp
else:
self.storage_backend.get(
key=key, target_location=target_slice.ctypes.data, target_size=per_block_bytes
)

logger.debug(f"[R3] Read {len(task_keys)} routing blocks from storage")
except Exception as e:
logger.warning(f"[R3] Failed to read routing from storage: {e}")

def _init_control(self):
dp_rank = self.local_data_parallel_id
tp_rank = self.rank
Expand Down Expand Up @@ -809,6 +980,9 @@ def read_storage_task(self, task: ReadStorageTask):
logger.info(
f"Successfully read {len(valid_gpu_block_ids)} blocks from cache storage for task {task.task_id}"
)
# Read routing data from storage for matched blocks
matched_keys = task.keys[: len(valid_gpu_block_ids)]
self._read_routing_from_storage(matched_keys, valid_gpu_block_ids)
except Exception as e:
logger.error(
f"Failed to read cache for task {task.task_id}, error: {e}, traceback: {traceback.format_exc()}"
Expand Down Expand Up @@ -1000,6 +1174,9 @@ def write_back_storage_task(self, task: WriteStorageTask):
logger.info(
f"Successfully wrote {write_block_num} blocks to cache storage for task {task.task_id}"
)
# Write routing data to storage (shares dedup with KVCache)
remaining_keys = task.keys[match_block_num:]
self._write_routing_to_storage(remaining_keys, gpu_block_ids)
except Exception as e:
logger.error(f"Error in write back storage task: {e}, traceback:{traceback.format_exc()}")
gpu_block_ids = []
Expand Down Expand Up @@ -1375,6 +1552,10 @@ def _transfer_data(
0,
)

# Routing: routing_host_buffer → routing_swap_buffer
if "routing" in self.aux_data_specs:
self._swap_routing(gpu_block_ids, cpu_block_ids, "to_cpu")

elif event_type.value == CacheStatus.SWAP2GPU.value:
swap_cache_all_layers(
self.gpu_cache_k_tensors,
Expand Down Expand Up @@ -1413,6 +1594,11 @@ def _transfer_data(
self.device,
1,
)

# Routing: routing_swap_buffer → routing_host_buffer
if "routing" in self.aux_data_specs:
self._swap_routing(gpu_block_ids, cpu_block_ids, "to_gpu")

else:
logger.warning(
f"transfer data: Get unexpected event type {event_type}, only SWAP2CPU and SWAP2GPU supported"
Expand Down
20 changes: 20 additions & 0 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,25 @@ def launch_cache_manager(
else:
storage_arg_str = " "

# Compute routing replay args for CTM
routing_arg_str = ""
routing_replay_config = getattr(self.config, "routing_replay_config", None)
if routing_replay_config is not None and routing_replay_config.enable_routing_replay:
model_config = self.config.model_config
num_moe_layers = model_config.num_hidden_layers - model_config.moe_layer_start_index
if model_config.architectures[0] == "Glm4MoeForCausalLM":
moe_top_k = model_config.num_experts_per_tok
else:
moe_top_k = model_config.moe_k
num_experts = model_config.moe_num_experts + model_config.moe_num_shared_experts
routing_dtype = "uint8" if num_experts + 1 <= 255 else ("uint16" if num_experts + 1 <= 65535 else "uint32")
routing_arg_str = (
f" --enable_routing_replay 1"
f" --routing_num_moe_layers {num_moe_layers}"
f" --routing_moe_top_k {moe_top_k}"
f" --routing_dtype {routing_dtype}"
)

if self.cache_config.num_cpu_blocks > 0 or self.cache_config.kvcache_storage_backend:
for i in range(tensor_parallel_size):
launch_cmd = (
Expand Down Expand Up @@ -324,6 +343,7 @@ def launch_cache_manager(
+ f" --write_policy {cache_config.write_policy}"
+ f" --max_model_len {self.config.model_config.max_model_len}"
+ f" --model_path {self.config.model_config.model}"
+ routing_arg_str
+ f" >{log_dir}/launch_cache_transfer_manager_{int(device_ids[i])}.log 2>&1"
)
logger.info(f"Launch cache transfer manager, command:{launch_cmd}")
Expand Down
41 changes: 41 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2274,10 +2274,51 @@ def _stop_profile(self):
num_gpu_blocks = self.get_profile_block_num_signal.value[0]
self.cfg.cache_config.reset(num_gpu_blocks)
self.resource_manager.reset_cache_config(self.cfg.cache_config)

# Create RoutingHostBuffer (SharedMemory) after num_gpu_blocks is known
if self.cfg.routing_replay_config.enable_routing_replay:
self._init_routing_host_buffer(num_gpu_blocks)

if self.cfg.cache_config.enable_prefix_caching or self.cfg.scheduler_config.splitwise_role != "mixed":
device_ids = self.cfg.parallel_config.device_ids.split(",")
self.cache_manager_processes = self.start_cache_service(device_ids, self.ipc_signal_suffix)

def _init_routing_host_buffer(self, num_gpu_blocks: int):
"""Create RoutingHostBuffer SharedMemory after profiling determines num_gpu_blocks."""
from fastdeploy.model_executor.layers.moe.routing_indices_cache import (
RoutingHostBuffer,
RoutingHostBufferView,
)

model_config = self.cfg.model_config
num_moe_layers = model_config.num_hidden_layers - model_config.moe_layer_start_index
if model_config.architectures[0] == "Glm4MoeForCausalLM":
moe_top_k = model_config.num_experts_per_tok
else:
moe_top_k = model_config.moe_k

num_experts = model_config.moe_num_experts + model_config.moe_num_shared_experts
dtype = "uint8" if num_experts + 1 <= 255 else ("uint16" if num_experts + 1 <= 65535 else "uint32")

dp_suffix = str(self.cfg.parallel_config.local_engine_worker_queue_port)
self.routing_host_buffer = RoutingHostBuffer(
num_gpu_blocks=num_gpu_blocks,
block_size=self.cfg.cache_config.block_size,
num_moe_layers=num_moe_layers,
top_k=moe_top_k,
dtype=dtype,
dp_suffix=dp_suffix,
)

# Set routing_host_view on resource_manager for PD disaggregation (D side)
if hasattr(self, "resource_manager") and hasattr(self.resource_manager, "routing_host_view"):
shm_name = f"routing_host_buffer.{dp_suffix}"
max_num_kv_tokens = num_gpu_blocks * self.cfg.cache_config.block_size
shape = (max_num_kv_tokens, num_moe_layers, moe_top_k)
self.resource_manager.routing_host_view = RoutingHostBufferView(
shape=shape, dtype=dtype, shm_name=shm_name
)

def check_health(self, time_interval_threashold=30):
"""
Check the health of the model server by checking whether all workers are alive.
Expand Down
5 changes: 5 additions & 0 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,11 @@ def _stop_profile(self):
num_gpu_blocks = self.get_profile_block_num_signal.value[0]
self.cfg.cache_config.reset(num_gpu_blocks)
self.engine.resource_manager.reset_cache_config(self.cfg.cache_config)

# Create RoutingHostBuffer (SharedMemory) before starting cache service
if self.cfg.routing_replay_config.enable_routing_replay:
self.engine._init_routing_host_buffer(num_gpu_blocks)

if self.cfg.cache_config.enable_prefix_caching or self.cfg.scheduler_config.splitwise_role != "mixed":
if not current_platform.is_intel_hpu():
device_ids = self.cfg.parallel_config.device_ids.split(",")
Expand Down
Loading
Loading