Skip to content

Commit d7c2166

Browse files
committed
[Feature] Add logging parameters and error output to terminal
1 parent 5c60e2f commit d7c2166

9 files changed

Lines changed: 276 additions & 14 deletions

File tree

docs/usage/environment_variables.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,24 @@ environment_variables: dict[str, Callable[[], Any]] = {
1515
# Log directory
1616
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),
1717

18+
# Global log level, prefer this over FD_DEBUG. Supports "INFO" and "DEBUG".
19+
"FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None),
20+
1821
# Enable debug mode (0 or 1)
1922
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),
2023

24+
# Request logging master switch. Set to 0 to disable request logging.
25+
"FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")),
26+
27+
# Request logging detail level (0-3). Higher level means more verbose output.
28+
"FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")),
29+
30+
# Max field length for request logging truncation.
31+
"FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")),
32+
33+
# Unified trace mode: off, local, otel, all.
34+
"FD_TRACE": lambda: os.getenv("FD_TRACE", "off"),
35+
2136
# FastDeploy log retention days
2237
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),
2338

docs/zh/usage/environment_variables.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,24 @@ environment_variables: dict[str, Callable[[], Any]] = {
1515
# 日志目录
1616
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),
1717

18+
# 全局日志级别,优先于 FD_DEBUG。支持 "INFO" 和 "DEBUG"。
19+
"FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None),
20+
1821
# 是否启用调试模式,可设置为 0 或 1
1922
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),
2023

24+
# 请求日志总开关。设置为 0 禁用请求日志。
25+
"FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")),
26+
27+
# 请求日志详细级别 (0-3)。级别越高输出越详细。
28+
"FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")),
29+
30+
# 请求日志字段截断最大长度。
31+
"FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")),
32+
33+
# 统一的 trace 开关:off, local, otel, all。
34+
"FD_TRACE": lambda: os.getenv("FD_TRACE", "off"),
35+
2136
# FastDeploy 日志保留天数
2237
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),
2338

fastdeploy/engine/common_engine.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import traceback
3232
import weakref
3333
from concurrent.futures import ThreadPoolExecutor
34+
from pathlib import Path
3435
from typing import Dict, List, Optional, Tuple
3536

3637
import numpy as np
@@ -81,6 +82,37 @@
8182
from fastdeploy.output.token_processor import TokenProcessor
8283

8384

