Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion custom_ops/xpu_ops/test/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ addopts =
--ignore=test_set_data_ipc.py
--ignore=test_read_data_ipc.py
--ignore=test_set_get_data_ipc.py
--ignore=test_draft_model_preprocess.py
--ignore=test_draft_model_preprocess.py
8 changes: 4 additions & 4 deletions fastdeploy/cache_manager/cache_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ def _update_history_hit_metrics(self):
self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num
self.storage_hit_token_ratio = self.total_storage_matched_token_num / self.total_token_num

main_process_metrics.hit_req_rate.set(self.hit_req_ratio)
main_process_metrics.hit_token_rate.set(self.hit_token_ratio)
main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio)
main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio)
main_process_metrics.set_value("hit_req_rate", self.hit_req_ratio)
main_process_metrics.set_value("hit_token_rate", self.hit_token_ratio)
main_process_metrics.set_value("cpu_hit_token_rate", self.cpu_hit_token_ratio)
main_process_metrics.set_value("gpu_hit_token_rate", self.gpu_hit_token_ratio)

logger.info(
f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}"
Expand Down
34 changes: 17 additions & 17 deletions fastdeploy/cache_manager/prefix_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ def __init__(
f"{self.cache_config.bytes_per_token_per_layer / self.config.parallel_config.tensor_parallel_size}"
)

main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks)
main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_resource.set(1.0)
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks)
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_resource", 1.0)

def _get_kv_cache_shape(self, max_block_num):
from fastdeploy.model_executor.layers.attention import get_attention_backend
Expand Down Expand Up @@ -462,11 +462,11 @@ def update_cache_config(self, cache_config):
heapq.heapify(self.gpu_free_block_list)
self.node_id_pool = list(range(self.num_gpu_blocks + self.num_cpu_blocks))

main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks)
main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks)
main_process_metrics.available_gpu_resource.set(1.0)
main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks)
main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks)
main_process_metrics.set_value("available_gpu_resource", 1.0)

def can_allocate_gpu_blocks(self, num_blocks: int, try_free_gpu_blocks: bool = True):
"""
Expand Down Expand Up @@ -494,8 +494,8 @@ def allocate_gpu_blocks(self, num_blocks, req_id=None):
logger.info(
f"req_id:{req_id} allocate_gpu_blocks: {allocated_block_ids}, len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}"
)
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)
return allocated_block_ids

def recycle_gpu_blocks(self, gpu_block_ids, req_id=None):
Expand Down Expand Up @@ -529,8 +529,8 @@ def recycle_gpu_blocks(self, gpu_block_ids, req_id=None):
else:
heapq.heappush(self.gpu_free_block_list, gpu_block_ids)
logger.debug(f"req_id:{req_id} recycle blocks end")
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)

def allocate_cpu_blocks(self, num_blocks):
"""
Expand Down Expand Up @@ -2296,9 +2296,9 @@ def reset(self, wait_for_tasks_done=False):

# reset metrics
self.metrics.reset_metrics()
main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_block_num.set(len(self.gpu_free_block_list))
main_process_metrics.available_gpu_resource.set(self.available_gpu_resource)
main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_block_num", len(self.gpu_free_block_list))
main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource)

