Skip to content

Commit f04bbbc

Browse files
committed
[Feature] Add logging parameters and error output to terminal
1 parent dd2aa10 commit f04bbbc

9 files changed

Lines changed: 276 additions & 14 deletions

File tree

docs/usage/environment_variables.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,24 @@ environment_variables: dict[str, Callable[[], Any]] = {
1515
# Log directory
1616
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),
1717

18+
# Global log level, prefer this over FD_DEBUG. Supports "INFO" and "DEBUG".
19+
"FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None),
20+
1821
# Enable debug mode (0 or 1)
1922
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),
2023

24+
# Request logging master switch. Set to 0 to disable request logging.
25+
"FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")),
26+
27+
# Request logging detail level (0-3). Higher level means more verbose output.
28+
"FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")),
29+
30+
# Max field length for request logging truncation.
31+
"FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")),
32+
33+
# Unified trace mode: off, local, otel, all.
34+
"FD_TRACE": lambda: os.getenv("FD_TRACE", "off"),
35+
2136
# FastDeploy log retention days
2237
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),
2338

docs/zh/usage/environment_variables.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,24 @@ environment_variables: dict[str, Callable[[], Any]] = {
1515
# 日志目录
1616
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),
1717

18+
# 全局日志级别,优先于 FD_DEBUG。支持 "INFO" 和 "DEBUG"。
19+
"FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None),
20+
1821
# 是否启用调试模式,可设置为 0 或 1
1922
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),
2023

24+
# 请求日志总开关。设置为 0 禁用请求日志。
25+
"FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")),
26+
27+
# 请求日志详细级别 (0-3)。级别越高输出越详细。
28+
"FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")),
29+
30+
# 请求日志字段截断最大长度。
31+
"FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")),
32+
33+
# 统一的 trace 开关:off, local, otel, all。
34+
"FD_TRACE": lambda: os.getenv("FD_TRACE", "off"),
35+
2136
# FastDeploy 日志保留天数
2237
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),
2338

fastdeploy/engine/common_engine.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import traceback
3232
import weakref
3333
from concurrent.futures import ThreadPoolExecutor
34+
from pathlib import Path
3435
from typing import Dict, List, Optional, Tuple
3536

3637
import numpy as np
@@ -83,6 +84,37 @@
8384
from fastdeploy.output.token_processor import TokenProcessor
8485

8586