85+
def _read_latest_worker_traceback(log_dir: str) -> Optional[str]:
86+
"""读取 workerlog.* 文件中的最新 traceback。"""
87+
88+
try:
89+
candidates = sorted(Path(log_dir).glob("workerlog.*"), key=lambda path: path.stat().st_mtime, reverse=True)
90+
except OSError:
91+
return None
92+
93+
for path in candidates:
94+
try:
95+
content = path.read_text(encoding="utf-8", errors="ignore")
96+
except OSError:
97+
continue
98+
99+
marker = "Traceback (most recent call last):"
100+
start = content.rfind(marker)
101+
if start != -1:
102+
return content[start:].strip()
103+
104+
return None
105+
106+
107+
def _format_worker_launch_failure_message(log_dir: str) -> str:
108+
"""格式化 worker 启动失败的错误消息,包含 traceback 信息。"""
109+
message = "Failed to launch worker processes, check log/workerlog.* for more details."
110+
traceback_text = _read_latest_worker_traceback(log_dir)
111+
if traceback_text:
112+
return f"{message}\n{traceback_text}"
113+
return message
114+
115+
84116
class EngineService:
85117
"""
86118
Base class containing common engine functionality
@@ -256,7 +288,7 @@ def start_worker_service(self, async_llm_pid=None):
256288
def check_worker_initialize_status_func(res: dict):
257289
res["worker_is_alive"] = True
258290
if not self.check_worker_initialize_status():
259-
self.llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
291+
self.llm_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
260292
res["worker_is_alive"] = False
261293

262294
self.check_worker_initialize_status_func_thread = threading.Thread(
@@ -282,7 +314,7 @@ def check_worker_initialize_status_func(res: dict):
282314
# Worker launched
283315
self.check_worker_initialize_status_func_thread.join()
284316
if not result_container["worker_is_alive"]:
285-
self.llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
317+
self.llm_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
286318
return False
287319

288320
# Start ZMQ service for communication with AsyncLLM

fastdeploy/engine/engine.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
import fastdeploy.metrics.trace as tracing
3939
from fastdeploy.engine.args_utils import EngineArgs
40-
from fastdeploy.engine.common_engine import EngineService
40+
from fastdeploy.engine.common_engine import EngineService, _format_worker_launch_failure_message
4141
from fastdeploy.engine.expert_service import start_data_parallel_service
4242
from fastdeploy.engine.request import Request
4343
from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal
@@ -157,7 +157,7 @@ def start(self, api_server_pid=None):
157157
def check_worker_initialize_status_func(res: dict):
158158
res["worker_is_alive"] = True
159159
if not self.check_worker_initialize_status():
160-
console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
160+
console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
161161
res["worker_is_alive"] = False
162162

163163
self.check_worker_initialize_status_func_thread = threading.Thread(
@@ -175,7 +175,8 @@ def check_worker_initialize_status_func(res: dict):
175175
# If block number is not specified, let workers do profiling to determine the block number,
176176
# and then start the cache manager
177177
if self.do_profile:
178-
self._stop_profile()
178+
if not self._stop_profile():
179+
return False
179180
elif self.cfg.scheduler_config.splitwise_role == "mixed" and self.cfg.cache_config.enable_prefix_caching:
180181
if not current_platform.is_intel_hpu():
181182
device_ids = self.cfg.parallel_config.device_ids.split(",")
@@ -203,7 +204,7 @@ def check_worker_initialize_status_func(res: dict):
203204
# Worker launched
204205
self.check_worker_initialize_status_func_thread.join()
205206
if not result_container["worker_is_alive"]:
206-
console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
207+
console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
207208
return False
208209

209210
console_logger.info(f"Worker processes are launched with {time.time() - start_time} seconds.")
@@ -752,7 +753,8 @@ def _stop_profile(self):
752753
while self.get_profile_block_num_signal.value[0] == 0:
753754
if hasattr(self, "worker_proc") and self.worker_proc is not None:
754755
if self.worker_proc.poll() is not None:
755-
raise RuntimeError("Worker process failed to start." "Please check log/workerlog.* for details.")
756+
console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
757+
return False
756758
time.sleep(1)
757759
num_gpu_blocks = self.get_profile_block_num_signal.value[0]
758760
self.cfg.cache_config.reset(num_gpu_blocks)
@@ -761,6 +763,7 @@ def _stop_profile(self):
761763
if not current_platform.is_intel_hpu():
762764
device_ids = self.cfg.parallel_config.device_ids.split(",")
763765
self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix)
766+
return True
764767

765768
def check_health(self, time_interval_threashold=30):
766769
"""

fastdeploy/envs.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,18 @@ def _validate_split_kv_size(value: int) -> int:
3636
"FD_BUILDING_ARCS": lambda: os.getenv("FD_BUILDING_ARCS", "[]"),
3737
# Log directory.
3838
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),
39+
# Global log level, prefer this over FD_DEBUG. Supports "INFO" and "DEBUG".
40+
"FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None),
3941
# Whether to use debug mode, can set 0 or 1
4042
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),
43+
# Request logging master switch. Set to 0 to disable request logging.
44+
"FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")),
45+
# Request logging detail level (0-3). Higher level means more verbose output.
46+
"FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")),
47+
# Max field length for request logging truncation.
48+
"FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")),
49+
# Unified trace mode: off, local, otel, all.
50+
"FD_TRACE": lambda: os.getenv("FD_TRACE", "off"),
4151
# Number of days to keep fastdeploy logs.
4252
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),
4353
# Model download source, can set "AISTUDIO", "MODELSCOPE" or "HUGGINGFACE".

fastdeploy/logger/handlers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ def _open(self):
323323
"""
324324
open new log file
325325
"""
326+
self.current_log_path.parent.mkdir(parents=True, exist_ok=True)
326327
if self.encoding is None:
327328
stream = open(str(self.current_log_path), self.mode)
328329
else:

fastdeploy/logger/setup_logging.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,20 @@
2626
from fastdeploy import envs
2727

2828

