Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/usage/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,24 @@ environment_variables: dict[str, Callable[[], Any]] = {
# Log directory
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),

# Global log level, prefer this over FD_DEBUG. Supports "INFO" and "DEBUG".
"FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None),

# Enable debug mode (0 or 1)
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),

# Request logging master switch. Set to 0 to disable request logging.
"FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")),

# Request logging detail level (0-3). Higher level means more verbose output.
"FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")),

# Max field length for request logging truncation.
"FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")),

# Unified trace mode: off, local, otel, all.
"FD_TRACE": lambda: os.getenv("FD_TRACE", "off"),

# FastDeploy log retention days
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),

Expand Down
15 changes: 15 additions & 0 deletions docs/zh/usage/environment_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,24 @@ environment_variables: dict[str, Callable[[], Any]] = {
# 日志目录
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),

# 全局日志级别,优先于 FD_DEBUG。支持 "INFO" 和 "DEBUG"。
"FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None),

# 是否启用调试模式,可设置为 0 或 1
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),

# 请求日志总开关。设置为 0 禁用请求日志。
"FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")),

# 请求日志详细级别 (0-3)。级别越高输出越详细。
"FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")),

# 请求日志字段截断最大长度。
"FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")),

# 统一的 trace 开关:off, local, otel, all。
"FD_TRACE": lambda: os.getenv("FD_TRACE", "off"),

# FastDeploy 日志保留天数
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),

Expand Down
36 changes: 34 additions & 2 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import traceback
import weakref
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import numpy as np
Expand Down Expand Up @@ -83,6 +84,37 @@
from fastdeploy.output.token_processor import TokenProcessor


def _read_latest_worker_traceback(log_dir: str) -> Optional[str]:
"""读取 workerlog.* 文件中的最新 traceback。"""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

使用英文注释


try:
candidates = sorted(Path(log_dir).glob("workerlog.*"), key=lambda path: path.stat().st_mtime, reverse=True)
except OSError:
return None

for path in candidates:
try:
content = path.read_text(encoding="utf-8", errors="ignore")
except OSError:
continue

marker = "Traceback (most recent call last):"
start = content.rfind(marker)
if start != -1:
return content[start:].strip()

return None


def _format_worker_launch_failure_message(log_dir: str) -> str:
"""格式化 worker 启动失败的错误消息,包含 traceback 信息。"""
message = "Failed to launch worker processes, check log/workerlog.* for more details."
traceback_text = _read_latest_worker_traceback(log_dir)
if traceback_text:
return f"{message}\n{traceback_text}"
return message


