diff --git a/fastdeploy/__init__.py b/fastdeploy/__init__.py index bf6a000826d..be1203cea87 100644 --- a/fastdeploy/__init__.py +++ b/fastdeploy/__init__.py @@ -15,19 +15,11 @@ """ import os -import uuid # suppress warning log from paddlepaddle os.environ["GLOG_minloglevel"] = "2" # suppress log from aistudio os.environ["AISTUDIO_LOG"] = "critical" -# set prometheus dir -if os.getenv("PROMETHEUS_MULTIPROC_DIR", "") == "": - prom_dir = f"/tmp/fd_prom_{str(uuid.uuid4())}" - os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir - if os.path.exists(prom_dir): - os.rmdir(prom_dir) - os.mkdir(prom_dir) import typing diff --git a/fastdeploy/entrypoints/openai/multi_api_server.py b/fastdeploy/entrypoints/openai/multi_api_server.py index 08aa3efbf80..206a0e649d9 100644 --- a/fastdeploy/entrypoints/openai/multi_api_server.py +++ b/fastdeploy/entrypoints/openai/multi_api_server.py @@ -107,6 +107,15 @@ def start_servers( env = os.environ.copy() env["FD_ENABLE_MULTI_API_SERVER"] = "1" env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}" + if "PROMETHEUS_MULTIPROC_DIR" in env: + prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR") + prom_dir_i = os.path.join(os.path.dirname(prom_dir), os.path.basename(prom_dir) + f"_dp{i}") + # Create the directory if it doesn't exist + if not os.path.exists(prom_dir_i): + os.makedirs(prom_dir_i, exist_ok=True) + env["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_i + logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {i}: {prom_dir_i}") + cmd = [ sys.executable, "-m", diff --git a/fastdeploy/metrics/metrics.py b/fastdeploy/metrics/metrics.py index 2384e57c7b3..ab1b087be6c 100644 --- a/fastdeploy/metrics/metrics.py +++ b/fastdeploy/metrics/metrics.py @@ -65,7 +65,7 @@ def collect(self): Metric: Prometheus Metric objects that are not excluded. """ for metric in self.base_registry.collect(): - if not any(name.startswith(metric.name) for name in self.exclude_names): + if not any(metric.name.startswith(name) for name in self.exclude_names): yield metric @@ -83,11 +83,15 @@ def get_filtered_metrics() -> str: multiprocess.MultiProcessCollector(base_registry) filtered_registry = CollectorRegistry() - # 注册一个新的colletor,过滤gauge指标 - filtered_registry.register(SimpleCollector(base_registry, EXCLUDE_LABELS)) + # 动态获取需要排除的 gauge 指标列表 + exclude_labels = main_process_metrics.get_excluded_metrics() + # 注册一个新的collector,过滤gauge指标 + filtered_registry.register(SimpleCollector(base_registry, exclude_labels)) # 将gauge指标重新注册到filtered_registry中,从内存中读取 main_process_metrics.re_register_gauge(filtered_registry) + # 将speculative中的gauge指标也重新注册 + main_process_metrics.re_register_speculative_gauge(filtered_registry) return generate_latest(filtered_registry).decode("utf-8") @@ -195,7 +199,7 @@ class MetricsManager: "type": Gauge, "name": "fastdeploy:num_requests_running", "description": "Number of requests currently running", - "kwargs": {"multiprocess_mode": "sum"}, + "kwargs": {}, }, "num_requests_waiting": { "type": Gauge, @@ -625,19 +629,22 @@ def __init__(self): # 在模块加载,指标注册先设置Prometheus环境变量 setup_multiprocess_prometheus() - # 动态创建所有指标 + # 动态创建所有非 gauge 型指标 for metric_name, config in self.METRICS.items(): setattr( self, metric_name, config["type"](config["name"], config["description"], **config["kwargs"]), ) - # 动态创建所有指标 + # 动态创建所有 gauge 型指标,统一配置 multiprocess_mode 为 livesum for metric_name, config in self.GAUGE_METRICS.items(): + kwargs = config["kwargs"].copy() + if "multiprocess_mode" not in kwargs: + kwargs["multiprocess_mode"] = "livesum" setattr( self, metric_name, - config["type"](config["name"], config["description"], **config["kwargs"]), + config["type"](config["name"], config["description"], **kwargs), ) # 动态创建server metrics for metric_name, config in self.SERVER_METRICS.items(): @@ -695,17 +702,22 @@ def _init_speculative_metrics(self, speculative_method, num_speculative_tokens): Gauge( f"{config['name']}_{i}", f"{config['description']} (head {i})", + multiprocess_mode="livesum", ) ) setattr(self, metric_name, gauges) else: + # For Gauge metrics, automatically add multiprocess_mode="livesum" + kwargs = config["kwargs"].copy() + if config["type"] == Gauge and "multiprocess_mode" not in kwargs: + kwargs["multiprocess_mode"] = "livesum" setattr( self, metric_name, config["type"]( config["name"], config["description"], - **config["kwargs"], + **kwargs, ), ) @@ -766,6 +778,19 @@ def register_speculative_metrics(self, registry: CollectorRegistry): else: registry.register(getattr(self, metric_name)) + def re_register_speculative_gauge(self, registry: CollectorRegistry): + """Re-register gauge metrics from SPECULATIVE_METRICS to the specified registry""" + # Check if SPECULATIVE_METRICS was initialized in this process + # (it's an instance attribute set by _init_speculative_metrics, not the class-level empty dict) + if not hasattr(self, "spec_decode_draft_acceptance_rate"): + return + for metric_name, config in self.SPECULATIVE_METRICS.items(): + if metric_name == "spec_decode_draft_single_head_acceptance_rate": + for gauge in getattr(self, metric_name): + registry.register(gauge) + elif config["type"] == Gauge: + registry.register(getattr(self, metric_name)) + def re_register_gauge(self, registry: CollectorRegistry): """Re-register gauge to the specified registry""" for metric_name in self.GAUGE_METRICS: @@ -789,10 +814,15 @@ def register_all(self, registry: CollectorRegistry): if hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"): self.register_speculative_metrics(registry) - @classmethod - def get_excluded_metrics(cls) -> Set[str]: + def get_excluded_metrics(self) -> Set[str]: """Get the set of indicator names that need to be excluded""" - return {config["name"] for config in cls.GAUGE_METRICS.values()} + excluded = {config["name"] for config in self.GAUGE_METRICS.values()} + # Also add gauge metrics from SPECULATIVE_METRICS (if initialized) + if hasattr(self, "SPECULATIVE_METRICS"): + for config in self.SPECULATIVE_METRICS.values(): + if config["type"] == Gauge or config["type"] == list[Gauge]: + excluded.add(config["name"]) + return excluded main_process_metrics = MetricsManager() @@ -800,5 +830,3 @@ def get_excluded_metrics(cls) -> Set[str]: # 由于zmq指标记录比较耗时,默认不开启,通过DEBUG参数开启 if envs.FD_DEBUG: main_process_metrics.init_zmq_metrics() - -EXCLUDE_LABELS = MetricsManager.get_excluded_metrics() diff --git a/tests/entrypoints/openai/test_multi_api_server.py b/tests/entrypoints/openai/test_multi_api_server.py index bf533b95762..58aa07cbf45 100644 --- a/tests/entrypoints/openai/test_multi_api_server.py +++ b/tests/entrypoints/openai/test_multi_api_server.py @@ -180,6 +180,58 @@ def test_main_function(self, mock_check_param, mock_sleep, mock_start_servers, m mock_proc1.wait.assert_called_once() mock_proc2.wait.assert_called_once() + @patch("fastdeploy.entrypoints.openai.multi_api_server.subprocess.Popen") + @patch("fastdeploy.entrypoints.openai.multi_api_server.is_port_available") + def test_prometheus_multiprocess_dir_per_dp(self, mock_is_port_available, mock_popen): + """Test that each DP server gets a unique PROMETHEUS_MULTIPROC_DIR""" + # Mock port availability check + mock_is_port_available.return_value = True + + # Mock subprocess.Popen to capture env passed to each server + envs_captured = [] + + def capture_popen(*args, **kwargs): + envs_captured.append(kwargs.get("env", {}).copy()) + mock_proc = MagicMock() + mock_proc.pid = 1000 + len(envs_captured) + return mock_proc + + mock_popen.side_effect = capture_popen + + # Call start_servers with 2 servers + processes = start_servers( + server_count=2, + device_count=2, + server_args=self.test_server_args, + ports="8000,8001", + metrics_ports="8800,8801", + controller_ports="-1", + ) + + # Verify subprocess.Popen was called twice + self.assertEqual(mock_popen.call_count, 2) + self.assertEqual(len(envs_captured), 2) + self.assertEqual(len(processes), 2) + + # Verify each server has a unique PROMETHEUS_MULTIPROC_DIR + prom_dirs = [] + for i, env in enumerate(envs_captured): + prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR") + print(f"Server {i} PROMETHEUS_MULTIPROC_DIR: {prom_dir}") + self.assertIsNotNone(prom_dir, f"Server {i} should have PROMETHEUS_MULTIPROC_DIR set") + prom_dirs.append(prom_dir) + + # Verify all PROMETHEUS_MULTIPROC_DIR values are unique + self.assertEqual( + len(prom_dirs), len(set(prom_dirs)), "Each DP server should have a unique PROMETHEUS_MULTIPROC_DIR" + ) + + # Verify each directory contains the server index + for i, prom_dir in enumerate(prom_dirs): + # The directory should contain the server index (0 or 1) + # to uniquely identify each server's metrics directory + self.assertIn(f"_dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain _dp{i}") + if __name__ == "__main__": unittest.main() diff --git a/tests/metrics/test_metrics.py b/tests/metrics/test_metrics.py index 65c228bcb65..009e93f7178 100644 --- a/tests/metrics/test_metrics.py +++ b/tests/metrics/test_metrics.py @@ -14,46 +14,64 @@ # limitations under the License. """ +import os import unittest from unittest.mock import patch from prometheus_client import Gauge -from fastdeploy.metrics.metrics import get_filtered_metrics +from fastdeploy.metrics.metrics import get_filtered_metrics, main_process_metrics class TestGetFilteredMetrics(unittest.TestCase): - def test_filtered_and_custom_metrics(self): - """ - Test get_filtered_metrics function: - 1. Exclude specific metrics from base_registry - 2. Keep other metrics in base_registry - 3. Ensure metrics registered by extra_register_func are effective - """ - - # Simulated metrics in base_registry (Gauge instances) - g_keep = Gauge("metric_to_keep", "Kept metric") - g_keep.set(1.23) - - g_exclude = Gauge("metric_to_exclude", "Excluded metric") - g_exclude.set(99) - - # Fake MultiProcessCollector: register our simulated metrics + def _collect_metrics_with_mocked_multiprocess(self, metric_name, multiprocess_value): def fake_multiprocess_collector(registry): - registry.register(g_keep) - registry.register(g_exclude) + gauge = Gauge(metric_name, f"fake metric for {metric_name}", ["pid"], registry=registry) + gauge.labels(pid="10001").set(multiprocess_value) - with patch( - "fastdeploy.metrics.metrics.multiprocess.MultiProcessCollector", side_effect=fake_multiprocess_collector + with ( + patch.dict(os.environ, {"PROMETHEUS_MULTIPROC_DIR": "/tmp/fake-prometheus-multiproc-dir"}, clear=False), + patch( + "fastdeploy.metrics.metrics.multiprocess.MultiProcessCollector", + side_effect=fake_multiprocess_collector, + ), ): - result = get_filtered_metrics() + return get_filtered_metrics() - print("==== result ====\n", result) + def _assert_unique_metric_value(self, metrics_text, metric_name, expected_value): + metric_lines = [line for line in metrics_text.splitlines() if line.startswith(f"{metric_name} ")] + self.assertEqual(metric_lines, [f"{metric_name} {expected_value}"]) + self.assertNotIn("pid=", metrics_text) - # 2. Kept metric should appear - self.assertIn("metric_to_keep", result) + def test_regular_gauge_returns_single_value_without_pid(self): + metric = main_process_metrics.batch_size + metric.set(8.0) - self.assertIn("metric_to_exclude", result) + result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1008.0) + + self._assert_unique_metric_value(result, metric._name, 8.0) + + def test_speculative_gauge_returns_single_value_without_pid(self): + if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"): + main_process_metrics._init_speculative_metrics("mtp", 2) + + metric = main_process_metrics.spec_decode_draft_acceptance_rate + metric.set(0.75) + + result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1000.75) + + self._assert_unique_metric_value(result, metric._name, 0.75) + + def test_speculative_single_head_gauge_returns_single_value_without_pid(self): + if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"): + main_process_metrics._init_speculative_metrics("mtp", 2) + + metric = main_process_metrics.spec_decode_draft_single_head_acceptance_rate[0] + metric.set(0.6) + + result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1000.6) + + self._assert_unique_metric_value(result, metric._name, 0.6) if __name__ == "__main__":