29+
class MaxLevelFilter(logging.Filter):
30+
"""过滤低于指定级别的日志记录。
31+
32+
用于将 INFO/DEBUG 路由到 stdout,ERROR/CRITICAL 路由到 stderr。
33+
"""
34+
35+
def __init__(self, level):
36+
super().__init__()
37+
self.level = logging._nameToLevel.get(level, level)
38+
39+
def filter(self, record):
40+
return record.levelno < self.level
41+
42+
2943
def setup_logging(log_dir=None, config_file=None):
3044
"""
3145
设置FastDeploy的日志配置
@@ -41,7 +55,7 @@ def setup_logging(log_dir=None, config_file=None):
4155

4256
# 使用环境变量中的日志目录,如果没有则使用传入的参数或默认值
4357
if log_dir is None:
44-
log_dir = getattr(envs, "FD_LOG_DIR", "logs")
58+
log_dir = getattr(envs, "FD_LOG_DIR", "log")
4559

4660
# 确保日志目录存在
4761
Path(log_dir).mkdir(parents=True, exist_ok=True)
@@ -58,6 +72,12 @@ def setup_logging(log_dir=None, config_file=None):
5872
default_config = {
5973
"version": 1,
6074
"disable_existing_loggers": False,
75+
"filters": {
76+
"below_error": {
77+
"()": "fastdeploy.logger.setup_logging.MaxLevelFilter",
78+
"level": "ERROR",
79+
}
80+
},
6181
"formatters": {
6282
"standard": {
6383
"class": "logging.Formatter",
@@ -71,12 +91,21 @@ def setup_logging(log_dir=None, config_file=None):
7191
},
7292
},
7393
"handlers": {
74-
"console": {
94+
# 控制台标准输出,用于 INFO/DEBUG(低于 ERROR 级别)
95+
"console_stdout": {
7596
"class": "logging.StreamHandler",
7697
"level": FASTDEPLOY_LOGGING_LEVEL,
98+
"filters": ["below_error"],
7799
"formatter": "colored",
78100
"stream": "ext://sys.stdout",
79101
},
102+
# 控制台错误输出,用于 ERROR/CRITICAL
103+
"console_stderr": {
104+
"class": "logging.StreamHandler",
105+
"level": "ERROR",
106+
"formatter": "colored",
107+
"stream": "ext://sys.stderr",
108+
},
80109
# 默认错误日志,保留最新1个小时的日志,位置在log/error.log
81110
"error_file": {
82111
"class": "logging.handlers.TimedRotatingFileHandler",
@@ -122,7 +151,14 @@ def setup_logging(log_dir=None, config_file=None):
122151
# 默认日志记录器,全局共享
123152
"fastdeploy": {
124153
"level": "DEBUG",
125-
"handlers": ["error_file", "default_file", "error_archive", "default_archive"],
154+
"handlers": [
155+
"console_stdout",
156+
"console_stderr",
157+
"error_file",
158+
"default_file",
159+
"error_archive",
160+
"default_archive",
161+
],
126162
"propagate": False,
127163
}
128164
},

tests/engine/test_common_engine.py

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import asyncio
1818
import os
1919
import sys
20+
import tempfile
2021
import threading
2122
import time
2223
import types
@@ -39,7 +40,11 @@ def enable_torch_proxy(scope=None):
3940
paddle.compat = _PaddleCompat()
4041

4142
from fastdeploy.engine.args_utils import EngineArgs
42-
from fastdeploy.engine.common_engine import EngineService
43+
from fastdeploy.engine.common_engine import (
44+
EngineService,
45+
_format_worker_launch_failure_message,
46+
_read_latest_worker_traceback,
47+
)
4348
from fastdeploy.engine.request import (
4449
ControlRequest,
4550
ControlResponse,
@@ -3504,3 +3509,90 @@ def _fake_sleep(s):
35043509
# At least one sleep call was made, confirming the inner function executed
35053510
self.assertGreaterEqual(call_count[0], 1)
35063511
self._detach_finalizer(eng)
3512+
3513+
3514+
class TestWorkerTracebackFunctions(unittest.TestCase):
3515+
"""测试 _read_latest_worker_traceback 和 _format_worker_launch_failure_message 函数"""
3516+
3517+
def test_read_latest_worker_traceback_finds_traceback(self):
3518+
"""测试能够正确读取 workerlog 文件中的 traceback"""
3519+
with tempfile.TemporaryDirectory() as temp_dir:
3520+
worker_log = os.path.join(temp_dir, "workerlog.0")
3521+
with open(worker_log, "w", encoding="utf-8") as fp:
3522+
fp.write(
3523+
"Some normal log output\n"
3524+
"Traceback (most recent call last):\n"
3525+
' File "worker_process.py", line 1, in <module>\n'
3526+
" run_worker_proc()\n"
3527+
"ValueError: The total number of blocks cannot be less than zero.\n"
3528+
)
3529+
3530+
result = _read_latest_worker_traceback(temp_dir)
3531+
self.assertIsNotNone(result)
3532+
self.assertIn("Traceback (most recent call last):", result)
3533+
self.assertIn("ValueError:", result)
3534+
3535+
def test_read_latest_worker_traceback_returns_none_when_no_traceback(self):
3536+
"""测试当没有 traceback 时返回 None"""
3537+
with tempfile.TemporaryDirectory() as temp_dir:
3538+
worker_log = os.path.join(temp_dir, "workerlog.0")
3539+
with open(worker_log, "w", encoding="utf-8") as fp:
3540+
fp.write("Normal log output without any errors\n")
3541+
3542+
result = _read_latest_worker_traceback(temp_dir)
3543+
self.assertIsNone(result)
3544+
3545+
def test_read_latest_worker_traceback_returns_none_when_no_files(self):
3546+
"""测试当没有 workerlog 文件时返回 None"""
3547+
with tempfile.TemporaryDirectory() as temp_dir:
3548+
result = _read_latest_worker_traceback(temp_dir)
3549+
self.assertIsNone(result)
3550+
3551+
def test_read_latest_worker_traceback_returns_none_for_nonexistent_dir(self):
3552+
"""测试当目录不存在时返回 None"""
3553+
result = _read_latest_worker_traceback("/nonexistent/path")
3554+
self.assertIsNone(result)
3555+
3556+
def test_read_latest_worker_traceback_picks_latest_file(self):
3557+
"""测试当有多个 workerlog 文件时选择最新的"""
3558+
with tempfile.TemporaryDirectory() as temp_dir:
3559+
# 创建较旧的文件
3560+
old_log = os.path.join(temp_dir, "workerlog.0")
3561+
with open(old_log, "w", encoding="utf-8") as fp:
3562+
fp.write("Traceback (most recent call last):\nOldError: old error\n")
3563+
3564+
# 短暂等待以确保时间戳不同
3565+
time.sleep(0.01)
3566+
3567+
# 创建较新的文件
3568+
new_log = os.path.join(temp_dir, "workerlog.1")
3569+
with open(new_log, "w", encoding="utf-8") as fp:
3570+
fp.write("Traceback (most recent call last):\nNewError: new error\n")
3571+
3572+
result = _read_latest_worker_traceback(temp_dir)
3573+
self.assertIsNotNone(result)
3574+
self.assertIn("NewError", result)
3575+
3576+
def test_format_worker_launch_failure_message_with_traceback(self):
3577+
"""测试带有 traceback 的错误消息格式化"""
3578+
with tempfile.TemporaryDirectory() as temp_dir:
3579+
worker_log = os.path.join(temp_dir, "workerlog.0")
3580+
with open(worker_log, "w", encoding="utf-8") as fp:
3581+
fp.write(
3582+
"Traceback (most recent call last):\n"
3583+
"ValueError: Test error message\n"
3584+
)
3585+
3586+
result = _format_worker_launch_failure_message(temp_dir)
3587+
self.assertIn("Failed to launch worker processes", result)
3588+
self.assertIn("workerlog.*", result)
3589+
self.assertIn("Traceback (most recent call last):", result)
3590+
self.assertIn("ValueError: Test error message", result)
3591+
3592+
def test_format_worker_launch_failure_message_without_traceback(self):
3593+
"""测试没有 traceback 时的错误消息格式化"""
3594+
with tempfile.TemporaryDirectory() as temp_dir:
3595+
result = _format_worker_launch_failure_message(temp_dir)
3596+
self.assertIn("Failed to launch worker processes", result)
3597+
self.assertIn("workerlog.*", result)
3598+
self.assertNotIn("Traceback", result)

0 commit comments

Comments
 (0)