class EngineService:
"""
Base class containing common engine functionality
Expand Down Expand Up @@ -258,7 +290,7 @@ def start_worker_service(self, async_llm_pid=None):
def check_worker_initialize_status_func(res: dict):
res["worker_is_alive"] = True
if not self.check_worker_initialize_status():
self.llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
self.llm_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
res["worker_is_alive"] = False

self.check_worker_initialize_status_func_thread = threading.Thread(
Expand All @@ -284,7 +316,7 @@ def check_worker_initialize_status_func(res: dict):
# Worker launched
self.check_worker_initialize_status_func_thread.join()
if not result_container["worker_is_alive"]:
self.llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
self.llm_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
return False

# Start ZMQ service for communication with AsyncLLM
Expand Down
16 changes: 11 additions & 5 deletions fastdeploy/engine/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@

import fastdeploy.metrics.trace as tracing
from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.common_engine import EngineService
from fastdeploy.engine.common_engine import (
EngineService,
_format_worker_launch_failure_message,
)
from fastdeploy.engine.expert_service import start_data_parallel_service
from fastdeploy.engine.request import Request
from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal
Expand Down Expand Up @@ -157,7 +160,7 @@ def start(self, api_server_pid=None):
def check_worker_initialize_status_func(res: dict):
res["worker_is_alive"] = True
if not self.check_worker_initialize_status():
console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
res["worker_is_alive"] = False

self.check_worker_initialize_status_func_thread = threading.Thread(
Expand All @@ -175,7 +178,8 @@ def check_worker_initialize_status_func(res: dict):
# If block number is not specified, let workers do profiling to determine the block number,
# and then start the cache manager
if self.do_profile:
self._stop_profile()
if not self._stop_profile():
return False
elif self.cfg.scheduler_config.splitwise_role == "mixed" and self.cfg.cache_config.enable_prefix_caching:
if not current_platform.is_intel_hpu():
device_ids = self.cfg.parallel_config.device_ids.split(",")
Expand Down Expand Up @@ -203,7 +207,7 @@ def check_worker_initialize_status_func(res: dict):
# Worker launched
self.check_worker_initialize_status_func_thread.join()
if not result_container["worker_is_alive"]:
console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.")
console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
return False

console_logger.info(f"Worker processes are launched with {time.time() - start_time} seconds.")
Expand Down Expand Up @@ -752,7 +756,8 @@ def _stop_profile(self):
while self.get_profile_block_num_signal.value[0] == 0:
if hasattr(self, "worker_proc") and self.worker_proc is not None:
if self.worker_proc.poll() is not None:
raise RuntimeError("Worker process failed to start." "Please check log/workerlog.* for details.")
console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR))
return False
time.sleep(1)
num_gpu_blocks = self.get_profile_block_num_signal.value[0]
self.cfg.cache_config.reset(num_gpu_blocks)
Expand All @@ -761,6 +766,7 @@ def _stop_profile(self):
if not current_platform.is_intel_hpu():
device_ids = self.cfg.parallel_config.device_ids.split(",")
self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix)
return True

def check_health(self, time_interval_threashold=30):
"""
Expand Down
10 changes: 10 additions & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,18 @@ def _validate_split_kv_size(value: int) -> int:
"FD_BUILDING_ARCS": lambda: os.getenv("FD_BUILDING_ARCS", "[]"),
# Log directory.
"FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"),
# Global log level, prefer this over FD_DEBUG. Supports "INFO" and "DEBUG".
"FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None),
# Whether to use debug mode, can set 0 or 1
"FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")),
# Request logging master switch. Set to 0 to disable request logging.
"FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")),
# Request logging detail level (0-3). Higher level means more verbose output.
"FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")),
# Max field length for request logging truncation.
"FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")),
# Unified trace mode: off, local, otel, all.
"FD_TRACE": lambda: os.getenv("FD_TRACE", "off"),
# Number of days to keep fastdeploy logs.
"FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"),
# Model download source, can set "AISTUDIO", "MODELSCOPE" or "HUGGINGFACE".
Expand Down
1 change: 1 addition & 0 deletions fastdeploy/logger/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def _open(self):
"""
open new log file
"""
self.current_log_path.parent.mkdir(parents=True, exist_ok=True)
if self.encoding is None:
stream = open(str(self.current_log_path), self.mode)
else:
Expand Down
42 changes: 39 additions & 3 deletions fastdeploy/logger/setup_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,20 @@
from fastdeploy import envs


class MaxLevelFilter(logging.Filter):
"""过滤低于指定级别的日志记录。

用于将 INFO/DEBUG 路由到 stdout,ERROR/CRITICAL 路由到 stderr。
"""

def __init__(self, level):
super().__init__()
self.level = logging._nameToLevel.get(level, level)

def filter(self, record):
return record.levelno < self.level


