From eb12fd3759e5eb521c9521d8b28d2911fa7ec3ed Mon Sep 17 00:00:00 2001 From: liyonghua0910 Date: Wed, 20 May 2026 10:12:08 +0000 Subject: [PATCH] [Metric] Support model_id as metric labels by redefining metric update interface Introduce MetricsManagerInterface with unified set_value/inc_value/dec_value/obs_value methods. When FD_DEFAULT_METRIC_LABEL_VALUES is set to a valid non-empty JSON dict, metric labels (e.g. model_id) are automatically applied. Otherwise, operations fall back to the raw prometheus_client calls. Co-Authored-By: Claude Opus 4.6 --- custom_ops/xpu_ops/test/pytest.ini | 2 +- fastdeploy/cache_manager/cache_metrics.py | 8 +- .../cache_manager/prefix_cache_manager.py | 34 ++--- .../kvcache_transfer/benchmark.py | 3 +- .../transfer_factory/kvcache_transfer/test.py | 3 +- fastdeploy/engine/common_engine.py | 18 +-- fastdeploy/engine/resource_manager.py | 18 +-- .../engine/sched/resource_manager_v1.py | 22 +-- fastdeploy/entrypoints/engine_client.py | 6 +- fastdeploy/entrypoints/openai/serving_chat.py | 8 +- .../entrypoints/openai/v1/serving_chat.py | 8 +- fastdeploy/envs.py | 4 + fastdeploy/metrics/interface.py | 72 +++++++++ fastdeploy/metrics/metrics.py | 144 +++++++++++++++--- fastdeploy/metrics/metrics_middleware.py | 8 +- fastdeploy/output/token_processor.py | 52 ++++--- fastdeploy/splitwise/splitwise_connector.py | 2 +- 17 files changed, 305 insertions(+), 107 deletions(-) create mode 100644 fastdeploy/metrics/interface.py diff --git a/custom_ops/xpu_ops/test/pytest.ini b/custom_ops/xpu_ops/test/pytest.ini index 15438a3cf7f..62d02c1d64e 100644 --- a/custom_ops/xpu_ops/test/pytest.ini +++ b/custom_ops/xpu_ops/test/pytest.ini @@ -40,4 +40,4 @@ addopts = --ignore=test_set_data_ipc.py --ignore=test_read_data_ipc.py --ignore=test_set_get_data_ipc.py - --ignore=test_draft_model_preprocess.py \ No newline at end of file + --ignore=test_draft_model_preprocess.py diff --git a/fastdeploy/cache_manager/cache_metrics.py b/fastdeploy/cache_manager/cache_metrics.py index 2dd3137d328..ade0718d56b 100644 --- a/fastdeploy/cache_manager/cache_metrics.py +++ b/fastdeploy/cache_manager/cache_metrics.py @@ -57,10 +57,10 @@ def _update_history_hit_metrics(self): self.gpu_hit_token_ratio = self.total_gpu_matched_token_num / self.total_token_num self.storage_hit_token_ratio = self.total_storage_matched_token_num / self.total_token_num - main_process_metrics.hit_req_rate.set(self.hit_req_ratio) - main_process_metrics.hit_token_rate.set(self.hit_token_ratio) - main_process_metrics.cpu_hit_token_rate.set(self.cpu_hit_token_ratio) - main_process_metrics.gpu_hit_token_rate.set(self.gpu_hit_token_ratio) + main_process_metrics.set_value("hit_req_rate", self.hit_req_ratio) + main_process_metrics.set_value("hit_token_rate", self.hit_token_ratio) + main_process_metrics.set_value("cpu_hit_token_rate", self.cpu_hit_token_ratio) + main_process_metrics.set_value("gpu_hit_token_rate", self.gpu_hit_token_ratio) logger.info( f"Metrics for all requests: req_count {self.req_count} hit_req_count {self.hit_req_count}" diff --git a/fastdeploy/cache_manager/prefix_cache_manager.py b/fastdeploy/cache_manager/prefix_cache_manager.py index d28c1e6f6b0..88e3a9e5391 100644 --- a/fastdeploy/cache_manager/prefix_cache_manager.py +++ b/fastdeploy/cache_manager/prefix_cache_manager.py @@ -131,11 +131,11 @@ def __init__( f"{self.cache_config.bytes_per_token_per_layer / self.config.parallel_config.tensor_parallel_size}" ) - main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks) - main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.available_gpu_resource.set(1.0) + main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks) + main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("available_gpu_resource", 1.0) def _get_kv_cache_shape(self, max_block_num): from fastdeploy.model_executor.layers.attention import get_attention_backend @@ -462,11 +462,11 @@ def update_cache_config(self, cache_config): heapq.heapify(self.gpu_free_block_list) self.node_id_pool = list(range(self.num_gpu_blocks + self.num_cpu_blocks)) - main_process_metrics.max_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.max_cpu_block_num.set(self.num_cpu_blocks) - main_process_metrics.available_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.free_gpu_block_num.set(self.num_gpu_blocks) - main_process_metrics.available_gpu_resource.set(1.0) + main_process_metrics.set_value("max_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("max_cpu_block_num", self.num_cpu_blocks) + main_process_metrics.set_value("available_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("free_gpu_block_num", self.num_gpu_blocks) + main_process_metrics.set_value("available_gpu_resource", 1.0) def can_allocate_gpu_blocks(self, num_blocks: int, try_free_gpu_blocks: bool = True): """ @@ -494,8 +494,8 @@ def allocate_gpu_blocks(self, num_blocks, req_id=None): logger.info( f"req_id:{req_id} allocate_gpu_blocks: {allocated_block_ids}, len(self.gpu_free_block_list) {len(self.gpu_free_block_list)}" ) - main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource) return allocated_block_ids def recycle_gpu_blocks(self, gpu_block_ids, req_id=None): @@ -529,8 +529,8 @@ def recycle_gpu_blocks(self, gpu_block_ids, req_id=None): else: heapq.heappush(self.gpu_free_block_list, gpu_block_ids) logger.debug(f"req_id:{req_id} recycle blocks end") - main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource) def allocate_cpu_blocks(self, num_blocks): """ @@ -2296,9 +2296,9 @@ def reset(self, wait_for_tasks_done=False): # reset metrics self.metrics.reset_metrics() - main_process_metrics.free_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_block_num.set(len(self.gpu_free_block_list)) - main_process_metrics.available_gpu_resource.set(self.available_gpu_resource) + main_process_metrics.set_value("free_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_block_num", len(self.gpu_free_block_list)) + main_process_metrics.set_value("available_gpu_resource", self.available_gpu_resource) def clear_prefix_cache(self): """ diff --git a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/benchmark.py b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/benchmark.py index 5272305ce40..5a4fd861b6b 100644 --- a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/benchmark.py +++ b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/benchmark.py @@ -32,9 +32,10 @@ from typing import List import paddle -import rdma_comm import zmq +import rdma_comm + if paddle.is_compiled_with_xpu(): from custom_setup_ops import get_peer_mem_addr diff --git a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/test.py b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/test.py index effc9a388e1..7918a280194 100644 --- a/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/test.py +++ b/fastdeploy/cache_manager/transfer_factory/kvcache_transfer/test.py @@ -12,9 +12,10 @@ import time import paddle -import rdma_comm import zmq +import rdma_comm + if paddle.is_compiled_with_xpu(): from custom_setup_ops import get_peer_mem_addr diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index d3f27122dba..427cca000d4 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -881,8 +881,8 @@ def _schedule_request_to_worker(self): else: continue - main_process_metrics.num_requests_waiting.dec(len(tasks)) - main_process_metrics.num_requests_running.inc(len(tasks)) + main_process_metrics.dec_value("num_requests_waiting", len(tasks)) + main_process_metrics.inc_value("num_requests_running", len(tasks)) except Exception as e: err_msg = f"Error happened while insert task to engine: {e}, {traceback.format_exc()!s}." self.llm_logger.error(err_msg) @@ -1010,7 +1010,7 @@ def _fetch_request(): ) ] ) - main_process_metrics.reschedule_req_num.inc() + main_process_metrics.inc_value("reschedule_req_num") need_delete_tasks.append(task) continue for tmp_task in need_delete_tasks: @@ -1118,7 +1118,7 @@ def _fetch_request(): f"preallocated request. req:{task.request_id} " ) self.llm_logger.error(msg) - main_process_metrics.reschedule_req_num.inc() + main_process_metrics.inc_value("reschedule_req_num") self.scheduler.put_results( [ RequestOutput( @@ -1325,7 +1325,7 @@ def _insert_zmq_task_to_scheduler(self): request = Request.from_dict(data) request.metrics.scheduler_recv_req_time = time.time() - main_process_metrics.requests_number.inc() + main_process_metrics.inc_value("requests_number") trace_carrier = data.get("trace_carrier") if trace_carrier: request_id = get_base_request_id(data["request_id"]) @@ -1388,7 +1388,7 @@ def _insert_zmq_task_to_scheduler(self): added_requests.pop(request_id) if failed is None: - main_process_metrics.num_requests_waiting.inc(1) + main_process_metrics.inc_value("num_requests_waiting", 1) continue self._send_error_response(request_id, failed) @@ -2084,7 +2084,7 @@ def _process_allocate_resource_requests(): self.llm_logger.debug(f"D has successfully sent cache infos for task {task.request_id}") processed_indices.append(idx) is_success = True - main_process_metrics.decode_preallocated_req_num.inc() + main_process_metrics.inc_value("decode_preallocated_req_num") else: if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len): self.llm_logger.debug(f"D Resource available, processing task {task.request_id}") @@ -2161,7 +2161,7 @@ def _process_prefilled_requests(): else: for req_output in ready_request_outputs: request_id = req_output.request_id - main_process_metrics.decode_preallocated_req_num.dec() + main_process_metrics.dec_value("decode_preallocated_req_num") trace_print(LoggingEventName.DECODE_PROCESS_PREFILLED_REQUEST_END, request_id, "") if envs.FD_ENABLE_INTERNAL_ADAPTER and not req_output.outputs.token_ids: # first token is eos in Prefill, just recycle resource and continue @@ -2176,7 +2176,7 @@ def _process_prefilled_requests(): self.llm_logger.warning( f"{request_id} prefill failed with msg:{req_output.error_msg}, recycle resource." ) - main_process_metrics.failed_recv_first_token_req_num.inc() + main_process_metrics.inc_value("failed_recv_first_token_req_num") self.resource_manager.pre_recycle_resource(request_id) if request_id in self.token_processor.tokens_counter: del self.token_processor.tokens_counter[request_id] diff --git a/fastdeploy/engine/resource_manager.py b/fastdeploy/engine/resource_manager.py index 98b5d7190bf..3c957ffdef0 100644 --- a/fastdeploy/engine/resource_manager.py +++ b/fastdeploy/engine/resource_manager.py @@ -70,7 +70,7 @@ def __init__( self.real_bsz = 0 self.abort_req_ids_set = set() llm_logger.info(f"{self.info()}") - main_process_metrics.max_batch_size.set(max_num_seqs) + main_process_metrics.set_value("max_batch_size", max_num_seqs) def reset_cache_config(self, cfg): """ @@ -180,7 +180,7 @@ def _recycle_block_tables(self, task): ori_number = self.available_block_num() self.cache_manager.recycle_gpu_blocks(block_tables) cur_number = self.available_block_num() - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) llm_logger.info(f"recycle {req_id} {cur_number - ori_number} blocks.") def available_batch(self): @@ -322,14 +322,14 @@ def allocate_resources_for_new_tasks(self, tasks): # record batch size here num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list]) - main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks) - main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("available_gpu_block_num", self.total_block_number() - num_blocks_used_by_tasks) + main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) llm_logger.info( f"Number of allocated requests: {len(tasks)}, number of " f"running requests in worker: {self.real_bsz}" ) llm_logger.info(f"{self.info()}") - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) return processed_tasks @@ -357,9 +357,9 @@ def _record_request_cache_info(self, task, common_block_ids, unique_block_ids, h task.cache_info = (cache_block_num, no_cache_block_num) # Report the number of cached tokens to Prometheus metrics - main_process_metrics.prefix_cache_token_num.inc(task.num_cached_tokens) - main_process_metrics.prefix_gpu_cache_token_num.inc(task.gpu_cache_token_num) - main_process_metrics.prefix_cpu_cache_token_num.inc(task.cpu_cache_token_num) + main_process_metrics.inc_value("prefix_cache_token_num", task.num_cached_tokens) + main_process_metrics.inc_value("prefix_gpu_cache_token_num", task.gpu_cache_token_num) + main_process_metrics.inc_value("prefix_cpu_cache_token_num", task.cpu_cache_token_num) cached_len = len(common_block_ids) * self.cfg.block_size task.block_tables = common_block_ids + unique_block_ids diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 1437ebc850f..846dad4ebd8 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -200,7 +200,7 @@ def __init__(self, max_num_seqs, config, tensor_parallel_size, splitwise_role, l self.finish_execution_pool = ThreadPoolExecutor(max_workers=1) self.lock = threading.Lock() self.to_be_rescheduled_request_id_set = set() - main_process_metrics.max_batch_size.set(max_num_seqs) + main_process_metrics.set_value("max_batch_size", max_num_seqs) self.using_extend_tables_req_id = set() self.reuse_block_num_map = dict() @@ -1466,9 +1466,9 @@ def get_prefix_cached_blocks(self, request: Request): request.metrics.storage_cache_prepare_time = metrics["storage_cache_prepare_time"] request.metrics.prompt_token_ids_len = request.prompt_token_ids_len - main_process_metrics.prefix_cache_token_num.inc(request.num_computed_tokens) - main_process_metrics.prefix_gpu_cache_token_num.inc(request.metrics.gpu_cache_token_num) - main_process_metrics.prefix_cpu_cache_token_num.inc(request.metrics.cpu_cache_token_num) + main_process_metrics.inc_value("prefix_cache_token_num", request.num_computed_tokens) + main_process_metrics.inc_value("prefix_gpu_cache_token_num", request.metrics.gpu_cache_token_num) + main_process_metrics.inc_value("prefix_cpu_cache_token_num", request.metrics.cpu_cache_token_num) trace_print(LoggingEventName.PREPARE_PREFIX_CACHE_END, request.request_id, getattr(request, "user", "")) @@ -1743,12 +1743,14 @@ def update_metrics(self, verbose=False): if task is not None: blocks_used_by_tasks.update(getattr(task, "block_tables", [])) blocks_used_by_tasks.update(getattr(task, "extend_block_tables", [])) - main_process_metrics.available_gpu_block_num.set(self.total_block_number() - len(blocks_used_by_tasks)) - main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch()) - main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc()) - main_process_metrics.num_requests_running.set(num_requests_running) - main_process_metrics.num_requests_waiting.set(num_requests_waiting) - main_process_metrics.num_requests_queuing.set(num_requests_queuing) + main_process_metrics.set_value( + "available_gpu_block_num", self.total_block_number() - len(blocks_used_by_tasks) + ) + main_process_metrics.set_value("batch_size", self.max_num_seqs - self.available_batch()) + main_process_metrics.set_value("gpu_cache_usage_perc", self.get_gpu_cache_usage_perc()) + main_process_metrics.set_value("num_requests_running", num_requests_running) + main_process_metrics.set_value("num_requests_waiting", num_requests_waiting) + main_process_metrics.set_value("num_requests_queuing", num_requests_queuing) if verbose: llm_logger.info( f"update metrics: running={num_requests_running}, " diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index a650ed4ad04..f27656f56bf 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -369,9 +369,9 @@ async def add_requests(self, task): if "messages" in task: task["messages"] = None - main_process_metrics.request_params_max_tokens.observe(task["max_tokens"]) - main_process_metrics.prompt_tokens_total.inc(input_ids_len) - main_process_metrics.request_prompt_tokens.observe(input_ids_len) + main_process_metrics.obs_value("request_params_max_tokens", task["max_tokens"]) + main_process_metrics.inc_value("prompt_tokens_total", input_ids_len) + main_process_metrics.obs_value("request_prompt_tokens", input_ids_len) except Exception as e: log_request_error( message="request[{request_id}] add_requests error: {error}, {traceback}", diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index d6429521f05..18bcad5b61c 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -479,8 +479,8 @@ async def chat_completion_stream_generator( if "trace_carrier" in res: del res["trace_carrier"] num_choices -= 1 - main_process_metrics.e2e_request_latency.observe( - time.time() - res["metrics"]["request_start_time"] + main_process_metrics.obs_value( + "e2e_request_latency", time.time() - res["metrics"]["request_start_time"] ) if previous_num_tokens[idx] != max_tokens: choice.finish_reason = "stop" @@ -829,8 +829,8 @@ async def _create_chat_completion_choice( return_completion_token_ids = True if output is not None and output.get("metrics") and output["metrics"].get("request_start_time"): - main_process_metrics.e2e_request_latency.observe( - time.time() - data.get("metrics").get("request_start_time") + main_process_metrics.obs_value( + "e2e_request_latency", time.time() - data.get("metrics").get("request_start_time") ) message = ChatMessage( role="assistant", diff --git a/fastdeploy/entrypoints/openai/v1/serving_chat.py b/fastdeploy/entrypoints/openai/v1/serving_chat.py index a199df1ae6e..a130c3aa9d6 100644 --- a/fastdeploy/entrypoints/openai/v1/serving_chat.py +++ b/fastdeploy/entrypoints/openai/v1/serving_chat.py @@ -301,8 +301,8 @@ async def _build_stream_response( if request_output.finished: if request_output.metrics and request_output.metrics.request_start_time: - main_process_metrics.e2e_request_latency.observe( - time.time() - request_output.metrics.request_start_time + main_process_metrics.obs_value( + "e2e_request_latency", time.time() - request_output.metrics.request_start_time ) max_tokens = request.max_completion_tokens or request.max_tokens choice_completion_tokens = response_ctx.choice_completion_tokens_dict[output.index] @@ -393,7 +393,9 @@ async def _create_chat_completion_choice( message.reasoning_content = output.reasoning_content message.tool_calls = request_output.accumulate_tool_calls if request_output.accumulate_tool_calls else None if output is not None and request_output.metrics and request_output.metrics.request_start_time: - main_process_metrics.e2e_request_latency.observe(time.time() - request_output.metrics.request_start_time) + main_process_metrics.obs_value( + "e2e_request_latency", time.time() - request_output.metrics.request_start_time + ) if request.return_token_ids: message.prompt_token_ids = request_output.prompt_token_ids diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 99c5ab776f7..c2fb72c45f9 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -294,6 +294,10 @@ def _validate_split_kv_size(value: int) -> int: # When set to 1, print which op / shape enters the block-wise CUDA Graph # during the capture phase. Defaults to 0 (silent). "FD_BLOCK_WISE_DEBUG": lambda: bool(int(os.getenv("FD_BLOCK_WISE_DEBUG", "0"))), + # Default label values for Prometheus metrics, specified as a JSON dict string. + # When set to a valid JSON dict, metric labels are automatically enabled. + # Example: '{"model_id":"my_model"}' adds model_id label to all metrics. + "FD_DEFAULT_METRIC_LABEL_VALUES": lambda: os.getenv("FD_DEFAULT_METRIC_LABEL_VALUES", "{}"), } diff --git a/fastdeploy/metrics/interface.py b/fastdeploy/metrics/interface.py new file mode 100644 index 00000000000..716f8f9eda7 --- /dev/null +++ b/fastdeploy/metrics/interface.py @@ -0,0 +1,72 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +""" +MetricsManagerInterface provides a unified interface for metric operations. +When FD_DEFAULT_METRIC_LABEL_VALUES is set to a valid JSON dict, metric labels +(e.g. model_id) are automatically applied. Otherwise, operations fall back to +the raw prometheus_client calls. +""" + +from abc import ABC, abstractmethod + + +class MetricsManagerInterface(ABC): + """Abstract base class that defines the unified metrics interface.""" + + @abstractmethod + def set_value(self, name: str, value, labelvalues: dict = None): + """Set a Gauge metric to the given value. + + Args: + name: The attribute name of the metric on the MetricsManager. + value: The value to set. + labelvalues: Optional dict of label key-value pairs. + """ + raise NotImplementedError + + @abstractmethod + def inc_value(self, name: str, value=1, labelvalues: dict = None): + """Increment a Counter or Gauge metric by the given value. + + Args: + name: The attribute name of the metric on the MetricsManager. + value: The amount to increment by (default 1). + labelvalues: Optional dict of label key-value pairs. + """ + raise NotImplementedError + + @abstractmethod + def dec_value(self, name: str, value=1, labelvalues: dict = None): + """Decrement a Gauge metric by the given value. + + Args: + name: The attribute name of the metric on the MetricsManager. + value: The amount to decrement by (default 1). + labelvalues: Optional dict of label key-value pairs. + """ + raise NotImplementedError + + @abstractmethod + def obs_value(self, name: str, value, labelvalues: dict = None): + """Observe a value on a Histogram metric. + + Args: + name: The attribute name of the metric on the MetricsManager. + value: The value to observe. + labelvalues: Optional dict of label key-value pairs. + """ + raise NotImplementedError diff --git a/fastdeploy/metrics/metrics.py b/fastdeploy/metrics/metrics.py index 0daa36ad58a..15531edbd67 100644 --- a/fastdeploy/metrics/metrics.py +++ b/fastdeploy/metrics/metrics.py @@ -17,6 +17,8 @@ """ metrics """ +import copy +import json import os from typing import Set @@ -32,6 +34,7 @@ from fastdeploy import envs from fastdeploy.metrics import build_1_2_5_buckets +from fastdeploy.metrics.interface import MetricsManagerInterface from fastdeploy.metrics.prometheus_multiprocess_setup import ( setup_multiprocess_prometheus, ) @@ -127,7 +130,7 @@ def get_filtered_metrics() -> str: ] -class MetricsManager: +class MetricsManager(MetricsManagerInterface): """Prometheus Metrics Manager handles all metric updates""" _instance = None @@ -653,21 +656,55 @@ class MetricsManager: }, } + def _patch_labelnames(self, metrics_dict: dict) -> dict: + """When _enable_labels is True, add keys from _default_labelvalues to + labelnames for all metrics. Does not modify the original dict. + + Returns a deep-copied dict with patched kwargs. + """ + if not self._enable_labels: + return metrics_dict + patched = {} + for name, config in metrics_dict.items(): + new_config = copy.deepcopy(config) + kwargs = new_config["kwargs"] + if "labelnames" in kwargs: + for label in self._default_labelvalues: + if label not in kwargs["labelnames"]: + kwargs["labelnames"].append(label) + else: + kwargs["labelnames"] = list(self._default_labelvalues.keys()) + patched[name] = new_config + return patched + def __init__(self): """Initializes the Prometheus metrics and starts the HTTP server if not already initialized.""" + # 解析 FD_DEFAULT_METRIC_LABEL_VALUES + # 当值为合法 JSON dict 且非空时启用 metric labels + try: + self._default_labelvalues = json.loads(envs.FD_DEFAULT_METRIC_LABEL_VALUES) + except (json.JSONDecodeError, TypeError): + self._default_labelvalues = {} + self._enable_labels = isinstance(self._default_labelvalues, dict) and len(self._default_labelvalues) > 0 + # 在模块加载,指标注册先设置Prometheus环境变量 setup_multiprocess_prometheus() + # 用 _patch_labelnames 处理后的副本创建指标,不修改类级别原始 dict + patched_metrics = self._patch_labelnames(self.METRICS) + patched_gauge_metrics = self._patch_labelnames(self.GAUGE_METRICS) + patched_server_metrics = self._patch_labelnames(self.SERVER_METRICS) + # 动态创建所有非 gauge 型指标 - for metric_name, config in self.METRICS.items(): + for metric_name, config in patched_metrics.items(): setattr( self, metric_name, config["type"](config["name"], config["description"], **config["kwargs"]), ) # 动态创建所有 gauge 型指标,统一配置 multiprocess_mode 为 livesum - for metric_name, config in self.GAUGE_METRICS.items(): + for metric_name, config in patched_gauge_metrics.items(): kwargs = config["kwargs"].copy() if "multiprocess_mode" not in kwargs: kwargs["multiprocess_mode"] = "livesum" @@ -677,13 +714,60 @@ def __init__(self): config["type"](config["name"], config["description"], **kwargs), ) # 动态创建server metrics - for metric_name, config in self.SERVER_METRICS.items(): + for metric_name, config in patched_server_metrics.items(): setattr( self, metric_name, config["type"](config["name"], config["description"], **config["kwargs"]), ) + def _get_metric_and_labels(self, name: str, labelvalues: dict = None): + """Get the metric object and merged labelvalues. + + When _enable_labels is True, returns (metric, merged_labels) where + merged_labels is the union of _default_labelvalues and caller-provided + labelvalues. When False, returns (metric, None). + """ + metric = getattr(self, name) + if not self._enable_labels: + return metric, None + merged = dict(self._default_labelvalues) + if labelvalues: + merged.update(labelvalues) + return metric, merged + + def set_value(self, name: str, value, labelvalues: dict = None): + """Set a Gauge metric to the given value.""" + metric, merged = self._get_metric_and_labels(name, labelvalues) + if merged is not None: + metric.labels(**merged).set(value) + else: + metric.set(value) + + def inc_value(self, name: str, value=1, labelvalues: dict = None): + """Increment a Counter or Gauge metric by the given value.""" + metric, merged = self._get_metric_and_labels(name, labelvalues) + if merged is not None: + metric.labels(**merged).inc(value) + else: + metric.inc(value) + + def dec_value(self, name: str, value=1, labelvalues: dict = None): + """Decrement a Gauge metric by the given value.""" + metric, merged = self._get_metric_and_labels(name, labelvalues) + if merged is not None: + metric.labels(**merged).dec(value) + else: + metric.dec(value) + + def obs_value(self, name: str, value, labelvalues: dict = None): + """Observe a value on a Histogram metric.""" + metric, merged = self._get_metric_and_labels(name, labelvalues) + if merged is not None: + metric.labels(**merged).observe(value) + else: + metric.observe(value) + def _init_speculative_metrics(self, speculative_method, num_speculative_tokens): self.SPECULATIVE_METRICS = { "spec_decode_draft_acceptance_rate": { @@ -724,8 +808,12 @@ def _init_speculative_metrics(self, speculative_method, num_speculative_tokens): "description": "Single head acceptance rate of speculative decoding", "kwargs": {}, } - for metric_name, config in self.SPECULATIVE_METRICS.items(): + + patched_spec_metrics = self._patch_labelnames(self.SPECULATIVE_METRICS) + + for metric_name, config in patched_spec_metrics.items(): if metric_name == "spec_decode_draft_single_head_acceptance_rate": + # list[Gauge] — keep original behavior, no label migration gauges = [] for i in range(num_speculative_tokens): gauges.append( @@ -752,8 +840,9 @@ def _init_speculative_metrics(self, speculative_method, num_speculative_tokens): ) def init_zmq_metrics(self): - # 动态创建所有指标 - for metric_name, config in self.ZMQ_METRICS.items(): + # 用 _patch_labelnames 处理 ZMQ_METRICS dict 后再创建指标 + patched_zmq_metrics = self._patch_labelnames(self.ZMQ_METRICS) + for metric_name, config in patched_zmq_metrics.items(): setattr( self, metric_name, @@ -769,35 +858,54 @@ def record_zmq_stats(self, zmq_metrics_stats: ZMQMetricsStats, address: str = "u if not self._collect_zmq_metrics: return + # 构建 zmq labelvalues: address + _default_labelvalues + zmq_labels = {"address": address} + if self._enable_labels: + zmq_labels.update(self._default_labelvalues) + # 记录zmq统计信息 - self.msg_send_total.labels(address=address).inc(zmq_metrics_stats.msg_send_total) - self.msg_send_failed_total.labels(address=address).inc(zmq_metrics_stats.msg_send_failed_total) - self.msg_bytes_send_total.labels(address=address).inc(zmq_metrics_stats.msg_bytes_send_total) - self.msg_recv_total.labels(address=address).inc(zmq_metrics_stats.msg_recv_total) - self.msg_bytes_recv_total.labels(address=address).inc(zmq_metrics_stats.msg_bytes_recv_total) + self.msg_send_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_send_total) + self.msg_send_failed_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_send_failed_total) + self.msg_bytes_send_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_bytes_send_total) + self.msg_recv_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_recv_total) + self.msg_bytes_recv_total.labels(**zmq_labels).inc(zmq_metrics_stats.msg_bytes_recv_total) if zmq_metrics_stats.zmq_latency > 0.0: # trans to millisecond - self.zmq_latency.labels(address=address).observe(zmq_metrics_stats.zmq_latency * 1000) + self.zmq_latency.labels(**zmq_labels).observe(zmq_metrics_stats.zmq_latency * 1000) def set_cache_config_info(self, obj) -> None: + metrics_info = obj.metrics_info() + if hasattr(self, "cache_config_info") and isinstance(self.cache_config_info, Gauge): - metrics_info = obj.metrics_info() if metrics_info: - self.cache_config_info.labels(**metrics_info).set(1) + # 合并 default labelvalues + merged = dict(metrics_info) + if self._enable_labels: + merged.update(self._default_labelvalues) + self.cache_config_info.labels(**merged).set(1) return - metrics_info = obj.metrics_info() if not metrics_info: return + # 动态创建 cache_config_info gauge,追加 default labelvalues 的 labelnames + labelnames = list(metrics_info.keys()) + if self._enable_labels: + for label in self._default_labelvalues: + if label not in labelnames: + labelnames.append(label) + self.cache_config_info = Gauge( name="fastdeploy:cache_config_info", documentation="Information of the engine's CacheConfig", - labelnames=list(metrics_info.keys()), + labelnames=labelnames, multiprocess_mode="mostrecent", ) - self.cache_config_info.labels(**metrics_info).set(1) + merged = dict(metrics_info) + if self._enable_labels: + merged.update(self._default_labelvalues) + self.cache_config_info.labels(**merged).set(1) def register_speculative_metrics(self, registry: CollectorRegistry): """Register all speculative metrics to the specified registry""" diff --git a/fastdeploy/metrics/metrics_middleware.py b/fastdeploy/metrics/metrics_middleware.py index 1ab1198e600..883218f2eb9 100644 --- a/fastdeploy/metrics/metrics_middleware.py +++ b/fastdeploy/metrics/metrics_middleware.py @@ -52,7 +52,11 @@ async def dispatch(self, request: Request, call_next): process_time = end_time - start_time # record http metrics - main_process_metrics.http_requests_total.labels(method=method, path=path, status_code=status_code).inc() - main_process_metrics.http_request_duration_seconds.labels(method=method, path=path).observe(process_time) + main_process_metrics.inc_value( + "http_requests_total", labelvalues={"method": method, "path": path, "status_code": status_code} + ) + main_process_metrics.obs_value( + "http_request_duration_seconds", process_time, labelvalues={"method": method, "path": path} + ) return response diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index ad554657bc5..cfbfe9428f1 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -283,7 +283,7 @@ def _process_per_token(self, task, batch_id: int, token_ids: np.ndarray, result: preempted_count=getattr(task.metrics, "preempted_count", 0), ) - main_process_metrics.request_token_ratio.observe(token_ratio) + main_process_metrics.obs_value("request_token_ratio", token_ratio) llm_logger.info(self.resource_manager.info()) if self.cfg.speculative_config.method: self._compute_speculative_status() @@ -626,13 +626,13 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False num_blocks_used_by_tasks = sum( [len(task.block_tables) if task else 0 for task in self.resource_manager.tasks_list] ) - main_process_metrics.available_gpu_block_num.set( - self.resource_manager.total_block_number() - num_blocks_used_by_tasks + main_process_metrics.set_value( + "available_gpu_block_num", self.resource_manager.total_block_number() - num_blocks_used_by_tasks ) - main_process_metrics.batch_size.set( - self.resource_manager.max_num_seqs - self.resource_manager.available_batch() + main_process_metrics.set_value( + "batch_size", self.resource_manager.max_num_seqs - self.resource_manager.available_batch() ) - main_process_metrics.available_batch_size.set(self.resource_manager.available_batch()) + main_process_metrics.set_value("available_batch_size", self.resource_manager.available_batch()) if task_id in self.tokens_counter: del self.tokens_counter[task_id] @@ -1045,7 +1045,7 @@ def _process_batch_output(self): preempted_count=getattr(task.metrics, "preempted_count", 0), ) - main_process_metrics.request_token_ratio.observe(token_ratio) + main_process_metrics.obs_value("request_token_ratio", token_ratio) if self.cfg.speculative_config.method: self._compute_speculative_status(result) self._record_completion_metrics(task, current_time) @@ -1083,20 +1083,22 @@ def _record_metrics(self, task, current_time, token_ids): """Record all metrics for a task""" if hasattr(task, "last_token_time") and task.last_token_time is not None: token_gen_time = current_time - task.last_token_time - main_process_metrics.time_per_output_token.observe(token_gen_time) + main_process_metrics.obs_value("time_per_output_token", token_gen_time) task.last_token_time = current_time # Record generation metrics - main_process_metrics.generation_tokens_total.inc(len(token_ids)) + main_process_metrics.inc_value("generation_tokens_total", len(token_ids)) def _record_first_token_metrics(self, task, current_time): """Record metrics for first token""" metrics = task.metrics trace_print(LoggingEventName.FIRST_TOKEN_GENERATED, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.DECODE_START, task.request_id, getattr(task, "user", "")) - main_process_metrics.time_to_first_token.observe(current_time - metrics.arrival_time) - main_process_metrics.request_queue_time.observe(metrics.inference_start_time - metrics.preprocess_end_time) - main_process_metrics.request_prefill_time.observe(current_time - metrics.inference_start_time) + main_process_metrics.obs_value("time_to_first_token", current_time - metrics.arrival_time) + main_process_metrics.obs_value( + "request_queue_time", metrics.inference_start_time - metrics.preprocess_end_time + ) + main_process_metrics.obs_value("request_prefill_time", current_time - metrics.inference_start_time) def _record_completion_metrics(self, task, current_time): """Record metrics when request completes""" @@ -1106,7 +1108,7 @@ def _record_completion_metrics(self, task, current_time): if role in ("mixed", "decode"): if metrics.engine_recv_first_token_time: decode_time = current_time - metrics.engine_recv_first_token_time - main_process_metrics.request_decode_time.observe(decode_time) + main_process_metrics.obs_value("request_decode_time", decode_time) trace_print(LoggingEventName.INFERENCE_END, task.request_id, getattr(task, "user", "")) if role == "prefill": @@ -1115,9 +1117,9 @@ def _record_completion_metrics(self, task, current_time): trace_print(LoggingEventName.DECODE_INFERENCE_END, task.request_id, getattr(task, "user", "")) trace_print(LoggingEventName.POSTPROCESSING_START, task.request_id, getattr(task, "user", "")) - main_process_metrics.request_success_total.inc() - main_process_metrics.request_inference_time.observe(current_time - metrics.inference_start_time) - main_process_metrics.request_generation_tokens.observe(self.tokens_counter[task.request_id]) + main_process_metrics.inc_value("request_success_total") + main_process_metrics.obs_value("request_inference_time", current_time - metrics.inference_start_time) + main_process_metrics.obs_value("request_generation_tokens", self.tokens_counter[task.request_id]) def _record_speculative_decoding_metrics(self, accept_num): """Record metrics of speculative decoding""" @@ -1133,12 +1135,12 @@ def _record_speculative_decoding_metrics(self, accept_num): if self.num_emitted_tokens == 0: return - main_process_metrics.spec_decode_num_accepted_tokens_total.set(self.num_accepted_tokens) - main_process_metrics.spec_decode_num_emitted_tokens_total.set(self.num_emitted_tokens) + main_process_metrics.set_value("spec_decode_num_accepted_tokens_total", self.num_accepted_tokens) + main_process_metrics.set_value("spec_decode_num_emitted_tokens_total", self.num_emitted_tokens) if self.cfg.speculative_config.method == SpecMethod.NGRAM: - main_process_metrics.spec_decode_draft_acceptance_rate.set( - self.num_accepted_tokens / self.num_emitted_tokens + main_process_metrics.set_value( + "spec_decode_draft_acceptance_rate", self.num_accepted_tokens / self.num_emitted_tokens ) if self.cfg.speculative_config.method == SpecMethod.MTP: @@ -1149,11 +1151,13 @@ def _record_speculative_decoding_metrics(self, accept_num): self.cfg.speculative_config.num_speculative_tokens + 1 ) - main_process_metrics.spec_decode_draft_acceptance_rate.set( - self.num_accepted_tokens / self.num_draft_tokens + main_process_metrics.set_value( + "spec_decode_draft_acceptance_rate", self.num_accepted_tokens / self.num_draft_tokens + ) + main_process_metrics.set_value( + "spec_decode_efficiency", self.num_emitted_tokens / self.max_num_emitted_tokens ) - main_process_metrics.spec_decode_efficiency.set(self.num_emitted_tokens / self.max_num_emitted_tokens) - main_process_metrics.spec_decode_num_draft_tokens_total.inc(num_draft_tokens) + main_process_metrics.inc_value("spec_decode_num_draft_tokens_total", num_draft_tokens) for i in range(1, self.cfg.speculative_config.num_speculative_tokens + 1): if self.accept_token_num_per_head[i - 1] != 0: diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index 8d2091c7d31..6cc4dc930d1 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -177,7 +177,7 @@ def _send_message(self, addr, msg_type: str, payload): self.logger.warning(f"_send_message: Send queue full for {addr}") except Exception as e: self.logger.error(f"_send_message: Send to {addr} failed: {e}, {str(traceback.format_exc())}") - main_process_metrics.send_cache_failed_num.inc() + main_process_metrics.inc_value("send_cache_failed_num") self._close_connection(addr) except Exception as e: self.logger.error(f"_send_message: Message preparation failed: {e}, {traceback.format_exc()}")