87+
def _read_latest_worker_traceback(log_dir: str) -> Optional[str]:
88+
"""读取 workerlog.* 文件中的最新 traceback。"""
89+
90+
try:
91+
candidates = sorted(Path(log_dir).glob("workerlog.*"), key=lambda path: path.stat().st_mtime, reverse=True)
92+
except OSError:
93+
return None
94+
95+
for path in candidates:
96+
try:
97+
content = path.read_text(encoding="utf-8", errors="ignore")
98+
except OSError:
99+
continue
100+
101+
marker = "Traceback (most recent call last):"
102+
start = content.rfind(marker)
103+
if start != -1:
104+
return content[start:].strip()
105+
106+
return None
107+
108+
109+
def _format_worker_launch_failure_message(log_dir: str) -> str:
110+
"""格式化 worker 启动失败的错误消息,包含 traceback 信息。"""
111+
message = "Failed to launch worker processes, check log/workerlog.* for more details."
112+
traceback_text = _read_latest_worker_traceback(log_dir)
113+
if traceback_text:
114+
return f"{message}\n{traceback_text}"
115+
return message
116+
117+
86118
class EngineService:
87119
"""
88120
Base class containing common engine functionality
@@ -258,7 +290,7 @@ def start_worker_service(self, async_llm_pid=None):
258290
def check_worker_initialize_status_func(res: dict):
259291
res["worker_is_alive"] = True
260292
if not self.check_worker_initialize_status():
261-
self.llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
293+
self.llm_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
262294
res["worker_is_alive"] = False
263295

264296
self.check_worker_initialize_status_func_thread = threading.Thread(
@@ -284,7 +316,7 @@ def check_worker_initialize_status_func(res: dict):
284316
# Worker launched
285317
self.check_worker_initialize_status_func_thread.join()
286318
if not result_container["worker_is_alive"]:
287-
self.llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
319+
self.llm_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
288320
return False
289321

290322
# Start ZMQ service for communication with AsyncLLM

fastdeploy/engine/engine.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@
3737

3838
import fastdeploy.metrics.trace as tracing
3939
from fastdeploy.engine.args_utils import EngineArgs
40-
from fastdeploy.engine.common_engine import EngineService
40+
from fastdeploy.engine.common_engine import (
41+
EngineService,
42+
_format_worker_launch_failure_message,
43+
)
4144
from fastdeploy.engine.expert_service import start_data_parallel_service
4245
from fastdeploy.engine.request import Request
4346
from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal
@@ -157,7 +160,7 @@ def start(self, api_server_pid=None):
157160
def check_worker_initialize_status_func(res: dict):
158161
res["worker_is_alive"] = True
159162
if not self.check_worker_initialize_status():
160-
console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
163+
console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
161164
res["worker_is_alive"] = False
162165

163166
self.check_worker_initialize_status_func_thread = threading.Thread(
@@ -175,7 +178,8 @@ def check_worker_initialize_status_func(res: dict):
175178
# If block number is not specified, let workers do profiling to determine the block number,
176179
# and then start the cache manager
177180
if self.do_profile:
178-
self._stop_profile()
181+
if not self._stop_profile():
182+
return False
179183
elif self.cfg.scheduler_config.splitwise_role == "mixed" and self.cfg.cache_config.enable_prefix_caching:
180184
if not current_platform.is_intel_hpu():
181185
device_ids = self.cfg.parallel_config.device_ids.split(",")
@@ -203,7 +207,7 @@ def check_worker_initialize_status_func(res: dict):
203207
# Worker launched
204208
self.check_worker_initialize_status_func_thread.join()
205209
if not result_container["worker_is_alive"]:
206-
console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
210+
console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
207211
return False
208212

209213
console_logger.info(f"Worker processes are launched with {time.time() - start_time} seconds.")
@@ -752,7 +756,8 @@ def _stop_profile(self):
752756
while self.get_profile_block_num_signal.value[0] == 0:
753757
if hasattr(self, "worker_proc") and self.worker_proc is not None:
754758
if self.worker_proc.poll() is not None:
755-
raise RuntimeError("Worker process failed to start." "Please check log/workerlog.* for details.")
759+
console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
760+
return False
756761
time.sleep(1)
757762
num_gpu_blocks = self.get_profile_block_num_signal.value[0]
758763
self.cfg.cache_config.reset(num_gpu_blocks)
@@ -761,6 +766,7 @@ def _stop_profile(self):
761766
if not current_platform.is_intel_hpu():
762767
device_ids = self.cfg.parallel_config.device_ids.split(",")
763768
self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix)
769+
return True
764770

765771
def check_health(self, time_interval_threashold=30):
766772
"""

fastdeploy/envs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,18 @@ def _validate_split_kv_size(value: int) -> int:
3636
"FD_BUILDING_ARCS": lambda: os.getenv("FD_BUILDING_ARCS", "[]"),
3737
# Log directory.
3838
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),
39+
# Global log level, prefer this over FD_DEBUG. Supports "INFO" and "DEBUG".
40+
"FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None),
3941
# Whether to use debug mode, can set 0 or 1
4042
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),
43+
# Request logging master switch. Set to 0 to disable request logging.
44+
"FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")),
45+
# Request logging detail level (0-3). Higher level means more verbose output.
46+
"FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")),
47+
# Max field length for request logging truncation.
48+
"FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")),
49+
# Unified trace mode: off, local, otel, all.
50+
"FD_TRACE": lambda: os.getenv("FD_TRACE", "off"),
4151
# Number of days to keep fastdeploy logs.
4252
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),
4353
# Model download source, can set "AISTUDIO", "MODELSCOPE" or "HUGGINGFACE".

fastdeploy/logger/handlers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ def _open(self):
323323
"""
324324
open new log file
325325
"""
326+
self.current_log_path.parent.mkdir(parents=True, exist_ok=True)
326327
if self.encoding is None:
327328
stream = open(str(self.current_log_path), self.mode)
328329
else:

fastdeploy/logger/setup_logging.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,20 @@
2626
from fastdeploy import envs
2727

2828