def setup_logging(log_dir=None, config_file=None):
"""
设置FastDeploy的日志配置
Expand All @@ -41,7 +55,7 @@ def setup_logging(log_dir=None, config_file=None):

# 使用环境变量中的日志目录,如果没有则使用传入的参数或默认值
if log_dir is None:
log_dir = getattr(envs, "FD_LOG_DIR", "logs")
log_dir = getattr(envs, "FD_LOG_DIR", "log")

# 确保日志目录存在
Path(log_dir).mkdir(parents=True, exist_ok=True)
Expand All @@ -58,6 +72,12 @@ def setup_logging(log_dir=None, config_file=None):
default_config = {
"version": 1,
"disable_existing_loggers": False,
"filters": {
"below_error": {
"()": "fastdeploy.logger.setup_logging.MaxLevelFilter",
"level": "ERROR",
}
},
"formatters": {
"standard": {
"class": "logging.Formatter",
Expand All @@ -71,12 +91,21 @@ def setup_logging(log_dir=None, config_file=None):
},
},
"handlers": {
"console": {
# 控制台标准输出,用于 INFO/DEBUG(低于 ERROR 级别)
"console_stdout": {
"class": "logging.StreamHandler",
"level": FASTDEPLOY_LOGGING_LEVEL,
"filters": ["below_error"],
"formatter": "colored",
"stream": "ext://sys.stdout",
},
# 控制台错误输出,用于 ERROR/CRITICAL
"console_stderr": {
"class": "logging.StreamHandler",
"level": "ERROR",
"formatter": "colored",
"stream": "ext://sys.stderr",
},
# 默认错误日志,保留最新1个小时的日志,位置在log/error.log
"error_file": {
"class": "logging.handlers.TimedRotatingFileHandler",
Expand Down Expand Up @@ -122,7 +151,14 @@ def setup_logging(log_dir=None, config_file=None):
# 默认日志记录器,全局共享
"fastdeploy": {
"level": "DEBUG",
"handlers": ["error_file", "default_file", "error_archive", "default_archive"],
"handlers": [
"console_stdout",
"console_stderr",
"error_file",
"default_file",
"error_archive",
"default_archive",
],
"propagate": False,
}
},
Expand Down
91 changes: 90 additions & 1 deletion tests/engine/test_common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import asyncio
import os
import sys
import tempfile
import threading
import time
import types
Expand All @@ -39,7 +40,11 @@ def enable_torch_proxy(scope=None):
paddle.compat = _PaddleCompat()

from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.common_engine import EngineService
from fastdeploy.engine.common_engine import (
EngineService,
_format_worker_launch_failure_message,
_read_latest_worker_traceback,
)
from fastdeploy.engine.request import (
ControlRequest,
ControlResponse,
Expand Down Expand Up @@ -3722,3 +3727,87 @@ def fake_time():

eng.resource_manager.recycle_abort_task.assert_called_with("req-1_0")
self._detach_finalizer(eng)


class TestWorkerTracebackFunctions(unittest.TestCase):
"""测试 _read_latest_worker_traceback 和 _format_worker_launch_failure_message 函数"""

def test_read_latest_worker_traceback_finds_traceback(self):
"""测试能够正确读取 workerlog 文件中的 traceback"""
with tempfile.TemporaryDirectory() as temp_dir:
worker_log = os.path.join(temp_dir, "workerlog.0")
with open(worker_log, "w", encoding="utf-8") as fp:
fp.write(
"Some normal log output\n"
"Traceback (most recent call last):\n"
' File "worker_process.py", line 1, in <module>\n'
" run_worker_proc()\n"
"ValueError: The total number of blocks cannot be less than zero.\n"
)

result = _read_latest_worker_traceback(temp_dir)
self.assertIsNotNone(result)
self.assertIn("Traceback (most recent call last):", result)
self.assertIn("ValueError:", result)

def test_read_latest_worker_traceback_returns_none_when_no_traceback(self):
"""测试当没有 traceback 时返回 None"""
with tempfile.TemporaryDirectory() as temp_dir:
worker_log = os.path.join(temp_dir, "workerlog.0")
with open(worker_log, "w", encoding="utf-8") as fp:
fp.write("Normal log output without any errors\n")

result = _read_latest_worker_traceback(temp_dir)
self.assertIsNone(result)

def test_read_latest_worker_traceback_returns_none_when_no_files(self):
"""测试当没有 workerlog 文件时返回 None"""
with tempfile.TemporaryDirectory() as temp_dir:
result = _read_latest_worker_traceback(temp_dir)
self.assertIsNone(result)

def test_read_latest_worker_traceback_returns_none_for_nonexistent_dir(self):
"""测试当目录不存在时返回 None"""
result = _read_latest_worker_traceback("/nonexistent/path")
self.assertIsNone(result)

def test_read_latest_worker_traceback_picks_latest_file(self):
"""测试当有多个 workerlog 文件时选择最新的"""
with tempfile.TemporaryDirectory() as temp_dir:
# 创建较旧的文件
old_log = os.path.join(temp_dir, "workerlog.0")
with open(old_log, "w", encoding="utf-8") as fp:
fp.write("Traceback (most recent call last):\nOldError: old error\n")

# 短暂等待以确保时间戳不同
time.sleep(0.01)

# 创建较新的文件
new_log = os.path.join(temp_dir, "workerlog.1")
with open(new_log, "w", encoding="utf-8") as fp:
fp.write("Traceback (most recent call last):\nNewError: new error\n")

result = _read_latest_worker_traceback(temp_dir)
self.assertIsNotNone(result)
self.assertIn("NewError", result)

def test_format_worker_launch_failure_message_with_traceback(self):
"""测试带有 traceback 的错误消息格式化"""
with tempfile.TemporaryDirectory() as temp_dir:
worker_log = os.path.join(temp_dir, "workerlog.0")
with open(worker_log, "w", encoding="utf-8") as fp:
fp.write("Traceback (most recent call last):\n" "ValueError: Test error message\n")

result = _format_worker_launch_failure_message(temp_dir)
self.assertIn("Failed to launch worker processes", result)
self.assertIn("workerlog.*", result)
self.assertIn("Traceback (most recent call last):", result)
self.assertIn("ValueError: Test error message", result)

def test_format_worker_launch_failure_message_without_traceback(self):
"""测试没有 traceback 时的错误消息格式化"""
with tempfile.TemporaryDirectory() as temp_dir:
result = _format_worker_launch_failure_message(temp_dir)
self.assertIn("Failed to launch worker processes", result)
self.assertIn("workerlog.*", result)
self.assertNotIn("Traceback", result)
Loading
Loading