|
2 | 2 |
|
3 | 3 | from __future__ import annotations |
4 | 4 |
|
| 5 | +import asyncio |
| 6 | +import contextlib |
5 | 7 | import logging |
6 | 8 | import os |
| 9 | +import signal |
7 | 10 | import sys |
8 | 11 | import time |
| 12 | +import tracemalloc |
9 | 13 | import uuid |
10 | 14 | from collections.abc import AsyncIterator |
11 | 15 | from contextlib import asynccontextmanager |
|
17 | 21 | from prometheus_client import CONTENT_TYPE_LATEST, generate_latest |
18 | 22 | from structlog.stdlib import BoundLogger |
19 | 23 |
|
| 24 | +from . import concurrency |
20 | 25 | from .client import SigV4Verifier |
21 | 26 | from .config import Settings |
22 | 27 | from .errors import S3Error, get_s3_error_code |
@@ -65,6 +70,78 @@ def _silence_health_probe_access_logs() -> None: |
65 | 70 | access_logger.addFilter(_health_probe_filter) |
66 | 71 |
|
67 | 72 |
|
| 73 | +def _rss_mb() -> float | None: |
| 74 | + """Process resident set size in MB from /proc (Linux). None elsewhere.""" |
| 75 | + try: |
| 76 | + with open("/proc/self/status") as f: |
| 77 | + for line in f: |
| 78 | + if line.startswith("VmRSS:"): |
| 79 | + return int(line.split()[1]) / 1024 # kB -> MB |
| 80 | + except OSError: |
| 81 | + return None |
| 82 | + return None |
| 83 | + |
| 84 | + |
| 85 | +def _dump_tracemalloc(limit: int = 20) -> None: |
| 86 | + """Log real RSS vs tracked Python heap + the top live allocations by call site. |
| 87 | +
|
| 88 | + Diagnostic only (memory debug mode). The whole point is the gap: RSS is what |
| 89 | + the kernel OOM-kills on, while tracemalloc only sees Python allocations. A |
| 90 | + large rss-minus-tracked gap means the memory is C-level (uvicorn/httptools |
| 91 | + socket buffers, openssl, allocator retention) -- NOT something a call site in |
| 92 | + the top list will explain. A small gap means it IS Python, and the top list |
| 93 | + names the exact line. Logging both each interval settles which world we're in. |
| 94 | + """ |
| 95 | + if not tracemalloc.is_tracing(): |
| 96 | + return |
| 97 | + snap = tracemalloc.take_snapshot() |
| 98 | + stats = snap.statistics("lineno") |
| 99 | + tracked_mb = sum(s.size for s in stats) / 1024 / 1024 |
| 100 | + rss = _rss_mb() |
| 101 | + governed_mb = concurrency.get_active_memory() / 1024 / 1024 |
| 102 | + logger.warning( |
| 103 | + "MEMORY_DEBUG", |
| 104 | + rss_mb=round(rss, 1) if rss is not None else None, |
| 105 | + tracked_mb=round(tracked_mb, 1), |
| 106 | + untracked_mb=round(rss - tracked_mb, 1) if rss is not None else None, |
| 107 | + governed_active_mb=round(governed_mb, 1), |
| 108 | + shown=limit, |
| 109 | + ) |
| 110 | + for i, st in enumerate(stats[:limit], 1): |
| 111 | + fr = st.traceback[0] |
| 112 | + logger.warning( |
| 113 | + "MEMORY_DEBUG_TOP", |
| 114 | + rank=i, |
| 115 | + size_mb=round(st.size / 1024 / 1024, 2), |
| 116 | + count=st.count, |
| 117 | + loc=f"{fr.filename}:{fr.lineno}", |
| 118 | + ) |
| 119 | + |
| 120 | + |
| 121 | +async def _periodic_tracemalloc(interval: int) -> None: |
| 122 | + while True: |
| 123 | + await asyncio.sleep(interval) |
| 124 | + _dump_tracemalloc() |
| 125 | + |
| 126 | + |
| 127 | +def _maybe_start_tracemalloc() -> asyncio.Task | None: |
| 128 | + """Enable memory debug mode (RSS + tracemalloc heap dumps) when requested. |
| 129 | +
|
| 130 | + Gated by S3PROXY_MEMORY_DEBUG (alias: S3PROXY_TRACEMALLOC). No-op with zero |
| 131 | + overhead when unset. Used for one-pod, time-boxed profiling: dumps every |
| 132 | + S3PROXY_MEMORY_DEBUG_INTERVAL secs and on SIGUSR1. |
| 133 | + """ |
| 134 | + if not (os.environ.get("S3PROXY_MEMORY_DEBUG") or os.environ.get("S3PROXY_TRACEMALLOC")): |
| 135 | + return None |
| 136 | + frames = int(os.environ.get("S3PROXY_MEMORY_DEBUG_FRAMES", "4")) |
| 137 | + interval = int(os.environ.get("S3PROXY_MEMORY_DEBUG_INTERVAL", "15")) |
| 138 | + tracemalloc.start(frames) |
| 139 | + logger.warning("MEMORY_DEBUG_ENABLED", frames=frames, interval_sec=interval, rss_mb=_rss_mb()) |
| 140 | + with contextlib.suppress(NotImplementedError, RuntimeError): |
| 141 | + asyncio.get_running_loop().add_signal_handler(signal.SIGUSR1, _dump_tracemalloc) |
| 142 | + return asyncio.create_task(_periodic_tracemalloc(interval)) |
| 143 | + |
| 144 | + |
68 | 145 | def create_lifespan(settings: Settings, credentials_store: dict[str, str]) -> AsyncIterator[None]: |
69 | 146 | """Create lifespan context manager for FastAPI app. |
70 | 147 |
|
@@ -114,8 +191,12 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: |
114 | 191 | app.state.stats_store = stats_store |
115 | 192 | app.state.start_time = time.monotonic() |
116 | 193 |
|
| 194 | + tracemalloc_task = _maybe_start_tracemalloc() |
| 195 | + |
117 | 196 | yield |
118 | 197 |
|
| 198 | + if tracemalloc_task is not None: |
| 199 | + tracemalloc_task.cancel() |
119 | 200 | await stats_store.aclose() # flush buffered samples before Redis closes |
120 | 201 | await close_redis() |
121 | 202 | await close_http_client() |
|
0 commit comments