Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions fastdeploy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,11 @@
"""

import os
import uuid

# suppress warning log from paddlepaddle
os.environ["GLOG_minloglevel"] = "2"
# suppress log from aistudio
os.environ["AISTUDIO_LOG"] = "critical"
# set prometheus dir
if os.getenv("PROMETHEUS_MULTIPROC_DIR", "") == "":
prom_dir = f"/tmp/fd_prom_{str(uuid.uuid4())}"
os.environ["PROMETHEUS_MULTIPROC_DIR"] = prom_dir
if os.path.exists(prom_dir):
os.rmdir(prom_dir)
os.mkdir(prom_dir)

import typing

Expand Down
9 changes: 9 additions & 0 deletions fastdeploy/entrypoints/openai/multi_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ def start_servers(
env = os.environ.copy()
env["FD_ENABLE_MULTI_API_SERVER"] = "1"
env["FD_LOG_DIR"] = env.get("FD_LOG_DIR", "log") + f"/log_{i}"
if "PROMETHEUS_MULTIPROC_DIR" in env:
prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR")
prom_dir_i = os.path.join(os.path.dirname(prom_dir), os.path.basename(prom_dir) + f"_dp{i}")
# Create the directory if it doesn't exist
if not os.path.exists(prom_dir_i):
os.makedirs(prom_dir_i, exist_ok=True)
env["PROMETHEUS_MULTIPROC_DIR"] = prom_dir_i
logger.info(f"Set PROMETHEUS_MULTIPROC_DIR for DP {i}: {prom_dir_i}")

cmd = [
sys.executable,
"-m",
Expand Down
54 changes: 41 additions & 13 deletions fastdeploy/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def collect(self):
Metric: Prometheus Metric objects that are not excluded.
"""
for metric in self.base_registry.collect():
if not any(name.startswith(metric.name) for name in self.exclude_names):
if not any(metric.name.startswith(name) for name in self.exclude_names):
yield metric


Expand All @@ -83,11 +83,15 @@ def get_filtered_metrics() -> str:
multiprocess.MultiProcessCollector(base_registry)

filtered_registry = CollectorRegistry()
# 注册一个新的colletor,过滤gauge指标
filtered_registry.register(SimpleCollector(base_registry, EXCLUDE_LABELS))
# 动态获取需要排除的 gauge 指标列表
exclude_labels = main_process_metrics.get_excluded_metrics()
# 注册一个新的collector,过滤gauge指标
filtered_registry.register(SimpleCollector(base_registry, exclude_labels))

# 将gauge指标重新注册到filtered_registry中,从内存中读取
main_process_metrics.re_register_gauge(filtered_registry)
# 将speculative中的gauge指标也重新注册
main_process_metrics.re_register_speculative_gauge(filtered_registry)

return generate_latest(filtered_registry).decode("utf-8")

Expand Down Expand Up @@ -195,7 +199,7 @@ class MetricsManager:
"type": Gauge,
"name": "fastdeploy:num_requests_running",
"description": "Number of requests currently running",
"kwargs": {"multiprocess_mode": "sum"},
"kwargs": {},
},
"num_requests_waiting": {
"type": Gauge,
Expand Down Expand Up @@ -625,19 +629,22 @@ def __init__(self):
# 在模块加载,指标注册先设置Prometheus环境变量
setup_multiprocess_prometheus()

# 动态创建所有指标
# 动态创建所有非 gauge 型指标
for metric_name, config in self.METRICS.items():
setattr(
self,
metric_name,
config["type"](config["name"], config["description"], **config["kwargs"]),
)
# 动态创建所有指标
# 动态创建所有 gauge 型指标,统一配置 multiprocess_mode 为 livesum
for metric_name, config in self.GAUGE_METRICS.items():
kwargs = config["kwargs"].copy()
if "multiprocess_mode" not in kwargs:
kwargs["multiprocess_mode"] = "livesum"
setattr(
self,
metric_name,
config["type"](config["name"], config["description"], **config["kwargs"]),
config["type"](config["name"], config["description"], **kwargs),
)
# 动态创建server metrics
for metric_name, config in self.SERVER_METRICS.items():
Expand Down Expand Up @@ -695,17 +702,22 @@ def _init_speculative_metrics(self, speculative_method, num_speculative_tokens):
Gauge(
f"{config['name']}_{i}",
f"{config['description']} (head {i})",
multiprocess_mode="livesum",
)
)
setattr(self, metric_name, gauges)
else:
# For Gauge metrics, automatically add multiprocess_mode="livesum"
kwargs = config["kwargs"].copy()
if config["type"] == Gauge and "multiprocess_mode" not in kwargs:
kwargs["multiprocess_mode"] = "livesum"
setattr(
self,
metric_name,
config["type"](
config["name"],
config["description"],
**config["kwargs"],
**kwargs,
),
)

