diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index d6c92f9fa..82136b65f 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -281,7 +281,17 @@ def read( stream_message_counter: DefaultDict[HashableStreamDescriptor, float] = defaultdict(float) for message in self.source.read(self.logger, config, catalog, state): yield self.handle_record_counts(message, stream_message_counter) - self._memory_monitor.check_memory_usage() + try: + self._memory_monitor.check_memory_usage() + except AirbyteTracedException: + # Flush queued messages (state checkpoints, logs) before propagating + # the memory fail-fast exception, so the platform receives the last + # committed state for the next sync. + for queued_message in self._emit_queued_messages(self.source): + yield self.handle_record_counts(queued_message, stream_message_counter) + raise + + # Flush queued messages after normal completion of the read loop. for message in self._emit_queued_messages(self.source): yield self.handle_record_counts(message, stream_message_counter) diff --git a/airbyte_cdk/utils/memory_monitor.py b/airbyte_cdk/utils/memory_monitor.py index 0767ce3bf..8382b3c65 100644 --- a/airbyte_cdk/utils/memory_monitor.py +++ b/airbyte_cdk/utils/memory_monitor.py @@ -2,12 +2,15 @@ # Copyright (c) 2026 Airbyte, Inc., all rights reserved. # -"""Source-side memory introspection to log memory usage approaching container limits.""" +"""Source-side memory introspection with fail-fast shutdown on memory threshold.""" import logging from pathlib import Path from typing import Optional +from airbyte_cdk.models import FailureType +from airbyte_cdk.utils.traced_exception import AirbyteTracedException + logger = logging.getLogger("airbyte") # cgroup v2 paths @@ -18,23 +21,71 @@ _CGROUP_V1_USAGE = Path("/sys/fs/cgroup/memory/memory.usage_in_bytes") _CGROUP_V1_LIMIT = Path("/sys/fs/cgroup/memory/memory.limit_in_bytes") -# Log when usage is at or above 90% -_MEMORY_THRESHOLD = 0.90 +# Process-level anonymous RSS from /proc/self/status (Linux only, no extra dependency) +_PROC_SELF_STATUS = Path("/proc/self/status") + +# Log when usage is at or above 95% +_MEMORY_THRESHOLD = 0.95 + +# Raise AirbyteTracedException when BOTH conditions are met: +# 1. cgroup usage >= critical threshold +# 2. process anonymous RSS (RssAnon) >= anon threshold of the container limit +# This dual-condition avoids false positives from reclaimable kernel page cache +# and file-backed / shared resident pages that inflate VmRSS. +_CRITICAL_THRESHOLD = 0.98 +_ANON_RSS_THRESHOLD = 0.90 # Check interval (every N messages) _DEFAULT_CHECK_INTERVAL = 5000 +def _read_process_anon_rss_bytes() -> Optional[int]: + """Read process-private anonymous resident memory from /proc/self/status. + + Parses the ``RssAnon`` field which represents private anonymous pages — the + closest proxy for Python-heap memory pressure. Unlike ``VmRSS`` (which is + ``RssAnon + RssFile + RssShmem``), ``RssAnon`` is not inflated by mmap'd + file-backed or shared resident pages. + + Returns anonymous RSS in bytes, or None if unavailable (non-Linux, + permission error, or ``RssAnon`` field not present in the kernel). + """ + try: + status_text = _PROC_SELF_STATUS.read_text() + for line in status_text.splitlines(): + if line.startswith("RssAnon:"): + # Format: "RssAnon: 12345 kB" + parts = line.split() + if len(parts) >= 2: + return int(parts[1]) * 1024 # Convert kB to bytes + return None + except (OSError, ValueError): + return None + + class MemoryMonitor: - """Monitors container memory usage via cgroup files and logs warnings when usage is high. + """Monitors container memory usage via cgroup files and raises on critical pressure. Lazily probes cgroup v2 then v1 files on the first call to ``check_memory_usage()``. Caches which version exists. If neither is found (local dev / CI), all subsequent calls are instant no-ops. - Logs a WARNING on every check interval (default 5000 messages) when memory - usage is at or above 90% of the container limit. This gives breadcrumb - trails showing whether memory is climbing, plateauing, or sawtoothing. + **Logging:** Logs a WARNING on every check interval (default 5000 messages) + when cgroup memory usage is at or above 95% of the container limit. + + **Fail-fast:** Raises ``AirbyteTracedException`` with + ``FailureType.system_error`` when *both*: + + 1. Cgroup usage >= 98% of the container limit (container is near OOM-kill) + 2. Process anonymous RSS (``RssAnon``) >= 90% of the container limit + (pressure is from process-private anonymous memory, not elastic kernel + page cache or file-backed resident pages) + + This dual-condition avoids false positives from SQLite mmap'd pages, shared + memory, or other kernel-reclaimable memory that inflates cgroup usage but + does not represent real process memory pressure. If ``RssAnon`` is not + available, the monitor logs a warning and skips fail-fast rather than + falling back to cgroup-only raising. """ def __init__( @@ -102,14 +153,19 @@ def _read_memory(self) -> Optional[tuple[int, int]]: return None def check_memory_usage(self) -> None: - """Check memory usage and log when above 90%. + """Check memory usage; log at 95% and raise at critical dual-condition. Intended to be called on every message. The monitor internally tracks a message counter and only reads cgroup files every ``check_interval`` messages (default 5000) to minimise I/O overhead. - Logs a WARNING on every check above 90% to provide breadcrumb trails - showing memory trends over the sync lifetime. + **Logging:** WARNING on every check above 95%. + + **Fail-fast:** If cgroup usage >= 98% *and* process anonymous RSS + (``RssAnon``) >= 90% of the container limit, raises + ``AirbyteTracedException`` with ``FailureType.system_error`` so the + platform receives a clear error message instead of an opaque OOM-kill. + If ``RssAnon`` is unavailable, logs a warning and skips fail-fast. This method is a no-op if cgroup files are unavailable. """ @@ -138,3 +194,36 @@ def check_memory_usage(self) -> None: usage_gb, limit_gb, ) + + # Fail-fast: dual-condition check + if usage_ratio >= _CRITICAL_THRESHOLD: + anon_rss_bytes = _read_process_anon_rss_bytes() + if anon_rss_bytes is not None: + anon_ratio = anon_rss_bytes / limit_bytes + anon_percent = int(anon_ratio * 100) + if anon_ratio >= _ANON_RSS_THRESHOLD: + raise AirbyteTracedException( + message=f"Source memory usage exceeded critical threshold ({usage_percent}% of container limit).", + internal_message=( + f"Cgroup memory: {usage_bytes} / {limit_bytes} bytes ({usage_percent}%). " + f"Process anonymous RSS (RssAnon): {anon_rss_bytes} bytes ({anon_percent}% of limit). " + f"Thresholds: cgroup >= {int(_CRITICAL_THRESHOLD * 100)}%, " + f"anonymous RSS >= {int(_ANON_RSS_THRESHOLD * 100)}%." + ), + failure_type=FailureType.system_error, + ) + else: + logger.info( + "Cgroup usage at %d%% but process anonymous RSS only %d%% of limit; " + "pressure likely from file-backed or reclaimable pages — not raising.", + usage_percent, + anon_percent, + ) + else: + # RssAnon unavailable — log and skip rather than cgroup-only raising, + # so the implementation stays truly dual-condition. + logger.warning( + "Cgroup usage at %d%% but RssAnon unavailable from /proc/self/status; " + "skipping fail-fast (cannot confirm anonymous memory pressure).", + usage_percent, + ) diff --git a/unit_tests/test_entrypoint.py b/unit_tests/test_entrypoint.py index 520131881..35cd608a3 100644 --- a/unit_tests/test_entrypoint.py +++ b/unit_tests/test_entrypoint.py @@ -856,3 +856,72 @@ def test_given_serialization_error_using_orjson_then_fallback_on_json( # There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here record_messages = list(filter(lambda message: "RECORD" in message, messages)) assert len(record_messages) == 2 + + +def test_memory_failfast_flushes_queued_state_before_raising(mocker): + """Record emitted → check_memory_usage raises → queued STATE flushed with recordCount → exception propagates.""" + queued_state = AirbyteMessage( + type=Type.STATE, + state=AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="users", namespace=None), + stream_state=AirbyteStateBlob({"cursor": "abc123"}), + ), + ), + ) + + message_repository = MagicMock() + message_repository.consume_queue.side_effect = [ + [queued_state], # flush during fail-fast exception handling + [], # normal end-of-loop flush (not reached) + ] + mocker.patch.object( + MockSource, + "message_repository", + new_callable=mocker.PropertyMock, + return_value=message_repository, + ) + + record = AirbyteMessage( + record=AirbyteRecordMessage(stream="users", data={"id": 1}, emitted_at=1), + type=Type.RECORD, + ) + mocker.patch.object(MockSource, "read_state", return_value={}) + mocker.patch.object(MockSource, "read_catalog", return_value={}) + mocker.patch.object(MockSource, "read", return_value=[record]) + + fail_fast_exc = AirbyteTracedException( + message="Memory usage exceeded critical threshold (98%)", + failure_type=FailureType.system_error, + ) + + entrypoint_obj = AirbyteEntrypoint(MockSource()) + mocker.patch.object( + entrypoint_obj._memory_monitor, "check_memory_usage", side_effect=fail_fast_exc + ) + + spec = ConnectorSpecification(connectionSpecification={}) + config: dict[str, str] = {} + + # Call read() directly to get AirbyteMessage objects (not serialised strings) + gen = entrypoint_obj.read(spec, config, {}, []) + + # 1. First yielded message is the RECORD + first = next(gen) + assert first.type == Type.RECORD + assert first.record.stream == "users" # type: ignore[union-attr] + + # 2. Second yielded message is the queued STATE (flushed before exception) + second = next(gen) + assert second.type == Type.STATE + assert second.state.stream.stream_state == AirbyteStateBlob({"cursor": "abc123"}) # type: ignore[union-attr] + + # 3. The STATE passed through handle_record_counts, so sourceStats.recordCount == 1.0 + assert second.state.sourceStats is not None # type: ignore[union-attr] + assert second.state.sourceStats.recordCount == 1.0 # type: ignore[union-attr] + + # 4. Next iteration re-raises the AirbyteTracedException + with pytest.raises(AirbyteTracedException) as exc_info: + next(gen) + assert exc_info.value is fail_fast_exc diff --git a/unit_tests/utils/test_memory_monitor.py b/unit_tests/utils/test_memory_monitor.py index cf8250465..99bd254d0 100644 --- a/unit_tests/utils/test_memory_monitor.py +++ b/unit_tests/utils/test_memory_monitor.py @@ -8,18 +8,34 @@ import pytest +from airbyte_cdk.models import FailureType from airbyte_cdk.utils.memory_monitor import ( _CGROUP_V1_LIMIT, _CGROUP_V1_USAGE, _CGROUP_V2_CURRENT, _CGROUP_V2_MAX, + _PROC_SELF_STATUS, MemoryMonitor, + _read_process_anon_rss_bytes, ) +from airbyte_cdk.utils.traced_exception import AirbyteTracedException _MOCK_USAGE_BELOW = "500000000\n" # 50% of 1 GB -_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB +_MOCK_USAGE_AT_90 = "910000000\n" # 91% of 1 GB (below 95% logging threshold) +_MOCK_USAGE_AT_95 = "960000000\n" # 96% of 1 GB (above 95% logging threshold) +_MOCK_USAGE_AT_97 = "970000000\n" # 97% of 1 GB (below 98% critical threshold) +_MOCK_USAGE_AT_98 = "980000000\n" # 98% of 1 GB (at critical threshold) _MOCK_LIMIT = "1000000000\n" # 1 GB +# Anonymous RSS mock values (in kB as they appear in /proc/self/status RssAnon field). +# VmRSS is intentionally kept high in the "low anon" mock to prove the metric choice matters: +# VmRSS can be inflated by file-backed pages while RssAnon stays low. +_MOCK_ANON_HIGH = "RssAnon:\t 920000 kB\n" # ~92% of 1 GB (above 90% threshold) +_MOCK_ANON_LOW_VMRSS_HIGH = ( + "VmRSS:\t 900000 kB\n" # ~90% of 1 GB — high total RSS + "RssAnon:\t 500000 kB\n" # ~50% of 1 GB — low anonymous RSS +) + def _v2_exists(self: Path) -> bool: return self in (_CGROUP_V2_CURRENT, _CGROUP_V2_MAX) @@ -105,43 +121,43 @@ def test_noop_when_limit_is_zero(caplog: pytest.LogCaptureFixture) -> None: def test_no_warning_below_threshold(caplog: pytest.LogCaptureFixture) -> None: - """No warning should be emitted when usage is below 90%.""" + """No warning should be emitted when usage is below 95%.""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_BELOW)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), ): monitor.check_memory_usage() assert not caplog.records # --------------------------------------------------------------------------- -# check_memory_usage — at/above 90% threshold +# check_memory_usage — at/above 95% threshold # --------------------------------------------------------------------------- -def test_logs_at_90_percent(caplog: pytest.LogCaptureFixture) -> None: - """Warning log should be emitted at 91% usage (above 90% threshold).""" +def test_logs_at_95_percent(caplog: pytest.LogCaptureFixture) -> None: + """Warning log should be emitted at 96% usage (above 95% threshold).""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), ): monitor.check_memory_usage() assert len(caplog.records) == 1 - assert "91%" in caplog.records[0].message + assert "96%" in caplog.records[0].message -def test_logs_on_every_check_above_90_percent(caplog: pytest.LogCaptureFixture) -> None: - """Warning should be logged on EVERY check interval when above 90%, not just once.""" +def test_logs_on_every_check_above_95_percent(caplog: pytest.LogCaptureFixture) -> None: + """Warning should be logged on EVERY check interval when above 95%, not just once.""" monitor = MemoryMonitor(check_interval=1) with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), ): monitor.check_memory_usage() monitor.check_memory_usage() @@ -150,7 +166,7 @@ def test_logs_on_every_check_above_90_percent(caplog: pytest.LogCaptureFixture) # All three checks should produce a warning (no one-shot flag) assert len(caplog.records) == 3 for record in caplog.records: - assert "91%" in record.message + assert "96%" in record.message # --------------------------------------------------------------------------- @@ -163,7 +179,7 @@ def test_cgroup_v1_emits_warning(caplog: pytest.LogCaptureFixture) -> None: def mock_read_text(self: Path) -> str: if self == _CGROUP_V1_USAGE: - return _MOCK_USAGE_AT_90 + return _MOCK_USAGE_AT_95 if self == _CGROUP_V1_LIMIT: return _MOCK_LIMIT return "" @@ -177,7 +193,7 @@ def mock_read_text(self: Path) -> str: monitor.check_memory_usage() assert len(caplog.records) == 1 - assert "91%" in caplog.records[0].message + assert "96%" in caplog.records[0].message # --------------------------------------------------------------------------- @@ -191,7 +207,7 @@ def test_check_interval_skips_intermediate_calls(caplog: pytest.LogCaptureFixtur with ( caplog.at_level(logging.WARNING, logger="airbyte"), patch.object(Path, "exists", _v2_exists), - patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_90)), + patch.object(Path, "read_text", _v2_mock_read(usage=_MOCK_USAGE_AT_95)), ): # First 4999 calls should be skipped for _ in range(4999): @@ -245,3 +261,131 @@ def mock_read_text(self: Path) -> str: ): monitor.check_memory_usage() assert not caplog.records + + +# --------------------------------------------------------------------------- +# _read_process_anon_rss_bytes — unit tests +# --------------------------------------------------------------------------- + + +def test_read_process_anon_rss_bytes_parses_rssanon() -> None: + """Correctly parses RssAnon from /proc/self/status content.""" + status_content = ( + "Name:\tpython3\nVmRSS:\t 1000000 kB\nRssAnon:\t 512000 kB\nRssShmem:\t 0 kB\n" + ) + with patch.object(Path, "read_text", return_value=status_content): + result = _read_process_anon_rss_bytes() + assert result == 512000 * 1024 + + +def test_read_process_anon_rss_bytes_returns_none_on_missing_file() -> None: + """Returns None when /proc/self/status is unreadable.""" + + def raise_oserror(self: Path) -> str: + raise OSError("No such file") + + with patch.object(Path, "read_text", raise_oserror): + assert _read_process_anon_rss_bytes() is None + + +def test_read_process_anon_rss_bytes_returns_none_when_rssanon_absent() -> None: + """Returns None when RssAnon line is not present (e.g. older kernel).""" + with patch.object(Path, "read_text", return_value="Name:\tpython3\nVmRSS:\t 512000 kB\n"): + assert _read_process_anon_rss_bytes() is None + + +def test_read_process_anon_rss_bytes_ignores_vmrss() -> None: + """Ensures the parser reads RssAnon specifically, not VmRSS.""" + # Only VmRSS present, no RssAnon — should return None + status_content = "VmRSS:\t 900000 kB\n" + with patch.object(Path, "read_text", return_value=status_content): + assert _read_process_anon_rss_bytes() is None + + +# --------------------------------------------------------------------------- +# check_memory_usage — fail-fast (dual-condition) +# --------------------------------------------------------------------------- + + +def _proc_status_read(anon_content: str, usage: str = _MOCK_USAGE_AT_98): + """Return a mock read_text that serves cgroup v2 AND /proc/self/status.""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return usage + if self == _CGROUP_V2_MAX: + return _MOCK_LIMIT + if self == _PROC_SELF_STATUS: + return anon_content + return "" + + return mock_read_text + + +def test_raises_when_both_cgroup_and_anon_rss_above_thresholds() -> None: + """Fail-fast raises AirbyteTracedException when both cgroup >= 98% and RssAnon >= 90%.""" + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _proc_status_read(_MOCK_ANON_HIGH)), + ): + with pytest.raises(AirbyteTracedException) as exc_info: + monitor.check_memory_usage() + assert exc_info.value.failure_type == FailureType.system_error + assert "critical threshold" in (exc_info.value.message or "") + assert "98%" in (exc_info.value.message or "") + assert "anonymous RSS" in (exc_info.value.internal_message or "") + + +def test_no_raise_when_cgroup_high_but_anon_rss_low(caplog: pytest.LogCaptureFixture) -> None: + """No exception when cgroup >= 98% but RssAnon < 90% (file-backed pages scenario). + + This test also proves the metric choice matters: VmRSS is 90% (high) but + RssAnon is only 50% (low), so the pressure is from file-backed pages. + """ + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.INFO, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", _proc_status_read(_MOCK_ANON_LOW_VMRSS_HIGH)), + ): + monitor.check_memory_usage() # Should NOT raise + info_records = [r for r in caplog.records if r.levelno == logging.INFO] + assert any("file-backed" in r.message for r in info_records) + + +def test_no_raise_when_cgroup_below_critical() -> None: + """No exception when cgroup at 97% (< 98% threshold), even with high RssAnon.""" + monitor = MemoryMonitor(check_interval=1) + with ( + patch.object(Path, "exists", _v2_exists), + patch.object( + Path, "read_text", _proc_status_read(_MOCK_ANON_HIGH, usage=_MOCK_USAGE_AT_97) + ), + ): + monitor.check_memory_usage() # Should NOT raise + + +def test_no_raise_when_anon_rss_unavailable_and_cgroup_critical( + caplog: pytest.LogCaptureFixture, +) -> None: + """Logs warning and skips fail-fast when RssAnon is unavailable (stays truly dual-condition).""" + + def mock_read_text(self: Path) -> str: + if self == _CGROUP_V2_CURRENT: + return _MOCK_USAGE_AT_98 + if self == _CGROUP_V2_MAX: + return _MOCK_LIMIT + if self == _PROC_SELF_STATUS: + raise OSError("No such file") + return "" + + monitor = MemoryMonitor(check_interval=1) + with ( + caplog.at_level(logging.WARNING, logger="airbyte"), + patch.object(Path, "exists", _v2_exists), + patch.object(Path, "read_text", mock_read_text), + ): + monitor.check_memory_usage() # Should NOT raise + warning_records = [r for r in caplog.records if r.levelno == logging.WARNING] + assert any("RssAnon unavailable" in r.message for r in warning_records)