29+
class MaxLevelFilter(logging.Filter):
30+
"""过滤低于指定级别的日志记录。
31+
32+
用于将 INFO/DEBUG 路由到 stdout,ERROR/CRITICAL 路由到 stderr。
33+
"""
34+
35+
def __init__(self, level):
36+
super().__init__()
37+
self.level = logging._nameToLevel.get(level, level)
38+
39+
def filter(self, record):
40+
return record.levelno < self.level
41+
42+
2943
def setup_logging(log_dir=None, config_file=None):
3044
"""
3145
设置FastDeploy的日志配置
@@ -41,7 +55,7 @@ def setup_logging(log_dir=None, config_file=None):
4155

4256
# 使用环境变量中的日志目录,如果没有则使用传入的参数或默认值
4357
if log_dir is None:
44-
log_dir = getattr(envs, "FD_LOG_DIR", "logs")
58+
log_dir = getattr(envs, "FD_LOG_DIR", "log")
4559

4660
# 确保日志目录存在
4761
Path(log_dir).mkdir(parents=True, exist_ok=True)
@@ -58,6 +72,12 @@ def setup_logging(log_dir=None, config_file=None):
5872
default_config = {
5973
"version": 1,
6074
"disable_existing_loggers": False,
75+
"filters": {
76+
"below_error": {
77+
"()": "fastdeploy.logger.setup_logging.MaxLevelFilter",
78+
"level": "ERROR",
79+
}
80+
},
6181
"formatters": {
6282
"standard": {
6383
"class": "logging.Formatter",
@@ -71,12 +91,21 @@ def setup_logging(log_dir=None, config_file=None):
7191
},
7292
},
7393
"handlers": {
74-
"console": {
94+
# 控制台标准输出,用于 INFO/DEBUG(低于 ERROR 级别)
95+
"console_stdout": {
7596
"class": "logging.StreamHandler",
7697
"level": FASTDEPLOY_LOGGING_LEVEL,
98+
"filters": ["below_error"],
7799
"formatter": "colored",
78100
"stream": "ext://sys.stdout",
79101
},
102+
# 控制台错误输出,用于 ERROR/CRITICAL
103+
"console_stderr": {
104+
"class": "logging.StreamHandler",
105+
"level": "ERROR",
106+
"formatter": "colored",
107+
"stream": "ext://sys.stderr",
108+
},
80109
# 默认错误日志,保留最新1个小时的日志,位置在log/error.log
81110
"error_file": {
82111
"class": "logging.handlers.TimedRotatingFileHandler",
@@ -122,7 +151,14 @@ def setup_logging(log_dir=None, config_file=None):
122151
# 默认日志记录器,全局共享
123152
"fastdeploy": {
124153
"level": "DEBUG",
125-
"handlers": ["error_file", "default_file", "error_archive", "default_archive"],
154+
"handlers": [
155+
"console_stdout",
156+
"console_stderr",
157+
"error_file",
158+
"default_file",
159+
"error_archive",
160+
"default_archive",
161+
],
126162
"propagate": False,
127163
}
128164
},

tests/engine/test_common_engine.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import asyncio
1818
import os
1919
import sys
20+
import tempfile
2021
import threading
2122
import time
2223
import types
@@ -39,7 +40,11 @@ def enable_torch_proxy(scope=None):
3940
paddle.compat = _PaddleCompat()
4041

4142
from fastdeploy.engine.args_utils import EngineArgs
42-
from fastdeploy.engine.common_engine import EngineService
43+
from fastdeploy.engine.common_engine import (
44+
EngineService,
45+
_format_worker_launch_failure_message,
46+
_read_latest_worker_traceback,
47+
)
4348
from fastdeploy.engine.request import (
4449
ControlRequest,
4550
ControlResponse,
@@ -3722,3 +3727,87 @@ def fake_time():
37223727

37233728
eng.resource_manager.recycle_abort_task.assert_called_with("req-1_0")
37243729
self._detach_finalizer(eng)
3730+
3731+
3732+
class TestWorkerTracebackFunctions(unittest.TestCase):
3733+
"""测试 _read_latest_worker_traceback 和 _format_worker_launch_failure_message 函数"""
3734+
3735+
def test_read_latest_worker_traceback_finds_traceback(self):
3736+
"""测试能够正确读取 workerlog 文件中的 traceback"""
3737+
with tempfile.TemporaryDirectory() as temp_dir:
3738+
worker_log = os.path.join(temp_dir, "workerlog.0")
3739+
with open(worker_log, "w", encoding="utf-8") as fp:
3740+
fp.write(
3741+
"Some normal log output\n"
3742+
"Traceback (most recent call last):\n"
3743+
' File "worker_process.py", line 1, in <module>\n'
3744+
" run_worker_proc()\n"
3745+
"ValueError: The total number of blocks cannot be less than zero.\n"
3746+
)
3747+
3748+
result = _read_latest_worker_traceback(temp_dir)
3749+
self.assertIsNotNone(result)
3750+
self.assertIn("Traceback (most recent call last):", result)
3751+
self.assertIn("ValueError:", result)
3752+
3753+
def test_read_latest_worker_traceback_returns_none_when_no_traceback(self):
3754+
"""测试当没有 traceback 时返回 None"""
3755+
with tempfile.TemporaryDirectory() as temp_dir:
3756+
worker_log = os.path.join(temp_dir, "workerlog.0")
3757+
with open(worker_log, "w", encoding="utf-8") as fp:
3758+
fp.write("Normal log output without any errors\n")
3759+
3760+
result = _read_latest_worker_traceback(temp_dir)
3761+
self.assertIsNone(result)
3762+
3763+
def test_read_latest_worker_traceback_returns_none_when_no_files(self):
3764+
"""测试当没有 workerlog 文件时返回 None"""
3765+
with tempfile.TemporaryDirectory() as temp_dir:
3766+
result = _read_latest_worker_traceback(temp_dir)
3767+
self.assertIsNone(result)
3768+
3769+
def test_read_latest_worker_traceback_returns_none_for_nonexistent_dir(self):
3770+
"""测试当目录不存在时返回 None"""
3771+
result = _read_latest_worker_traceback("/nonexistent/path")
3772+
self.assertIsNone(result)
3773+
3774+
def test_read_latest_worker_traceback_picks_latest_file(self):
3775+
"""测试当有多个 workerlog 文件时选择最新的"""
3776+
with tempfile.TemporaryDirectory() as temp_dir:
3777+
# 创建较旧的文件
3778+
old_log = os.path.join(temp_dir, "workerlog.0")
3779+
with open(old_log, "w", encoding="utf-8") as fp:
3780+
fp.write("Traceback (most recent call last):\nOldError: old error\n")
3781+
3782+
# 短暂等待以确保时间戳不同
3783+
time.sleep(0.01)
3784+
3785+
# 创建较新的文件
3786+
new_log = os.path.join(temp_dir, "workerlog.1")
3787+
with open(new_log, "w", encoding="utf-8") as fp:
3788+
fp.write("Traceback (most recent call last):\nNewError: new error\n")
3789+
3790+
result = _read_latest_worker_traceback(temp_dir)
3791+
self.assertIsNotNone(result)
3792+
self.assertIn("NewError", result)
3793+
3794+
def test_format_worker_launch_failure_message_with_traceback(self):
3795+
"""测试带有 traceback 的错误消息格式化"""
3796+
with tempfile.TemporaryDirectory() as temp_dir:
3797+
worker_log = os.path.join(temp_dir, "workerlog.0")
3798+
with open(worker_log, "w", encoding="utf-8") as fp:
3799+
fp.write("Traceback (most recent call last):\n" "ValueError: Test error message\n")
3800+
3801+
result = _format_worker_launch_failure_message(temp_dir)
3802+
self.assertIn("Failed to launch worker processes", result)
3803+
self.assertIn("workerlog.*", result)
3804+
self.assertIn("Traceback (most recent call last):", result)
3805+
self.assertIn("ValueError: Test error message", result)
3806+
3807+
def test_format_worker_launch_failure_message_without_traceback(self):
3808+
"""测试没有 traceback 时的错误消息格式化"""
3809+
with tempfile.TemporaryDirectory() as temp_dir:
3810+
result = _format_worker_launch_failure_message(temp_dir)
3811+
self.assertIn("Failed to launch worker processes", result)
3812+
self.assertIn("workerlog.*", result)
3813+
self.assertNotIn("Traceback", result)

0 commit comments

Comments
 (0)