def clear_prefix_cache(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@
from typing import List

import paddle
import rdma_comm
import zmq

import rdma_comm

if paddle.is_compiled_with_xpu():
from custom_setup_ops import get_peer_mem_addr

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
import time

import paddle
import rdma_comm
import zmq

import rdma_comm

if paddle.is_compiled_with_xpu():
from custom_setup_ops import get_peer_mem_addr

Expand Down
18 changes: 9 additions & 9 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,8 +881,8 @@ def _schedule_request_to_worker(self):
else:
continue

main_process_metrics.num_requests_waiting.dec(len(tasks))
main_process_metrics.num_requests_running.inc(len(tasks))
main_process_metrics.dec_value("num_requests_waiting", len(tasks))
main_process_metrics.inc_value("num_requests_running", len(tasks))
except Exception as e:
err_msg = f"Error happened while insert task to engine: {e}, {traceback.format_exc()!s}."
self.llm_logger.error(err_msg)
Expand Down Expand Up @@ -1010,7 +1010,7 @@ def _fetch_request():
)
]
)
main_process_metrics.reschedule_req_num.inc()
main_process_metrics.inc_value("reschedule_req_num")
need_delete_tasks.append(task)
continue
for tmp_task in need_delete_tasks:
Expand Down Expand Up @@ -1118,7 +1118,7 @@ def _fetch_request():
f"preallocated request. req:{task.request_id} "
)
self.llm_logger.error(msg)
main_process_metrics.reschedule_req_num.inc()
main_process_metrics.inc_value("reschedule_req_num")
self.scheduler.put_results(
[
RequestOutput(
Expand Down Expand Up @@ -1325,7 +1325,7 @@ def _insert_zmq_task_to_scheduler(self):
request = Request.from_dict(data)

request.metrics.scheduler_recv_req_time = time.time()
main_process_metrics.requests_number.inc()
main_process_metrics.inc_value("requests_number")
trace_carrier = data.get("trace_carrier")
if trace_carrier:
request_id = get_base_request_id(data["request_id"])
Expand Down Expand Up @@ -1388,7 +1388,7 @@ def _insert_zmq_task_to_scheduler(self):
added_requests.pop(request_id)

if failed is None:
main_process_metrics.num_requests_waiting.inc(1)
main_process_metrics.inc_value("num_requests_waiting", 1)
continue

self._send_error_response(request_id, failed)
Expand Down Expand Up @@ -2084,7 +2084,7 @@ def _process_allocate_resource_requests():
self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}")
processed_indices.append(idx)
is_success = True
main_process_metrics.decode_preallocated_req_num.inc()
main_process_metrics.inc_value("decode_preallocated_req_num")
else:
if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len):
self.llm_logger.debug(f"D Resource available, processing task {task.request_id}")
Expand Down Expand Up @@ -2161,7 +2161,7 @@ def _process_prefilled_requests():
else:
for req_output in ready_request_outputs:
request_id = req_output.request_id
main_process_metrics.decode_preallocated_req_num.dec()
main_process_metrics.dec_value("decode_preallocated_req_num")
trace_print(LoggingEventName.DECODE_PROCESS_PREFILLED_REQUEST_END, request_id, "")
if envs.FD_ENABLE_INTERNAL_ADAPTER and not req_output.outputs.token_ids:
# first token is eos in Prefill, just recycle resource and continue
Expand All @@ -2176,7 +2176,7 @@ def _process_prefilled_requests():
self.llm_logger.warning(
f"{request_id} prefill failed with msg:{req_output.error_msg}, recycle resource."
)
main_process_metrics.failed_recv_first_token_req_num.inc()
main_process_metrics.inc_value("failed_recv_first_token_req_num")
self.resource_manager.pre_recycle_resource(request_id)
if request_id in self.token_processor.tokens_counter:
del self.token_processor.tokens_counter[request_id]
Expand Down
18 changes: 9 additions & 9 deletions fastdeploy/engine/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(
self.real_bsz = 0
self.abort_req_ids_set = set()
llm_logger.info(f"{self.info()}")
main_process_metrics.max_batch_size.set(max_num_seqs)
main_process_metrics.set_value("max_batch_size", max_num_seqs)

def reset_cache_config(self, cfg):
"""
Expand Down Expand Up @@ -180,7 +180,7 @@ def _recycle_block_tables(self, task):
ori_number = self.available_block_num()
self.cache_manager.recycle_gpu_blocks(block_tables)
cur_number = self.available_block_num()
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
llm_logger.info(f"recycle {req_id} {cur_number - ori_number} blocks.")

def available_batch(self):
Expand Down Expand Up @@ -322,14 +322,14 @@ def allocate_resources_for_new_tasks(self, tasks):

# record batch size here
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks)
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
llm_logger.info(
f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}"
)
llm_logger.info(f"{self.info()}")
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())

return processed_tasks

Expand Down Expand Up @@ -357,9 +357,9 @@ def _record_request_cache_info(self, task, common_block_ids, unique_block_ids, h
task.cache_info = (cache_block_num, no_cache_block_num)

# Report the number of cached tokens to Prometheus metrics
main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num)
main_process_metrics.inc_value("prefix_cache_token_num", task.num_cached_tokens)
main_process_metrics.inc_value("prefix_gpu_cache_token_num", task.gpu_cache_token_num)
main_process_metrics.inc_value("prefix_cpu_cache_token_num", task.cpu_cache_token_num)

cached_len = len(common_block_ids) * self.cfg.block_size
task.block_tables = common_block_ids + unique_block_ids
Expand Down
22 changes: 12 additions & 10 deletions fastdeploy/engine/sched/resource_manager_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l
self.finish_execution_pool = ThreadPoolExecutor(max_workers=1)
self.lock = threading.Lock()
self.to_be_rescheduled_request_id_set = set()
main_process_metrics.max_batch_size.set(max_num_seqs)
main_process_metrics.set_value("max_batch_size", max_num_seqs)

self.using_extend_tables_req_id = set()
self.reuse_block_num_map = dict()
Expand Down Expand Up @@ -1466,9 +1466,9 @@ def get_prefix_cached_blocks(self, request: Request):
request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"]
request.metrics.prompt_token_ids_len = request.prompt_token_ids_len

main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens)
main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num)
main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num)
main_process_metrics.inc_value("prefix_cache_token_num", request.num_computed_tokens)
main_process_metrics.inc_value("prefix_gpu_cache_token_num", request.metrics.gpu_cache_token_num)
main_process_metrics.inc_value("prefix_cpu_cache_token_num", request.metrics.cpu_cache_token_num)

trace_print(LoggingEventName.PREPARE_PREFIX_CACHE_END, request.request_id, getattr(request, "user", ""))

Expand Down Expand Up @@ -1743,12 +1743,14 @@ def update_metrics(self, verbose=False):
if task is not None:
blocks_used_by_tasks.update(getattr(task, "block_tables", []))
blocks_used_by_tasks.update(getattr(task, "extend_block_tables", []))
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - len(blocks_used_by_tasks))
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
main_process_metrics.num_requests_running.set(num_requests_running)
main_process_metrics.num_requests_waiting.set(num_requests_waiting)
main_process_metrics.num_requests_queuing.set(num_requests_queuing)
main_process_metrics.set_value(
"available_gpu_block_num", self.total_block_number() - len(blocks_used_by_tasks)
)
main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch())
main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc())
main_process_metrics.set_value("num_requests_running", num_requests_running)
main_process_metrics.set_value("num_requests_waiting", num_requests_waiting)
main_process_metrics.set_value("num_requests_queuing", num_requests_queuing)
if verbose:
llm_logger.info(
f"update metrics: running={num_requests_running}, "
Expand Down
6 changes: 3 additions & 3 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,9 @@ async def add_requests(self, task):

if "messages" in task:
task["messages"] = None
main_process_metrics.request_params_max_tokens.observe(task["max_tokens"])
main_process_metrics.prompt_tokens_total.inc(input_ids_len)
main_process_metrics.request_prompt_tokens.observe(input_ids_len)
main_process_metrics.obs_value("request_params_max_tokens", task["max_tokens"])
main_process_metrics.inc_value("prompt_tokens_total", input_ids_len)
main_process_metrics.obs_value("request_prompt_tokens", input_ids_len)
except Exception as e:
log_request_error(
message="request[{request_id}] add_requests error: {error}, {traceback}",
Expand Down
8 changes: 4 additions & 4 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,8 +479,8 @@ async def chat_completion_stream_generator(
if "trace_carrier" in res:
del res["trace_carrier"]
num_choices -= 1
main_process_metrics.e2e_request_latency.observe(
time.time() - res["metrics"]["request_start_time"]
main_process_metrics.obs_value(
"e2e_request_latency", time.time() - res["metrics"]["request_start_time"]
)
if previous_num_tokens[idx] != max_tokens:
choice.finish_reason = "stop"
Expand Down Expand Up @@ -829,8 +829,8 @@ async def _create_chat_completion_choice(
return_completion_token_ids = True

if output is not None and output.get("metrics") and output["metrics"].get("request_start_time"):
main_process_metrics.e2e_request_latency.observe(
time.time() - data.get("metrics").get("request_start_time")
main_process_metrics.obs_value(
"e2e_request_latency", time.time() - data.get("metrics").get("request_start_time")
)
message = ChatMessage(
role="assistant",
Expand Down
8 changes: 5 additions & 3 deletions fastdeploy/entrypoints/openai/v1/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ async def _build_stream_response(

if request_output.finished:
if request_output.metrics and request_output.metrics.request_start_time:
main_process_metrics.e2e_request_latency.observe(
time.time() - request_output.metrics.request_start_time
main_process_metrics.obs_value(
"e2e_request_latency", time.time() - request_output.metrics.request_start_time
)
max_tokens = request.max_completion_tokens or request.max_tokens
choice_completion_tokens = response_ctx.choice_completion_tokens_dict[output.index]
Expand Down Expand Up @@ -393,7 +393,9 @@ async def _create_chat_completion_choice(
message.reasoning_content = output.reasoning_content
message.tool_calls = request_output.accumulate_tool_calls if request_output.accumulate_tool_calls else None
if output is not None and request_output.metrics and request_output.metrics.request_start_time:
main_process_metrics.e2e_request_latency.observe(time.time() - request_output.metrics.request_start_time)
main_process_metrics.obs_value(
"e2e_request_latency", time.time() - request_output.metrics.request_start_time
)

if request.return_token_ids:
message.prompt_token_ids = request_output.prompt_token_ids
Expand Down
Loading
Loading