Expand Down Expand Up @@ -766,6 +778,19 @@ def register_speculative_metrics(self, registry: CollectorRegistry):
else:
registry.register(getattr(self, metric_name))

def re_register_speculative_gauge(self, registry: CollectorRegistry):
"""Re-register gauge metrics from SPECULATIVE_METRICS to the specified registry"""
# Check if SPECULATIVE_METRICS was initialized in this process
# (it's an instance attribute set by _init_speculative_metrics, not the class-level empty dict)
if not hasattr(self, "spec_decode_draft_acceptance_rate"):
return
for metric_name, config in self.SPECULATIVE_METRICS.items():
if metric_name == "spec_decode_draft_single_head_acceptance_rate":
for gauge in getattr(self, metric_name):
registry.register(gauge)
elif config["type"] == Gauge:
registry.register(getattr(self, metric_name))

def re_register_gauge(self, registry: CollectorRegistry):
"""Re-register gauge to the specified registry"""
for metric_name in self.GAUGE_METRICS:
Expand All @@ -789,16 +814,19 @@ def register_all(self, registry: CollectorRegistry):
if hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
self.register_speculative_metrics(registry)

@classmethod
def get_excluded_metrics(cls) -> Set[str]:
def get_excluded_metrics(self) -> Set[str]:
"""Get the set of indicator names that need to be excluded"""
return {config["name"] for config in cls.GAUGE_METRICS.values()}
excluded = {config["name"] for config in self.GAUGE_METRICS.values()}
# Also add gauge metrics from SPECULATIVE_METRICS (if initialized)
if hasattr(self, "SPECULATIVE_METRICS"):
for config in self.SPECULATIVE_METRICS.values():
if config["type"] == Gauge or config["type"] == list[Gauge]:
excluded.add(config["name"])
return excluded


main_process_metrics = MetricsManager()

# 由于zmq指标记录比较耗时,默认不开启,通过DEBUG参数开启
if envs.FD_DEBUG:
main_process_metrics.init_zmq_metrics()

