diff --git a/docs/usage/environment_variables.md b/docs/usage/environment_variables.md index 07f9dd2f144..692ad8cd023 100644 --- a/docs/usage/environment_variables.md +++ b/docs/usage/environment_variables.md @@ -15,9 +15,24 @@ environment_variables: dict[str, Callable[[], Any]] = { # Log directory "FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"), + # Global log level, prefer this over FD_DEBUG. Supports "INFO" and "DEBUG". + "FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None), + # Enable debug mode (0 or 1) "FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")), + # Request logging master switch. Set to 0 to disable request logging. + "FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")), + + # Request logging detail level (0-3). Higher level means more verbose output. + "FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")), + + # Max field length for request logging truncation. + "FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")), + + # Unified trace mode: off, local, otel, all. + "FD_TRACE": lambda: os.getenv("FD_TRACE", "off"), + # FastDeploy log retention days "FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"), diff --git a/docs/zh/usage/environment_variables.md b/docs/zh/usage/environment_variables.md index b1fbf23ed33..0a4cfd389db 100644 --- a/docs/zh/usage/environment_variables.md +++ b/docs/zh/usage/environment_variables.md @@ -15,9 +15,24 @@ environment_variables: dict[str, Callable[[], Any]] = { # 日志目录 "FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"), + # 全局日志级别,优先于 FD_DEBUG。支持 "INFO" 和 "DEBUG"。 + "FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None), + # 是否启用调试模式,可设置为 0 或 1 "FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")), + # 请求日志总开关。设置为 0 禁用请求日志。 + "FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")), + + # 请求日志详细级别 (0-3)。级别越高输出越详细。 + "FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")), + + # 请求日志字段截断最大长度。 + "FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")), + + # 统一的 trace 开关:off, local, otel, all。 + "FD_TRACE": lambda: os.getenv("FD_TRACE", "off"), + # FastDeploy 日志保留天数 "FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"), diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index f2f3367ee79..6b4dc49704b 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -31,6 +31,7 @@ import traceback import weakref from concurrent.futures import ThreadPoolExecutor +from pathlib import Path from typing import Dict, List, Optional, Tuple import numpy as np @@ -83,6 +84,37 @@ from fastdeploy.output.token_processor import TokenProcessor +def _read_latest_worker_traceback(log_dir: str) -> Optional[str]: + """读取 workerlog.* 文件中的最新 traceback。""" + + try: + candidates = sorted(Path(log_dir).glob("workerlog.*"), key=lambda path: path.stat().st_mtime, reverse=True) + except OSError: + return None + + for path in candidates: + try: + content = path.read_text(encoding="utf-8", errors="ignore") + except OSError: + continue + + marker = "Traceback (most recent call last):" + start = content.rfind(marker) + if start != -1: + return content[start:].strip() + + return None + + +def _format_worker_launch_failure_message(log_dir: str) -> str: + """格式化 worker 启动失败的错误消息,包含 traceback 信息。""" + message = "Failed to launch worker processes, check log/workerlog.* for more details." + traceback_text = _read_latest_worker_traceback(log_dir) + if traceback_text: + return f"{message}\n{traceback_text}" + return message + + class EngineService: """ Base class containing common engine functionality @@ -258,7 +290,7 @@ def start_worker_service(self, async_llm_pid=None): def check_worker_initialize_status_func(res: dict): res["worker_is_alive"] = True if not self.check_worker_initialize_status(): - self.llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.") + self.llm_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR)) res["worker_is_alive"] = False self.check_worker_initialize_status_func_thread = threading.Thread( @@ -284,7 +316,7 @@ def check_worker_initialize_status_func(res: dict): # Worker launched self.check_worker_initialize_status_func_thread.join() if not result_container["worker_is_alive"]: - self.llm_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.") + self.llm_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR)) return False # Start ZMQ service for communication with AsyncLLM diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 9f78f8584ac..283693fae8c 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -37,7 +37,10 @@ import fastdeploy.metrics.trace as tracing from fastdeploy.engine.args_utils import EngineArgs -from fastdeploy.engine.common_engine import EngineService +from fastdeploy.engine.common_engine import ( + EngineService, + _format_worker_launch_failure_message, +) from fastdeploy.engine.expert_service import start_data_parallel_service from fastdeploy.engine.request import Request from fastdeploy.inter_communicator import EngineWorkerQueue, IPCSignal @@ -157,7 +160,7 @@ def start(self, api_server_pid=None): def check_worker_initialize_status_func(res: dict): res["worker_is_alive"] = True if not self.check_worker_initialize_status(): - console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.") + console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR)) res["worker_is_alive"] = False self.check_worker_initialize_status_func_thread = threading.Thread( @@ -175,7 +178,8 @@ def check_worker_initialize_status_func(res: dict): # If block number is not specified, let workers do profiling to determine the block number, # and then start the cache manager if self.do_profile: - self._stop_profile() + if not self._stop_profile(): + return False elif self.cfg.scheduler_config.splitwise_role == "mixed" and self.cfg.cache_config.enable_prefix_caching: if not current_platform.is_intel_hpu(): device_ids = self.cfg.parallel_config.device_ids.split(",") @@ -203,7 +207,7 @@ def check_worker_initialize_status_func(res: dict): # Worker launched self.check_worker_initialize_status_func_thread.join() if not result_container["worker_is_alive"]: - console_logger.error("Failed to launch worker processes, check log/workerlog.* for more details.") + console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR)) return False console_logger.info(f"Worker processes are launched with {time.time() - start_time} seconds.") @@ -752,7 +756,8 @@ def _stop_profile(self): while self.get_profile_block_num_signal.value[0] == 0: if hasattr(self, "worker_proc") and self.worker_proc is not None: if self.worker_proc.poll() is not None: - raise RuntimeError("Worker process failed to start." "Please check log/workerlog.* for details.") + console_logger.error(_format_worker_launch_failure_message(envs.FD_LOG_DIR)) + return False time.sleep(1) num_gpu_blocks = self.get_profile_block_num_signal.value[0] self.cfg.cache_config.reset(num_gpu_blocks) @@ -761,6 +766,7 @@ def _stop_profile(self): if not current_platform.is_intel_hpu(): device_ids = self.cfg.parallel_config.device_ids.split(",") self.cache_manager_processes = self.engine.start_cache_service(device_ids, self.ipc_signal_suffix) + return True def check_health(self, time_interval_threashold=30): """ diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 19c9a893435..aa047dbb7e8 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -36,8 +36,18 @@ def _validate_split_kv_size(value: int) -> int: "FD_BUILDING_ARCS": lambda: os.getenv("FD_BUILDING_ARCS", "[]"), # Log directory. "FD_LOG_DIR": lambda: os.getenv("FD_LOG_DIR", "log"), + # Global log level, prefer this over FD_DEBUG. Supports "INFO" and "DEBUG". + "FD_LOG_LEVEL": lambda: os.getenv("FD_LOG_LEVEL", None), # Whether to use debug mode, can set 0 or 1 "FD_DEBUG": lambda: int(os.getenv("FD_DEBUG", "0")), + # Request logging master switch. Set to 0 to disable request logging. + "FD_LOG_REQUESTS": lambda: int(os.getenv("FD_LOG_REQUESTS", "1")), + # Request logging detail level (0-3). Higher level means more verbose output. + "FD_LOG_REQUESTS_LEVEL": lambda: int(os.getenv("FD_LOG_REQUESTS_LEVEL", "0")), + # Max field length for request logging truncation. + "FD_LOG_MAX_LEN": lambda: int(os.getenv("FD_LOG_MAX_LEN", "2048")), + # Unified trace mode: off, local, otel, all. + "FD_TRACE": lambda: os.getenv("FD_TRACE", "off"), # Number of days to keep fastdeploy logs. "FD_LOG_BACKUP_COUNT": lambda: os.getenv("FD_LOG_BACKUP_COUNT", "7"), # Model download source, can set "AISTUDIO", "MODELSCOPE" or "HUGGINGFACE". diff --git a/fastdeploy/logger/handlers.py b/fastdeploy/logger/handlers.py index ac6bf191c88..5c15e7bf550 100644 --- a/fastdeploy/logger/handlers.py +++ b/fastdeploy/logger/handlers.py @@ -323,6 +323,7 @@ def _open(self): """ open new log file """ + self.current_log_path.parent.mkdir(parents=True, exist_ok=True) if self.encoding is None: stream = open(str(self.current_log_path), self.mode) else: diff --git a/fastdeploy/logger/setup_logging.py b/fastdeploy/logger/setup_logging.py index 2dd24b379df..e90f215fc90 100644 --- a/fastdeploy/logger/setup_logging.py +++ b/fastdeploy/logger/setup_logging.py @@ -26,6 +26,20 @@ from fastdeploy import envs +class MaxLevelFilter(logging.Filter): + """过滤低于指定级别的日志记录。 + + 用于将 INFO/DEBUG 路由到 stdout,ERROR/CRITICAL 路由到 stderr。 + """ + + def __init__(self, level): + super().__init__() + self.level = logging._nameToLevel.get(level, level) + + def filter(self, record): + return record.levelno < self.level + + def setup_logging(log_dir=None, config_file=None): """ 设置FastDeploy的日志配置 @@ -41,7 +55,7 @@ def setup_logging(log_dir=None, config_file=None): # 使用环境变量中的日志目录,如果没有则使用传入的参数或默认值 if log_dir is None: - log_dir = getattr(envs, "FD_LOG_DIR", "logs") + log_dir = getattr(envs, "FD_LOG_DIR", "log") # 确保日志目录存在 Path(log_dir).mkdir(parents=True, exist_ok=True) @@ -58,6 +72,12 @@ def setup_logging(log_dir=None, config_file=None): default_config = { "version": 1, "disable_existing_loggers": False, + "filters": { + "below_error": { + "()": "fastdeploy.logger.setup_logging.MaxLevelFilter", + "level": "ERROR", + } + }, "formatters": { "standard": { "class": "logging.Formatter", @@ -71,12 +91,21 @@ def setup_logging(log_dir=None, config_file=None): }, }, "handlers": { - "console": { + # 控制台标准输出,用于 INFO/DEBUG(低于 ERROR 级别) + "console_stdout": { "class": "logging.StreamHandler", "level": FASTDEPLOY_LOGGING_LEVEL, + "filters": ["below_error"], "formatter": "colored", "stream": "ext://sys.stdout", }, + # 控制台错误输出,用于 ERROR/CRITICAL + "console_stderr": { + "class": "logging.StreamHandler", + "level": "ERROR", + "formatter": "colored", + "stream": "ext://sys.stderr", + }, # 默认错误日志,保留最新1个小时的日志,位置在log/error.log "error_file": { "class": "logging.handlers.TimedRotatingFileHandler", @@ -122,7 +151,14 @@ def setup_logging(log_dir=None, config_file=None): # 默认日志记录器,全局共享 "fastdeploy": { "level": "DEBUG", - "handlers": ["error_file", "default_file", "error_archive", "default_archive"], + "handlers": [ + "console_stdout", + "console_stderr", + "error_file", + "default_file", + "error_archive", + "default_archive", + ], "propagate": False, } }, diff --git a/tests/engine/test_common_engine.py b/tests/engine/test_common_engine.py index 65e24047c67..f81e1bdb33d 100644 --- a/tests/engine/test_common_engine.py +++ b/tests/engine/test_common_engine.py @@ -17,6 +17,7 @@ import asyncio import os import sys +import tempfile import threading import time import types @@ -39,7 +40,11 @@ def enable_torch_proxy(scope=None): paddle.compat = _PaddleCompat() from fastdeploy.engine.args_utils import EngineArgs -from fastdeploy.engine.common_engine import EngineService +from fastdeploy.engine.common_engine import ( + EngineService, + _format_worker_launch_failure_message, + _read_latest_worker_traceback, +) from fastdeploy.engine.request import ( ControlRequest, ControlResponse, @@ -3722,3 +3727,87 @@ def fake_time(): eng.resource_manager.recycle_abort_task.assert_called_with("req-1_0") self._detach_finalizer(eng) + + +class TestWorkerTracebackFunctions(unittest.TestCase): + """测试 _read_latest_worker_traceback 和 _format_worker_launch_failure_message 函数""" + + def test_read_latest_worker_traceback_finds_traceback(self): + """测试能够正确读取 workerlog 文件中的 traceback""" + with tempfile.TemporaryDirectory() as temp_dir: + worker_log = os.path.join(temp_dir, "workerlog.0") + with open(worker_log, "w", encoding="utf-8") as fp: + fp.write( + "Some normal log output\n" + "Traceback (most recent call last):\n" + ' File "worker_process.py", line 1, in \n' + " run_worker_proc()\n" + "ValueError: The total number of blocks cannot be less than zero.\n" + ) + + result = _read_latest_worker_traceback(temp_dir) + self.assertIsNotNone(result) + self.assertIn("Traceback (most recent call last):", result) + self.assertIn("ValueError:", result) + + def test_read_latest_worker_traceback_returns_none_when_no_traceback(self): + """测试当没有 traceback 时返回 None""" + with tempfile.TemporaryDirectory() as temp_dir: + worker_log = os.path.join(temp_dir, "workerlog.0") + with open(worker_log, "w", encoding="utf-8") as fp: + fp.write("Normal log output without any errors\n") + + result = _read_latest_worker_traceback(temp_dir) + self.assertIsNone(result) + + def test_read_latest_worker_traceback_returns_none_when_no_files(self): + """测试当没有 workerlog 文件时返回 None""" + with tempfile.TemporaryDirectory() as temp_dir: + result = _read_latest_worker_traceback(temp_dir) + self.assertIsNone(result) + + def test_read_latest_worker_traceback_returns_none_for_nonexistent_dir(self): + """测试当目录不存在时返回 None""" + result = _read_latest_worker_traceback("/nonexistent/path") + self.assertIsNone(result) + + def test_read_latest_worker_traceback_picks_latest_file(self): + """测试当有多个 workerlog 文件时选择最新的""" + with tempfile.TemporaryDirectory() as temp_dir: + # 创建较旧的文件 + old_log = os.path.join(temp_dir, "workerlog.0") + with open(old_log, "w", encoding="utf-8") as fp: + fp.write("Traceback (most recent call last):\nOldError: old error\n") + + # 短暂等待以确保时间戳不同 + time.sleep(0.01) + + # 创建较新的文件 + new_log = os.path.join(temp_dir, "workerlog.1") + with open(new_log, "w", encoding="utf-8") as fp: + fp.write("Traceback (most recent call last):\nNewError: new error\n") + + result = _read_latest_worker_traceback(temp_dir) + self.assertIsNotNone(result) + self.assertIn("NewError", result) + + def test_format_worker_launch_failure_message_with_traceback(self): + """测试带有 traceback 的错误消息格式化""" + with tempfile.TemporaryDirectory() as temp_dir: + worker_log = os.path.join(temp_dir, "workerlog.0") + with open(worker_log, "w", encoding="utf-8") as fp: + fp.write("Traceback (most recent call last):\n" "ValueError: Test error message\n") + + result = _format_worker_launch_failure_message(temp_dir) + self.assertIn("Failed to launch worker processes", result) + self.assertIn("workerlog.*", result) + self.assertIn("Traceback (most recent call last):", result) + self.assertIn("ValueError: Test error message", result) + + def test_format_worker_launch_failure_message_without_traceback(self): + """测试没有 traceback 时的错误消息格式化""" + with tempfile.TemporaryDirectory() as temp_dir: + result = _format_worker_launch_failure_message(temp_dir) + self.assertIn("Failed to launch worker processes", result) + self.assertIn("workerlog.*", result) + self.assertNotIn("Traceback", result) diff --git a/tests/engine/test_engine.py b/tests/engine/test_engine.py new file mode 100644 index 00000000000..762db4ea4ed --- /dev/null +++ b/tests/engine/test_engine.py @@ -0,0 +1,128 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for fastdeploy/engine/engine.py (LLMEngine) +""" + +import os +import tempfile +import types +import unittest +from unittest.mock import Mock, patch + +import numpy as np + +from fastdeploy.engine.engine import LLMEngine + + +class TestLLMEngineStopProfile(unittest.TestCase): + """测试 LLMEngine._stop_profile 方法""" + + def test_stop_profile_logs_worker_traceback_and_returns_false(self): + """测试 worker 进程失败时,_stop_profile 打印 traceback 并返回 False""" + eng = object.__new__(LLMEngine) + eng.do_profile = 1 + eng.get_profile_block_num_signal = type("Sig", (), {"value": np.array([0])})() + eng.worker_proc = Mock(poll=lambda: 1) + + with tempfile.TemporaryDirectory() as temp_dir: + worker_log = os.path.join(temp_dir, "workerlog.0") + with open(worker_log, "w", encoding="utf-8") as fp: + fp.write( + "Traceback (most recent call last):\n" + "ValueError: The total number of blocks cannot be less than zero.\n" + ) + + with ( + patch("fastdeploy.engine.engine.time.sleep", lambda *_: None), + patch("fastdeploy.engine.engine.envs.FD_LOG_DIR", temp_dir), + patch("fastdeploy.engine.engine.console_logger.error") as mock_error, + ): + result = eng._stop_profile() + + self.assertFalse(result) + error_messages = [call.args[0] for call in mock_error.call_args_list] + self.assertTrue(any("Traceback (most recent call last):" in msg for msg in error_messages)) + self.assertTrue(any("The total number of blocks cannot be less than zero" in msg for msg in error_messages)) + + def test_stop_profile_returns_true_on_success(self): + """测试 _stop_profile 正常完成时返回 True""" + eng = object.__new__(LLMEngine) + eng.do_profile = 1 + eng.get_profile_block_num_signal = type("Sig", (), {"value": np.array([100])})() + eng.worker_proc = Mock(poll=lambda: None) + eng.ipc_signal_suffix = "_test" + eng.cfg = types.SimpleNamespace( + parallel_config=types.SimpleNamespace(device_ids="0"), + scheduler_config=types.SimpleNamespace(splitwise_role="decode"), + cache_config=Mock(enable_prefix_caching=False, reset=Mock()), + ) + eng.engine = types.SimpleNamespace( + start_cache_service=lambda *_: None, + resource_manager=Mock(reset_cache_config=Mock()), + ) + eng.cache_manager_processes = None + + result = eng._stop_profile() + + self.assertTrue(result) + + +class TestLLMEngineStart(unittest.TestCase): + """测试 LLMEngine.start 方法中的错误处理""" + + class _Sig: + def __init__(self, val): + self.value = np.array([val]) + + def test_start_returns_false_when_profile_worker_dies(self): + """测试当 profile worker 失败时,start 返回 False""" + eng = object.__new__(LLMEngine) + eng.is_started = False + eng.api_server_pid = None + eng.do_profile = 1 + port = int(os.getenv("FD_ENGINE_QUEUE_PORT", "6778")) + eng.cfg = types.SimpleNamespace( + parallel_config=types.SimpleNamespace(engine_worker_queue_port=[port], device_ids="0"), + scheduler_config=types.SimpleNamespace(splitwise_role="mixed", max_num_seqs=8), + cache_config=types.SimpleNamespace( + enable_prefix_caching=True, + block_size=64, + num_gpu_blocks_override=None, + total_block_num=0, + num_cpu_blocks=0, + ), + model_config=types.SimpleNamespace(max_model_len=128), + ) + eng._init_worker_signals = lambda: setattr(eng, "loaded_model_signal", self._Sig(1)) + eng.launch_components = lambda: None + eng.worker_proc = None + eng.engine = types.SimpleNamespace( + start=lambda: None, + create_data_processor=lambda: setattr(eng.engine, "data_processor", object()), + data_processor=object(), + ) + eng._start_worker_service = lambda: Mock(stdout=Mock(), poll=lambda: 1) + eng.check_worker_initialize_status = lambda: False + eng._stop_profile = lambda: False + + with patch("fastdeploy.engine.engine.time.sleep", lambda *_: None): + result = eng.start(api_server_pid=None) + + self.assertFalse(result) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/logger/test_setup_logging.py b/tests/logger/test_setup_logging.py index e83c25964a7..0bf02c14e88 100644 --- a/tests/logger/test_setup_logging.py +++ b/tests/logger/test_setup_logging.py @@ -21,12 +21,14 @@ from pathlib import Path from unittest.mock import patch -from fastdeploy.logger.setup_logging import setup_logging +from fastdeploy.logger.setup_logging import MaxLevelFilter, setup_logging class TestSetupLogging(unittest.TestCase): def setUp(self): self.temp_dir = tempfile.mkdtemp(prefix="logger_setup_test_") + if hasattr(setup_logging, "_configured"): + delattr(setup_logging, "_configured") self.patches = [ patch("fastdeploy.envs.FD_LOG_DIR", self.temp_dir), patch("fastdeploy.envs.FD_DEBUG", 0), @@ -52,7 +54,7 @@ def test_default_config_fallback(self): logger = logging.getLogger("fastdeploy") self.assertTrue(logger.handlers) handler_classes = [h.__class__.__name__ for h in logger.handlers] - self.assertIn("TimedRotatingFileHandler", handler_classes[0]) + self.assertIn("TimedRotatingFileHandler", handler_classes) def test_debug_level_affects_handlers(self): """FD_DEBUG=1 should force DEBUG level""" @@ -62,7 +64,7 @@ def test_debug_level_affects_handlers(self): called_config = mock_cfg.call_args[0][0] for handler in called_config["handlers"].values(): self.assertIn("formatter", handler) - self.assertEqual(called_config["handlers"]["console"]["level"], "DEBUG") + self.assertEqual(called_config["handlers"]["console_stdout"]["level"], "DEBUG") @patch("logging.config.dictConfig") def test_custom_config_with_dailyrotating_and_debug(self, mock_dict): @@ -122,6 +124,62 @@ def test_backup_count_merging(self, mock_dict): config_used = mock_dict.call_args[0][0] self.assertEqual(config_used["handlers"]["daily"]["backupCount"], 3) + @patch("logging.config.dictConfig") + def test_error_logs_use_stderr_handler(self, mock_dict): + """ERROR级别日志应该使用stderr输出""" + setup_logging() + config_used = mock_dict.call_args[0][0] + self.assertIn("console_stderr", config_used["handlers"]) + self.assertEqual(config_used["handlers"]["console_stderr"]["stream"], "ext://sys.stderr") + self.assertEqual(config_used["handlers"]["console_stderr"]["level"], "ERROR") + + @patch("logging.config.dictConfig") + def test_console_stdout_filters_below_error(self, mock_dict): + """console_stdout应该只输出低于ERROR级别的日志""" + setup_logging() + config_used = mock_dict.call_args[0][0] + self.assertIn("console_stdout", config_used["handlers"]) + self.assertIn("below_error", config_used["handlers"]["console_stdout"]["filters"]) + self.assertEqual(config_used["handlers"]["console_stdout"]["stream"], "ext://sys.stdout") + + +class TestMaxLevelFilter(unittest.TestCase): + def test_filter_allows_below_level(self): + """MaxLevelFilter应该允许低于指定级别的日志通过""" + filter = MaxLevelFilter("ERROR") + record = logging.LogRecord( + name="test", level=logging.INFO, pathname="", lineno=0, msg="test", args=(), exc_info=None + ) + self.assertTrue(filter.filter(record)) + + def test_filter_blocks_at_level(self): + """MaxLevelFilter应该阻止等于指定级别的日志""" + filter = MaxLevelFilter("ERROR") + record = logging.LogRecord( + name="test", level=logging.ERROR, pathname="", lineno=0, msg="test", args=(), exc_info=None + ) + self.assertFalse(filter.filter(record)) + + def test_filter_blocks_above_level(self): + """MaxLevelFilter应该阻止高于指定级别的日志""" + filter = MaxLevelFilter("ERROR") + record = logging.LogRecord( + name="test", level=logging.CRITICAL, pathname="", lineno=0, msg="test", args=(), exc_info=None + ) + self.assertFalse(filter.filter(record)) + + def test_filter_with_numeric_level(self): + """MaxLevelFilter应该支持数字级别""" + filter = MaxLevelFilter(logging.WARNING) + info_record = logging.LogRecord( + name="test", level=logging.INFO, pathname="", lineno=0, msg="test", args=(), exc_info=None + ) + warning_record = logging.LogRecord( + name="test", level=logging.WARNING, pathname="", lineno=0, msg="test", args=(), exc_info=None + ) + self.assertTrue(filter.filter(info_record)) + self.assertFalse(filter.filter(warning_record)) + if __name__ == "__main__": unittest.main()