EXCLUDE_LABELS = MetricsManager.get_excluded_metrics()
52 changes: 52 additions & 0 deletions tests/entrypoints/openai/test_multi_api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,58 @@ def test_main_function(self, mock_check_param, mock_sleep, mock_start_servers, m
mock_proc1.wait.assert_called_once()
mock_proc2.wait.assert_called_once()

@patch("fastdeploy.entrypoints.openai.multi_api_server.subprocess.Popen")
@patch("fastdeploy.entrypoints.openai.multi_api_server.is_port_available")
def test_prometheus_multiprocess_dir_per_dp(self, mock_is_port_available, mock_popen):
"""Test that each DP server gets a unique PROMETHEUS_MULTIPROC_DIR"""
# Mock port availability check
mock_is_port_available.return_value = True

# Mock subprocess.Popen to capture env passed to each server
envs_captured = []

def capture_popen(*args, **kwargs):
envs_captured.append(kwargs.get("env", {}).copy())
mock_proc = MagicMock()
mock_proc.pid = 1000 + len(envs_captured)
return mock_proc

mock_popen.side_effect = capture_popen

# Call start_servers with 2 servers
processes = start_servers(
server_count=2,
device_count=2,
server_args=self.test_server_args,
ports="8000,8001",
metrics_ports="8800,8801",
controller_ports="-1",
)

# Verify subprocess.Popen was called twice
self.assertEqual(mock_popen.call_count, 2)
self.assertEqual(len(envs_captured), 2)
self.assertEqual(len(processes), 2)

# Verify each server has a unique PROMETHEUS_MULTIPROC_DIR
prom_dirs = []
for i, env in enumerate(envs_captured):
prom_dir = env.get("PROMETHEUS_MULTIPROC_DIR")
print(f"Server {i} PROMETHEUS_MULTIPROC_DIR: {prom_dir}")
self.assertIsNotNone(prom_dir, f"Server {i} should have PROMETHEUS_MULTIPROC_DIR set")
prom_dirs.append(prom_dir)

# Verify all PROMETHEUS_MULTIPROC_DIR values are unique
self.assertEqual(
len(prom_dirs), len(set(prom_dirs)), "Each DP server should have a unique PROMETHEUS_MULTIPROC_DIR"
)

# Verify each directory contains the server index
for i, prom_dir in enumerate(prom_dirs):
# The directory should contain the server index (0 or 1)
# to uniquely identify each server's metrics directory
self.assertIn(f"_dp{i}", prom_dir, f"PROMETHEUS_MULTIPROC_DIR for server {i} should contain _dp{i}")


if __name__ == "__main__":
unittest.main()
70 changes: 44 additions & 26 deletions tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,64 @@
# limitations under the License.
"""

import os
import unittest
from unittest.mock import patch

from prometheus_client import Gauge

from fastdeploy.metrics.metrics import get_filtered_metrics
from fastdeploy.metrics.metrics import get_filtered_metrics, main_process_metrics


class TestGetFilteredMetrics(unittest.TestCase):
def test_filtered_and_custom_metrics(self):
"""
Test get_filtered_metrics function:
1. Exclude specific metrics from base_registry
2. Keep other metrics in base_registry
3. Ensure metrics registered by extra_register_func are effective
"""

# Simulated metrics in base_registry (Gauge instances)
g_keep = Gauge("metric_to_keep", "Kept metric")
g_keep.set(1.23)

g_exclude = Gauge("metric_to_exclude", "Excluded metric")
g_exclude.set(99)

# Fake MultiProcessCollector: register our simulated metrics
def _collect_metrics_with_mocked_multiprocess(self, metric_name, multiprocess_value):
def fake_multiprocess_collector(registry):
registry.register(g_keep)
registry.register(g_exclude)
gauge = Gauge(metric_name, f"fake metric for {metric_name}", ["pid"], registry=registry)
gauge.labels(pid="10001").set(multiprocess_value)

with patch(
"fastdeploy.metrics.metrics.multiprocess.MultiProcessCollector", side_effect=fake_multiprocess_collector
with (
patch.dict(os.environ, {"PROMETHEUS_MULTIPROC_DIR": "/tmp/fake-prometheus-multiproc-dir"}, clear=False),
patch(
"fastdeploy.metrics.metrics.multiprocess.MultiProcessCollector",
side_effect=fake_multiprocess_collector,
),
):
result = get_filtered_metrics()
return get_filtered_metrics()

print("==== result ====\n", result)
def _assert_unique_metric_value(self, metrics_text, metric_name, expected_value):
metric_lines = [line for line in metrics_text.splitlines() if line.startswith(f"{metric_name} ")]
self.assertEqual(metric_lines, [f"{metric_name} {expected_value}"])
self.assertNotIn("pid=", metrics_text)

# 2. Kept metric should appear
self.assertIn("metric_to_keep", result)
def test_regular_gauge_returns_single_value_without_pid(self):
metric = main_process_metrics.batch_size
metric.set(8.0)

self.assertIn("metric_to_exclude", result)
result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1008.0)

self._assert_unique_metric_value(result, metric._name, 8.0)

def test_speculative_gauge_returns_single_value_without_pid(self):
if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
main_process_metrics._init_speculative_metrics("mtp", 2)

metric = main_process_metrics.spec_decode_draft_acceptance_rate
metric.set(0.75)

result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1000.75)

self._assert_unique_metric_value(result, metric._name, 0.75)

def test_speculative_single_head_gauge_returns_single_value_without_pid(self):
if not hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
main_process_metrics._init_speculative_metrics("mtp", 2)

metric = main_process_metrics.spec_decode_draft_single_head_acceptance_rate[0]
metric.set(0.6)

result = self._collect_metrics_with_mocked_multiprocess(metric._name, multiprocess_value=1000.6)

self._assert_unique_metric_value(result, metric._name, 0.6)


if __name__ == "__main__":
Expand Down
Loading